Limit the number of concurrent sync tasks and allow only one sync task for the same segment (#24881)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2023-06-16 14:14:39 +08:00 committed by GitHub
parent cbfbc4e63a
commit c73219a54d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 125 additions and 3 deletions

View File

@ -331,6 +331,7 @@ dataNode:
flowGraph:
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
maxParallelSyncTaskNum: 2 # Maximum number of sync tasks executed in parallel in each flush manager
segment:
insertBufSize: 16777216 # Max buffer size to flush for a single segment.
deleteBufBytes: 67108864 # Max buffer size to flush del for a single channel

View File

@ -969,6 +969,7 @@ type mockFlushManager struct {
returnError bool
recordFlushedSeg bool
flushedSegIDs []UniqueID
full bool
injectOverCount struct {
sync.RWMutex
value int
@ -994,6 +995,10 @@ func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID,
return nil
}
func (mfm *mockFlushManager) isFull() bool {
return mfm.full
}
func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) {
go func() {
time.Sleep(time.Second * time.Duration(mfm.sleepSeconds))

View File

@ -465,14 +465,26 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
ibNode.channel.(*ChannelMeta).needToSync.Store(false)
for _, task := range syncTasks {
log.Info("insertBufferNode syncing BufferData",
zap.Int64("segmentID", task.segmentID),
log := log.With(zap.Int64("segmentID", task.segmentID),
zap.Bool("flushed", task.flushed),
zap.Bool("dropped", task.dropped),
zap.Bool("auto", task.auto),
zap.Any("position", endPosition),
zap.String("channel", ibNode.channelName),
)
// check if task pool is full
if !task.dropped && !task.flushed && ibNode.flushManager.isFull() {
log.Warn("task pool is full, skip it")
continue
}
// check if segment is syncing
segment := ibNode.channel.getSegment(task.segmentID)
if !task.dropped && !task.flushed && segment.isSyncing() {
log.Info("segment is syncing, skip it")
continue
}
segment.setSyncing(true)
log.Info("insertBufferNode syncing BufferData")
// use the flushed pk stats to take current stat
var pkStats *storage.PrimaryKeyStats
// TODO, this has to be async flush, no need to block here.

View File

@ -263,6 +263,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
// test flushBufferData failed
segment := channel.getSegment(UniqueID(1))
segment.setSyncing(false)
setFlowGraphRetryOpt(retry.Attempts(1))
inMsg = genFlowGraphInsertMsg(insertChannelName)
iBNode.flushManager = &mockFlushManager{returnError: true}
@ -1144,3 +1146,57 @@ func TestInsertBufferNode_collectSegmentsToSync(t *testing.T) {
})
}
}
func TestInsertBufferNode_task_pool_is_full(t *testing.T) {
ctx := context.Background()
flushCh := make(chan flushMsg, 100)
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer func() {
err := cm.RemoveWithPrefix(ctx, cm.RootPath())
assert.NoError(t, err)
}()
channelName := "test_task_pool_is_full_mock_ch1"
collection := UniqueID(0)
segmentID := UniqueID(100)
channel := newChannel(channelName, collection, nil, &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm)
err := channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_New,
segID: segmentID,
collID: collection,
startPos: new(msgpb.MsgPosition),
endPos: new(msgpb.MsgPosition),
})
assert.NoError(t, err)
channel.setCurInsertBuffer(segmentID, &BufferData{size: 100})
fManager := &mockFlushManager{
full: true,
}
dManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
node := &insertBufferNode{
flushChan: flushCh,
channelName: channelName,
channel: channel,
flushManager: fManager,
delBufferManager: dManager,
}
inMsg := genFlowGraphInsertMsg(channelName)
inMsg.BaseMsg = flowgraph.NewBaseMsg(true) // trigger sync task
segmentsToSync := node.Sync(&inMsg, []UniqueID{segmentID}, nil)
assert.Len(t, segmentsToSync, 0)
fManager.full = false
segment := channel.getSegment(segmentID)
segment.setSyncing(true)
segmentsToSync = node.Sync(&inMsg, []UniqueID{segmentID}, nil)
assert.Len(t, segmentsToSync, 0)
}

