milvus/internal/querycoord/task_scheduler.go

828 lines
24 KiB
Go
Raw Normal View History

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querycoord
import (
"container/list"
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"sync"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
oplog "github.com/opentracing/opentracing-go/log"
)
// TaskQueue is used to cache triggerTasks
type TaskQueue struct {
tasks *list.List
maxTask int64
taskChan chan int // to block scheduler
sync.Mutex
}
// Chan returns the taskChan of taskQueue
func (queue *TaskQueue) Chan() <-chan int {
return queue.taskChan
}
func (queue *TaskQueue) taskEmpty() bool {
queue.Lock()
defer queue.Unlock()
return queue.tasks.Len() == 0
}
func (queue *TaskQueue) taskFull() bool {
return int64(queue.tasks.Len()) >= queue.maxTask
}
func (queue *TaskQueue) addTask(t task) {
queue.Lock()
defer queue.Unlock()
if queue.tasks.Len() == 0 {
queue.taskChan <- 1
queue.tasks.PushBack(t)
return
}
for e := queue.tasks.Back(); e != nil; e = e.Prev() {
if t.TaskPriority() > e.Value.(task).TaskPriority() {
if e.Prev() == nil {
queue.taskChan <- 1
queue.tasks.InsertBefore(t, e)
break
}
continue
}
//TODO:: take care of timestamp
queue.taskChan <- 1
queue.tasks.InsertAfter(t, e)
break
}
}
func (queue *TaskQueue) addTaskToFront(t task) {
queue.taskChan <- 1
if queue.tasks.Len() == 0 {
queue.tasks.PushBack(t)
} else {
queue.tasks.PushFront(t)
}
}
// PopTask pops a trigger task from task list
func (queue *TaskQueue) PopTask() task {
queue.Lock()
defer queue.Unlock()
if queue.tasks.Len() <= 0 {
log.Warn("sorry, but the unissued task list is empty!")
return nil
}
ft := queue.tasks.Front()
queue.tasks.Remove(ft)
return ft.Value.(task)
}
// NewTaskQueue creates a new task queue for scheduler to cache trigger tasks
func NewTaskQueue() *TaskQueue {
return &TaskQueue{
tasks: list.New(),
maxTask: 1024,
taskChan: make(chan int, 1024),
}
}
// TaskScheduler controls the scheduling of trigger tasks and internal tasks
type TaskScheduler struct {
triggerTaskQueue *TaskQueue
activateTaskChan chan task
meta Meta
cluster Cluster
taskIDAllocator func() (UniqueID, error)
client *etcdkv.EtcdKV
stopActivateTaskLoopChan chan int
rootCoord types.RootCoord
dataCoord types.DataCoord
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// NewTaskScheduler reloads tasks from kv and returns a new taskScheduler
func NewTaskScheduler(ctx context.Context, meta Meta, cluster Cluster, kv *etcdkv.EtcdKV, rootCoord types.RootCoord, dataCoord types.DataCoord) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)
taskChan := make(chan task, 1024)
stopTaskLoopChan := make(chan int, 1)
s := &TaskScheduler{
ctx: ctx1,
cancel: cancel,
meta: meta,
cluster: cluster,
activateTaskChan: taskChan,
client: kv,
stopActivateTaskLoopChan: stopTaskLoopChan,
rootCoord: rootCoord,
dataCoord: dataCoord,
}
s.triggerTaskQueue = NewTaskQueue()
//init id allocator
etcdKV, err := tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "queryCoordTaskID")
if err != nil {
return nil, err
}
idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", etcdKV)
if err := idAllocator.Initialize(); err != nil {
log.Debug("query coordinator idAllocator initialize failed", zap.Error(err))
return nil, err
}
s.taskIDAllocator = func() (UniqueID, error) {
return idAllocator.AllocOne()
}
err = s.reloadFromKV()
if err != nil {
log.Error("reload task from kv failed", zap.Error(err))
return nil, err
}
return s, nil
}
func (scheduler *TaskScheduler) reloadFromKV() error {
triggerTaskIDKeys, triggerTaskValues, err := scheduler.client.LoadWithPrefix(triggerTaskPrefix)
if err != nil {
return err
}
activeTaskIDKeys, activeTaskValues, err := scheduler.client.LoadWithPrefix(activeTaskPrefix)
if err != nil {
return err
}
taskInfoKeys, taskInfoValues, err := scheduler.client.LoadWithPrefix(taskInfoPrefix)
if err != nil {
return err
}
triggerTasks := make(map[int64]task)
for index := range triggerTaskIDKeys {
taskID, err := strconv.ParseInt(filepath.Base(triggerTaskIDKeys[index]), 10, 64)
if err != nil {
return err
}
t, err := scheduler.unmarshalTask(taskID, triggerTaskValues[index])
if err != nil {
return err
}
triggerTasks[taskID] = t
}
activeTasks := make(map[int64]task)
for index := range activeTaskIDKeys {
taskID, err := strconv.ParseInt(filepath.Base(activeTaskIDKeys[index]), 10, 64)
if err != nil {
return err
}
t, err := scheduler.unmarshalTask(taskID, activeTaskValues[index])
if err != nil {
return err
}
activeTasks[taskID] = t
}
taskInfos := make(map[int64]taskState)
for index := range taskInfoKeys {
taskID, err := strconv.ParseInt(filepath.Base(taskInfoKeys[index]), 10, 64)
if err != nil {
return err
}
value, err := strconv.ParseInt(taskInfoValues[index], 10, 64)
if err != nil {
return err
}
state := taskState(value)
taskInfos[taskID] = state
if _, ok := triggerTasks[taskID]; !ok {
log.Error("reloadFromKV: taskStateInfo and triggerTaskInfo are inconsistent")
continue
}
triggerTasks[taskID].SetState(state)
}
var doneTriggerTask task = nil
for _, t := range triggerTasks {
if t.State() == taskDone {
doneTriggerTask = t
for _, childTask := range activeTasks {
childTask.SetParentTask(t) //replace child task after reScheduler
t.AddChildTask(childTask)
}
t.SetResultInfo(nil)
continue
}
scheduler.triggerTaskQueue.addTask(t)
}
if doneTriggerTask != nil {
scheduler.triggerTaskQueue.addTaskToFront(doneTriggerTask)
}
return nil
}
func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, error) {
header := commonpb.MsgHeader{}
err := proto.Unmarshal([]byte(t), &header)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal message header, err %s ", err.Error())
}
var newTask task
baseTask := newBaseTask(scheduler.ctx, querypb.TriggerCondition_grpcRequest)
switch header.Base.MsgType {
case commonpb.MsgType_LoadCollection:
loadReq := querypb.LoadCollectionRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
loadCollectionTask := &LoadCollectionTask{
BaseTask: baseTask,
LoadCollectionRequest: &loadReq,
2021-06-21 17:28:03 +08:00
rootCoord: scheduler.rootCoord,
dataCoord: scheduler.dataCoord,
cluster: scheduler.cluster,
meta: scheduler.meta,
}
newTask = loadCollectionTask
case commonpb.MsgType_LoadPartitions:
loadReq := querypb.LoadPartitionsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
loadPartitionTask := &LoadPartitionTask{
BaseTask: baseTask,
LoadPartitionsRequest: &loadReq,
dataCoord: scheduler.dataCoord,
cluster: scheduler.cluster,
meta: scheduler.meta,
}
newTask = loadPartitionTask
case commonpb.MsgType_ReleaseCollection:
loadReq := querypb.ReleaseCollectionRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
releaseCollectionTask := &ReleaseCollectionTask{
BaseTask: baseTask,
ReleaseCollectionRequest: &loadReq,
cluster: scheduler.cluster,
meta: scheduler.meta,
rootCoord: scheduler.rootCoord,
}
newTask = releaseCollectionTask
case commonpb.MsgType_ReleasePartitions:
loadReq := querypb.ReleasePartitionsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
releasePartitionTask := &ReleasePartitionTask{
BaseTask: baseTask,
ReleasePartitionsRequest: &loadReq,
cluster: scheduler.cluster,
}
newTask = releasePartitionTask
case commonpb.MsgType_LoadSegments:
//TODO::trigger condition may be different
loadReq := querypb.LoadSegmentsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
loadSegmentTask := &LoadSegmentTask{
BaseTask: baseTask,
LoadSegmentsRequest: &loadReq,
cluster: scheduler.cluster,
meta: scheduler.meta,
excludeNodeIDs: []int64{},
}
newTask = loadSegmentTask
case commonpb.MsgType_ReleaseSegments:
//TODO::trigger condition may be different
loadReq := querypb.ReleaseSegmentsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
releaseSegmentTask := &ReleaseSegmentTask{
BaseTask: baseTask,
ReleaseSegmentsRequest: &loadReq,
cluster: scheduler.cluster,
}
newTask = releaseSegmentTask
case commonpb.MsgType_WatchDmChannels:
//TODO::trigger condition may be different
loadReq := querypb.WatchDmChannelsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
watchDmChannelTask := &WatchDmChannelTask{
BaseTask: baseTask,
WatchDmChannelsRequest: &loadReq,
cluster: scheduler.cluster,
meta: scheduler.meta,
excludeNodeIDs: []int64{},
}
newTask = watchDmChannelTask
case commonpb.MsgType_WatchQueryChannels:
//TODO::trigger condition may be different
loadReq := querypb.AddQueryChannelRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
watchQueryChannelTask := &WatchQueryChannelTask{
BaseTask: baseTask,
AddQueryChannelRequest: &loadReq,
cluster: scheduler.cluster,
}
newTask = watchQueryChannelTask
case commonpb.MsgType_LoadBalanceSegments:
//TODO::trigger condition may be different
loadReq := querypb.LoadBalanceRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return nil, err
}
loadBalanceTask := &LoadBalanceTask{
BaseTask: baseTask,
LoadBalanceRequest: &loadReq,
2021-06-21 17:28:03 +08:00
rootCoord: scheduler.rootCoord,
dataCoord: scheduler.dataCoord,
cluster: scheduler.cluster,
meta: scheduler.meta,
}
newTask = loadBalanceTask
default:
err = errors.New("inValid msg type when unMarshal task")
log.Error(err.Error())
return nil, err
}
newTask.SetID(taskID)
return newTask, nil
}
// Enqueue pushs a trigger task to triggerTaskQueue and assigns task id
func (scheduler *TaskScheduler) Enqueue(t task) error {
id, err := scheduler.taskIDAllocator()
if err != nil {
log.Error("allocator trigger taskID failed", zap.Error(err))
return err
}
t.SetID(id)
kvs := make(map[string]string)
taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, t.ID())
blobs, err := t.Marshal()
if err != nil {
log.Error("error when save marshal task", zap.Int64("taskID", t.ID()), zap.Error(err))
return err
}
kvs[taskKey] = string(blobs)
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
kvs[stateKey] = strconv.Itoa(int(taskUndo))
err = scheduler.client.MultiSave(kvs)
if err != nil {
//TODO::clean etcd meta
log.Error("error when save trigger task to etcd", zap.Int64("taskID", t.ID()), zap.Error(err))
return err
}
t.SetState(taskUndo)
scheduler.triggerTaskQueue.addTask(t)
log.Debug("EnQueue a triggerTask and save to etcd", zap.Int64("taskID", t.ID()))
return nil
}
func (scheduler *TaskScheduler) processTask(t task) error {
var taskInfoKey string
// assign taskID for childTask and update triggerTask's childTask to etcd
updateKVFn := func(parentTask task) error {
kvs := make(map[string]string)
kvs[taskInfoKey] = strconv.Itoa(int(taskDone))
for _, childTask := range parentTask.GetChildTask() {
id, err := scheduler.taskIDAllocator()
if err != nil {
return err
}
childTask.SetID(id)
childTaskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, childTask.ID())
blobs, err := childTask.Marshal()
if err != nil {
return err
}
kvs[childTaskKey] = string(blobs)
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, childTask.ID())
kvs[stateKey] = strconv.Itoa(int(taskUndo))
}
err := scheduler.client.MultiSave(kvs)
if err != nil {
return err
}
return nil
}
span, ctx := trace.StartSpanFromContext(t.TraceCtx(),
opentracing.Tags{
"Type": t.Type(),
"ID": t.ID(),
})
var err error
defer span.Finish()
defer func() {
//task postExecute
span.LogFields(oplog.Int64("processTask: scheduler process PostExecute", t.ID()))
t.PostExecute(ctx)
}()
// task preExecute
span.LogFields(oplog.Int64("processTask: scheduler process PreExecute", t.ID()))
t.PreExecute(ctx)
taskInfoKey = fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
err = scheduler.client.Save(taskInfoKey, strconv.Itoa(int(taskDoing)))
if err != nil {
trace.LogError(span, err)
t.SetResultInfo(err)
return err
}
t.SetState(taskDoing)
// task execute
span.LogFields(oplog.Int64("processTask: scheduler process Execute", t.ID()))
err = t.Execute(ctx)
if err != nil {
trace.LogError(span, err)
return err
}
err = updateKVFn(t)
if err != nil {
trace.LogError(span, err)
t.SetResultInfo(err)
return err
}
log.Debug("processTask: update etcd success", zap.Int64("parent taskID", t.ID()))
if t.Type() == commonpb.MsgType_LoadCollection || t.Type() == commonpb.MsgType_LoadPartitions {
t.Notify(nil)
}
t.SetState(taskDone)
t.UpdateTaskProcess()
return nil
}
func (scheduler *TaskScheduler) scheduleLoop() {
defer scheduler.wg.Done()
activeTaskWg := &sync.WaitGroup{}
var triggerTask task
processInternalTaskFn := func(activateTasks []task, triggerTask task) {
log.Debug("scheduleLoop: num of child task", zap.Int("num child task", len(activateTasks)))
for _, childTask := range activateTasks {
if childTask != nil {
log.Debug("scheduleLoop: add a activate task to activateChan", zap.Int64("taskID", childTask.ID()))
scheduler.activateTaskChan <- childTask
activeTaskWg.Add(1)
go scheduler.waitActivateTaskDone(activeTaskWg, childTask, triggerTask)
}
}
activeTaskWg.Wait()
}
rollBackInterTaskFn := func(triggerTask task, originInternalTasks []task, rollBackTasks []task) error {
saves := make(map[string]string)
removes := make([]string, 0)
childTaskIDs := make([]int64, 0)
for _, t := range originInternalTasks {
childTaskIDs = append(childTaskIDs, t.ID())
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, t.ID())
removes = append(removes, taskKey)
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
removes = append(removes, stateKey)
}
for _, t := range rollBackTasks {
id, err := scheduler.taskIDAllocator()
if err != nil {
return err
}
t.SetID(id)
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, t.ID())
blobs, err := t.Marshal()
if err != nil {
return err
}
saves[taskKey] = string(blobs)
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
saves[stateKey] = strconv.Itoa(int(taskUndo))
}
err := scheduler.client.MultiSaveAndRemove(saves, removes)
if err != nil {
return err
}
for _, taskID := range childTaskIDs {
triggerTask.RemoveChildTaskByID(taskID)
}
for _, t := range rollBackTasks {
triggerTask.AddChildTask(t)
}
return nil
}
removeTaskFromKVFn := func(triggerTask task) error {
keys := make([]string, 0)
taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, triggerTask.ID())
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, triggerTask.ID())
keys = append(keys, taskKey)
keys = append(keys, stateKey)
childTasks := triggerTask.GetChildTask()
for _, t := range childTasks {
taskKey = fmt.Sprintf("%s/%d", activeTaskPrefix, t.ID())
stateKey = fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
keys = append(keys, taskKey)
keys = append(keys, stateKey)
}
err := scheduler.client.MultiRemove(keys)
if err != nil {
return err
}
return nil
}
for {
var err error
select {
case <-scheduler.ctx.Done():
scheduler.stopActivateTaskLoopChan <- 1
return
case <-scheduler.triggerTaskQueue.Chan():
triggerTask = scheduler.triggerTaskQueue.PopTask()
log.Debug("scheduleLoop: pop a triggerTask from triggerTaskQueue", zap.Int64("triggerTaskID", triggerTask.ID()))
alreadyNotify := true
if triggerTask.State() == taskUndo || triggerTask.State() == taskDoing {
err = scheduler.processTask(triggerTask)
if err != nil {
log.Debug("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.ID()), zap.Error(err))
alreadyNotify = false
}
}
if triggerTask.Type() != commonpb.MsgType_LoadCollection && triggerTask.Type() != commonpb.MsgType_LoadPartitions {
alreadyNotify = false
}
childTasks := triggerTask.GetChildTask()
if len(childTasks) != 0 {
activateTasks := make([]task, len(childTasks))
copy(activateTasks, childTasks)
processInternalTaskFn(activateTasks, triggerTask)
resultStatus := triggerTask.GetResultInfo()
if resultStatus.ErrorCode != commonpb.ErrorCode_Success {
rollBackTasks := triggerTask.RollBack(scheduler.ctx)
log.Debug("scheduleLoop: start rollBack after triggerTask failed",
zap.Int64("triggerTaskID", triggerTask.ID()),
zap.Any("rollBackTasks", rollBackTasks))
err = rollBackInterTaskFn(triggerTask, childTasks, rollBackTasks)
if err != nil {
log.Error("scheduleLoop: rollBackInternalTask error",
zap.Int64("triggerTaskID", triggerTask.ID()),
zap.Error(err))
triggerTask.SetResultInfo(err)
} else {
processInternalTaskFn(rollBackTasks, triggerTask)
}
}
}
err = removeTaskFromKVFn(triggerTask)
if err != nil {
log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.ID()), zap.Error(err))
triggerTask.SetResultInfo(err)
} else {
log.Debug("scheduleLoop: trigger task done and delete from etcd", zap.Int64("triggerTaskID", triggerTask.ID()))
}
resultStatus := triggerTask.GetResultInfo()
if resultStatus.ErrorCode != commonpb.ErrorCode_Success {
triggerTask.SetState(taskFailed)
if !alreadyNotify {
triggerTask.Notify(errors.New(resultStatus.Reason))
}
} else {
triggerTask.UpdateTaskProcess()
triggerTask.SetState(taskExpired)
if !alreadyNotify {
triggerTask.Notify(nil)
}
}
}
}
}
func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, triggerTask task) {
defer wg.Done()
var err error
redoFunc1 := func() {
if !t.IsValid() || !t.IsRetryable() {
log.Debug("waitActivateTaskDone: reSchedule the activate task",
zap.Int64("taskID", t.ID()),
zap.Int64("triggerTaskID", triggerTask.ID()))
reScheduledTasks, err := t.Reschedule(scheduler.ctx)
if err != nil {
log.Error("waitActivateTaskDone: reschedule task error",
zap.Int64("taskID", t.ID()),
zap.Int64("triggerTaskID", triggerTask.ID()),
zap.Error(err))
triggerTask.SetResultInfo(err)
return
}
removes := make([]string, 0)
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, t.ID())
removes = append(removes, taskKey)
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
removes = append(removes, stateKey)
saves := make(map[string]string)
for _, rt := range reScheduledTasks {
if rt != nil {
id, err := scheduler.taskIDAllocator()
if err != nil {
log.Error("waitActivateTaskDone: allocate id error",
zap.Int64("triggerTaskID", triggerTask.ID()),
zap.Error(err))
triggerTask.SetResultInfo(err)
return
}
rt.SetID(id)
log.Debug("waitActivateTaskDone: reScheduler set id", zap.Int64("id", rt.ID()))
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, rt.ID())
blobs, err := rt.Marshal()
if err != nil {
log.Error("waitActivateTaskDone: error when marshal active task",
zap.Int64("triggerTaskID", triggerTask.ID()),
zap.Error(err))
triggerTask.SetResultInfo(err)
return
}
saves[taskKey] = string(blobs)
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, rt.ID())
saves[stateKey] = strconv.Itoa(int(taskUndo))
}
}
//TODO::queryNode auto watch queryChannel, then update etcd use same id directly
err = scheduler.client.MultiSaveAndRemove(saves, removes)
if err != nil {
log.Error("waitActivateTaskDone: error when save and remove task from etcd", zap.Int64("triggerTaskID", triggerTask.ID()))
triggerTask.SetResultInfo(err)
return
}
triggerTask.RemoveChildTaskByID(t.ID())
log.Debug("waitActivateTaskDone: delete failed active task and save reScheduled task to etcd",
zap.Int64("triggerTaskID", triggerTask.ID()),
zap.Int64("failed taskID", t.ID()),
zap.Any("reScheduled tasks", reScheduledTasks))
for _, rt := range reScheduledTasks {
if rt != nil {
triggerTask.AddChildTask(rt)
log.Debug("waitActivateTaskDone: add a reScheduled active task to activateChan", zap.Int64("taskID", rt.ID()))
scheduler.activateTaskChan <- rt
wg.Add(1)
go scheduler.waitActivateTaskDone(wg, rt, triggerTask)
}
}
//delete task from etcd
} else {
log.Debug("waitActivateTaskDone: retry the active task",
zap.Int64("taskID", t.ID()),
zap.Int64("triggerTaskID", triggerTask.ID()))
scheduler.activateTaskChan <- t
wg.Add(1)
go scheduler.waitActivateTaskDone(wg, t, triggerTask)
}
}
redoFunc2 := func(err error) {
if t.IsValid() {
if !t.IsRetryable() {
log.Error("waitActivateTaskDone: activate task failed after retry",
zap.Int64("taskID", t.ID()),
zap.Int64("triggerTaskID", triggerTask.ID()))
triggerTask.SetResultInfo(err)
return
}
log.Debug("waitActivateTaskDone: retry the active task",
zap.Int64("taskID", t.ID()),
zap.Int64("triggerTaskID", triggerTask.ID()))
scheduler.activateTaskChan <- t
wg.Add(1)
go scheduler.waitActivateTaskDone(wg, t, triggerTask)
}
}
err = t.WaitToFinish()
if err != nil {
log.Debug("waitActivateTaskDone: activate task return err",
zap.Int64("taskID", t.ID()),
zap.Int64("triggerTaskID", triggerTask.ID()),
zap.Error(err))
switch t.Type() {
case commonpb.MsgType_LoadSegments:
redoFunc1()
case commonpb.MsgType_WatchDmChannels:
redoFunc1()
case commonpb.MsgType_WatchQueryChannels:
redoFunc2(err)
case commonpb.MsgType_ReleaseSegments:
redoFunc2(err)
case commonpb.MsgType_ReleaseCollection:
redoFunc2(err)
case commonpb.MsgType_ReleasePartitions:
redoFunc2(err)
default:
//TODO:: case commonpb.MsgType_RemoveDmChannels:
}
} else {
log.Debug("waitActivateTaskDone: one activate task done",
zap.Int64("taskID", t.ID()),
zap.Int64("triggerTaskID", triggerTask.ID()))
}
}
func (scheduler *TaskScheduler) processActivateTaskLoop() {
defer scheduler.wg.Done()
for {
select {
case <-scheduler.stopActivateTaskLoopChan:
log.Debug("processActivateTaskLoop, ctx done")
return
case t := <-scheduler.activateTaskChan:
if t == nil {
log.Error("processActivateTaskLoop: pop a nil active task", zap.Int64("taskID", t.ID()))
continue
}
log.Debug("processActivateTaskLoop: pop a active task from activateChan", zap.Int64("taskID", t.ID()))
go func() {
err := scheduler.processTask(t)
t.Notify(err)
}()
}
}
}
func (scheduler *TaskScheduler) Start() error {
scheduler.wg.Add(2)
go scheduler.scheduleLoop()
go scheduler.processActivateTaskLoop()
return nil
}
func (scheduler *TaskScheduler) Close() {
scheduler.cancel()
scheduler.wg.Wait()
}