From c73219a54dacd49dacef9a13f57fc29c373737ac Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 16 Jun 2023 14:14:39 +0800 Subject: [PATCH] Limit the number of concurrent sync tasks and allow only one sync task for the same segment (#24881) Signed-off-by: bigsheeper --- configs/milvus.yaml | 1 + internal/datanode/compactor_test.go | 5 ++ .../datanode/flow_graph_insert_buffer_node.go | 16 +++++- .../flow_graph_insert_buffer_node_test.go | 56 +++++++++++++++++++ internal/datanode/flush_manager.go | 20 ++++++- internal/datanode/segment.go | 17 ++++++ pkg/util/paramtable/component_param.go | 10 ++++ pkg/util/paramtable/component_param_test.go | 3 + 8 files changed, 125 insertions(+), 3 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7dd7f890c8..bfb2d96406 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -331,6 +331,7 @@ dataNode: flowGraph: maxQueueLength: 1024 # Maximum length of task queue in flowgraph maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph + maxParallelSyncTaskNum: 2 # Maximum number of sync tasks executed in parallel in each flush manager segment: insertBufSize: 16777216 # Max buffer size to flush for a single segment. deleteBufBytes: 67108864 # Max buffer size to flush del for a single channel diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 9e390e715b..879663dab6 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -969,6 +969,7 @@ type mockFlushManager struct { returnError bool recordFlushedSeg bool flushedSegIDs []UniqueID + full bool injectOverCount struct { sync.RWMutex value int @@ -994,6 +995,10 @@ func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID, return nil } +func (mfm *mockFlushManager) isFull() bool { + return mfm.full +} + func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) { go func() { time.Sleep(time.Second * time.Duration(mfm.sleepSeconds)) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index ed3c697a27..ad02f058ff 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -465,14 +465,26 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, ibNode.channel.(*ChannelMeta).needToSync.Store(false) for _, task := range syncTasks { - log.Info("insertBufferNode syncing BufferData", - zap.Int64("segmentID", task.segmentID), + log := log.With(zap.Int64("segmentID", task.segmentID), zap.Bool("flushed", task.flushed), zap.Bool("dropped", task.dropped), zap.Bool("auto", task.auto), zap.Any("position", endPosition), zap.String("channel", ibNode.channelName), ) + // check if task pool is full + if !task.dropped && !task.flushed && ibNode.flushManager.isFull() { + log.Warn("task pool is full, skip it") + continue + } + // check if segment is syncing + segment := ibNode.channel.getSegment(task.segmentID) + if !task.dropped && !task.flushed && segment.isSyncing() { + log.Info("segment is syncing, skip it") + continue + } + segment.setSyncing(true) + log.Info("insertBufferNode syncing BufferData") // use the flushed pk stats to take current stat var pkStats *storage.PrimaryKeyStats // TODO, this has to be async flush, no need to block here. diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 25d430df09..e48da3f8a8 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -263,6 +263,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) }) // test flushBufferData failed + segment := channel.getSegment(UniqueID(1)) + segment.setSyncing(false) setFlowGraphRetryOpt(retry.Attempts(1)) inMsg = genFlowGraphInsertMsg(insertChannelName) iBNode.flushManager = &mockFlushManager{returnError: true} @@ -1144,3 +1146,57 @@ func TestInsertBufferNode_collectSegmentsToSync(t *testing.T) { }) } } + +func TestInsertBufferNode_task_pool_is_full(t *testing.T) { + ctx := context.Background() + flushCh := make(chan flushMsg, 100) + + cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir)) + defer func() { + err := cm.RemoveWithPrefix(ctx, cm.RootPath()) + assert.NoError(t, err) + }() + + channelName := "test_task_pool_is_full_mock_ch1" + collection := UniqueID(0) + segmentID := UniqueID(100) + + channel := newChannel(channelName, collection, nil, &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm) + err := channel.addSegment(addSegmentReq{ + segType: datapb.SegmentType_New, + segID: segmentID, + collID: collection, + startPos: new(msgpb.MsgPosition), + endPos: new(msgpb.MsgPosition), + }) + assert.NoError(t, err) + channel.setCurInsertBuffer(segmentID, &BufferData{size: 100}) + + fManager := &mockFlushManager{ + full: true, + } + + dManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, + } + + node := &insertBufferNode{ + flushChan: flushCh, + channelName: channelName, + channel: channel, + flushManager: fManager, + delBufferManager: dManager, + } + inMsg := genFlowGraphInsertMsg(channelName) + inMsg.BaseMsg = flowgraph.NewBaseMsg(true) // trigger sync task + + segmentsToSync := node.Sync(&inMsg, []UniqueID{segmentID}, nil) + assert.Len(t, segmentsToSync, 0) + + fManager.full = false + segment := channel.getSegment(segmentID) + segment.setSyncing(true) + segmentsToSync = node.Sync(&inMsg, []UniqueID{segmentID}, nil) + assert.Len(t, segmentsToSync, 0) +} diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 8409a3eca3..245d42ee78 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -51,6 +51,8 @@ type flushManager interface { flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) // notify flush manager del buffer data flushDelData(data *DelDataBuf, segmentID UniqueID, pos *msgpb.MsgPosition) error + // isFull return true if the task pool is full + isFull() bool // injectFlush injects compaction or other blocking task before flush sync injectFlush(injection *taskInjection, segments ...UniqueID) // startDropping changes flush manager into dropping mode @@ -150,7 +152,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskR // postTask handles clean up work after a task is done func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) { // delete task from working map - q.working.Delete(string(pack.pos.MsgID)) + q.working.Delete(getSyncTaskID(pack.pos)) // after descreasing working count, check whether flush queue is empty q.injectMut.Lock() q.runningTasks-- @@ -417,6 +419,20 @@ func (m *rendezvousFlushManager) serializePkStatsLog(segmentID int64, flushed bo return blob, stats, nil } +// isFull return true if the task pool is full +func (m *rendezvousFlushManager) isFull() bool { + var num int + m.dispatcher.Range(func(_, q any) bool { + queue := q.(*orderFlushQueue) + queue.working.Range(func(_, _ any) bool { + num++ + return true + }) + return true + }) + return num >= Params.DataNodeCfg.MaxParallelSyncTaskNum.GetAsInt() +} + // flushBufferData notifies flush manager insert buffer data. // This method will be retired on errors. Final errors will be propagated upstream and logged. func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) { @@ -945,6 +961,8 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet dsService.flushingSegCache.Remove(req.GetSegmentID()) dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos) dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos) + segment := dsService.channel.getSegment(req.GetSegmentID()) + segment.setSyncing(false) // dsService.channel.saveBinlogPath(fieldStats) } } diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go index 9e8d54af92..bb77eb3b86 100644 --- a/internal/datanode/segment.go +++ b/internal/datanode/segment.go @@ -54,6 +54,23 @@ type Segment struct { lastSyncTs Timestamp startPos *msgpb.MsgPosition // TODO readonly lazyLoading atomic.Value + syncing atomic.Value +} + +func (s *Segment) isSyncing() bool { + if s != nil { + b, ok := s.syncing.Load().(bool) + if ok { + return b + } + } + return false +} + +func (s *Segment) setSyncing(syncing bool) { + if s != nil { + s.syncing.Store(syncing) + } } func (s *Segment) isLoadingLazy() bool { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 93dd489059..bff5c2960d 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2131,6 +2131,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # type dataNodeConfig struct { FlowGraphMaxQueueLength ParamItem `refreshable:"false"` FlowGraphMaxParallelism ParamItem `refreshable:"false"` + MaxParallelSyncTaskNum ParamItem `refreshable:"false"` // segment FlushInsertBufferSize ParamItem `refreshable:"true"` @@ -2176,6 +2177,15 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.FlowGraphMaxParallelism.Init(base.mgr) + p.MaxParallelSyncTaskNum = ParamItem{ + Key: "dataNode.dataSync.maxParallelSyncTaskNum", + Version: "2.3.0", + DefaultValue: "2", + Doc: "Maximum number of sync tasks executed in parallel in each flush manager", + Export: true, + } + p.MaxParallelSyncTaskNum.Init(base.mgr) + p.FlushInsertBufferSize = ParamItem{ Key: "dataNode.segment.insertBufSize", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f7a5f8da9a..303caccbbb 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -367,6 +367,9 @@ func TestComponentParam(t *testing.T) { maxParallelism := Params.FlowGraphMaxParallelism.GetAsInt() t.Logf("flowGraphMaxParallelism: %d", maxParallelism) + maxParallelSyncTaskNum := Params.MaxParallelSyncTaskNum.GetAsInt() + t.Logf("maxParallelSyncTaskNum: %d", maxParallelSyncTaskNum) + size := Params.FlushInsertBufferSize.GetAsInt() t.Logf("FlushInsertBufferSize: %d", size)