Load segment before watch dmCahnnel when load collection (#10831)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2021-10-29 18:36:40 +08:00 committed by GitHub
parent 0049837dc4
commit 373722d1ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -644,9 +644,12 @@ func (scheduler *TaskScheduler) scheduleLoop() {
childTasks := triggerTask.getChildTask()
if len(childTasks) != 0 {
activateTasks := make([]task, len(childTasks))
copy(activateTasks, childTasks)
processInternalTaskFn(activateTasks, triggerTask)
// process loadSegment before watchDmChannel, avoid delete not taking effect
highPriorityTasks, lowPriorityTasks := sortInternalTaskByPriority(childTasks, commonpb.MsgType_LoadSegments)
processInternalTaskFn(highPriorityTasks, triggerTask)
if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success {
processInternalTaskFn(lowPriorityTasks, triggerTask)
}
if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success {
err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta)
if err != nil {
@ -966,3 +969,17 @@ func reverseSealedSegmentChangeInfo(changeInfosMap map[UniqueID]*querypb.SealedS
return result
}
func sortInternalTaskByPriority(tasks []task, taskType commonpb.MsgType) ([]task, []task) {
highPriorityTasks := make([]task, 0)
lowPriorityTasks := make([]task, 0)
for _, t := range tasks {
if t.msgType() == taskType {
highPriorityTasks = append(highPriorityTasks, t)
} else {
lowPriorityTasks = append(lowPriorityTasks, t)
}
}
return highPriorityTasks, lowPriorityTasks
}