From 373722d1cabd8195505501dccdbf1d06d25f9506 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Fri, 29 Oct 2021 18:36:40 +0800 Subject: [PATCH] Load segment before watch dmCahnnel when load collection (#10831) Signed-off-by: xige-16 --- internal/querycoord/task_scheduler.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index b611333040..b3b7031aa3 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -644,9 +644,12 @@ func (scheduler *TaskScheduler) scheduleLoop() { childTasks := triggerTask.getChildTask() if len(childTasks) != 0 { - activateTasks := make([]task, len(childTasks)) - copy(activateTasks, childTasks) - processInternalTaskFn(activateTasks, triggerTask) + // process loadSegment before watchDmChannel, avoid delete not taking effect + highPriorityTasks, lowPriorityTasks := sortInternalTaskByPriority(childTasks, commonpb.MsgType_LoadSegments) + processInternalTaskFn(highPriorityTasks, triggerTask) + if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success { + processInternalTaskFn(lowPriorityTasks, triggerTask) + } if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success { err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta) if err != nil { @@ -966,3 +969,17 @@ func reverseSealedSegmentChangeInfo(changeInfosMap map[UniqueID]*querypb.SealedS return result } + +func sortInternalTaskByPriority(tasks []task, taskType commonpb.MsgType) ([]task, []task) { + highPriorityTasks := make([]task, 0) + lowPriorityTasks := make([]task, 0) + for _, t := range tasks { + if t.msgType() == taskType { + highPriorityTasks = append(highPriorityTasks, t) + } else { + lowPriorityTasks = append(lowPriorityTasks, t) + } + } + + return highPriorityTasks, lowPriorityTasks +}