mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Fix update rollBackTask after triggerTask fail (#12228)
Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
parent
7b870e3392
commit
99397535be
@ -588,48 +588,6 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
||||
activeTaskWg.Wait()
|
||||
}
|
||||
|
||||
rollBackInterTaskFn := func(triggerTask task, originInternalTasks []task, rollBackTasks []task) error {
|
||||
saves := make(map[string]string)
|
||||
removes := make([]string, 0)
|
||||
childTaskIDs := make([]int64, 0)
|
||||
for _, t := range originInternalTasks {
|
||||
childTaskIDs = append(childTaskIDs, t.getTaskID())
|
||||
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, t.getTaskID())
|
||||
removes = append(removes, taskKey)
|
||||
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID())
|
||||
removes = append(removes, stateKey)
|
||||
}
|
||||
|
||||
for _, t := range rollBackTasks {
|
||||
id, err := scheduler.taskIDAllocator()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.setTaskID(id)
|
||||
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, t.getTaskID())
|
||||
blobs, err := t.marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
saves[taskKey] = string(blobs)
|
||||
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.getTaskID())
|
||||
saves[stateKey] = strconv.Itoa(int(taskUndo))
|
||||
}
|
||||
|
||||
err := scheduler.client.MultiSaveAndRemove(saves, removes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, taskID := range childTaskIDs {
|
||||
triggerTask.removeChildTaskByID(taskID)
|
||||
}
|
||||
for _, t := range rollBackTasks {
|
||||
triggerTask.addChildTask(t)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
removeTaskFromKVFn := func(triggerTask task) error {
|
||||
childTasks := triggerTask.getChildTask()
|
||||
for _, t := range childTasks {
|
||||
@ -701,15 +659,10 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
||||
log.Debug("scheduleLoop: start rollBack after triggerTask failed",
|
||||
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
|
||||
zap.Any("rollBackTasks", rollBackTasks))
|
||||
err = rollBackInterTaskFn(triggerTask, childTasks, rollBackTasks)
|
||||
if err != nil {
|
||||
log.Error("scheduleLoop: rollBackInternalTask error",
|
||||
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
|
||||
zap.Error(err))
|
||||
|
||||
} else {
|
||||
processInternalTaskFn(rollBackTasks, triggerTask)
|
||||
}
|
||||
// there is no need to save rollBacked internal task to etcd
|
||||
// After queryCoord recover, it will retry failed childTask
|
||||
// if childTask still execute failed, then reProduce rollBacked tasks
|
||||
processInternalTaskFn(rollBackTasks, triggerTask)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user