From 2b982b5d8fbe900df14afcf64473dea00e7977fd Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 10 Feb 2023 11:04:32 +0800 Subject: [PATCH] Fix DataNode ut never meet condition (#22093) See also: #22079 Signed-off-by: yangxuan --- internal/datanode/compaction_executor_test.go | 5 +- internal/datanode/data_sync_service_test.go | 125 ++++++++---------- 2 files changed, 60 insertions(+), 70 deletions(-) diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index 668919d720..d4123aa11b 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -27,9 +27,12 @@ import ( func TestCompactionExecutor(t *testing.T) { t.Run("Test execute", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) ex := newCompactionExecutor() - go ex.start(context.TODO()) + go ex.start(ctx) ex.execute(newMockCompactor(true)) + + cancel() }) t.Run("Test stopTask", func(t *testing.T) { diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 3beb6e8f6d..c0bb595080 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/commonpb" @@ -201,7 +202,6 @@ func TestDataSyncService_Start(t *testing.T) { // init data node insertChannelName := "by-dev-rootcoord-dml" - ddlChannelName := "by-dev-rootcoord-ddl" Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) @@ -322,19 +322,13 @@ func TestDataSyncService_Start(t *testing.T) { insertStream, _ := factory.NewMsgStream(ctx) insertStream.AsProducer([]string{insertChannelName}) - ddStream, _ := factory.NewMsgStream(ctx) - ddStream.AsProducer([]string{ddlChannelName}) - var insertMsgStream msgstream.MsgStream = insertStream - var ddMsgStream msgstream.MsgStream = ddStream err = insertMsgStream.Produce(&msgPack) assert.NoError(t, err) _, err = insertMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - _, err = ddMsgStream.Broadcast(&timeTickMsgPack) - assert.NoError(t, err) select { case flushPack := <-sync.flushListener: @@ -353,42 +347,34 @@ func TestDataSyncService_Close(t *testing.T) { defer cancel() os.RemoveAll("/tmp/milvus") - - // init data node - insertChannelName := "by-dev-rootcoord-dml2" - ddlChannelName := "by-dev-rootcoord-ddl2" - - Factory := &MetaFactory{} - collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) - mockRootCoord := &RootCoordFactory{ - pkType: schemapb.DataType_Int64, - } - - flushChan := make(chan flushMsg, 100) - resendTTChan := make(chan resendTTMsg, 100) - cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) - defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) - - allocFactory := NewAllocatorFactory(1) - factory := dependency.NewDefaultFactory(true) defer os.RemoveAll("/tmp/milvus") - paramtable.Get().Remove(Params.DataNodeCfg.FlushInsertBufferSize.Key) + // init data node + var ( + insertChannelName = "by-dev-rootcoord-dml2" + + metaFactory = &MetaFactory{} + mockRootCoord = &RootCoordFactory{pkType: schemapb.DataType_Int64} + + collMeta = metaFactory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) + cm = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) + ) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + ufs := []*datapb.SegmentInfo{{ CollectionID: collMeta.ID, PartitionID: 1, InsertChannel: insertChannelName, - ID: 0, - NumOfRows: 0, + ID: 1, + NumOfRows: 1, DmlPosition: &internalpb.MsgPosition{}, }} fs := []*datapb.SegmentInfo{{ CollectionID: collMeta.ID, PartitionID: 1, InsertChannel: insertChannelName, - ID: 1, - NumOfRows: 0, + ID: 0, + NumOfRows: 1, DmlPosition: &internalpb.MsgPosition{}, }} var ufsIds []int64 @@ -406,10 +392,16 @@ func TestDataSyncService_Close(t *testing.T) { FlushedSegmentIds: fsIds, } - signalCh := make(chan string, 100) + var ( + flushChan = make(chan flushMsg, 100) + resendTTChan = make(chan resendTTMsg, 100) + signalCh = make(chan string, 100) - dataCoord := &DataCoordFactory{} - dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{ + allocFactory = NewAllocatorFactory(1) + factory = dependency.NewDefaultFactory(true) + mockDataCoord = &DataCoordFactory{} + ) + mockDataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{ 0: { ID: 0, CollectionID: collMeta.ID, @@ -425,15 +417,22 @@ func TestDataSyncService_Close(t *testing.T) { }, } - sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor(), 0) + // No Auto flush + paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key) + + channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) + sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), 0) assert.Nil(t, err) sync.flushListener = make(chan *segmentFlushPack, 10) defer close(sync.flushListener) + sync.start() - dataFactory := NewDataFactory() - ts := tsoutil.GetCurrentTime() + var ( + dataFactory = NewDataFactory() + ts = tsoutil.GetCurrentTime() + ) insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName, ts) msgPack := msgstream.MsgPack{ BeginTs: ts, @@ -448,9 +447,7 @@ func TestDataSyncService_Close(t *testing.T) { } // 400 is the actual data - int64Pks := []primaryKey{ - newInt64PrimaryKey(400), - } + int64Pks := []primaryKey{newInt64PrimaryKey(400)} deleteMessages := dataFactory.GenMsgStreamDeleteMsgWithTs(0, int64Pks, insertChannelName, ts+1) inMsgs := make([]msgstream.TsMsg, 0) inMsgs = append(inMsgs, deleteMessages) @@ -501,11 +498,7 @@ func TestDataSyncService_Close(t *testing.T) { insertStream, _ := factory.NewMsgStream(ctx) insertStream.AsProducer([]string{insertChannelName}) - ddStream, _ := factory.NewMsgStream(ctx) - ddStream.AsProducer([]string{ddlChannelName}) - var insertMsgStream msgstream.MsgStream = insertStream - var ddMsgStream msgstream.MsgStream = ddStream err = insertMsgStream.Produce(&msgPack) assert.NoError(t, err) @@ -515,34 +508,28 @@ func TestDataSyncService_Close(t *testing.T) { _, err = insertMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - _, err = ddMsgStream.Broadcast(&timeTickMsgPack) - assert.NoError(t, err) - // wait for delete - for sync.delBufferManager.GetEntriesNum(1) == 0 { - time.Sleep(100) - } + // wait for delete, no auto flush leads to all data in buffer. + require.Eventually(t, func() bool { return sync.delBufferManager.GetEntriesNum(1) == 1 }, + 5*time.Second, 100*time.Millisecond) + assert.Equal(t, 0, len(sync.flushListener)) - // close and wait for flush + // close will trigger a force sync sync.close() - for { - select { - case flushPack, ok := <-sync.flushListener: - assert.True(t, ok) - if flushPack.segmentID == 1 { - assert.True(t, len(flushPack.insertLogs) == 12) - assert.True(t, len(flushPack.statsLogs) == 1) - assert.True(t, len(flushPack.deltaLogs) == 1) - return - } - if flushPack.segmentID == 0 { - assert.True(t, len(flushPack.insertLogs) == 0) - assert.True(t, len(flushPack.statsLogs) == 0) - assert.True(t, len(flushPack.deltaLogs) == 0) - } - case <-sync.ctx.Done(): - } - } + assert.Eventually(t, func() bool { return len(sync.flushListener) == 1 }, + 5*time.Second, 100*time.Millisecond) + flushPack, ok := <-sync.flushListener + assert.True(t, ok) + assert.Equal(t, UniqueID(1), flushPack.segmentID) + assert.True(t, len(flushPack.insertLogs) == 12) + assert.True(t, len(flushPack.statsLogs) == 1) + assert.True(t, len(flushPack.deltaLogs) == 1) + + <-sync.ctx.Done() + + // Double close is safe + sync.close() + <-sync.ctx.Done() } func genBytes() (rawData []byte) {