2021-04-19 10:09:43 +08:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2021-06-22 14:40:07 +08:00
|
|
|
package proxy
|
2020-11-03 14:53:36 +08:00
|
|
|
|
|
|
|
import (
|
|
|
|
"container/list"
|
2020-11-05 18:01:33 +08:00
|
|
|
"context"
|
2021-03-08 19:39:36 +08:00
|
|
|
"errors"
|
2021-06-11 09:50:34 +08:00
|
|
|
"fmt"
|
2021-01-23 20:58:46 +08:00
|
|
|
"strconv"
|
2020-11-03 14:53:36 +08:00
|
|
|
"sync"
|
2020-11-11 09:54:01 +08:00
|
|
|
|
2021-03-08 19:39:36 +08:00
|
|
|
"go.uber.org/zap"
|
2021-03-05 10:15:27 +08:00
|
|
|
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/allocator"
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
"github.com/milvus-io/milvus/internal/msgstream"
|
2021-06-15 10:19:38 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/trace"
|
2021-02-23 09:58:06 +08:00
|
|
|
"github.com/opentracing/opentracing-go"
|
|
|
|
oplog "github.com/opentracing/opentracing-go/log"
|
2020-11-03 14:53:36 +08:00
|
|
|
)
|
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
type TaskQueue interface {
|
|
|
|
utChan() <-chan int
|
2021-04-07 10:06:17 +08:00
|
|
|
utEmpty() bool
|
2020-11-14 11:24:49 +08:00
|
|
|
utFull() bool
|
|
|
|
addUnissuedTask(t task) error
|
|
|
|
FrontUnissuedTask() task
|
|
|
|
PopUnissuedTask() task
|
|
|
|
AddActiveTask(t task)
|
2021-06-15 10:19:38 +08:00
|
|
|
PopActiveTask(tID UniqueID) task
|
2020-11-14 11:24:49 +08:00
|
|
|
getTaskByReqID(reqID UniqueID) task
|
|
|
|
TaskDoneTest(ts Timestamp) bool
|
|
|
|
Enqueue(t task) error
|
|
|
|
}
|
|
|
|
|
2020-11-07 16:18:23 +08:00
|
|
|
type BaseTaskQueue struct {
|
2020-11-03 14:53:36 +08:00
|
|
|
unissuedTasks *list.List
|
2021-06-15 10:19:38 +08:00
|
|
|
activeTasks map[UniqueID]task
|
2021-05-31 11:40:31 +08:00
|
|
|
utLock sync.RWMutex
|
|
|
|
atLock sync.RWMutex
|
2020-11-14 11:24:49 +08:00
|
|
|
|
|
|
|
// maxTaskNum should keep still
|
|
|
|
maxTaskNum int64
|
|
|
|
|
|
|
|
utBufChan chan int // to block scheduler
|
2020-11-16 17:01:10 +08:00
|
|
|
|
|
|
|
sched *TaskScheduler
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
func (queue *BaseTaskQueue) utChan() <-chan int {
|
|
|
|
return queue.utBufChan
|
|
|
|
}
|
|
|
|
|
2021-04-07 10:06:17 +08:00
|
|
|
func (queue *BaseTaskQueue) utEmpty() bool {
|
2021-05-31 11:40:31 +08:00
|
|
|
queue.utLock.RLock()
|
|
|
|
defer queue.utLock.RUnlock()
|
2020-11-14 11:24:49 +08:00
|
|
|
return queue.unissuedTasks.Len() == 0
|
|
|
|
}
|
|
|
|
|
|
|
|
func (queue *BaseTaskQueue) utFull() bool {
|
|
|
|
return int64(queue.unissuedTasks.Len()) >= queue.maxTaskNum
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
func (queue *BaseTaskQueue) addUnissuedTask(t task) error {
|
2020-11-03 14:53:36 +08:00
|
|
|
queue.utLock.Lock()
|
|
|
|
defer queue.utLock.Unlock()
|
2020-11-14 11:24:49 +08:00
|
|
|
|
|
|
|
if queue.utFull() {
|
|
|
|
return errors.New("task queue is full")
|
|
|
|
}
|
2020-11-03 14:53:36 +08:00
|
|
|
queue.unissuedTasks.PushBack(t)
|
2020-11-14 11:24:49 +08:00
|
|
|
queue.utBufChan <- 1
|
|
|
|
return nil
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
func (queue *BaseTaskQueue) FrontUnissuedTask() task {
|
2021-05-31 11:40:31 +08:00
|
|
|
queue.utLock.RLock()
|
|
|
|
defer queue.utLock.RUnlock()
|
2020-11-14 11:24:49 +08:00
|
|
|
|
2020-11-03 14:53:36 +08:00
|
|
|
if queue.unissuedTasks.Len() <= 0 {
|
2021-03-08 19:39:36 +08:00
|
|
|
log.Warn("sorry, but the unissued task list is empty!")
|
2020-11-03 14:53:36 +08:00
|
|
|
return nil
|
|
|
|
}
|
2020-11-14 11:24:49 +08:00
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
return queue.unissuedTasks.Front().Value.(task)
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
func (queue *BaseTaskQueue) PopUnissuedTask() task {
|
2020-11-03 14:53:36 +08:00
|
|
|
queue.utLock.Lock()
|
|
|
|
defer queue.utLock.Unlock()
|
2020-11-14 11:24:49 +08:00
|
|
|
|
2020-11-03 14:53:36 +08:00
|
|
|
if queue.unissuedTasks.Len() <= 0 {
|
2021-03-08 19:39:36 +08:00
|
|
|
log.Warn("sorry, but the unissued task list is empty!")
|
2020-11-03 14:53:36 +08:00
|
|
|
return nil
|
|
|
|
}
|
2020-11-14 11:24:49 +08:00
|
|
|
|
2020-11-03 14:53:36 +08:00
|
|
|
ft := queue.unissuedTasks.Front()
|
2020-11-14 11:24:49 +08:00
|
|
|
queue.unissuedTasks.Remove(ft)
|
|
|
|
|
|
|
|
return ft.Value.(task)
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
func (queue *BaseTaskQueue) AddActiveTask(t task) {
|
2020-11-03 14:53:36 +08:00
|
|
|
queue.atLock.Lock()
|
2020-11-14 14:39:52 +08:00
|
|
|
defer queue.atLock.Unlock()
|
2021-06-15 10:19:38 +08:00
|
|
|
tID := t.ID()
|
|
|
|
_, ok := queue.activeTasks[tID]
|
2020-11-03 14:53:36 +08:00
|
|
|
if ok {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy task with tID already in active task list!", zap.Any("ID", tID))
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
2020-11-14 11:24:49 +08:00
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
queue.activeTasks[tID] = t
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
|
2020-11-03 14:53:36 +08:00
|
|
|
queue.atLock.Lock()
|
2020-11-14 14:39:52 +08:00
|
|
|
defer queue.atLock.Unlock()
|
2021-06-15 10:19:38 +08:00
|
|
|
t, ok := queue.activeTasks[tID]
|
2020-11-03 14:53:36 +08:00
|
|
|
if ok {
|
2021-06-15 10:19:38 +08:00
|
|
|
delete(queue.activeTasks, tID)
|
2020-11-03 14:53:36 +08:00
|
|
|
return t
|
|
|
|
}
|
2020-11-14 11:24:49 +08:00
|
|
|
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy task not in active task list! ts", zap.Any("tID", tID))
|
2021-06-15 10:19:38 +08:00
|
|
|
return t
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-12 17:58:05 +08:00
|
|
|
func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task {
|
2021-05-31 11:40:31 +08:00
|
|
|
queue.utLock.RLock()
|
|
|
|
defer queue.utLock.RUnlock()
|
2020-11-07 16:18:23 +08:00
|
|
|
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
|
2020-11-12 17:58:05 +08:00
|
|
|
if e.Value.(task).ID() == reqID {
|
2020-11-12 11:18:23 +08:00
|
|
|
return e.Value.(task)
|
2020-11-07 16:18:23 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-31 11:40:31 +08:00
|
|
|
queue.atLock.RLock()
|
|
|
|
defer queue.atLock.RUnlock()
|
2021-06-15 10:19:38 +08:00
|
|
|
for tID := range queue.activeTasks {
|
|
|
|
if tID == reqID {
|
|
|
|
return queue.activeTasks[tID]
|
2020-11-07 16:18:23 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
|
2021-05-31 11:40:31 +08:00
|
|
|
queue.utLock.RLock()
|
|
|
|
defer queue.utLock.RUnlock()
|
2020-11-03 14:53:36 +08:00
|
|
|
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
|
2020-11-21 16:54:20 +08:00
|
|
|
if e.Value.(task).EndTs() < ts {
|
2020-11-03 14:53:36 +08:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-31 11:40:31 +08:00
|
|
|
queue.atLock.RLock()
|
|
|
|
defer queue.atLock.RUnlock()
|
2021-06-15 10:19:38 +08:00
|
|
|
for _, task := range queue.activeTasks {
|
|
|
|
if task.BeginTs() < ts {
|
2020-11-03 14:53:36 +08:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
func (queue *BaseTaskQueue) Enqueue(t task) error {
|
2021-01-22 09:36:18 +08:00
|
|
|
err := t.OnEnqueue()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-03-15 19:47:13 +08:00
|
|
|
ts, err := queue.sched.tsoAllocator.AllocOne()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-11-16 17:01:10 +08:00
|
|
|
t.SetTs(ts)
|
2020-11-23 16:52:17 +08:00
|
|
|
|
2021-03-15 19:47:13 +08:00
|
|
|
reqID, err := queue.sched.idAllocator.AllocOne()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-11-23 16:52:17 +08:00
|
|
|
t.SetID(reqID)
|
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
return queue.addUnissuedTask(t)
|
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
type DdTaskQueue struct {
|
2020-11-07 16:18:23 +08:00
|
|
|
BaseTaskQueue
|
|
|
|
lock sync.Mutex
|
|
|
|
}
|
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
type pChanStatInfo struct {
|
|
|
|
pChanStatistics
|
|
|
|
refCnt int
|
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
type DmTaskQueue struct {
|
2020-11-07 16:18:23 +08:00
|
|
|
BaseTaskQueue
|
2021-06-15 10:19:38 +08:00
|
|
|
statsLock sync.RWMutex
|
|
|
|
pChanStatisticsInfos map[pChan]*pChanStatInfo
|
2020-11-07 16:18:23 +08:00
|
|
|
}
|
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
func (queue *DmTaskQueue) Enqueue(t task) error {
|
|
|
|
err := t.OnEnqueue()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-05-31 11:40:31 +08:00
|
|
|
}
|
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
ts, err := queue.sched.tsoAllocator.AllocOne()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
t.SetTs(ts)
|
2021-06-08 19:25:37 +08:00
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
reqID, err := queue.sched.idAllocator.AllocOne()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
t.SetID(reqID)
|
|
|
|
|
|
|
|
return queue.addUnissuedTask(t)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (queue *DmTaskQueue) addUnissuedTask(t task) error {
|
|
|
|
queue.utLock.Lock()
|
|
|
|
defer queue.utLock.Unlock()
|
|
|
|
|
|
|
|
if queue.utFull() {
|
|
|
|
return errors.New("task queue is full")
|
|
|
|
}
|
|
|
|
queue.unissuedTasks.PushBack(t)
|
|
|
|
queue.addPChanStats(t)
|
|
|
|
queue.utBufChan <- 1
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (queue *DmTaskQueue) PopActiveTask(tID UniqueID) task {
|
|
|
|
queue.atLock.Lock()
|
|
|
|
defer queue.atLock.Unlock()
|
|
|
|
t, ok := queue.activeTasks[tID]
|
|
|
|
if ok {
|
|
|
|
delete(queue.activeTasks, tID)
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy DmTaskQueue popPChanStats", zap.Any("tID", t.ID()))
|
2021-06-15 10:19:38 +08:00
|
|
|
queue.popPChanStats(t)
|
|
|
|
} else {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy task not in active task list!", zap.Any("tID", tID))
|
2021-06-15 10:19:38 +08:00
|
|
|
}
|
|
|
|
return t
|
|
|
|
}
|
|
|
|
|
|
|
|
func (queue *DmTaskQueue) addPChanStats(t task) error {
|
|
|
|
if dmT, ok := t.(dmlTask); ok {
|
|
|
|
stats, err := dmT.getPChanStats()
|
2021-06-08 19:25:37 +08:00
|
|
|
if err != nil {
|
2021-06-15 10:19:38 +08:00
|
|
|
return err
|
2021-06-08 19:25:37 +08:00
|
|
|
}
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy DmTaskQueue addPChanStats", zap.Any("tID", t.ID()),
|
2021-06-15 10:19:38 +08:00
|
|
|
zap.Any("stats", stats))
|
|
|
|
queue.statsLock.Lock()
|
|
|
|
for cName, stat := range stats {
|
|
|
|
info, ok := queue.pChanStatisticsInfos[cName]
|
|
|
|
if !ok {
|
|
|
|
info = &pChanStatInfo{
|
|
|
|
pChanStatistics: stat,
|
|
|
|
refCnt: 1,
|
|
|
|
}
|
|
|
|
queue.pChanStatisticsInfos[cName] = info
|
|
|
|
} else {
|
|
|
|
if info.minTs > stat.minTs {
|
|
|
|
info.minTs = stat.minTs
|
|
|
|
}
|
|
|
|
if info.maxTs < stat.maxTs {
|
|
|
|
info.maxTs = stat.maxTs
|
|
|
|
}
|
|
|
|
info.refCnt++
|
|
|
|
}
|
2021-06-08 19:25:37 +08:00
|
|
|
}
|
2021-06-15 10:19:38 +08:00
|
|
|
queue.statsLock.Unlock()
|
|
|
|
} else {
|
2021-06-22 14:40:07 +08:00
|
|
|
return fmt.Errorf("Proxy addUnissuedTask reflect to dmlTask failed, tID:%v", t.ID())
|
2021-06-08 19:25:37 +08:00
|
|
|
}
|
2021-06-15 10:19:38 +08:00
|
|
|
return nil
|
|
|
|
}
|
2021-06-08 19:25:37 +08:00
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
func (queue *DmTaskQueue) popPChanStats(t task) error {
|
|
|
|
if dmT, ok := t.(dmlTask); ok {
|
|
|
|
channels, err := dmT.getChannels()
|
2021-05-31 11:40:31 +08:00
|
|
|
if err != nil {
|
2021-06-15 10:19:38 +08:00
|
|
|
return err
|
2021-05-31 11:40:31 +08:00
|
|
|
}
|
2021-06-15 10:19:38 +08:00
|
|
|
queue.statsLock.Lock()
|
|
|
|
for _, cName := range channels {
|
|
|
|
info, ok := queue.pChanStatisticsInfos[cName]
|
|
|
|
if ok {
|
|
|
|
info.refCnt--
|
|
|
|
if info.refCnt <= 0 {
|
|
|
|
delete(queue.pChanStatisticsInfos, cName)
|
|
|
|
}
|
|
|
|
}
|
2021-05-31 11:40:31 +08:00
|
|
|
}
|
2021-06-15 10:19:38 +08:00
|
|
|
queue.statsLock.Unlock()
|
|
|
|
} else {
|
2021-06-22 14:40:07 +08:00
|
|
|
return fmt.Errorf("Proxy DmTaskQueue popPChanStats reflect to dmlTask failed, tID:%v", t.ID())
|
2021-05-31 11:40:31 +08:00
|
|
|
}
|
2021-06-15 10:19:38 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (queue *DmTaskQueue) getPChanStatsInfo() (map[pChan]*pChanStatistics, error) {
|
2021-05-31 11:40:31 +08:00
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
ret := make(map[pChan]*pChanStatistics)
|
|
|
|
queue.statsLock.RLock()
|
|
|
|
defer queue.statsLock.RUnlock()
|
|
|
|
for cName, info := range queue.pChanStatisticsInfos {
|
|
|
|
ret[cName] = &pChanStatistics{
|
|
|
|
minTs: info.minTs,
|
|
|
|
maxTs: info.maxTs,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ret, nil
|
2021-05-31 11:40:31 +08:00
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
type DqTaskQueue struct {
|
2020-11-07 16:18:23 +08:00
|
|
|
BaseTaskQueue
|
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
func (queue *DdTaskQueue) Enqueue(t task) error {
|
2020-11-03 14:53:36 +08:00
|
|
|
queue.lock.Lock()
|
|
|
|
defer queue.lock.Unlock()
|
2020-11-26 16:01:31 +08:00
|
|
|
return queue.BaseTaskQueue.Enqueue(t)
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-16 17:01:10 +08:00
|
|
|
func NewDdTaskQueue(sched *TaskScheduler) *DdTaskQueue {
|
2020-11-12 11:18:23 +08:00
|
|
|
return &DdTaskQueue{
|
|
|
|
BaseTaskQueue: BaseTaskQueue{
|
|
|
|
unissuedTasks: list.New(),
|
2021-06-15 10:19:38 +08:00
|
|
|
activeTasks: make(map[UniqueID]task),
|
2020-11-14 11:24:49 +08:00
|
|
|
maxTaskNum: 1024,
|
|
|
|
utBufChan: make(chan int, 1024),
|
2020-11-16 17:01:10 +08:00
|
|
|
sched: sched,
|
2020-11-12 11:18:23 +08:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-16 17:01:10 +08:00
|
|
|
func NewDmTaskQueue(sched *TaskScheduler) *DmTaskQueue {
|
2020-11-12 11:18:23 +08:00
|
|
|
return &DmTaskQueue{
|
|
|
|
BaseTaskQueue: BaseTaskQueue{
|
|
|
|
unissuedTasks: list.New(),
|
2021-06-15 10:19:38 +08:00
|
|
|
activeTasks: make(map[UniqueID]task),
|
2020-11-14 11:24:49 +08:00
|
|
|
maxTaskNum: 1024,
|
|
|
|
utBufChan: make(chan int, 1024),
|
2020-11-16 17:01:10 +08:00
|
|
|
sched: sched,
|
2020-11-12 11:18:23 +08:00
|
|
|
},
|
2021-06-15 10:19:38 +08:00
|
|
|
pChanStatisticsInfos: make(map[pChan]*pChanStatInfo),
|
2020-11-12 11:18:23 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-16 17:01:10 +08:00
|
|
|
func NewDqTaskQueue(sched *TaskScheduler) *DqTaskQueue {
|
2020-11-12 11:18:23 +08:00
|
|
|
return &DqTaskQueue{
|
|
|
|
BaseTaskQueue: BaseTaskQueue{
|
|
|
|
unissuedTasks: list.New(),
|
2021-06-15 10:19:38 +08:00
|
|
|
activeTasks: make(map[UniqueID]task),
|
2020-11-14 11:24:49 +08:00
|
|
|
maxTaskNum: 1024,
|
|
|
|
utBufChan: make(chan int, 1024),
|
2020-11-16 17:01:10 +08:00
|
|
|
sched: sched,
|
2020-11-12 11:18:23 +08:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-05 18:01:33 +08:00
|
|
|
type TaskScheduler struct {
|
2020-11-14 11:24:49 +08:00
|
|
|
DdQueue TaskQueue
|
2021-05-31 11:40:31 +08:00
|
|
|
DmQueue *DmTaskQueue
|
2020-11-14 11:24:49 +08:00
|
|
|
DqQueue TaskQueue
|
2020-11-03 14:53:36 +08:00
|
|
|
|
2020-11-12 12:04:12 +08:00
|
|
|
idAllocator *allocator.IDAllocator
|
2021-05-21 13:11:21 +08:00
|
|
|
tsoAllocator *TimestampAllocator
|
2020-11-11 09:54:01 +08:00
|
|
|
|
2020-11-05 18:01:33 +08:00
|
|
|
wg sync.WaitGroup
|
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
2021-02-08 14:30:54 +08:00
|
|
|
|
|
|
|
msFactory msgstream.Factory
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
|
|
|
|
2020-11-11 09:54:01 +08:00
|
|
|
func NewTaskScheduler(ctx context.Context,
|
2020-11-12 12:04:12 +08:00
|
|
|
idAllocator *allocator.IDAllocator,
|
2021-05-21 13:11:21 +08:00
|
|
|
tsoAllocator *TimestampAllocator,
|
2021-02-08 14:30:54 +08:00
|
|
|
factory msgstream.Factory) (*TaskScheduler, error) {
|
2020-11-11 09:54:01 +08:00
|
|
|
ctx1, cancel := context.WithCancel(ctx)
|
|
|
|
s := &TaskScheduler{
|
|
|
|
idAllocator: idAllocator,
|
|
|
|
tsoAllocator: tsoAllocator,
|
|
|
|
ctx: ctx1,
|
|
|
|
cancel: cancel,
|
2021-02-08 14:30:54 +08:00
|
|
|
msFactory: factory,
|
2020-11-11 09:54:01 +08:00
|
|
|
}
|
2020-11-16 17:01:10 +08:00
|
|
|
s.DdQueue = NewDdTaskQueue(s)
|
|
|
|
s.DmQueue = NewDmTaskQueue(s)
|
|
|
|
s.DqQueue = NewDqTaskQueue(s)
|
2020-11-11 09:54:01 +08:00
|
|
|
|
|
|
|
return s, nil
|
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
func (sched *TaskScheduler) scheduleDdTask() task {
|
2020-11-03 14:53:36 +08:00
|
|
|
return sched.DdQueue.PopUnissuedTask()
|
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
func (sched *TaskScheduler) scheduleDmTask() task {
|
2020-11-03 14:53:36 +08:00
|
|
|
return sched.DmQueue.PopUnissuedTask()
|
|
|
|
}
|
|
|
|
|
2020-11-12 11:18:23 +08:00
|
|
|
func (sched *TaskScheduler) scheduleDqTask() task {
|
2020-11-03 14:53:36 +08:00
|
|
|
return sched.DqQueue.PopUnissuedTask()
|
|
|
|
}
|
|
|
|
|
2020-11-12 17:58:05 +08:00
|
|
|
func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task {
|
|
|
|
if t := sched.DdQueue.getTaskByReqID(collMeta); t != nil {
|
2020-11-07 16:18:23 +08:00
|
|
|
return t
|
|
|
|
}
|
2020-11-12 17:58:05 +08:00
|
|
|
if t := sched.DmQueue.getTaskByReqID(collMeta); t != nil {
|
2020-11-07 16:18:23 +08:00
|
|
|
return t
|
|
|
|
}
|
2020-11-12 17:58:05 +08:00
|
|
|
if t := sched.DqQueue.getTaskByReqID(collMeta); t != nil {
|
2020-11-07 16:18:23 +08:00
|
|
|
return t
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
2021-03-25 14:41:46 +08:00
|
|
|
span, ctx := trace.StartSpanFromContext(t.TraceCtx(),
|
2021-02-23 09:58:06 +08:00
|
|
|
opentracing.Tags{
|
|
|
|
"Type": t.Name(),
|
|
|
|
"ID": t.ID(),
|
|
|
|
})
|
|
|
|
defer span.Finish()
|
|
|
|
span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID()))
|
|
|
|
err := t.PreExecute(ctx)
|
2020-11-07 16:18:23 +08:00
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
defer func() {
|
2020-11-12 11:18:23 +08:00
|
|
|
t.Notify(err)
|
2020-11-14 11:24:49 +08:00
|
|
|
}()
|
|
|
|
if err != nil {
|
2021-02-23 09:58:06 +08:00
|
|
|
trace.LogError(span, err)
|
2020-11-14 11:24:49 +08:00
|
|
|
return
|
|
|
|
}
|
2020-11-07 16:18:23 +08:00
|
|
|
|
2021-02-23 09:58:06 +08:00
|
|
|
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
|
2020-11-14 11:24:49 +08:00
|
|
|
q.AddActiveTask(t)
|
2021-02-23 09:58:06 +08:00
|
|
|
|
2020-11-16 17:01:10 +08:00
|
|
|
defer func() {
|
2021-02-23 09:58:06 +08:00
|
|
|
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
|
2021-06-15 10:19:38 +08:00
|
|
|
q.PopActiveTask(t.ID())
|
2020-11-16 17:01:10 +08:00
|
|
|
}()
|
2021-02-23 09:58:06 +08:00
|
|
|
span.LogFields(oplog.Int64("scheduler process Execute", t.ID()))
|
|
|
|
err = t.Execute(ctx)
|
2020-11-14 11:24:49 +08:00
|
|
|
if err != nil {
|
2021-02-23 09:58:06 +08:00
|
|
|
trace.LogError(span, err)
|
2020-11-14 11:24:49 +08:00
|
|
|
return
|
2020-11-07 16:18:23 +08:00
|
|
|
}
|
2021-02-23 09:58:06 +08:00
|
|
|
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
|
|
|
|
err = t.PostExecute(ctx)
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
func (sched *TaskScheduler) definitionLoop() {
|
2020-11-05 18:01:33 +08:00
|
|
|
defer sched.wg.Done()
|
|
|
|
for {
|
2020-11-14 11:24:49 +08:00
|
|
|
select {
|
|
|
|
case <-sched.ctx.Done():
|
2020-11-05 18:01:33 +08:00
|
|
|
return
|
2020-11-14 11:24:49 +08:00
|
|
|
case <-sched.DdQueue.utChan():
|
2021-04-07 10:06:17 +08:00
|
|
|
if !sched.DdQueue.utEmpty() {
|
2020-11-14 11:24:49 +08:00
|
|
|
t := sched.scheduleDdTask()
|
|
|
|
sched.processTask(t, sched.DdQueue)
|
|
|
|
}
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
2020-11-14 11:24:49 +08:00
|
|
|
}
|
|
|
|
}
|
2020-11-05 18:01:33 +08:00
|
|
|
|
2020-11-14 11:24:49 +08:00
|
|
|
func (sched *TaskScheduler) manipulationLoop() {
|
|
|
|
defer sched.wg.Done()
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-sched.ctx.Done():
|
|
|
|
return
|
|
|
|
case <-sched.DmQueue.utChan():
|
2021-04-07 10:06:17 +08:00
|
|
|
if !sched.DmQueue.utEmpty() {
|
2020-11-14 11:24:49 +08:00
|
|
|
t := sched.scheduleDmTask()
|
|
|
|
go sched.processTask(t, sched.DmQueue)
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
2020-11-14 11:24:49 +08:00
|
|
|
}
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sched *TaskScheduler) queryLoop() {
|
|
|
|
defer sched.wg.Done()
|
2020-11-07 16:18:23 +08:00
|
|
|
|
|
|
|
for {
|
2020-11-14 11:24:49 +08:00
|
|
|
select {
|
|
|
|
case <-sched.ctx.Done():
|
2020-11-07 16:18:23 +08:00
|
|
|
return
|
2020-11-14 11:24:49 +08:00
|
|
|
case <-sched.DqQueue.utChan():
|
2021-04-07 10:06:17 +08:00
|
|
|
if !sched.DqQueue.utEmpty() {
|
2020-11-14 11:24:49 +08:00
|
|
|
t := sched.scheduleDqTask()
|
|
|
|
go sched.processTask(t, sched.DqQueue)
|
2020-11-16 17:01:10 +08:00
|
|
|
} else {
|
2021-03-08 19:39:36 +08:00
|
|
|
log.Debug("query queue is empty ...")
|
2020-11-07 16:18:23 +08:00
|
|
|
}
|
2020-11-14 11:24:49 +08:00
|
|
|
}
|
2020-11-07 16:18:23 +08:00
|
|
|
}
|
2020-11-05 18:01:33 +08:00
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
type resultBufHeader struct {
|
2021-06-02 10:17:32 +08:00
|
|
|
usedVChans map[interface{}]struct{} // set of vChan
|
|
|
|
receivedVChansSet map[interface{}]struct{} // set of vChan
|
|
|
|
receivedSealedSegmentIDsSet map[interface{}]struct{} // set of UniqueID
|
|
|
|
receivedGlobalSegmentIDsSet map[interface{}]struct{} // set of UniqueID
|
2021-06-15 10:19:38 +08:00
|
|
|
haveError bool
|
2021-06-02 10:17:32 +08:00
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
type searchResultBuf struct {
|
|
|
|
resultBufHeader
|
|
|
|
resultBuf []*internalpb.SearchResults
|
|
|
|
}
|
|
|
|
|
|
|
|
type queryResultBuf struct {
|
|
|
|
resultBufHeader
|
|
|
|
resultBuf []*internalpb.RetrieveResults
|
|
|
|
}
|
|
|
|
|
2021-06-02 10:17:32 +08:00
|
|
|
func newSearchResultBuf() *searchResultBuf {
|
|
|
|
return &searchResultBuf{
|
2021-06-16 20:15:59 +08:00
|
|
|
resultBufHeader: resultBufHeader{
|
|
|
|
usedVChans: make(map[interface{}]struct{}),
|
|
|
|
receivedVChansSet: make(map[interface{}]struct{}),
|
|
|
|
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
|
|
|
|
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
|
|
|
|
haveError: false,
|
|
|
|
},
|
|
|
|
resultBuf: make([]*internalpb.SearchResults, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newQueryResultBuf() *queryResultBuf {
|
|
|
|
return &queryResultBuf{
|
|
|
|
resultBufHeader: resultBufHeader{
|
|
|
|
usedVChans: make(map[interface{}]struct{}),
|
|
|
|
receivedVChansSet: make(map[interface{}]struct{}),
|
|
|
|
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
|
|
|
|
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
|
|
|
|
haveError: false,
|
|
|
|
},
|
|
|
|
resultBuf: make([]*internalpb.RetrieveResults, 0),
|
2021-06-02 10:17:32 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func setContain(m1, m2 map[interface{}]struct{}) bool {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy task_scheduler setContain", zap.Any("len(m1)", len(m1)),
|
2021-06-11 09:50:34 +08:00
|
|
|
zap.Any("len(m2)", len(m2)))
|
2021-06-02 10:17:32 +08:00
|
|
|
if len(m1) < len(m2) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
for k2 := range m2 {
|
|
|
|
_, ok := m1[k2]
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy task_scheduler setContain", zap.Any("k2", fmt.Sprintf("%v", k2)),
|
2021-06-11 09:50:34 +08:00
|
|
|
zap.Any("ok", ok))
|
2021-06-02 10:17:32 +08:00
|
|
|
if !ok {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
func (sr *resultBufHeader) readyToReduce() bool {
|
2021-06-15 10:19:38 +08:00
|
|
|
if sr.haveError {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("haveError", true))
|
2021-06-15 10:19:38 +08:00
|
|
|
return true
|
|
|
|
}
|
2021-06-11 09:50:34 +08:00
|
|
|
|
|
|
|
receivedVChansSetStrMap := make(map[string]int)
|
|
|
|
|
|
|
|
for x := range sr.receivedVChansSet {
|
|
|
|
receivedVChansSetStrMap[x.(string)] = 1
|
|
|
|
}
|
|
|
|
|
|
|
|
usedVChansSetStrMap := make(map[string]int)
|
|
|
|
for x := range sr.usedVChans {
|
|
|
|
usedVChansSetStrMap[x.(string)] = 1
|
|
|
|
}
|
|
|
|
|
|
|
|
sealedSegmentIDsStrMap := make(map[int64]int)
|
|
|
|
|
|
|
|
for x := range sr.receivedSealedSegmentIDsSet {
|
|
|
|
sealedSegmentIDsStrMap[x.(int64)] = 1
|
2021-06-02 10:17:32 +08:00
|
|
|
}
|
|
|
|
|
2021-06-11 09:50:34 +08:00
|
|
|
sealedGlobalSegmentIDsStrMap := make(map[int64]int)
|
|
|
|
for x := range sr.receivedGlobalSegmentIDsSet {
|
|
|
|
sealedGlobalSegmentIDsStrMap[x.(int64)] = 1
|
|
|
|
}
|
|
|
|
|
|
|
|
ret1 := setContain(sr.receivedVChansSet, sr.usedVChans)
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("receivedVChansSet", receivedVChansSetStrMap),
|
2021-06-11 09:50:34 +08:00
|
|
|
zap.Any("usedVChans", usedVChansSetStrMap),
|
|
|
|
zap.Any("receivedSealedSegmentIDsSet", sealedSegmentIDsStrMap),
|
|
|
|
zap.Any("receivedGlobalSegmentIDsSet", sealedGlobalSegmentIDsStrMap),
|
|
|
|
zap.Any("ret1", ret1),
|
2021-06-23 18:00:37 +08:00
|
|
|
)
|
|
|
|
if !ret1 {
|
2021-06-11 09:50:34 +08:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
ret := setContain(sr.receivedSealedSegmentIDsSet, sr.receivedGlobalSegmentIDsSet)
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("ret", ret))
|
2021-06-11 09:50:34 +08:00
|
|
|
return ret
|
2021-06-02 10:17:32 +08:00
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
func (sr *resultBufHeader) addPartialResult(vchans []vChan, searchSegIDs, globalSegIDs []UniqueID) {
|
2021-06-02 10:17:32 +08:00
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
for _, vchan := range vchans {
|
2021-06-02 10:17:32 +08:00
|
|
|
sr.receivedVChansSet[vchan] = struct{}{}
|
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
for _, sealedSegment := range searchSegIDs {
|
2021-06-02 10:17:32 +08:00
|
|
|
sr.receivedSealedSegmentIDsSet[sealedSegment] = struct{}{}
|
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
for _, globalSegment := range globalSegIDs {
|
2021-06-02 10:17:32 +08:00
|
|
|
sr.receivedGlobalSegmentIDsSet[globalSegment] = struct{}{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
func (sr *searchResultBuf) addPartialResult(result *internalpb.SearchResults) {
|
|
|
|
sr.resultBuf = append(sr.resultBuf, result)
|
|
|
|
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
sr.haveError = true
|
|
|
|
return
|
|
|
|
}
|
|
|
|
sr.resultBufHeader.addPartialResult(result.ChannelIDsSearched, result.SealedSegmentIDsSearched,
|
|
|
|
result.GlobalSealedSegmentIDs)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (qr *queryResultBuf) addPartialResult(result *internalpb.RetrieveResults) {
|
|
|
|
qr.resultBuf = append(qr.resultBuf, result)
|
|
|
|
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
|
|
|
|
qr.haveError = true
|
|
|
|
return
|
|
|
|
}
|
|
|
|
qr.resultBufHeader.addPartialResult(result.ChannelIDsRetrieved, result.SealedSegmentIDsRetrieved,
|
|
|
|
result.GlobalSealedSegmentIDs)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sched *TaskScheduler) collectResultLoop() {
|
2020-11-21 16:54:20 +08:00
|
|
|
defer sched.wg.Done()
|
|
|
|
|
2021-03-19 20:16:04 +08:00
|
|
|
queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx)
|
2021-03-05 18:16:50 +08:00
|
|
|
queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames, Params.ProxySubName)
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy", zap.Strings("SearchResultChannelNames", Params.SearchResultChannelNames),
|
2021-06-11 09:50:34 +08:00
|
|
|
zap.Any("ProxySubName", Params.ProxySubName))
|
2021-05-24 18:19:43 +08:00
|
|
|
|
2020-11-21 16:54:20 +08:00
|
|
|
queryResultMsgStream.Start()
|
|
|
|
defer queryResultMsgStream.Close()
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
searchResultBufs := make(map[UniqueID]*searchResultBuf)
|
|
|
|
searchResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult
|
|
|
|
queryResultBufs := make(map[UniqueID]*queryResultBuf)
|
|
|
|
queryResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult
|
2020-11-21 16:54:20 +08:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case msgPack, ok := <-queryResultMsgStream.Chan():
|
|
|
|
if !ok {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop exit Chan closed")
|
2020-11-21 16:54:20 +08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
if msgPack == nil {
|
|
|
|
continue
|
|
|
|
}
|
2021-06-18 10:33:58 +08:00
|
|
|
|
2020-11-21 16:54:20 +08:00
|
|
|
for _, tsMsg := range msgPack.Msgs {
|
2021-03-25 14:41:46 +08:00
|
|
|
sp, ctx := trace.StartSpanFromContext(tsMsg.TraceCtx())
|
|
|
|
tsMsg.SetTraceCtx(ctx)
|
2021-05-24 18:19:43 +08:00
|
|
|
if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk {
|
|
|
|
reqID := searchResultMsg.Base.MsgID
|
|
|
|
reqIDStr := strconv.FormatInt(reqID, 10)
|
2021-06-16 20:15:59 +08:00
|
|
|
ignoreThisResult, ok := searchResultBufFlags[reqID]
|
2021-06-15 10:19:38 +08:00
|
|
|
if !ok {
|
2021-06-16 20:15:59 +08:00
|
|
|
searchResultBufFlags[reqID] = false
|
2021-06-15 10:19:38 +08:00
|
|
|
ignoreThisResult = false
|
|
|
|
}
|
|
|
|
if ignoreThisResult {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID))
|
2021-06-15 10:19:38 +08:00
|
|
|
continue
|
|
|
|
}
|
2021-05-24 18:19:43 +08:00
|
|
|
t := sched.getTaskByReqID(reqID)
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t))
|
2021-05-24 18:19:43 +08:00
|
|
|
if t == nil {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
|
2021-06-16 20:15:59 +08:00
|
|
|
delete(searchResultBufs, reqID)
|
|
|
|
searchResultBufFlags[reqID] = true
|
2021-05-24 18:19:43 +08:00
|
|
|
continue
|
|
|
|
}
|
2021-01-23 20:58:46 +08:00
|
|
|
|
2021-06-02 10:17:32 +08:00
|
|
|
st, ok := t.(*SearchTask)
|
|
|
|
if !ok {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop type assert t as SearchTask failed", zap.Any("t", t))
|
2021-06-16 20:15:59 +08:00
|
|
|
delete(searchResultBufs, reqID)
|
|
|
|
searchResultBufFlags[reqID] = true
|
2021-06-02 10:17:32 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
resultBuf, ok := searchResultBufs[reqID]
|
2021-05-24 18:19:43 +08:00
|
|
|
if !ok {
|
2021-06-15 10:19:38 +08:00
|
|
|
resultBuf = newSearchResultBuf()
|
2021-06-02 10:17:32 +08:00
|
|
|
vchans, err := st.getVChannels()
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
|
2021-06-11 09:50:34 +08:00
|
|
|
zap.Error(err))
|
2021-06-02 10:17:32 +08:00
|
|
|
if err != nil {
|
2021-06-16 20:15:59 +08:00
|
|
|
delete(searchResultBufs, reqID)
|
2021-06-02 10:17:32 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
for _, vchan := range vchans {
|
2021-06-15 10:19:38 +08:00
|
|
|
resultBuf.usedVChans[vchan] = struct{}{}
|
2021-06-02 10:17:32 +08:00
|
|
|
}
|
2021-06-11 09:50:34 +08:00
|
|
|
pchans, err := st.getChannels()
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
|
2021-06-11 09:50:34 +08:00
|
|
|
zap.Error(err))
|
|
|
|
if err != nil {
|
2021-06-16 20:15:59 +08:00
|
|
|
delete(searchResultBufs, reqID)
|
2021-06-11 09:50:34 +08:00
|
|
|
continue
|
|
|
|
}
|
2021-06-16 20:15:59 +08:00
|
|
|
searchResultBufs[reqID] = resultBuf
|
2021-05-24 18:19:43 +08:00
|
|
|
}
|
2021-06-15 10:19:38 +08:00
|
|
|
resultBuf.addPartialResult(&searchResultMsg.SearchResults)
|
2021-01-23 20:58:46 +08:00
|
|
|
|
2021-05-24 18:19:43 +08:00
|
|
|
//t := sched.getTaskByReqID(reqID)
|
|
|
|
{
|
|
|
|
colName := t.(*SearchTask).query.CollectionName
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(searchResultBufs[reqID].resultBuf)))
|
2021-05-24 18:19:43 +08:00
|
|
|
}
|
2021-06-02 10:17:32 +08:00
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
if resultBuf.readyToReduce() {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce")
|
2021-06-16 20:15:59 +08:00
|
|
|
searchResultBufFlags[reqID] = true
|
2021-06-15 10:19:38 +08:00
|
|
|
st.resultBuf <- resultBuf.resultBuf
|
2021-06-16 20:15:59 +08:00
|
|
|
delete(searchResultBufs, reqID)
|
2021-05-24 18:19:43 +08:00
|
|
|
}
|
2021-06-02 10:17:32 +08:00
|
|
|
|
2021-05-24 18:19:43 +08:00
|
|
|
sp.Finish()
|
2021-01-23 20:58:46 +08:00
|
|
|
}
|
2021-06-16 20:15:59 +08:00
|
|
|
if queryResultMsg, rtOk := tsMsg.(*msgstream.RetrieveResultMsg); rtOk {
|
|
|
|
//reqID := retrieveResultMsg.Base.MsgID
|
|
|
|
//reqIDStr := strconv.FormatInt(reqID, 10)
|
|
|
|
//t := sched.getTaskByReqID(reqID)
|
|
|
|
//if t == nil {
|
2021-06-22 14:40:07 +08:00
|
|
|
// log.Debug("proxy", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr))
|
2021-06-16 20:15:59 +08:00
|
|
|
// delete(queryResultBufs, reqID)
|
|
|
|
// continue
|
|
|
|
//}
|
|
|
|
//
|
|
|
|
//_, ok = queryResultBufs[reqID]
|
|
|
|
//if !ok {
|
|
|
|
// queryResultBufs[reqID] = make([]*internalpb.RetrieveResults, 0)
|
|
|
|
//}
|
|
|
|
//queryResultBufs[reqID] = append(queryResultBufs[reqID], &retrieveResultMsg.RetrieveResults)
|
|
|
|
//
|
|
|
|
//{
|
|
|
|
// colName := t.(*RetrieveTask).retrieve.CollectionName
|
|
|
|
// log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID])))
|
|
|
|
//}
|
|
|
|
//if len(queryResultBufs[reqID]) == queryNodeNum {
|
|
|
|
// t := sched.getTaskByReqID(reqID)
|
|
|
|
// if t != nil {
|
|
|
|
// rt, ok := t.(*RetrieveTask)
|
|
|
|
// if ok {
|
|
|
|
// rt.resultBuf <- queryResultBufs[reqID]
|
|
|
|
// delete(queryResultBufs, reqID)
|
|
|
|
// }
|
|
|
|
// } else {
|
|
|
|
// }
|
|
|
|
//}
|
|
|
|
|
|
|
|
reqID := queryResultMsg.Base.MsgID
|
2021-05-24 18:19:43 +08:00
|
|
|
reqIDStr := strconv.FormatInt(reqID, 10)
|
2021-06-16 20:15:59 +08:00
|
|
|
ignoreThisResult, ok := queryResultBufFlags[reqID]
|
|
|
|
if !ok {
|
|
|
|
queryResultBufFlags[reqID] = false
|
|
|
|
ignoreThisResult = false
|
|
|
|
}
|
|
|
|
if ignoreThisResult {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop Got a queryResultMsg, but we should ignore", zap.Any("ReqID", reqID))
|
2021-06-16 20:15:59 +08:00
|
|
|
continue
|
|
|
|
}
|
2020-11-21 16:54:20 +08:00
|
|
|
t := sched.getTaskByReqID(reqID)
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop Got a queryResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t))
|
2021-05-24 18:19:43 +08:00
|
|
|
if t == nil {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
|
2021-06-16 20:15:59 +08:00
|
|
|
delete(queryResultBufs, reqID)
|
|
|
|
queryResultBufFlags[reqID] = true
|
2021-05-24 18:19:43 +08:00
|
|
|
continue
|
|
|
|
}
|
2021-01-23 20:58:46 +08:00
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
st, ok := t.(*RetrieveTask)
|
2021-05-24 18:19:43 +08:00
|
|
|
if !ok {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop type assert t as RetrieveTask failed", zap.Any("t", t))
|
2021-06-16 20:15:59 +08:00
|
|
|
delete(queryResultBufs, reqID)
|
|
|
|
queryResultBufFlags[reqID] = true
|
|
|
|
continue
|
2021-05-24 18:19:43 +08:00
|
|
|
}
|
|
|
|
|
2021-06-16 20:15:59 +08:00
|
|
|
resultBuf, ok := queryResultBufs[reqID]
|
|
|
|
if !ok {
|
|
|
|
resultBuf = newQueryResultBuf()
|
|
|
|
vchans, err := st.getVChannels()
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
|
2021-06-16 20:15:59 +08:00
|
|
|
zap.Error(err))
|
|
|
|
if err != nil {
|
|
|
|
delete(queryResultBufs, reqID)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
for _, vchan := range vchans {
|
|
|
|
resultBuf.usedVChans[vchan] = struct{}{}
|
|
|
|
}
|
|
|
|
pchans, err := st.getChannels()
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
|
2021-06-16 20:15:59 +08:00
|
|
|
zap.Error(err))
|
|
|
|
if err != nil {
|
|
|
|
delete(queryResultBufs, reqID)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
queryResultBufs[reqID] = resultBuf
|
|
|
|
}
|
|
|
|
resultBuf.addPartialResult(&queryResultMsg.RetrieveResults)
|
|
|
|
|
|
|
|
//t := sched.getTaskByReqID(reqID)
|
2021-05-24 18:19:43 +08:00
|
|
|
{
|
|
|
|
colName := t.(*RetrieveTask).retrieve.CollectionName
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID].resultBuf)))
|
2021-05-24 18:19:43 +08:00
|
|
|
}
|
2021-06-16 20:15:59 +08:00
|
|
|
|
|
|
|
if resultBuf.readyToReduce() {
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce")
|
2021-06-16 20:15:59 +08:00
|
|
|
queryResultBufFlags[reqID] = true
|
|
|
|
st.resultBuf <- resultBuf.resultBuf
|
|
|
|
delete(queryResultBufs, reqID)
|
2020-11-21 16:54:20 +08:00
|
|
|
}
|
2021-05-24 18:19:43 +08:00
|
|
|
sp.Finish()
|
2020-11-21 16:54:20 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
case <-sched.ctx.Done():
|
2021-06-22 14:40:07 +08:00
|
|
|
log.Debug("Proxy collectResultLoop is closed ...")
|
2020-11-21 16:54:20 +08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-11 09:54:01 +08:00
|
|
|
func (sched *TaskScheduler) Start() error {
|
2020-11-14 11:24:49 +08:00
|
|
|
sched.wg.Add(1)
|
2020-11-05 18:01:33 +08:00
|
|
|
go sched.definitionLoop()
|
2020-11-14 11:24:49 +08:00
|
|
|
|
|
|
|
sched.wg.Add(1)
|
2020-11-05 18:01:33 +08:00
|
|
|
go sched.manipulationLoop()
|
2020-11-14 11:24:49 +08:00
|
|
|
|
|
|
|
sched.wg.Add(1)
|
2020-11-05 18:01:33 +08:00
|
|
|
go sched.queryLoop()
|
|
|
|
|
2020-11-21 16:54:20 +08:00
|
|
|
sched.wg.Add(1)
|
2021-06-16 20:15:59 +08:00
|
|
|
go sched.collectResultLoop()
|
2020-11-21 16:54:20 +08:00
|
|
|
|
2020-11-03 14:53:36 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-11-05 18:01:33 +08:00
|
|
|
func (sched *TaskScheduler) Close() {
|
|
|
|
sched.cancel()
|
|
|
|
sched.wg.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sched *TaskScheduler) TaskDoneTest(ts Timestamp) bool {
|
2020-11-03 14:53:36 +08:00
|
|
|
ddTaskDone := sched.DdQueue.TaskDoneTest(ts)
|
|
|
|
dmTaskDone := sched.DmQueue.TaskDoneTest(ts)
|
2021-06-15 10:19:38 +08:00
|
|
|
return ddTaskDone && dmTaskDone
|
2020-11-03 14:53:36 +08:00
|
|
|
}
|
2021-05-31 11:40:31 +08:00
|
|
|
|
2021-06-15 10:19:38 +08:00
|
|
|
func (sched *TaskScheduler) getPChanStatistics() (map[pChan]*pChanStatistics, error) {
|
|
|
|
return sched.DmQueue.getPChanStatsInfo()
|
2021-05-31 11:40:31 +08:00
|
|
|
}
|