View File

@ -51,6 +51,8 @@ type flushManager interface {
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error)
// notify flush manager del buffer data
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *msgpb.MsgPosition) error
// isFull return true if the task pool is full
isFull() bool
// injectFlush injects compaction or other blocking task before flush sync
injectFlush(injection *taskInjection, segments ...UniqueID)
// startDropping changes flush manager into dropping mode
@ -150,7 +152,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *msgpb.MsgPosition) *flushTaskR
// postTask handles clean up work after a task is done
func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInjectionFunc) {
// delete task from working map
q.working.Delete(string(pack.pos.MsgID))
q.working.Delete(getSyncTaskID(pack.pos))
// after descreasing working count, check whether flush queue is empty
q.injectMut.Lock()
q.runningTasks--
@ -417,6 +419,20 @@ func (m *rendezvousFlushManager) serializePkStatsLog(segmentID int64, flushed bo
return blob, stats, nil
}
// isFull return true if the task pool is full
func (m *rendezvousFlushManager) isFull() bool {
var num int
m.dispatcher.Range(func(_, q any) bool {
queue := q.(*orderFlushQueue)
queue.working.Range(func(_, _ any) bool {
num++
return true
})
return true
})
return num >= Params.DataNodeCfg.MaxParallelSyncTaskNum.GetAsInt()
}
// flushBufferData notifies flush manager insert buffer data.
// This method will be retired on errors. Final errors will be propagated upstream and logged.
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) {
@ -945,6 +961,8 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
dsService.flushingSegCache.Remove(req.GetSegmentID())
dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos)
dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos)
segment := dsService.channel.getSegment(req.GetSegmentID())
segment.setSyncing(false)
// dsService.channel.saveBinlogPath(fieldStats)
}
}

View File

@ -54,6 +54,23 @@ type Segment struct {
lastSyncTs Timestamp
startPos *msgpb.MsgPosition // TODO readonly
lazyLoading atomic.Value
syncing atomic.Value
}
func (s *Segment) isSyncing() bool {
if s != nil {
b, ok := s.syncing.Load().(bool)
if ok {
return b
}
}
return false
}
func (s *Segment) setSyncing(syncing bool) {
if s != nil {
s.syncing.Store(syncing)
}
}
func (s *Segment) isLoadingLazy() bool {

View File

@ -2131,6 +2131,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
type dataNodeConfig struct {
FlowGraphMaxQueueLength ParamItem `refreshable:"false"`
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
MaxParallelSyncTaskNum ParamItem `refreshable:"false"`
// segment
FlushInsertBufferSize ParamItem `refreshable:"true"`
@ -2176,6 +2177,15 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.FlowGraphMaxParallelism.Init(base.mgr)
p.MaxParallelSyncTaskNum = ParamItem{
Key: "dataNode.dataSync.maxParallelSyncTaskNum",
Version: "2.3.0",
DefaultValue: "2",
Doc: "Maximum number of sync tasks executed in parallel in each flush manager",
Export: true,
}
p.MaxParallelSyncTaskNum.Init(base.mgr)
p.FlushInsertBufferSize = ParamItem{
Key: "dataNode.segment.insertBufSize",
Version: "2.0.0",

View File

@ -367,6 +367,9 @@ func TestComponentParam(t *testing.T) {
maxParallelism := Params.FlowGraphMaxParallelism.GetAsInt()
t.Logf("flowGraphMaxParallelism: %d", maxParallelism)
maxParallelSyncTaskNum := Params.MaxParallelSyncTaskNum.GetAsInt()
t.Logf("maxParallelSyncTaskNum: %d", maxParallelSyncTaskNum)
size := Params.FlushInsertBufferSize.GetAsInt()
t.Logf("FlushInsertBufferSize: %d", size)