diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index b6354a06fe..680b4de648 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -60,6 +60,13 @@ func (queue *taskQueue) taskEmpty() bool { return queue.tasks.Len() == 0 } +func (queue *taskQueue) Len() int { + queue.Lock() + defer queue.Unlock() + + return queue.tasks.Len() +} + func (queue *taskQueue) taskFull() bool { return int64(queue.tasks.Len()) >= queue.maxTask } @@ -123,6 +130,9 @@ func (queue *taskQueue) addTask(t task) { func (queue *taskQueue) addTaskToFront(t task) { queue.taskChan <- 1 + + queue.Lock() + defer queue.Unlock() if queue.tasks.Len() == 0 { queue.tasks.PushBack(t) } else { @@ -272,23 +282,27 @@ func (scheduler *TaskScheduler) reloadFromKV() error { triggerTasks[taskID].setState(state) } - var doneTriggerTask task - for _, t := range triggerTasks { - if t.getState() == taskDone { - doneTriggerTask = t - for _, childTask := range activeTasks { - childTask.setParentTask(t) //replace child task after reScheduler - t.addChildTask(childTask) + // triggerTaskQueue's size is 1024, if the size of triggerTasks if large than 1024 the loop will be blocked, + // so run it in a standalone goroutine + go func() { + var doneTriggerTask task + for _, t := range triggerTasks { + if t.getState() == taskDone { + doneTriggerTask = t + for _, childTask := range activeTasks { + childTask.setParentTask(t) //replace child task after reScheduler + t.addChildTask(childTask) + } + t.setResultInfo(nil) + continue } - t.setResultInfo(nil) - continue + scheduler.triggerTaskQueue.addTask(t) } - scheduler.triggerTaskQueue.addTask(t) - } - if doneTriggerTask != nil { - scheduler.triggerTaskQueue.addTaskToFront(doneTriggerTask) - } + if doneTriggerTask != nil { + scheduler.triggerTaskQueue.addTaskToFront(doneTriggerTask) + } + }() return nil } diff --git a/internal/querycoord/task_scheduler_test.go b/internal/querycoord/task_scheduler_test.go index b5379c2124..8f38f5faaa 100644 --- a/internal/querycoord/task_scheduler_test.go +++ b/internal/querycoord/task_scheduler_test.go @@ -462,6 +462,13 @@ func TestReloadTaskFromKV(t *testing.T) { taskScheduler.reloadFromKV() + // wait for the addtask goroutine finished + assert.Eventually(t, + func() bool { + return taskScheduler.triggerTaskQueue.Len() == len(kvs)-2 + }, + 10*time.Second, 100*time.Millisecond) + task := taskScheduler.triggerTaskQueue.popTask() assert.Equal(t, taskDone, task.getState()) assert.Equal(t, 1, len(task.getChildTask()))