Add data node unit tests (#7622)

Issue: #6357
Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
sunby 2021-09-09 16:40:00 +08:00 committed by GitHub
parent be81b69e91
commit 06cee85a39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 131 additions and 102 deletions

View File

@ -61,6 +61,8 @@ const (
ConnectEtcdMaxRetryTime = 1000
)
const illegalRequestErrStr = "Illegal request"
// DataNode communicates with outside services and unioun all
// services in datanode package.
//
@ -362,12 +364,12 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha
}
switch {
case node.State.Load() != internalpb.StateCode_Healthy:
status.Reason = fmt.Sprintf("DataNode %d not healthy, please re-send message", node.NodeID)
case !node.isHealthy():
status.Reason = msgDataNodeIsUnhealthy(node.NodeID)
return status, nil
case len(in.GetVchannels()) == 0:
status.Reason = "Illegal request"
status.Reason = illegalRequestErrStr
return status, nil
default:

View File

@ -18,9 +18,11 @@ import (
"math/rand"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
@ -32,7 +34,6 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -55,49 +56,65 @@ func TestDataNode(t *testing.T) {
node.Register()
t.Run("Test WatchDmChannels", func(t *testing.T) {
t.Skip()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node1 := newIDLEDataNodeMock(ctx)
node1.Init()
node1.Start()
vchannels := []*datapb.VchannelInfo{}
for _, ch := range []string{"datanode-01-test-WatchDmChannel",
"datanode-02-test-WatchDmChannels"} {
log.Debug("InsertChannels", zap.String("name", ch))
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: ch,
UnflushedSegments: []*datapb.SegmentInfo{},
node1.Register()
defer func() {
err := node1.Stop()
assert.Nil(t, err)
}()
cases := []struct {
desc string
channels []string
expect bool
failReason string
}{
{"test watch channel normally", []string{"datanode-01-test-WatchDmChannel", "datanode-02-test-WatchDmChannels"}, true, ""},
{"test send empty request", []string{}, false, illegalRequestErrStr},
}
for _, testcase := range cases {
vchannels := []*datapb.VchannelInfo{}
for _, ch := range testcase.channels {
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: ch,
UnflushedSegments: []*datapb.SegmentInfo{},
}
vchannels = append(vchannels, vchan)
}
req := &datapb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Vchannels: vchannels,
}
resp, err := node1.WatchDmChannels(context.TODO(), req)
assert.NoError(t, err)
if testcase.expect {
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.NotNil(t, node1.vchan2FlushCh)
assert.NotNil(t, node1.vchan2SyncService)
sync, ok := node1.vchan2SyncService[testcase.channels[0]]
assert.True(t, ok)
assert.NotNil(t, sync)
assert.Equal(t, UniqueID(1), sync.collectionID)
assert.Equal(t, len(testcase.channels), len(node1.vchan2SyncService))
assert.Equal(t, len(node1.vchan2FlushCh), len(node1.vchan2SyncService))
} else {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, testcase.failReason, resp.Reason)
}
vchannels = append(vchannels, vchan)
}
req := &datapb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Vchannels: vchannels,
}
_, err := node1.WatchDmChannels(node.ctx, req)
assert.NoError(t, err)
assert.NotNil(t, node1.vchan2FlushCh)
assert.NotNil(t, node1.vchan2SyncService)
sync, ok := node1.vchan2SyncService["datanode-01-test-WatchDmChannel"]
assert.True(t, ok)
assert.NotNil(t, sync)
assert.Equal(t, UniqueID(1), sync.collectionID)
assert.Equal(t, 2, len(node1.vchan2SyncService))
assert.Equal(t, len(node1.vchan2FlushCh), len(node1.vchan2SyncService))
_, err = node1.WatchDmChannels(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, 2, len(node1.vchan2SyncService))
cancel()
<-node1.ctx.Done()
node1.Stop()
})
t.Run("Test GetComponentStates", func(t *testing.T) {
@ -138,14 +155,29 @@ func TestDataNode(t *testing.T) {
})
t.Run("Test FlushSegments", func(t *testing.T) {
t.Skipf("Fix latter")
dmChannelName := "fake-dm-channel-test-HEALTHDataNodeMock"
node1 := newHEALTHDataNodeMock(dmChannelName)
node1 := newIDLEDataNodeMock(context.TODO())
node1.Init()
node1.Start()
node1.Register()
defer func() {
err := node1.Stop()
assert.Nil(t, err)
}()
sync, ok := node1.vchan2SyncService[dmChannelName]
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
UnflushedSegments: []*datapb.SegmentInfo{},
FlushedSegments: []int64{},
}
err := node1.NewDataSyncService(vchan)
assert.Nil(t, err)
service, ok := node1.vchan2SyncService[dmChannelName]
assert.True(t, ok)
sync.replica.addNewSegment(0, 1, 1, dmChannelName, nil, nil)
service.replica.addNewSegment(0, 1, 1, dmChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
req := &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{},
@ -154,53 +186,60 @@ func TestDataNode(t *testing.T) {
SegmentIDs: []int64{0},
}
status, err := node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
wg := sync.WaitGroup{}
wg.Add(2)
timeTickMsgPack := msgstream.MsgPack{}
go func() {
defer wg.Done()
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(0),
EndTimestamp: Timestamp(0),
HashValues: []uint32{0},
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: UniqueID(0),
Timestamp: math.MaxUint64,
SourceID: 0,
},
},
}
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
// pulsar produce
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.NoError(t, err)
insertStream, _ := msFactory.NewMsgStream(node1.ctx)
insertStream.AsProducer([]string{dmChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
defer func() {
<-node1.ctx.Done()
node1.Stop()
status, err := node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
}()
go func() {
defer wg.Done()
timeTickMsgPack := msgstream.MsgPack{}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(0),
EndTimestamp: Timestamp(0),
HashValues: []uint32{0},
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: UniqueID(0),
Timestamp: math.MaxUint64,
SourceID: 0,
},
},
}
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
// pulsar produce
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.NoError(t, err)
insertStream, err := msFactory.NewMsgStream(node1.ctx)
assert.NoError(t, err)
insertStream.AsProducer([]string{dmChannelName})
insertStream.Start()
defer insertStream.Close()
err = insertStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = insertStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
}()
wg.Wait()
})
t.Run("Test GetTimeTickChannel", func(t *testing.T) {

View File

@ -43,7 +43,6 @@ const ctxTimeInMillisecond = 5000
const debug = false
func newIDLEDataNodeMock(ctx context.Context) *DataNode {
msFactory := msgstream.NewPmsFactory()
node := NewDataNode(ctx, msFactory)
@ -52,7 +51,6 @@ func newIDLEDataNodeMock(ctx context.Context) *DataNode {
collectionID: 1,
collectionName: "collection-1",
}
node.SetRootCoordInterface(rc)
ds := &DataCoordFactory{}
@ -90,16 +88,6 @@ func newHEALTHDataNodeMock(dmChannelName string) *DataNode {
ds := &DataCoordFactory{}
node.SetDataCoordInterface(ds)
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
UnflushedSegments: []*datapb.SegmentInfo{},
FlushedSegments: []int64{},
}
node.Start()
_ = node.NewDataSyncService(vchan)
return node
}