2021-11-10 23:55:48 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 10:09:43 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-10 23:55:48 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 10:09:43 +08:00
//
2021-11-10 23:55:48 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 10:09:43 +08:00
2021-06-22 14:40:07 +08:00
package proxy
2020-11-03 14:53:36 +08:00
import (
"container/list"
2020-11-05 18:01:33 +08:00
"context"
2021-03-08 19:39:36 +08:00
"errors"
2021-06-11 09:50:34 +08:00
"fmt"
2021-01-23 20:58:46 +08:00
"strconv"
2020-11-03 14:53:36 +08:00
"sync"
2020-11-11 09:54:01 +08:00
2021-12-22 10:23:00 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-09-06 20:49:04 +08:00
"github.com/milvus-io/milvus/internal/util/funcutil"
2021-03-08 19:39:36 +08:00
"go.uber.org/zap"
2021-03-05 10:15:27 +08:00
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
2021-06-15 10:19:38 +08:00
"github.com/milvus-io/milvus/internal/proto/commonpb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/internalpb"
2021-11-05 14:57:44 +08:00
"github.com/milvus-io/milvus/internal/util/mqclient"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/trace"
2021-02-23 09:58:06 +08:00
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
2020-11-03 14:53:36 +08:00
)
2021-09-06 20:49:04 +08:00
type taskQueue interface {
2020-11-14 11:24:49 +08:00
utChan ( ) <- chan int
2021-04-07 10:06:17 +08:00
utEmpty ( ) bool
2020-11-14 11:24:49 +08:00
utFull ( ) bool
addUnissuedTask ( t task ) error
FrontUnissuedTask ( ) task
PopUnissuedTask ( ) task
AddActiveTask ( t task )
2021-06-15 10:19:38 +08:00
PopActiveTask ( tID UniqueID ) task
2020-11-14 11:24:49 +08:00
getTaskByReqID ( reqID UniqueID ) task
Enqueue ( t task ) error
2021-09-15 19:09:50 +08:00
setMaxTaskNum ( num int64 )
getMaxTaskNum ( ) int64
2020-11-14 11:24:49 +08:00
}
2021-09-06 20:49:04 +08:00
type baseTaskQueue struct {
2020-11-03 14:53:36 +08:00
unissuedTasks * list . List
2021-06-15 10:19:38 +08:00
activeTasks map [ UniqueID ] task
2021-05-31 11:40:31 +08:00
utLock sync . RWMutex
atLock sync . RWMutex
2020-11-14 11:24:49 +08:00
// maxTaskNum should keep still
2021-09-15 19:09:50 +08:00
maxTaskNum int64
maxTaskNumMtx sync . RWMutex
2020-11-14 11:24:49 +08:00
utBufChan chan int // to block scheduler
2020-11-16 17:01:10 +08:00
2021-09-06 20:49:04 +08:00
tsoAllocatorIns tsoAllocator
idAllocatorIns idAllocatorInterface
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) utChan ( ) <- chan int {
2020-11-14 11:24:49 +08:00
return queue . utBufChan
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) utEmpty ( ) bool {
2021-05-31 11:40:31 +08:00
queue . utLock . RLock ( )
defer queue . utLock . RUnlock ( )
2020-11-14 11:24:49 +08:00
return queue . unissuedTasks . Len ( ) == 0
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) utFull ( ) bool {
2021-09-15 19:09:50 +08:00
return int64 ( queue . unissuedTasks . Len ( ) ) >= queue . getMaxTaskNum ( )
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) addUnissuedTask ( t task ) error {
2020-11-03 14:53:36 +08:00
queue . utLock . Lock ( )
defer queue . utLock . Unlock ( )
2020-11-14 11:24:49 +08:00
if queue . utFull ( ) {
return errors . New ( "task queue is full" )
}
2020-11-03 14:53:36 +08:00
queue . unissuedTasks . PushBack ( t )
2020-11-14 11:24:49 +08:00
queue . utBufChan <- 1
return nil
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) FrontUnissuedTask ( ) task {
2021-05-31 11:40:31 +08:00
queue . utLock . RLock ( )
defer queue . utLock . RUnlock ( )
2020-11-14 11:24:49 +08:00
2020-11-03 14:53:36 +08:00
if queue . unissuedTasks . Len ( ) <= 0 {
return nil
}
2020-11-14 11:24:49 +08:00
2020-11-12 11:18:23 +08:00
return queue . unissuedTasks . Front ( ) . Value . ( task )
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) PopUnissuedTask ( ) task {
2020-11-03 14:53:36 +08:00
queue . utLock . Lock ( )
defer queue . utLock . Unlock ( )
2020-11-14 11:24:49 +08:00
2020-11-03 14:53:36 +08:00
if queue . unissuedTasks . Len ( ) <= 0 {
return nil
}
2020-11-14 11:24:49 +08:00
2020-11-03 14:53:36 +08:00
ft := queue . unissuedTasks . Front ( )
2020-11-14 11:24:49 +08:00
queue . unissuedTasks . Remove ( ft )
return ft . Value . ( task )
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) AddActiveTask ( t task ) {
2020-11-03 14:53:36 +08:00
queue . atLock . Lock ( )
2020-11-14 14:39:52 +08:00
defer queue . atLock . Unlock ( )
2021-06-15 10:19:38 +08:00
tID := t . ID ( )
_ , ok := queue . activeTasks [ tID ]
2020-11-03 14:53:36 +08:00
if ok {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy task with tID already in active task list!" , zap . Any ( "ID" , tID ) )
2020-11-03 14:53:36 +08:00
}
2020-11-14 11:24:49 +08:00
2021-06-15 10:19:38 +08:00
queue . activeTasks [ tID ] = t
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) PopActiveTask ( tID UniqueID ) task {
2020-11-03 14:53:36 +08:00
queue . atLock . Lock ( )
2020-11-14 14:39:52 +08:00
defer queue . atLock . Unlock ( )
2021-06-15 10:19:38 +08:00
t , ok := queue . activeTasks [ tID ]
2020-11-03 14:53:36 +08:00
if ok {
2021-06-15 10:19:38 +08:00
delete ( queue . activeTasks , tID )
2020-11-03 14:53:36 +08:00
return t
}
2020-11-14 11:24:49 +08:00
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy task not in active task list! ts" , zap . Any ( "tID" , tID ) )
2021-06-15 10:19:38 +08:00
return t
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) getTaskByReqID ( reqID UniqueID ) task {
2021-05-31 11:40:31 +08:00
queue . utLock . RLock ( )
2020-11-07 16:18:23 +08:00
for e := queue . unissuedTasks . Front ( ) ; e != nil ; e = e . Next ( ) {
2020-11-12 17:58:05 +08:00
if e . Value . ( task ) . ID ( ) == reqID {
2021-11-22 12:19:14 +08:00
queue . utLock . RUnlock ( )
2020-11-12 11:18:23 +08:00
return e . Value . ( task )
2020-11-07 16:18:23 +08:00
}
}
2021-11-22 12:19:14 +08:00
queue . utLock . RUnlock ( )
2020-11-07 16:18:23 +08:00
2021-05-31 11:40:31 +08:00
queue . atLock . RLock ( )
2021-11-22 12:19:14 +08:00
for tID , t := range queue . activeTasks {
2021-06-15 10:19:38 +08:00
if tID == reqID {
2021-11-22 12:19:14 +08:00
queue . atLock . RUnlock ( )
return t
2020-11-07 16:18:23 +08:00
}
}
2021-11-22 12:19:14 +08:00
queue . atLock . RUnlock ( )
2020-11-07 16:18:23 +08:00
return nil
}
2021-09-06 20:49:04 +08:00
func ( queue * baseTaskQueue ) Enqueue ( t task ) error {
2021-01-22 09:36:18 +08:00
err := t . OnEnqueue ( )
if err != nil {
return err
}
2021-09-06 20:49:04 +08:00
ts , err := queue . tsoAllocatorIns . AllocOne ( )
2021-03-15 19:47:13 +08:00
if err != nil {
return err
}
2020-11-16 17:01:10 +08:00
t . SetTs ( ts )
2020-11-23 16:52:17 +08:00
2021-09-06 20:49:04 +08:00
reqID , err := queue . idAllocatorIns . AllocOne ( )
2021-03-15 19:47:13 +08:00
if err != nil {
return err
}
2020-11-23 16:52:17 +08:00
t . SetID ( reqID )
2020-11-14 11:24:49 +08:00
return queue . addUnissuedTask ( t )
}
2021-09-15 19:09:50 +08:00
func ( queue * baseTaskQueue ) setMaxTaskNum ( num int64 ) {
queue . maxTaskNumMtx . Lock ( )
defer queue . maxTaskNumMtx . Unlock ( )
queue . maxTaskNum = num
}
func ( queue * baseTaskQueue ) getMaxTaskNum ( ) int64 {
queue . maxTaskNumMtx . RLock ( )
defer queue . maxTaskNumMtx . RUnlock ( )
return queue . maxTaskNum
}
2021-09-06 20:49:04 +08:00
func newBaseTaskQueue ( tsoAllocatorIns tsoAllocator , idAllocatorIns idAllocatorInterface ) * baseTaskQueue {
return & baseTaskQueue {
unissuedTasks : list . New ( ) ,
activeTasks : make ( map [ UniqueID ] task ) ,
utLock : sync . RWMutex { } ,
atLock : sync . RWMutex { } ,
2021-12-23 18:39:11 +08:00
maxTaskNum : Params . ProxyCfg . MaxTaskNum ,
utBufChan : make ( chan int , Params . ProxyCfg . MaxTaskNum ) ,
2021-09-06 20:49:04 +08:00
tsoAllocatorIns : tsoAllocatorIns ,
idAllocatorIns : idAllocatorIns ,
}
}
type ddTaskQueue struct {
* baseTaskQueue
2020-11-07 16:18:23 +08:00
lock sync . Mutex
}
2021-06-15 10:19:38 +08:00
type pChanStatInfo struct {
pChanStatistics
2021-07-17 16:31:29 +08:00
tsSet map [ Timestamp ] struct { }
2021-06-15 10:19:38 +08:00
}
2021-09-06 20:49:04 +08:00
type dmTaskQueue struct {
* baseTaskQueue
2021-07-15 17:47:55 +08:00
lock sync . Mutex
2021-06-15 10:19:38 +08:00
statsLock sync . RWMutex
pChanStatisticsInfos map [ pChan ] * pChanStatInfo
2020-11-07 16:18:23 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * dmTaskQueue ) Enqueue ( t task ) error {
2021-11-05 09:14:02 +08:00
queue . statsLock . Lock ( )
defer queue . statsLock . Unlock ( )
2021-09-06 20:49:04 +08:00
err := queue . baseTaskQueue . Enqueue ( t )
2021-06-15 10:19:38 +08:00
if err != nil {
return err
}
2021-11-24 16:01:17 +08:00
err = queue . addPChanStats ( t )
if err != nil {
return err
}
2021-06-15 10:19:38 +08:00
return nil
}
2021-09-06 20:49:04 +08:00
func ( queue * dmTaskQueue ) PopActiveTask ( tID UniqueID ) task {
2021-06-15 10:19:38 +08:00
queue . atLock . Lock ( )
defer queue . atLock . Unlock ( )
t , ok := queue . activeTasks [ tID ]
if ok {
2021-11-05 09:14:02 +08:00
queue . statsLock . Lock ( )
defer queue . statsLock . Unlock ( )
2021-06-15 10:19:38 +08:00
delete ( queue . activeTasks , tID )
2021-09-06 20:49:04 +08:00
log . Debug ( "Proxy dmTaskQueue popPChanStats" , zap . Any ( "tID" , t . ID ( ) ) )
2021-06-15 10:19:38 +08:00
queue . popPChanStats ( t )
} else {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy task not in active task list!" , zap . Any ( "tID" , tID ) )
2021-06-15 10:19:38 +08:00
}
return t
}
2021-09-06 20:49:04 +08:00
func ( queue * dmTaskQueue ) addPChanStats ( t task ) error {
2021-06-15 10:19:38 +08:00
if dmT , ok := t . ( dmlTask ) ; ok {
stats , err := dmT . getPChanStats ( )
2021-06-08 19:25:37 +08:00
if err != nil {
2021-09-06 20:49:04 +08:00
log . Debug ( "Proxy dmTaskQueue addPChanStats" , zap . Any ( "tID" , t . ID ( ) ) ,
2021-07-27 14:34:48 +08:00
zap . Any ( "stats" , stats ) , zap . Error ( err ) )
2021-06-15 10:19:38 +08:00
return err
2021-06-08 19:25:37 +08:00
}
2021-06-15 10:19:38 +08:00
for cName , stat := range stats {
info , ok := queue . pChanStatisticsInfos [ cName ]
if ! ok {
info = & pChanStatInfo {
pChanStatistics : stat ,
2021-07-17 16:31:29 +08:00
tsSet : map [ Timestamp ] struct { } {
stat . minTs : { } ,
} ,
2021-06-15 10:19:38 +08:00
}
queue . pChanStatisticsInfos [ cName ] = info
} else {
if info . minTs > stat . minTs {
2021-07-15 17:47:55 +08:00
queue . pChanStatisticsInfos [ cName ] . minTs = stat . minTs
2021-06-15 10:19:38 +08:00
}
if info . maxTs < stat . maxTs {
2021-07-15 17:47:55 +08:00
queue . pChanStatisticsInfos [ cName ] . maxTs = stat . maxTs
2021-06-15 10:19:38 +08:00
}
2021-07-17 16:31:29 +08:00
queue . pChanStatisticsInfos [ cName ] . tsSet [ info . minTs ] = struct { } { }
2021-06-15 10:19:38 +08:00
}
2021-06-08 19:25:37 +08:00
}
2021-06-15 10:19:38 +08:00
} else {
2021-07-27 14:34:48 +08:00
return fmt . Errorf ( "proxy addUnissuedTask reflect to dmlTask failed, tID:%v" , t . ID ( ) )
2021-06-08 19:25:37 +08:00
}
2021-06-15 10:19:38 +08:00
return nil
}
2021-06-08 19:25:37 +08:00
2021-09-06 20:49:04 +08:00
func ( queue * dmTaskQueue ) popPChanStats ( t task ) error {
2021-06-15 10:19:38 +08:00
if dmT , ok := t . ( dmlTask ) ; ok {
channels , err := dmT . getChannels ( )
2021-05-31 11:40:31 +08:00
if err != nil {
2021-06-15 10:19:38 +08:00
return err
2021-05-31 11:40:31 +08:00
}
2021-06-15 10:19:38 +08:00
for _ , cName := range channels {
info , ok := queue . pChanStatisticsInfos [ cName ]
if ok {
2021-07-17 16:31:29 +08:00
delete ( queue . pChanStatisticsInfos [ cName ] . tsSet , info . minTs )
if len ( queue . pChanStatisticsInfos [ cName ] . tsSet ) <= 0 {
2021-06-15 10:19:38 +08:00
delete ( queue . pChanStatisticsInfos , cName )
2021-07-17 16:31:29 +08:00
} else if queue . pChanStatisticsInfos [ cName ] . minTs == info . minTs {
minTs := info . maxTs
for ts := range queue . pChanStatisticsInfos [ cName ] . tsSet {
if ts < minTs {
minTs = ts
}
}
queue . pChanStatisticsInfos [ cName ] . minTs = minTs
2021-06-15 10:19:38 +08:00
}
}
2021-05-31 11:40:31 +08:00
}
2021-06-15 10:19:38 +08:00
} else {
2021-11-29 12:25:17 +08:00
return fmt . Errorf ( "proxy dmTaskQueue popPChanStats reflect to dmlTask failed, tID:%v" , t . ID ( ) )
2021-05-31 11:40:31 +08:00
}
2021-06-15 10:19:38 +08:00
return nil
}
2021-09-06 20:49:04 +08:00
func ( queue * dmTaskQueue ) getPChanStatsInfo ( ) ( map [ pChan ] * pChanStatistics , error ) {
2021-05-31 11:40:31 +08:00
2021-06-15 10:19:38 +08:00
ret := make ( map [ pChan ] * pChanStatistics )
queue . statsLock . RLock ( )
defer queue . statsLock . RUnlock ( )
for cName , info := range queue . pChanStatisticsInfos {
ret [ cName ] = & pChanStatistics {
minTs : info . minTs ,
maxTs : info . maxTs ,
}
}
return ret , nil
2021-05-31 11:40:31 +08:00
}
2021-09-06 20:49:04 +08:00
type dqTaskQueue struct {
* baseTaskQueue
2020-11-07 16:18:23 +08:00
}
2021-09-06 20:49:04 +08:00
func ( queue * ddTaskQueue ) Enqueue ( t task ) error {
2020-11-03 14:53:36 +08:00
queue . lock . Lock ( )
defer queue . lock . Unlock ( )
2021-09-06 20:49:04 +08:00
return queue . baseTaskQueue . Enqueue ( t )
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func newDdTaskQueue ( tsoAllocatorIns tsoAllocator , idAllocatorIns idAllocatorInterface ) * ddTaskQueue {
return & ddTaskQueue {
baseTaskQueue : newBaseTaskQueue ( tsoAllocatorIns , idAllocatorIns ) ,
2020-11-12 11:18:23 +08:00
}
}
2021-09-06 20:49:04 +08:00
func newDmTaskQueue ( tsoAllocatorIns tsoAllocator , idAllocatorIns idAllocatorInterface ) * dmTaskQueue {
return & dmTaskQueue {
baseTaskQueue : newBaseTaskQueue ( tsoAllocatorIns , idAllocatorIns ) ,
2021-06-15 10:19:38 +08:00
pChanStatisticsInfos : make ( map [ pChan ] * pChanStatInfo ) ,
2020-11-12 11:18:23 +08:00
}
}
2021-09-06 20:49:04 +08:00
func newDqTaskQueue ( tsoAllocatorIns tsoAllocator , idAllocatorIns idAllocatorInterface ) * dqTaskQueue {
return & dqTaskQueue {
baseTaskQueue : newBaseTaskQueue ( tsoAllocatorIns , idAllocatorIns ) ,
2020-11-12 11:18:23 +08:00
}
}
2021-09-06 20:49:04 +08:00
type taskScheduler struct {
2021-09-15 15:46:06 +08:00
ddQueue * ddTaskQueue
2021-09-06 20:49:04 +08:00
dmQueue * dmTaskQueue
2021-09-15 15:46:06 +08:00
dqQueue * dqTaskQueue
2020-11-11 09:54:01 +08:00
2020-11-05 18:01:33 +08:00
wg sync . WaitGroup
ctx context . Context
cancel context . CancelFunc
2021-02-08 14:30:54 +08:00
msFactory msgstream . Factory
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func newTaskScheduler ( ctx context . Context ,
idAllocatorIns idAllocatorInterface ,
tsoAllocatorIns tsoAllocator ,
factory msgstream . Factory ) ( * taskScheduler , error ) {
2020-11-11 09:54:01 +08:00
ctx1 , cancel := context . WithCancel ( ctx )
2021-09-06 20:49:04 +08:00
s := & taskScheduler {
ctx : ctx1 ,
cancel : cancel ,
msFactory : factory ,
2020-11-11 09:54:01 +08:00
}
2021-09-06 20:49:04 +08:00
s . ddQueue = newDdTaskQueue ( tsoAllocatorIns , idAllocatorIns )
s . dmQueue = newDmTaskQueue ( tsoAllocatorIns , idAllocatorIns )
s . dqQueue = newDqTaskQueue ( tsoAllocatorIns , idAllocatorIns )
2020-11-11 09:54:01 +08:00
return s , nil
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) scheduleDdTask ( ) task {
return sched . ddQueue . PopUnissuedTask ( )
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) scheduleDmTask ( ) task {
return sched . dmQueue . PopUnissuedTask ( )
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) scheduleDqTask ( ) task {
return sched . dqQueue . PopUnissuedTask ( )
2020-11-03 14:53:36 +08:00
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) getTaskByReqID ( collMeta UniqueID ) task {
if t := sched . ddQueue . getTaskByReqID ( collMeta ) ; t != nil {
2020-11-07 16:18:23 +08:00
return t
}
2021-09-06 20:49:04 +08:00
if t := sched . dmQueue . getTaskByReqID ( collMeta ) ; t != nil {
2020-11-07 16:18:23 +08:00
return t
}
2021-09-06 20:49:04 +08:00
if t := sched . dqQueue . getTaskByReqID ( collMeta ) ; t != nil {
2020-11-07 16:18:23 +08:00
return t
}
return nil
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) processTask ( t task , q taskQueue ) {
2021-03-25 14:41:46 +08:00
span , ctx := trace . StartSpanFromContext ( t . TraceCtx ( ) ,
2021-02-23 09:58:06 +08:00
opentracing . Tags {
"Type" : t . Name ( ) ,
"ID" : t . ID ( ) ,
} )
defer span . Finish ( )
2021-10-09 22:50:39 +08:00
traceID , _ , _ := trace . InfoFromSpan ( span )
2021-07-15 17:47:55 +08:00
span . LogFields ( oplog . Int64 ( "scheduler process AddActiveTask" , t . ID ( ) ) )
q . AddActiveTask ( t )
defer func ( ) {
span . LogFields ( oplog . Int64 ( "scheduler process PopActiveTask" , t . ID ( ) ) )
q . PopActiveTask ( t . ID ( ) )
} ( )
2021-02-23 09:58:06 +08:00
span . LogFields ( oplog . Int64 ( "scheduler process PreExecute" , t . ID ( ) ) )
2021-07-15 17:47:55 +08:00
2021-02-23 09:58:06 +08:00
err := t . PreExecute ( ctx )
2020-11-07 16:18:23 +08:00
2020-11-14 11:24:49 +08:00
defer func ( ) {
2020-11-12 11:18:23 +08:00
t . Notify ( err )
2020-11-14 11:24:49 +08:00
} ( )
if err != nil {
2021-02-23 09:58:06 +08:00
trace . LogError ( span , err )
2021-10-09 22:50:39 +08:00
log . Error ( "Failed to pre-execute task: " + err . Error ( ) ,
zap . String ( "traceID" , traceID ) )
2020-11-14 11:24:49 +08:00
return
}
2020-11-07 16:18:23 +08:00
2021-02-23 09:58:06 +08:00
span . LogFields ( oplog . Int64 ( "scheduler process Execute" , t . ID ( ) ) )
err = t . Execute ( ctx )
2020-11-14 11:24:49 +08:00
if err != nil {
2021-02-23 09:58:06 +08:00
trace . LogError ( span , err )
2021-10-09 22:50:39 +08:00
log . Error ( "Failed to execute task: " + err . Error ( ) ,
zap . String ( "traceID" , traceID ) )
2020-11-14 11:24:49 +08:00
return
2020-11-07 16:18:23 +08:00
}
2021-07-15 17:47:55 +08:00
2021-02-23 09:58:06 +08:00
span . LogFields ( oplog . Int64 ( "scheduler process PostExecute" , t . ID ( ) ) )
err = t . PostExecute ( ctx )
2021-10-09 22:50:39 +08:00
if err != nil {
trace . LogError ( span , err )
log . Error ( "Failed to post-execute task: " + err . Error ( ) ,
zap . String ( "traceID" , traceID ) )
return
}
2020-11-05 18:01:33 +08:00
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) definitionLoop ( ) {
2020-11-05 18:01:33 +08:00
defer sched . wg . Done ( )
for {
2020-11-14 11:24:49 +08:00
select {
case <- sched . ctx . Done ( ) :
2020-11-05 18:01:33 +08:00
return
2021-09-06 20:49:04 +08:00
case <- sched . ddQueue . utChan ( ) :
if ! sched . ddQueue . utEmpty ( ) {
2020-11-14 11:24:49 +08:00
t := sched . scheduleDdTask ( )
2021-09-06 20:49:04 +08:00
sched . processTask ( t , sched . ddQueue )
2020-11-14 11:24:49 +08:00
}
2020-11-03 14:53:36 +08:00
}
2020-11-14 11:24:49 +08:00
}
}
2020-11-05 18:01:33 +08:00
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) manipulationLoop ( ) {
2020-11-14 11:24:49 +08:00
defer sched . wg . Done ( )
for {
select {
case <- sched . ctx . Done ( ) :
return
2021-09-06 20:49:04 +08:00
case <- sched . dmQueue . utChan ( ) :
if ! sched . dmQueue . utEmpty ( ) {
2020-11-14 11:24:49 +08:00
t := sched . scheduleDmTask ( )
2021-09-06 20:49:04 +08:00
go sched . processTask ( t , sched . dmQueue )
2020-11-03 14:53:36 +08:00
}
2020-11-14 11:24:49 +08:00
}
2020-11-05 18:01:33 +08:00
}
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) queryLoop ( ) {
2020-11-05 18:01:33 +08:00
defer sched . wg . Done ( )
2020-11-07 16:18:23 +08:00
for {
2020-11-14 11:24:49 +08:00
select {
case <- sched . ctx . Done ( ) :
2020-11-07 16:18:23 +08:00
return
2021-09-06 20:49:04 +08:00
case <- sched . dqQueue . utChan ( ) :
if ! sched . dqQueue . utEmpty ( ) {
2020-11-14 11:24:49 +08:00
t := sched . scheduleDqTask ( )
2021-09-06 20:49:04 +08:00
go sched . processTask ( t , sched . dqQueue )
2020-11-16 17:01:10 +08:00
} else {
2021-03-08 19:39:36 +08:00
log . Debug ( "query queue is empty ..." )
2020-11-07 16:18:23 +08:00
}
2020-11-14 11:24:49 +08:00
}
2020-11-07 16:18:23 +08:00
}
2020-11-05 18:01:33 +08:00
}
2021-06-16 20:15:59 +08:00
type resultBufHeader struct {
2021-12-22 10:23:00 +08:00
msgID UniqueID
2021-06-02 10:17:32 +08:00
usedVChans map [ interface { } ] struct { } // set of vChan
receivedVChansSet map [ interface { } ] struct { } // set of vChan
receivedSealedSegmentIDsSet map [ interface { } ] struct { } // set of UniqueID
receivedGlobalSegmentIDsSet map [ interface { } ] struct { } // set of UniqueID
2021-06-15 10:19:38 +08:00
haveError bool
2021-06-02 10:17:32 +08:00
}
2021-06-16 20:15:59 +08:00
type searchResultBuf struct {
resultBufHeader
resultBuf [ ] * internalpb . SearchResults
}
type queryResultBuf struct {
resultBufHeader
resultBuf [ ] * internalpb . RetrieveResults
}
2021-12-22 10:23:00 +08:00
func newSearchResultBuf ( msgID UniqueID ) * searchResultBuf {
2021-06-02 10:17:32 +08:00
return & searchResultBuf {
2021-06-16 20:15:59 +08:00
resultBufHeader : resultBufHeader {
usedVChans : make ( map [ interface { } ] struct { } ) ,
receivedVChansSet : make ( map [ interface { } ] struct { } ) ,
receivedSealedSegmentIDsSet : make ( map [ interface { } ] struct { } ) ,
receivedGlobalSegmentIDsSet : make ( map [ interface { } ] struct { } ) ,
haveError : false ,
} ,
resultBuf : make ( [ ] * internalpb . SearchResults , 0 ) ,
}
}
2021-12-22 10:23:00 +08:00
func newQueryResultBuf ( msgID UniqueID ) * queryResultBuf {
2021-06-16 20:15:59 +08:00
return & queryResultBuf {
resultBufHeader : resultBufHeader {
usedVChans : make ( map [ interface { } ] struct { } ) ,
receivedVChansSet : make ( map [ interface { } ] struct { } ) ,
receivedSealedSegmentIDsSet : make ( map [ interface { } ] struct { } ) ,
receivedGlobalSegmentIDsSet : make ( map [ interface { } ] struct { } ) ,
haveError : false ,
} ,
resultBuf : make ( [ ] * internalpb . RetrieveResults , 0 ) ,
2021-06-02 10:17:32 +08:00
}
}
2021-06-16 20:15:59 +08:00
func ( sr * resultBufHeader ) readyToReduce ( ) bool {
2021-06-15 10:19:38 +08:00
if sr . haveError {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy searchResultBuf readyToReduce" , zap . Any ( "haveError" , true ) )
2021-06-15 10:19:38 +08:00
return true
}
2021-06-11 09:50:34 +08:00
2021-12-22 10:23:00 +08:00
log . Debug ( "check if result buf is ready to reduce" ,
zap . String ( "role" , typeutil . ProxyRole ) ,
zap . Int64 ( "MsgID" , sr . msgID ) ,
zap . Any ( "receivedVChansSet" , funcutil . SetToSlice ( sr . receivedVChansSet ) ) ,
zap . Any ( "usedVChans" , funcutil . SetToSlice ( sr . usedVChans ) ) ,
zap . Any ( "receivedSealedSegmentIDsSet" , funcutil . SetToSlice ( sr . receivedSealedSegmentIDsSet ) ) ,
zap . Any ( "receivedGlobalSegmentIDsSet" , funcutil . SetToSlice ( sr . receivedGlobalSegmentIDsSet ) ) )
2021-06-11 09:50:34 +08:00
2021-09-06 20:49:04 +08:00
ret1 := funcutil . SetContain ( sr . receivedVChansSet , sr . usedVChans )
2021-06-23 18:00:37 +08:00
if ! ret1 {
2021-06-11 09:50:34 +08:00
return false
}
2021-12-22 10:23:00 +08:00
return funcutil . SetContain ( sr . receivedSealedSegmentIDsSet , sr . receivedGlobalSegmentIDsSet )
2021-06-02 10:17:32 +08:00
}
2021-06-16 20:15:59 +08:00
func ( sr * resultBufHeader ) addPartialResult ( vchans [ ] vChan , searchSegIDs , globalSegIDs [ ] UniqueID ) {
2021-06-02 10:17:32 +08:00
2021-06-16 20:15:59 +08:00
for _ , vchan := range vchans {
2021-06-02 10:17:32 +08:00
sr . receivedVChansSet [ vchan ] = struct { } { }
}
2021-06-16 20:15:59 +08:00
for _ , sealedSegment := range searchSegIDs {
2021-06-02 10:17:32 +08:00
sr . receivedSealedSegmentIDsSet [ sealedSegment ] = struct { } { }
}
2021-06-16 20:15:59 +08:00
for _ , globalSegment := range globalSegIDs {
2021-06-02 10:17:32 +08:00
sr . receivedGlobalSegmentIDsSet [ globalSegment ] = struct { } { }
}
}
2021-06-16 20:15:59 +08:00
func ( sr * searchResultBuf ) addPartialResult ( result * internalpb . SearchResults ) {
sr . resultBuf = append ( sr . resultBuf , result )
if result . Status . ErrorCode != commonpb . ErrorCode_Success {
sr . haveError = true
return
}
sr . resultBufHeader . addPartialResult ( result . ChannelIDsSearched , result . SealedSegmentIDsSearched ,
result . GlobalSealedSegmentIDs )
}
func ( qr * queryResultBuf ) addPartialResult ( result * internalpb . RetrieveResults ) {
qr . resultBuf = append ( qr . resultBuf , result )
if result . Status . ErrorCode != commonpb . ErrorCode_Success {
qr . haveError = true
return
}
qr . resultBufHeader . addPartialResult ( result . ChannelIDsRetrieved , result . SealedSegmentIDsRetrieved ,
result . GlobalSealedSegmentIDs )
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) collectResultLoop ( ) {
2020-11-21 16:54:20 +08:00
defer sched . wg . Done ( )
2021-03-19 20:16:04 +08:00
queryResultMsgStream , _ := sched . msFactory . NewQueryMsgStream ( sched . ctx )
2021-11-05 14:57:44 +08:00
// proxy didn't need to walk through all the search results in channel, because it no longer has client connections.
2021-12-23 18:39:11 +08:00
queryResultMsgStream . AsConsumerWithPosition ( Params . ProxyCfg . SearchResultChannelNames , Params . ProxyCfg . ProxySubName , mqclient . SubscriptionPositionLatest )
log . Debug ( "Proxy" , zap . Strings ( "SearchResultChannelNames" , Params . ProxyCfg . SearchResultChannelNames ) ,
zap . Any ( "ProxySubName" , Params . ProxyCfg . ProxySubName ) )
2021-05-24 18:19:43 +08:00
2020-11-21 16:54:20 +08:00
queryResultMsgStream . Start ( )
defer queryResultMsgStream . Close ( )
2021-06-16 20:15:59 +08:00
searchResultBufs := make ( map [ UniqueID ] * searchResultBuf )
2021-12-23 18:39:11 +08:00
searchResultBufFlags := newIDCache ( Params . ProxyCfg . BufFlagExpireTime , Params . ProxyCfg . BufFlagCleanupInterval ) // if value is true, we can ignore searchResult
2021-06-16 20:15:59 +08:00
queryResultBufs := make ( map [ UniqueID ] * queryResultBuf )
2021-12-23 18:39:11 +08:00
queryResultBufFlags := newIDCache ( Params . ProxyCfg . BufFlagExpireTime , Params . ProxyCfg . BufFlagCleanupInterval ) // if value is true, we can ignore queryResult
2020-11-21 16:54:20 +08:00
for {
select {
case msgPack , ok := <- queryResultMsgStream . Chan ( ) :
if ! ok {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop exit Chan closed" )
2020-11-21 16:54:20 +08:00
return
}
if msgPack == nil {
continue
}
2021-06-18 10:33:58 +08:00
2020-11-21 16:54:20 +08:00
for _ , tsMsg := range msgPack . Msgs {
2021-03-25 14:41:46 +08:00
sp , ctx := trace . StartSpanFromContext ( tsMsg . TraceCtx ( ) )
tsMsg . SetTraceCtx ( ctx )
2021-05-24 18:19:43 +08:00
if searchResultMsg , srOk := tsMsg . ( * msgstream . SearchResultMsg ) ; srOk {
reqID := searchResultMsg . Base . MsgID
reqIDStr := strconv . FormatInt ( reqID , 10 )
2021-11-16 14:13:12 +08:00
ignoreThisResult , ok := searchResultBufFlags . Get ( reqID )
2021-06-15 10:19:38 +08:00
if ! ok {
2021-11-16 14:13:12 +08:00
searchResultBufFlags . Set ( reqID , false )
2021-06-15 10:19:38 +08:00
ignoreThisResult = false
}
if ignoreThisResult {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop Got a SearchResultMsg, but we should ignore" , zap . Any ( "ReqID" , reqID ) )
2021-06-15 10:19:38 +08:00
continue
}
2021-05-24 18:19:43 +08:00
t := sched . getTaskByReqID ( reqID )
2021-07-23 20:33:34 +08:00
log . Debug ( "Proxy collectResultLoop Got a SearchResultMsg" , zap . Any ( "ReqID" , reqID ) )
2021-05-24 18:19:43 +08:00
if t == nil {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop GetTaskByReqID failed" , zap . String ( "reqID" , reqIDStr ) )
2021-06-16 20:15:59 +08:00
delete ( searchResultBufs , reqID )
2021-11-16 14:13:12 +08:00
searchResultBufFlags . Set ( reqID , true )
2021-05-24 18:19:43 +08:00
continue
}
2021-01-23 20:58:46 +08:00
2021-09-11 11:36:22 +08:00
st , ok := t . ( * searchTask )
2021-06-02 10:17:32 +08:00
if ! ok {
2021-09-11 11:36:22 +08:00
log . Debug ( "Proxy collectResultLoop type assert t as searchTask failed" , zap . Any ( "ReqID" , reqID ) )
2021-06-16 20:15:59 +08:00
delete ( searchResultBufs , reqID )
2021-11-16 14:13:12 +08:00
searchResultBufFlags . Set ( reqID , true )
2021-06-02 10:17:32 +08:00
continue
}
2021-06-16 20:15:59 +08:00
resultBuf , ok := searchResultBufs [ reqID ]
2021-05-24 18:19:43 +08:00
if ! ok {
2021-12-22 10:23:00 +08:00
resultBuf = newSearchResultBuf ( reqID )
2021-06-02 10:17:32 +08:00
vchans , err := st . getVChannels ( )
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop, first receive" , zap . Any ( "reqID" , reqID ) , zap . Any ( "vchans" , vchans ) ,
2021-06-11 09:50:34 +08:00
zap . Error ( err ) )
2021-06-02 10:17:32 +08:00
if err != nil {
2021-06-16 20:15:59 +08:00
delete ( searchResultBufs , reqID )
2021-06-02 10:17:32 +08:00
continue
}
for _ , vchan := range vchans {
2021-06-15 10:19:38 +08:00
resultBuf . usedVChans [ vchan ] = struct { } { }
2021-06-02 10:17:32 +08:00
}
2021-06-11 09:50:34 +08:00
pchans , err := st . getChannels ( )
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop, first receive" , zap . Any ( "reqID" , reqID ) , zap . Any ( "pchans" , pchans ) ,
2021-06-11 09:50:34 +08:00
zap . Error ( err ) )
if err != nil {
2021-06-16 20:15:59 +08:00
delete ( searchResultBufs , reqID )
2021-06-11 09:50:34 +08:00
continue
}
2021-06-16 20:15:59 +08:00
searchResultBufs [ reqID ] = resultBuf
2021-05-24 18:19:43 +08:00
}
2021-06-15 10:19:38 +08:00
resultBuf . addPartialResult ( & searchResultMsg . SearchResults )
2021-01-23 20:58:46 +08:00
2021-05-24 18:19:43 +08:00
//t := sched.getTaskByReqID(reqID)
{
2021-09-11 11:36:22 +08:00
colName := t . ( * searchTask ) . query . CollectionName
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop" , zap . String ( "collection name" , colName ) , zap . String ( "reqID" , reqIDStr ) , zap . Int ( "answer cnt" , len ( searchResultBufs [ reqID ] . resultBuf ) ) )
2021-05-24 18:19:43 +08:00
}
2021-06-02 10:17:32 +08:00
2021-06-15 10:19:38 +08:00
if resultBuf . readyToReduce ( ) {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop readyToReduce and assign to reduce" )
2021-11-16 14:13:12 +08:00
searchResultBufFlags . Set ( reqID , true )
2021-06-15 10:19:38 +08:00
st . resultBuf <- resultBuf . resultBuf
2021-06-16 20:15:59 +08:00
delete ( searchResultBufs , reqID )
2021-05-24 18:19:43 +08:00
}
2021-06-02 10:17:32 +08:00
2021-05-24 18:19:43 +08:00
sp . Finish ( )
2021-01-23 20:58:46 +08:00
}
2021-06-16 20:15:59 +08:00
if queryResultMsg , rtOk := tsMsg . ( * msgstream . RetrieveResultMsg ) ; rtOk {
//reqID := retrieveResultMsg.Base.MsgID
//reqIDStr := strconv.FormatInt(reqID, 10)
//t := sched.getTaskByReqID(reqID)
//if t == nil {
2021-06-22 14:40:07 +08:00
// log.Debug("proxy", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr))
2021-06-16 20:15:59 +08:00
// delete(queryResultBufs, reqID)
// continue
//}
//
//_, ok = queryResultBufs[reqID]
//if !ok {
// queryResultBufs[reqID] = make([]*internalpb.RetrieveResults, 0)
//}
//queryResultBufs[reqID] = append(queryResultBufs[reqID], &retrieveResultMsg.RetrieveResults)
//
//{
// colName := t.(*RetrieveTask).retrieve.CollectionName
// log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID])))
//}
//if len(queryResultBufs[reqID]) == queryNodeNum {
// t := sched.getTaskByReqID(reqID)
// if t != nil {
// rt, ok := t.(*RetrieveTask)
// if ok {
// rt.resultBuf <- queryResultBufs[reqID]
// delete(queryResultBufs, reqID)
// }
// } else {
// }
//}
reqID := queryResultMsg . Base . MsgID
2021-05-24 18:19:43 +08:00
reqIDStr := strconv . FormatInt ( reqID , 10 )
2021-11-16 14:13:12 +08:00
ignoreThisResult , ok := queryResultBufFlags . Get ( reqID )
2021-06-16 20:15:59 +08:00
if ! ok {
2021-11-16 14:13:12 +08:00
queryResultBufFlags . Set ( reqID , false )
2021-06-16 20:15:59 +08:00
ignoreThisResult = false
}
if ignoreThisResult {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop Got a queryResultMsg, but we should ignore" , zap . Any ( "ReqID" , reqID ) )
2021-06-16 20:15:59 +08:00
continue
}
2020-11-21 16:54:20 +08:00
t := sched . getTaskByReqID ( reqID )
2021-07-23 20:33:34 +08:00
log . Debug ( "Proxy collectResultLoop Got a queryResultMsg" , zap . Any ( "ReqID" , reqID ) )
2021-05-24 18:19:43 +08:00
if t == nil {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop GetTaskByReqID failed" , zap . String ( "reqID" , reqIDStr ) )
2021-06-16 20:15:59 +08:00
delete ( queryResultBufs , reqID )
2021-11-16 14:13:12 +08:00
queryResultBufFlags . Set ( reqID , true )
2021-05-24 18:19:43 +08:00
continue
}
2021-01-23 20:58:46 +08:00
2021-09-11 11:36:22 +08:00
st , ok := t . ( * queryTask )
2021-05-24 18:19:43 +08:00
if ! ok {
2021-09-11 11:36:22 +08:00
log . Debug ( "Proxy collectResultLoop type assert t as queryTask failed" )
2021-06-16 20:15:59 +08:00
delete ( queryResultBufs , reqID )
2021-11-16 14:13:12 +08:00
queryResultBufFlags . Set ( reqID , true )
2021-06-16 20:15:59 +08:00
continue
2021-05-24 18:19:43 +08:00
}
2021-06-16 20:15:59 +08:00
resultBuf , ok := queryResultBufs [ reqID ]
if ! ok {
2021-12-22 10:23:00 +08:00
resultBuf = newQueryResultBuf ( reqID )
2021-06-16 20:15:59 +08:00
vchans , err := st . getVChannels ( )
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop, first receive" , zap . Any ( "reqID" , reqID ) , zap . Any ( "vchans" , vchans ) ,
2021-06-16 20:15:59 +08:00
zap . Error ( err ) )
if err != nil {
delete ( queryResultBufs , reqID )
continue
}
for _ , vchan := range vchans {
resultBuf . usedVChans [ vchan ] = struct { } { }
}
pchans , err := st . getChannels ( )
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop, first receive" , zap . Any ( "reqID" , reqID ) , zap . Any ( "pchans" , pchans ) ,
2021-06-16 20:15:59 +08:00
zap . Error ( err ) )
if err != nil {
delete ( queryResultBufs , reqID )
continue
}
queryResultBufs [ reqID ] = resultBuf
}
resultBuf . addPartialResult ( & queryResultMsg . RetrieveResults )
//t := sched.getTaskByReqID(reqID)
2021-05-24 18:19:43 +08:00
{
2021-09-11 11:36:22 +08:00
colName := t . ( * queryTask ) . query . CollectionName
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop" , zap . String ( "collection name" , colName ) , zap . String ( "reqID" , reqIDStr ) , zap . Int ( "answer cnt" , len ( queryResultBufs [ reqID ] . resultBuf ) ) )
2021-05-24 18:19:43 +08:00
}
2021-06-16 20:15:59 +08:00
if resultBuf . readyToReduce ( ) {
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop readyToReduce and assign to reduce" )
2021-11-16 14:13:12 +08:00
queryResultBufFlags . Set ( reqID , true )
2021-06-16 20:15:59 +08:00
st . resultBuf <- resultBuf . resultBuf
delete ( queryResultBufs , reqID )
2020-11-21 16:54:20 +08:00
}
2021-05-24 18:19:43 +08:00
sp . Finish ( )
2020-11-21 16:54:20 +08:00
}
}
case <- sched . ctx . Done ( ) :
2021-06-22 14:40:07 +08:00
log . Debug ( "Proxy collectResultLoop is closed ..." )
2020-11-21 16:54:20 +08:00
return
}
}
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) Start ( ) error {
2020-11-14 11:24:49 +08:00
sched . wg . Add ( 1 )
2020-11-05 18:01:33 +08:00
go sched . definitionLoop ( )
2020-11-14 11:24:49 +08:00
sched . wg . Add ( 1 )
2020-11-05 18:01:33 +08:00
go sched . manipulationLoop ( )
2020-11-14 11:24:49 +08:00
sched . wg . Add ( 1 )
2020-11-05 18:01:33 +08:00
go sched . queryLoop ( )
2020-11-21 16:54:20 +08:00
sched . wg . Add ( 1 )
2021-06-16 20:15:59 +08:00
go sched . collectResultLoop ( )
2020-11-21 16:54:20 +08:00
2020-11-03 14:53:36 +08:00
return nil
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) Close ( ) {
2020-11-05 18:01:33 +08:00
sched . cancel ( )
sched . wg . Wait ( )
}
2021-09-06 20:49:04 +08:00
func ( sched * taskScheduler ) getPChanStatistics ( ) ( map [ pChan ] * pChanStatistics , error ) {
return sched . dmQueue . getPChanStatsInfo ( )
2021-05-31 11:40:31 +08:00
}