2021-04-19 13:47:10 +08:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-06-22 16:44:09 +08:00
package querycoord
2021-04-15 15:15:46 +08:00
import (
"context"
2021-06-23 17:44:12 +08:00
"errors"
2021-04-15 15:15:46 +08:00
"fmt"
2021-10-11 09:54:37 +08:00
"sync"
2021-04-15 15:15:46 +08:00
2021-06-19 11:45:09 +08:00
"github.com/golang/protobuf/proto"
2021-04-15 15:15:46 +08:00
"go.uber.org/zap"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-07-02 10:40:13 +08:00
"github.com/milvus-io/milvus/internal/proto/proxypb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/querypb"
2021-11-05 14:47:19 +08:00
"github.com/milvus-io/milvus/internal/rootcoord"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/types"
2021-06-30 16:18:13 +08:00
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
2021-04-15 15:15:46 +08:00
)
2021-06-19 11:45:09 +08:00
const (
2021-06-22 16:44:09 +08:00
triggerTaskPrefix = "queryCoord-triggerTask"
activeTaskPrefix = "queryCoord-activeTask"
taskInfoPrefix = "queryCoord-taskInfo"
loadBalanceInfoPrefix = "queryCoord-loadBalanceInfo"
2021-06-19 11:45:09 +08:00
)
2021-10-11 09:54:37 +08:00
const (
2021-10-15 20:25:08 +08:00
// MaxRetryNum is the maximum number of times that each task can be retried
2021-10-11 09:54:37 +08:00
MaxRetryNum = 5
2021-10-21 10:53:09 +08:00
// MaxSendSizeToEtcd is the default limit size of etcd messages that can be sent and received
2021-10-27 19:32:21 +08:00
// MaxSendSizeToEtcd = 2097152
// Limit size of every loadSegmentReq to 200k
MaxSendSizeToEtcd = 200000
2021-10-11 09:54:37 +08:00
)
2021-06-19 11:45:09 +08:00
type taskState int
const (
taskUndo taskState = 0
taskDoing taskState = 1
taskDone taskState = 3
taskExpired taskState = 4
2021-10-11 09:54:37 +08:00
taskFailed taskState = 5
2021-06-19 11:45:09 +08:00
)
2021-04-15 15:15:46 +08:00
type task interface {
2021-10-14 20:18:33 +08:00
traceCtx ( ) context . Context
getTaskID ( ) UniqueID // return ReqId
setTaskID ( id UniqueID )
msgBase ( ) * commonpb . MsgBase
msgType ( ) commonpb . MsgType
timestamp ( ) Timestamp
getTriggerCondition ( ) querypb . TriggerCondition
preExecute ( ctx context . Context ) error
execute ( ctx context . Context ) error
postExecute ( ctx context . Context ) error
reschedule ( ctx context . Context ) ( [ ] task , error )
rollBack ( ctx context . Context ) [ ] task
waitToFinish ( ) error
notify ( err error )
taskPriority ( ) querypb . TriggerCondition
setParentTask ( t task )
getParentTask ( ) task
getChildTask ( ) [ ] task
addChildTask ( t task )
removeChildTaskByID ( taskID UniqueID )
isValid ( ) bool
marshal ( ) ( [ ] byte , error )
getState ( ) taskState
setState ( state taskState )
isRetryable ( ) bool
setResultInfo ( err error )
getResultInfo ( ) * commonpb . Status
updateTaskProcess ( )
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
type baseTask struct {
2021-10-21 10:51:14 +08:00
condition
2021-10-11 09:54:37 +08:00
ctx context . Context
cancel context . CancelFunc
result * commonpb . Status
resultMu sync . RWMutex
state taskState
stateMu sync . RWMutex
retryCount int
//sync.RWMutex
2021-06-15 12:41:40 +08:00
taskID UniqueID
triggerCondition querypb . TriggerCondition
parentTask task
childTasks [ ] task
2021-10-11 09:54:37 +08:00
childTasksMu sync . RWMutex
}
2021-10-18 21:34:47 +08:00
func newBaseTask ( ctx context . Context , triggerType querypb . TriggerCondition ) * baseTask {
2021-10-11 09:54:37 +08:00
childCtx , cancel := context . WithCancel ( ctx )
2021-10-21 10:51:14 +08:00
condition := newTaskCondition ( childCtx )
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
baseTask := & baseTask {
2021-10-11 09:54:37 +08:00
ctx : childCtx ,
cancel : cancel ,
2021-10-21 10:51:14 +08:00
condition : condition ,
2021-10-11 09:54:37 +08:00
state : taskUndo ,
retryCount : MaxRetryNum ,
triggerCondition : triggerType ,
childTasks : [ ] task { } ,
}
return baseTask
2021-06-15 12:41:40 +08:00
}
2021-10-14 20:18:33 +08:00
// getTaskID function returns the unique taskID of the trigger task
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getTaskID ( ) UniqueID {
2021-06-15 12:41:40 +08:00
return bt . taskID
}
2021-10-14 20:18:33 +08:00
// setTaskID function sets the trigger task with a unique id, which is allocated by tso
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setTaskID ( id UniqueID ) {
2021-06-15 12:41:40 +08:00
bt . taskID = id
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) traceCtx ( ) context . Context {
2021-04-15 15:15:46 +08:00
return bt . ctx
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getTriggerCondition ( ) querypb . TriggerCondition {
2021-10-11 09:54:37 +08:00
return bt . triggerCondition
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) taskPriority ( ) querypb . TriggerCondition {
2021-06-15 12:41:40 +08:00
return bt . triggerCondition
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setParentTask ( t task ) {
2021-10-11 09:54:37 +08:00
bt . parentTask = t
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getParentTask ( ) task {
2021-06-15 12:41:40 +08:00
return bt . parentTask
}
2021-10-13 21:26:33 +08:00
// GetChildTask function returns all the child tasks of the trigger task
// Child task may be loadSegmentTask, watchDmChannelTask or watchQueryChannelTask
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getChildTask ( ) [ ] task {
2021-10-11 09:54:37 +08:00
bt . childTasksMu . RLock ( )
defer bt . childTasksMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
return bt . childTasks
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) addChildTask ( t task ) {
2021-10-11 09:54:37 +08:00
bt . childTasksMu . Lock ( )
defer bt . childTasksMu . Unlock ( )
2021-06-15 12:41:40 +08:00
bt . childTasks = append ( bt . childTasks , t )
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) removeChildTaskByID ( taskID UniqueID ) {
2021-10-11 09:54:37 +08:00
bt . childTasksMu . Lock ( )
defer bt . childTasksMu . Unlock ( )
result := make ( [ ] task , 0 )
for _ , t := range bt . childTasks {
2021-10-14 20:18:33 +08:00
if t . getTaskID ( ) != taskID {
2021-10-11 09:54:37 +08:00
result = append ( result , t )
}
}
bt . childTasks = result
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) isValid ( ) bool {
2021-06-19 11:45:09 +08:00
return true
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) reschedule ( ctx context . Context ) ( [ ] task , error ) {
2021-06-19 11:45:09 +08:00
return nil , nil
}
2021-10-12 23:40:36 +08:00
// State returns the state of task, such as taskUndo, taskDoing, taskDone, taskExpired, taskFailed
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getState ( ) taskState {
2021-10-11 09:54:37 +08:00
bt . stateMu . RLock ( )
defer bt . stateMu . RUnlock ( )
2021-06-19 11:45:09 +08:00
return bt . state
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setState ( state taskState ) {
2021-10-11 09:54:37 +08:00
bt . stateMu . Lock ( )
defer bt . stateMu . Unlock ( )
2021-06-19 11:45:09 +08:00
bt . state = state
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) isRetryable ( ) bool {
2021-10-11 09:54:37 +08:00
return bt . retryCount > 0
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setResultInfo ( err error ) {
2021-10-11 09:54:37 +08:00
bt . resultMu . Lock ( )
defer bt . resultMu . Unlock ( )
if bt . result == nil {
bt . result = & commonpb . Status { }
}
if err == nil {
bt . result . ErrorCode = commonpb . ErrorCode_Success
bt . result . Reason = ""
return
}
bt . result . ErrorCode = commonpb . ErrorCode_UnexpectedError
bt . result . Reason = bt . result . Reason + ", " + err . Error ( )
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getResultInfo ( ) * commonpb . Status {
2021-10-11 09:54:37 +08:00
bt . resultMu . RLock ( )
defer bt . resultMu . RUnlock ( )
return proto . Clone ( bt . result ) . ( * commonpb . Status )
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) updateTaskProcess ( ) {
2021-10-11 09:54:37 +08:00
// TODO::
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
//TODO::
return nil
}
2021-10-18 21:34:47 +08:00
type loadCollectionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . LoadCollectionRequest
2021-11-17 09:47:12 +08:00
rootCoord types . RootCoord
dataCoord types . DataCoord
indexCoord types . IndexCoord
cluster Cluster
meta Meta
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lct . Base
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lct . LoadCollectionRequest )
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return lct . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return lct . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) updateTaskProcess ( ) {
2021-10-11 09:54:37 +08:00
collectionID := lct . CollectionID
2021-10-14 20:18:33 +08:00
childTasks := lct . getChildTask ( )
2021-10-11 09:54:37 +08:00
allDone := true
for _ , t := range childTasks {
2021-10-14 20:18:33 +08:00
if t . getState ( ) != taskDone {
2021-10-11 09:54:37 +08:00
allDone = false
}
}
if allDone {
err := lct . meta . setLoadPercentage ( collectionID , 0 , 100 , querypb . LoadType_loadCollection )
if err != nil {
log . Error ( "loadCollectionTask: set load percentage to meta's collectionInfo" , zap . Int64 ( "collectionID" , collectionID ) )
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-10-11 09:54:37 +08:00
}
}
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) preExecute ( ctx context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := lct . CollectionID
schema := lct . Schema
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do loadCollectionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lct . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Stringer ( "schema" , schema ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lct . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := lct . CollectionID
showPartitionRequest := & milvuspb . ShowPartitionsRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ShowPartitions ,
} ,
CollectionID : collectionID ,
}
2021-07-01 15:24:17 +08:00
showPartitionResponse , err := lct . rootCoord . ShowPartitions ( ctx , showPartitionRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-24 21:10:13 +08:00
log . Debug ( "loadCollectionTask: get collection's all partitionIDs" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , showPartitionResponse . PartitionIDs ) )
partitionIDs := showPartitionResponse . PartitionIDs
2021-06-23 17:44:12 +08:00
toLoadPartitionIDs := make ( [ ] UniqueID , 0 )
2021-06-24 21:10:13 +08:00
hasCollection := lct . meta . hasCollection ( collectionID )
2021-06-26 16:08:11 +08:00
watchPartition := false
2021-06-24 21:10:13 +08:00
if hasCollection {
2021-06-26 16:08:11 +08:00
watchPartition = true
2021-08-02 22:39:25 +08:00
loadType , _ := lct . meta . getLoadType ( collectionID )
if loadType == querypb . LoadType_loadCollection {
2021-06-24 21:10:13 +08:00
for _ , partitionID := range partitionIDs {
hasReleasePartition := lct . meta . hasReleasePartition ( collectionID , partitionID )
if hasReleasePartition {
toLoadPartitionIDs = append ( toLoadPartitionIDs , partitionID )
}
}
} else {
for _ , partitionID := range partitionIDs {
hasPartition := lct . meta . hasPartition ( collectionID , partitionID )
if ! hasPartition {
toLoadPartitionIDs = append ( toLoadPartitionIDs , partitionID )
}
}
2021-06-23 17:44:12 +08:00
}
2021-06-24 21:10:13 +08:00
} else {
toLoadPartitionIDs = partitionIDs
2021-06-23 17:44:12 +08:00
}
2021-06-24 21:10:13 +08:00
2021-06-23 17:44:12 +08:00
log . Debug ( "loadCollectionTask: toLoadPartitionIDs" , zap . Int64s ( "partitionIDs" , toLoadPartitionIDs ) )
2021-06-24 21:10:13 +08:00
lct . meta . addCollection ( collectionID , lct . Schema )
2021-08-02 22:39:25 +08:00
lct . meta . setLoadType ( collectionID , querypb . LoadType_loadCollection )
2021-06-24 21:10:13 +08:00
for _ , id := range toLoadPartitionIDs {
lct . meta . addPartition ( collectionID , id )
}
2021-06-23 17:44:12 +08:00
2021-06-26 16:08:11 +08:00
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
watchDmChannelReqs := make ( [ ] * querypb . WatchDmChannelsRequest , 0 )
2021-06-15 12:41:40 +08:00
channelsToWatch := make ( [ ] string , 0 )
segmentsToLoad := make ( [ ] UniqueID , 0 )
2021-11-13 08:49:08 +08:00
var watchDeltaChannels [ ] * datapb . VchannelInfo
2021-06-23 17:44:12 +08:00
for _ , partitionID := range toLoadPartitionIDs {
2021-06-16 11:09:56 +08:00
getRecoveryInfoRequest := & datapb . GetRecoveryInfoRequest {
2021-06-15 12:41:40 +08:00
Base : lct . Base ,
CollectionID : collectionID ,
PartitionID : partitionID ,
}
2021-06-30 16:18:13 +08:00
recoveryInfo , err := lct . dataCoord . GetRecoveryInfo ( ctx , getRecoveryInfoRequest )
2021-06-15 12:41:40 +08:00
if err != nil {
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-06-15 12:41:40 +08:00
return err
}
2021-04-15 15:15:46 +08:00
2021-06-15 12:41:40 +08:00
for _ , segmentBingLog := range recoveryInfo . Binlogs {
segmentID := segmentBingLog . SegmentID
segmentLoadInfo := & querypb . SegmentLoadInfo {
2021-06-26 16:08:11 +08:00
SegmentID : segmentID ,
2021-06-15 12:41:40 +08:00
PartitionID : partitionID ,
CollectionID : collectionID ,
2021-06-26 16:08:11 +08:00
BinlogPaths : segmentBingLog . FieldBinlogs ,
2021-09-07 11:35:18 +08:00
NumOfRows : segmentBingLog . NumOfRows ,
2021-10-22 14:31:13 +08:00
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
2021-04-15 15:15:46 +08:00
}
2021-06-26 16:08:11 +08:00
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lct . rootCoord , lct . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lct . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
2021-06-26 16:08:11 +08:00
loadSegmentReq := & querypb . LoadSegmentsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-26 16:08:11 +08:00
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : lct . Schema ,
LoadCondition : querypb . TriggerCondition_grpcRequest ,
2021-11-11 12:56:42 +08:00
CollectionID : collectionID ,
2021-06-26 16:08:11 +08:00
}
2021-06-15 12:41:40 +08:00
segmentsToLoad = append ( segmentsToLoad , segmentID )
2021-06-26 16:08:11 +08:00
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
2021-04-15 15:15:46 +08:00
}
2021-06-15 12:41:40 +08:00
2021-11-13 08:49:08 +08:00
if len ( watchDeltaChannels ) != len ( recoveryInfo . Channels ) {
2021-11-05 14:47:19 +08:00
for _ , info := range recoveryInfo . Channels {
2021-11-13 08:49:08 +08:00
deltaChannelName , err := rootcoord . ConvertChannelName ( info . ChannelName , Params . DmlChannelPrefix , Params . DeltaChannelPrefix )
2021-11-05 14:47:19 +08:00
if err != nil {
return err
}
2021-11-13 08:49:08 +08:00
deltaChannel := proto . Clone ( info ) . ( * datapb . VchannelInfo )
deltaChannel . ChannelName = deltaChannelName
watchDeltaChannels = append ( watchDeltaChannels , deltaChannel )
2021-11-05 14:47:19 +08:00
}
}
2021-06-15 12:41:40 +08:00
for _ , info := range recoveryInfo . Channels {
channel := info . ChannelName
2021-06-26 16:08:11 +08:00
if ! watchPartition {
merged := false
for index , channelName := range channelsToWatch {
if channel == channelName {
merged = true
oldInfo := watchDmChannelReqs [ index ] . Infos [ 0 ]
newInfo := mergeVChannelInfo ( oldInfo , info )
watchDmChannelReqs [ index ] . Infos = [ ] * datapb . VchannelInfo { newInfo }
break
}
}
if ! merged {
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lct . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
2021-06-26 16:08:11 +08:00
watchRequest := & querypb . WatchDmChannelsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-26 16:08:11 +08:00
CollectionID : collectionID ,
Infos : [ ] * datapb . VchannelInfo { info } ,
Schema : lct . Schema ,
}
channelsToWatch = append ( channelsToWatch , channel )
watchDmChannelReqs = append ( watchDmChannelReqs , watchRequest )
}
} else {
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lct . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
2021-06-15 12:41:40 +08:00
watchRequest := & querypb . WatchDmChannelsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-15 12:41:40 +08:00
CollectionID : collectionID ,
2021-06-26 16:08:11 +08:00
PartitionID : partitionID ,
2021-06-16 11:09:56 +08:00
Infos : [ ] * datapb . VchannelInfo { info } ,
2021-06-15 12:41:40 +08:00
Schema : lct . Schema ,
}
channelsToWatch = append ( channelsToWatch , channel )
2021-06-26 16:08:11 +08:00
watchDmChannelReqs = append ( watchDmChannelReqs , watchRequest )
2021-06-15 12:41:40 +08:00
}
2021-04-15 15:15:46 +08:00
}
2021-11-05 14:47:19 +08:00
2021-04-15 15:15:46 +08:00
}
2021-11-13 08:49:08 +08:00
msgBase := proto . Clone ( lct . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDeltaChannels
watchDeltaChannelReq := & querypb . WatchDeltaChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : watchDeltaChannels ,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lct . meta . setDeltaChannel ( watchDeltaChannelReq . CollectionID , watchDeltaChannelReq . Infos )
2021-04-15 15:15:46 +08:00
2021-11-13 08:49:08 +08:00
internalTasks , err := assignInternalTask ( ctx , collectionID , lct , lct . meta , lct . cluster , loadSegmentReqs , watchDmChannelReqs , watchDeltaChannelReq , false , nil , nil )
2021-09-29 09:56:04 +08:00
if err != nil {
2021-10-11 09:54:37 +08:00
log . Warn ( "loadCollectionTask: assign child task failed" , zap . Int64 ( "collectionID" , collectionID ) )
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-09-29 09:56:04 +08:00
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
lct . addChildTask ( internalTask )
log . Debug ( "loadCollectionTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Int64 ( "collectionID" , collectionID ) , zap . Any ( "task" , internalTask ) )
}
2021-06-26 16:08:11 +08:00
log . Debug ( "loadCollectionTask: assign child task done" , zap . Int64 ( "collectionID" , collectionID ) )
2021-06-15 12:41:40 +08:00
2021-04-15 15:15:46 +08:00
log . Debug ( "LoadCollection execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lct . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-04-15 15:15:46 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) postExecute ( ctx context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := lct . CollectionID
2021-06-23 17:44:12 +08:00
if lct . result . ErrorCode != commonpb . ErrorCode_Success {
2021-10-11 09:54:37 +08:00
lct . childTasks = [ ] task { }
err := lct . meta . releaseCollection ( collectionID )
2021-06-30 17:48:19 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Error ( "loadCollectionTask: occur error when release collection info from meta" , zap . Error ( err ) )
2021-06-23 17:44:12 +08:00
}
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadCollectionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lct . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
nodes , _ := lct . cluster . onlineNodes ( )
resultTasks := make ( [ ] task , 0 )
//TODO::call rootCoord.ReleaseDQLMessageStream
for nodeID := range nodes {
//brute force rollBack, should optimize
req := & querypb . ReleaseCollectionRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ReleaseCollection ,
MsgID : lct . Base . MsgID ,
Timestamp : lct . Base . Timestamp ,
SourceID : lct . Base . SourceID ,
} ,
DbID : lct . DbID ,
CollectionID : lct . CollectionID ,
NodeID : nodeID ,
}
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_grpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( lct )
2021-10-18 21:34:47 +08:00
releaseCollectionTask := & releaseCollectionTask {
baseTask : baseTask ,
2021-10-11 09:54:37 +08:00
ReleaseCollectionRequest : req ,
cluster : lct . cluster ,
}
resultTasks = append ( resultTasks , releaseCollectionTask )
}
log . Debug ( "loadCollectionTask: rollBack loadCollectionTask" , zap . Any ( "loadCollectionTask" , lct ) , zap . Any ( "rollBack task" , resultTasks ) )
return resultTasks
}
2021-10-18 21:34:47 +08:00
// releaseCollectionTask will release all the data of this collection on query nodes
type releaseCollectionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . ReleaseCollectionRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-08-02 22:39:25 +08:00
meta Meta
2021-07-02 10:40:13 +08:00
rootCoord types . RootCoord
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return rct . Base
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( rct . ReleaseCollectionRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return rct . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return rct . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) preExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rct . CollectionID
2021-10-14 20:18:33 +08:00
rct . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do releaseCollectionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rct . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
rct . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := rct . CollectionID
2021-10-11 09:54:37 +08:00
2021-09-18 18:45:51 +08:00
// if nodeID ==0, it means that the release request has not been assigned to the specified query node
2021-06-19 11:45:09 +08:00
if rct . NodeID <= 0 {
2021-07-02 10:40:13 +08:00
rct . meta . releaseCollection ( collectionID )
releaseDQLMessageStreamReq := & proxypb . ReleaseDQLMessageStreamRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_RemoveQueryChannels ,
MsgID : rct . Base . MsgID ,
Timestamp : rct . Base . Timestamp ,
SourceID : rct . Base . SourceID ,
} ,
DbID : rct . DbID ,
CollectionID : rct . CollectionID ,
}
res , err := rct . rootCoord . ReleaseDQLMessageStream ( rct . ctx , releaseDQLMessageStreamReq )
2021-10-11 09:54:37 +08:00
if res . ErrorCode != commonpb . ErrorCode_Success || err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "releaseCollectionTask: release collection end, releaseDQLMessageStream occur error" , zap . Int64 ( "collectionID" , rct . CollectionID ) )
2021-07-02 10:40:13 +08:00
err = errors . New ( "rootCoord releaseDQLMessageStream failed" )
2021-10-14 20:18:33 +08:00
rct . setResultInfo ( err )
2021-07-02 10:40:13 +08:00
return err
}
2021-09-15 20:40:07 +08:00
nodes , err := rct . cluster . onlineNodes ( )
2021-06-30 17:48:19 +08:00
if err != nil {
log . Debug ( err . Error ( ) )
}
for nodeID := range nodes {
2021-06-19 11:45:09 +08:00
req := proto . Clone ( rct . ReleaseCollectionRequest ) . ( * querypb . ReleaseCollectionRequest )
req . NodeID = nodeID
2021-10-11 09:54:37 +08:00
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_grpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( rct )
2021-10-18 21:34:47 +08:00
releaseCollectionTask := & releaseCollectionTask {
baseTask : baseTask ,
2021-06-19 11:45:09 +08:00
ReleaseCollectionRequest : req ,
2021-06-15 12:41:40 +08:00
cluster : rct . cluster ,
}
2021-10-11 09:54:37 +08:00
2021-10-14 20:18:33 +08:00
rct . addChildTask ( releaseCollectionTask )
2021-10-18 21:34:47 +08:00
log . Debug ( "releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask" , zap . Any ( "task" , releaseCollectionTask ) )
2021-06-15 12:41:40 +08:00
}
} else {
2021-08-02 22:39:25 +08:00
err := rct . cluster . releaseCollection ( ctx , rct . NodeID , rct . ReleaseCollectionRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "releaseCollectionTask: release collection end, node occur error" , zap . Int64 ( "nodeID" , rct . NodeID ) )
2021-10-14 20:18:33 +08:00
rct . setResultInfo ( err )
2021-06-23 17:44:12 +08:00
return err
}
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
log . Debug ( "releaseCollectionTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rct . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rct . NodeID ) )
2021-04-15 15:15:46 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) postExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rct . CollectionID
2021-10-11 09:54:37 +08:00
if rct . result . ErrorCode != commonpb . ErrorCode_Success {
rct . childTasks = [ ] task { }
}
2021-04-15 15:15:46 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "releaseCollectionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rct . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rct . NodeID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
//TODO::
//if taskID == 0, recovery meta
//if taskID != 0, recovery collection on queryNode
return nil
}
2021-10-18 21:34:47 +08:00
// loadPartitionTask will load all the data of this partition to query nodes
type loadPartitionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . LoadPartitionsRequest
2021-11-17 09:47:12 +08:00
rootCoord types . RootCoord
dataCoord types . DataCoord
indexCoord types . IndexCoord
cluster Cluster
meta Meta
addCol bool
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lpt . Base
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lpt . LoadPartitionsRequest )
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return lpt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return lpt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) updateTaskProcess ( ) {
2021-04-15 15:15:46 +08:00
collectionID := lpt . CollectionID
2021-10-11 09:54:37 +08:00
partitionIDs := lpt . PartitionIDs
2021-10-14 20:18:33 +08:00
childTasks := lpt . getChildTask ( )
2021-10-11 09:54:37 +08:00
allDone := true
for _ , t := range childTasks {
2021-10-14 20:18:33 +08:00
if t . getState ( ) != taskDone {
2021-10-11 09:54:37 +08:00
allDone = false
}
}
if allDone {
for _ , id := range partitionIDs {
err := lpt . meta . setLoadPercentage ( collectionID , id , 100 , querypb . LoadType_LoadPartition )
if err != nil {
log . Error ( "loadPartitionTask: set load percentage to meta's collectionInfo" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , id ) )
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( err )
2021-10-11 09:54:37 +08:00
}
}
2021-06-23 17:44:12 +08:00
}
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) preExecute ( context . Context ) error {
2021-10-11 09:54:37 +08:00
collectionID := lpt . CollectionID
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do loadPartitionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lpt . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lpt . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := lpt . CollectionID
partitionIDs := lpt . PartitionIDs
2021-06-23 17:44:12 +08:00
if ! lpt . meta . hasCollection ( collectionID ) {
lpt . meta . addCollection ( collectionID , lpt . Schema )
lpt . addCol = true
}
2021-06-19 11:45:09 +08:00
for _ , id := range partitionIDs {
lpt . meta . addPartition ( collectionID , id )
}
2021-04-15 15:15:46 +08:00
2021-06-15 12:41:40 +08:00
segmentsToLoad := make ( [ ] UniqueID , 0 )
2021-06-26 16:08:11 +08:00
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
2021-06-15 12:41:40 +08:00
channelsToWatch := make ( [ ] string , 0 )
2021-06-26 16:08:11 +08:00
watchDmReqs := make ( [ ] * querypb . WatchDmChannelsRequest , 0 )
2021-11-13 08:49:08 +08:00
var watchDeltaChannels [ ] * datapb . VchannelInfo
2021-04-15 15:15:46 +08:00
for _ , partitionID := range partitionIDs {
2021-06-16 11:09:56 +08:00
getRecoveryInfoRequest := & datapb . GetRecoveryInfoRequest {
2021-06-15 12:41:40 +08:00
Base : lpt . Base ,
2021-04-15 15:15:46 +08:00
CollectionID : collectionID ,
PartitionID : partitionID ,
}
2021-06-30 16:18:13 +08:00
recoveryInfo , err := lpt . dataCoord . GetRecoveryInfo ( ctx , getRecoveryInfoRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
for _ , segmentBingLog := range recoveryInfo . Binlogs {
segmentID := segmentBingLog . SegmentID
segmentLoadInfo := & querypb . SegmentLoadInfo {
SegmentID : segmentID ,
2021-04-15 15:15:46 +08:00
PartitionID : partitionID ,
2021-06-15 12:41:40 +08:00
CollectionID : collectionID ,
2021-06-26 16:08:11 +08:00
BinlogPaths : segmentBingLog . FieldBinlogs ,
2021-09-07 11:35:18 +08:00
NumOfRows : segmentBingLog . NumOfRows ,
2021-10-22 14:31:13 +08:00
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
2021-06-26 16:08:11 +08:00
}
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lpt . rootCoord , lpt . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lpt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
2021-06-26 16:08:11 +08:00
loadSegmentReq := & querypb . LoadSegmentsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-26 16:08:11 +08:00
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : lpt . Schema ,
LoadCondition : querypb . TriggerCondition_grpcRequest ,
2021-11-11 12:56:42 +08:00
CollectionID : collectionID ,
2021-04-15 15:15:46 +08:00
}
2021-06-15 12:41:40 +08:00
segmentsToLoad = append ( segmentsToLoad , segmentID )
2021-06-26 16:08:11 +08:00
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
2021-06-15 12:41:40 +08:00
}
2021-11-13 08:49:08 +08:00
if len ( watchDeltaChannels ) != len ( recoveryInfo . Channels ) {
2021-11-05 14:47:19 +08:00
for _ , info := range recoveryInfo . Channels {
2021-11-13 08:49:08 +08:00
deltaChannelName , err := rootcoord . ConvertChannelName ( info . ChannelName , Params . DmlChannelPrefix , Params . DeltaChannelPrefix )
2021-11-05 14:47:19 +08:00
if err != nil {
return err
}
2021-11-13 08:49:08 +08:00
deltaChannel := proto . Clone ( info ) . ( * datapb . VchannelInfo )
deltaChannel . ChannelName = deltaChannelName
watchDeltaChannels = append ( watchDeltaChannels , deltaChannel )
2021-11-05 14:47:19 +08:00
}
}
2021-06-15 12:41:40 +08:00
for _ , info := range recoveryInfo . Channels {
2021-11-05 14:47:19 +08:00
// watch dml channels
2021-06-15 12:41:40 +08:00
channel := info . ChannelName
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lpt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
2021-06-26 16:08:11 +08:00
watchDmRequest := & querypb . WatchDmChannelsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-15 12:41:40 +08:00
CollectionID : collectionID ,
PartitionID : partitionID ,
2021-06-16 11:09:56 +08:00
Infos : [ ] * datapb . VchannelInfo { info } ,
2021-06-15 12:41:40 +08:00
Schema : lpt . Schema ,
2021-04-15 15:15:46 +08:00
}
2021-06-15 12:41:40 +08:00
channelsToWatch = append ( channelsToWatch , channel )
2021-06-26 16:08:11 +08:00
watchDmReqs = append ( watchDmReqs , watchDmRequest )
2021-10-18 21:34:47 +08:00
log . Debug ( "loadPartitionTask: set watchDmChannelsRequests" , zap . Any ( "request" , watchDmRequest ) , zap . Int64 ( "collectionID" , collectionID ) )
2021-11-05 14:47:19 +08:00
2021-04-15 15:15:46 +08:00
}
}
2021-11-13 08:49:08 +08:00
msgBase := proto . Clone ( lpt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDeltaChannels
watchDeltaChannelReq := & querypb . WatchDeltaChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : watchDeltaChannels ,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lpt . meta . setDeltaChannel ( watchDeltaChannelReq . CollectionID , watchDeltaChannelReq . Infos )
internalTasks , err := assignInternalTask ( ctx , collectionID , lpt , lpt . meta , lpt . cluster , loadSegmentReqs , watchDmReqs , watchDeltaChannelReq , false , nil , nil )
2021-09-29 09:56:04 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "loadPartitionTask: assign child task failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( err )
2021-09-29 09:56:04 +08:00
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
lpt . addChildTask ( internalTask )
log . Debug ( "loadPartitionTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Int64 ( "collectionID" , collectionID ) , zap . Any ( "task" , internalTask ) )
}
2021-10-18 21:34:47 +08:00
log . Debug ( "loadPartitionTask: assign child task done" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-04-15 15:15:46 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadPartitionTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lpt . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64s ( "partitionIDs" , partitionIDs ) )
return nil
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) postExecute ( ctx context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := lpt . CollectionID
partitionIDs := lpt . PartitionIDs
2021-06-23 17:44:12 +08:00
if lpt . result . ErrorCode != commonpb . ErrorCode_Success {
2021-10-11 09:54:37 +08:00
lpt . childTasks = [ ] task { }
2021-06-23 17:44:12 +08:00
if lpt . addCol {
2021-10-11 09:54:37 +08:00
err := lpt . meta . releaseCollection ( collectionID )
2021-06-30 17:48:19 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Error ( "loadPartitionTask: occur error when release collection info from meta" , zap . Error ( err ) )
2021-06-23 17:44:12 +08:00
}
} else {
2021-10-11 09:54:37 +08:00
for _ , partitionID := range partitionIDs {
err := lpt . meta . releasePartition ( collectionID , partitionID )
if err != nil {
2021-10-18 21:34:47 +08:00
log . Error ( "loadPartitionTask: occur error when release partition info from meta" , zap . Error ( err ) )
2021-06-23 17:44:12 +08:00
}
}
}
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadPartitionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lpt . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
partitionIDs := lpt . PartitionIDs
resultTasks := make ( [ ] task , 0 )
//brute force rollBack, should optimize
if lpt . addCol {
nodes , _ := lpt . cluster . onlineNodes ( )
for nodeID := range nodes {
req := & querypb . ReleaseCollectionRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ReleaseCollection ,
MsgID : lpt . Base . MsgID ,
Timestamp : lpt . Base . Timestamp ,
SourceID : lpt . Base . SourceID ,
} ,
DbID : lpt . DbID ,
CollectionID : lpt . CollectionID ,
NodeID : nodeID ,
}
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_grpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( lpt )
2021-10-18 21:34:47 +08:00
releaseCollectionTask := & releaseCollectionTask {
baseTask : baseTask ,
2021-10-11 09:54:37 +08:00
ReleaseCollectionRequest : req ,
cluster : lpt . cluster ,
}
resultTasks = append ( resultTasks , releaseCollectionTask )
}
} else {
nodes , _ := lpt . cluster . onlineNodes ( )
for nodeID := range nodes {
req := & querypb . ReleasePartitionsRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ReleasePartitions ,
MsgID : lpt . Base . MsgID ,
Timestamp : lpt . Base . Timestamp ,
SourceID : lpt . Base . SourceID ,
} ,
DbID : lpt . DbID ,
CollectionID : lpt . CollectionID ,
PartitionIDs : partitionIDs ,
NodeID : nodeID ,
}
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_grpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( lpt )
2021-10-18 21:34:47 +08:00
releasePartitionTask := & releasePartitionTask {
baseTask : baseTask ,
2021-10-11 09:54:37 +08:00
ReleasePartitionsRequest : req ,
cluster : lpt . cluster ,
}
resultTasks = append ( resultTasks , releasePartitionTask )
}
}
log . Debug ( "loadPartitionTask: rollBack loadPartitionTask" , zap . Any ( "loadPartitionTask" , lpt ) , zap . Any ( "rollBack task" , resultTasks ) )
return resultTasks
}
2021-10-18 21:34:47 +08:00
// releasePartitionTask will release all the data of this partition on query nodes
type releasePartitionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . ReleasePartitionsRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return rpt . Base
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( rpt . ReleasePartitionsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return rpt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return rpt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) preExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rpt . CollectionID
2021-10-14 20:18:33 +08:00
rpt . setResultInfo ( nil )
2021-04-15 15:15:46 +08:00
log . Debug ( "start do releasePartitionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rpt . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
rpt . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := rpt . CollectionID
partitionIDs := rpt . PartitionIDs
2021-10-11 09:54:37 +08:00
2021-09-23 21:06:02 +08:00
// if nodeID ==0, it means that the release request has not been assigned to the specified query node
2021-06-19 11:45:09 +08:00
if rpt . NodeID <= 0 {
2021-09-15 20:40:07 +08:00
nodes , err := rpt . cluster . onlineNodes ( )
2021-06-30 17:48:19 +08:00
if err != nil {
log . Debug ( err . Error ( ) )
}
for nodeID := range nodes {
2021-06-19 11:45:09 +08:00
req := proto . Clone ( rpt . ReleasePartitionsRequest ) . ( * querypb . ReleasePartitionsRequest )
req . NodeID = nodeID
2021-10-11 09:54:37 +08:00
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_grpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( rpt )
2021-10-18 21:34:47 +08:00
releasePartitionTask := & releasePartitionTask {
baseTask : baseTask ,
2021-06-19 11:45:09 +08:00
ReleasePartitionsRequest : req ,
2021-06-15 12:41:40 +08:00
cluster : rpt . cluster ,
}
2021-10-14 20:18:33 +08:00
rpt . addChildTask ( releasePartitionTask )
2021-10-18 21:34:47 +08:00
log . Debug ( "releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask" , zap . Any ( "task" , releasePartitionTask ) )
2021-06-15 12:41:40 +08:00
}
} else {
2021-08-02 22:39:25 +08:00
err := rpt . cluster . releasePartitions ( ctx , rpt . NodeID , rpt . ReleasePartitionsRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-11 09:54:37 +08:00
log . Warn ( "ReleasePartitionsTask: release partition end, node occur error" , zap . String ( "nodeID" , fmt . Sprintln ( rpt . NodeID ) ) )
2021-10-14 20:18:33 +08:00
rpt . setResultInfo ( err )
2021-06-23 17:44:12 +08:00
return err
}
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
log . Debug ( "releasePartitionTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rpt . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-06-15 12:41:40 +08:00
zap . Int64s ( "partitionIDs" , partitionIDs ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rpt . NodeID ) )
2021-04-15 15:15:46 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) postExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rpt . CollectionID
partitionIDs := rpt . PartitionIDs
2021-10-11 09:54:37 +08:00
if rpt . result . ErrorCode != commonpb . ErrorCode_Success {
rpt . childTasks = [ ] task { }
}
2021-06-15 12:41:40 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "releasePartitionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rpt . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64s ( "partitionIDs" , partitionIDs ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rpt . NodeID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
//TODO::
//if taskID == 0, recovery meta
//if taskID != 0, recovery partition on queryNode
return nil
}
2021-10-18 21:34:47 +08:00
type loadSegmentTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . LoadSegmentsRequest
2021-10-11 09:54:37 +08:00
meta Meta
cluster Cluster
excludeNodeIDs [ ] int64
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lst . Base
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lst . LoadSegmentsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) isValid ( ) bool {
2021-10-22 19:07:15 +08:00
online , err := lst . cluster . isOnline ( lst . DstNodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return lst . ctx != nil && online
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) msgType ( ) commonpb . MsgType {
2021-06-15 12:41:40 +08:00
return lst . Base . MsgType
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) timestamp ( ) Timestamp {
2021-06-15 12:41:40 +08:00
return lst . Base . Timestamp
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) updateTaskProcess ( ) {
2021-10-14 20:18:33 +08:00
parentTask := lst . getParentTask ( )
2021-10-11 09:54:37 +08:00
if parentTask == nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "loadSegmentTask: parentTask should not be nil" )
2021-10-11 09:54:37 +08:00
return
}
2021-10-14 20:18:33 +08:00
parentTask . updateTaskProcess ( )
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) preExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
segmentIDs := make ( [ ] UniqueID , 0 )
for _ , info := range lst . Infos {
segmentIDs = append ( segmentIDs , info . SegmentID )
}
2021-10-14 20:18:33 +08:00
lst . setResultInfo ( nil )
2021-06-15 12:41:40 +08:00
log . Debug ( "start do loadSegmentTask" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , segmentIDs ) ,
2021-10-22 19:07:15 +08:00
zap . Int64 ( "loaded nodeID" , lst . DstNodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lst . retryCount --
} ( )
2021-10-22 19:07:15 +08:00
err := lst . cluster . loadSegments ( ctx , lst . DstNodeID , lst . LoadSegmentsRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "loadSegmentTask: loadSegment occur error" , zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-10-14 20:18:33 +08:00
lst . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
2021-06-19 11:45:09 +08:00
log . Debug ( "loadSegmentTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) postExecute ( context . Context ) error {
2021-06-19 11:45:09 +08:00
log . Debug ( "loadSegmentTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) reschedule ( ctx context . Context ) ( [ ] task , error ) {
2021-11-11 12:56:42 +08:00
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
collectionID := lst . CollectionID
2021-06-19 11:45:09 +08:00
for _ , info := range lst . Infos {
2021-11-11 12:56:42 +08:00
msgBase := proto . Clone ( lst . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
req := & querypb . LoadSegmentsRequest {
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { info } ,
Schema : lst . Schema ,
LoadCondition : lst . triggerCondition ,
SourceNodeID : lst . SourceNodeID ,
CollectionID : lst . CollectionID ,
}
loadSegmentReqs = append ( loadSegmentReqs , req )
}
if lst . excludeNodeIDs == nil {
lst . excludeNodeIDs = [ ] int64 { }
2021-06-19 11:45:09 +08:00
}
2021-10-22 19:07:15 +08:00
lst . excludeNodeIDs = append ( lst . excludeNodeIDs , lst . DstNodeID )
2021-11-13 08:49:08 +08:00
deltaChannelInfos , err := lst . meta . getDeltaChannelsByCollectionID ( collectionID )
if err != nil {
return nil , err
}
msgBase := proto . Clone ( lst . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDeltaChannels
watchDeltaRequest := & querypb . WatchDeltaChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : deltaChannelInfos ,
}
log . Debug ( "assignInternalTask: add a watchDeltaChannelTask childTask" , zap . Any ( "task" , watchDeltaRequest ) )
2021-11-11 12:56:42 +08:00
//TODO:: wait or not according msgType
2021-11-12 18:49:10 +08:00
reScheduledTasks , err := assignInternalTask ( ctx , collectionID , lst . getParentTask ( ) , lst . meta , lst . cluster , loadSegmentReqs , nil , nil , false , lst . excludeNodeIDs , nil )
2021-10-11 09:54:37 +08:00
if err != nil {
log . Error ( "loadSegment reschedule failed" , zap . Int64s ( "excludeNodes" , lst . excludeNodeIDs ) , zap . Error ( err ) )
return nil , err
}
2021-06-19 11:45:09 +08:00
2021-11-11 12:56:42 +08:00
return reScheduledTasks , nil
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
type releaseSegmentTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . ReleaseSegmentsRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return rst . Base
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( rst . ReleaseSegmentsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) isValid ( ) bool {
2021-10-11 09:54:37 +08:00
online , err := rst . cluster . isOnline ( rst . NodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return rst . ctx != nil && online
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) msgType ( ) commonpb . MsgType {
2021-06-15 12:41:40 +08:00
return rst . Base . MsgType
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) timestamp ( ) Timestamp {
2021-06-15 12:41:40 +08:00
return rst . Base . Timestamp
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) preExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
segmentIDs := rst . SegmentIDs
2021-10-14 20:18:33 +08:00
rst . setResultInfo ( nil )
2021-06-15 12:41:40 +08:00
log . Debug ( "start do releaseSegmentTask" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , segmentIDs ) ,
zap . Int64 ( "loaded nodeID" , rst . NodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
rst . retryCount --
} ( )
2021-08-02 22:39:25 +08:00
err := rst . cluster . releaseSegments ( rst . ctx , rst . NodeID , rst . ReleaseSegmentsRequest )
2021-06-15 12:41:40 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "releaseSegmentTask: releaseSegment occur error" , zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-10-14 20:18:33 +08:00
rst . setResultInfo ( err )
2021-06-15 12:41:40 +08:00
return err
}
log . Debug ( "releaseSegmentTask Execute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , rst . SegmentIDs ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) postExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
segmentIDs := rst . SegmentIDs
log . Debug ( "releaseSegmentTask postExecute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , segmentIDs ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
type watchDmChannelTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . WatchDmChannelsRequest
2021-10-11 09:54:37 +08:00
meta Meta
cluster Cluster
excludeNodeIDs [ ] int64
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return wdt . Base
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( wdt . WatchDmChannelsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) isValid ( ) bool {
2021-10-11 09:54:37 +08:00
online , err := wdt . cluster . isOnline ( wdt . NodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return wdt . ctx != nil && online
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) msgType ( ) commonpb . MsgType {
2021-06-15 12:41:40 +08:00
return wdt . Base . MsgType
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) timestamp ( ) Timestamp {
2021-06-15 12:41:40 +08:00
return wdt . Base . Timestamp
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) updateTaskProcess ( ) {
2021-10-14 20:18:33 +08:00
parentTask := wdt . getParentTask ( )
2021-10-11 09:54:37 +08:00
if parentTask == nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "watchDmChannelTask: parentTask should not be nil" )
2021-10-11 09:54:37 +08:00
return
}
2021-10-14 20:18:33 +08:00
parentTask . updateTaskProcess ( )
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) preExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
channelInfos := wdt . Infos
channels := make ( [ ] string , 0 )
for _ , info := range channelInfos {
channels = append ( channels , info . ChannelName )
2021-04-15 15:15:46 +08:00
}
2021-10-14 20:18:33 +08:00
wdt . setResultInfo ( nil )
2021-06-15 12:41:40 +08:00
log . Debug ( "start do watchDmChannelTask" ,
2021-06-19 11:45:09 +08:00
zap . Strings ( "dmChannels" , channels ) ,
zap . Int64 ( "loaded nodeID" , wdt . NodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
wdt . retryCount --
} ( )
2021-08-02 22:39:25 +08:00
err := wdt . cluster . watchDmChannels ( wdt . ctx , wdt . NodeID , wdt . WatchDmChannelsRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "watchDmChannelTask: watchDmChannel occur error" , zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-10-14 20:18:33 +08:00
wdt . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
2021-06-19 11:45:09 +08:00
log . Debug ( "watchDmChannelsTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) postExecute ( context . Context ) error {
2021-06-19 11:45:09 +08:00
log . Debug ( "watchDmChannelTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) reschedule ( ctx context . Context ) ( [ ] task , error ) {
2021-06-19 11:45:09 +08:00
collectionID := wdt . CollectionID
2021-11-11 12:56:42 +08:00
watchDmChannelReqs := make ( [ ] * querypb . WatchDmChannelsRequest , 0 )
2021-06-19 11:45:09 +08:00
for _ , info := range wdt . Infos {
2021-11-11 12:56:42 +08:00
msgBase := proto . Clone ( wdt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
req := & querypb . WatchDmChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
PartitionID : wdt . PartitionID ,
Infos : [ ] * datapb . VchannelInfo { info } ,
Schema : wdt . Schema ,
ExcludeInfos : wdt . ExcludeInfos ,
}
watchDmChannelReqs = append ( watchDmChannelReqs , req )
2021-06-19 11:45:09 +08:00
}
2021-11-11 12:56:42 +08:00
if wdt . excludeNodeIDs == nil {
wdt . excludeNodeIDs = [ ] int64 { }
}
2021-10-11 09:54:37 +08:00
wdt . excludeNodeIDs = append ( wdt . excludeNodeIDs , wdt . NodeID )
2021-11-11 12:56:42 +08:00
//TODO:: wait or not according msgType
2021-11-12 18:49:10 +08:00
reScheduledTasks , err := assignInternalTask ( ctx , collectionID , wdt . parentTask , wdt . meta , wdt . cluster , nil , watchDmChannelReqs , nil , false , wdt . excludeNodeIDs , nil )
2021-10-11 09:54:37 +08:00
if err != nil {
log . Error ( "watchDmChannel reschedule failed" , zap . Int64s ( "excludeNodes" , wdt . excludeNodeIDs ) , zap . Error ( err ) )
return nil , err
}
2021-06-19 11:45:09 +08:00
2021-11-11 12:56:42 +08:00
return reScheduledTasks , nil
2021-06-19 11:45:09 +08:00
}
2021-11-05 14:47:19 +08:00
type watchDeltaChannelTask struct {
* baseTask
* querypb . WatchDeltaChannelsRequest
meta Meta
cluster Cluster
excludeNodeIDs [ ] int64
}
func ( wdt * watchDeltaChannelTask ) msgBase ( ) * commonpb . MsgBase {
return wdt . Base
}
func ( wdt * watchDeltaChannelTask ) marshal ( ) ( [ ] byte , error ) {
return proto . Marshal ( wdt . WatchDeltaChannelsRequest )
}
func ( wdt * watchDeltaChannelTask ) msgType ( ) commonpb . MsgType {
return wdt . Base . MsgType
}
func ( wdt * watchDeltaChannelTask ) timestamp ( ) Timestamp {
return wdt . Base . Timestamp
}
func ( wdt * watchDeltaChannelTask ) updateTaskProcess ( ) {
parentTask := wdt . getParentTask ( )
if parentTask == nil {
log . Warn ( "watchDeltaChannel: parentTask should not be nil" )
return
}
parentTask . updateTaskProcess ( )
}
func ( wdt * watchDeltaChannelTask ) preExecute ( context . Context ) error {
channelInfos := wdt . Infos
channels := make ( [ ] string , 0 )
for _ , info := range channelInfos {
channels = append ( channels , info . ChannelName )
}
wdt . setResultInfo ( nil )
log . Debug ( "start do watchDeltaChannelTask" ,
zap . Strings ( "deltaChannels" , channels ) ,
zap . Int64 ( "loaded nodeID" , wdt . NodeID ) ,
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
return nil
}
func ( wdt * watchDeltaChannelTask ) execute ( ctx context . Context ) error {
defer func ( ) {
wdt . retryCount --
} ( )
err := wdt . cluster . watchDeltaChannels ( wdt . ctx , wdt . NodeID , wdt . WatchDeltaChannelsRequest )
if err != nil {
log . Warn ( "watchDeltaChannelTask: watchDeltaChannel occur error" , zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) , zap . Error ( err ) )
wdt . setResultInfo ( err )
return err
}
log . Debug ( "watchDeltaChannelsTask Execute done" ,
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
return nil
}
func ( wdt * watchDeltaChannelTask ) postExecute ( context . Context ) error {
log . Debug ( "watchDeltaChannelTask postExecute done" ,
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
return nil
}
2021-10-18 21:34:47 +08:00
type watchQueryChannelTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . AddQueryChannelRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return wqt . Base
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( wqt . AddQueryChannelRequest )
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) isValid ( ) bool {
2021-10-11 09:54:37 +08:00
online , err := wqt . cluster . isOnline ( wqt . NodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return wqt . ctx != nil && online
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) msgType ( ) commonpb . MsgType {
2021-06-19 11:45:09 +08:00
return wqt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) timestamp ( ) Timestamp {
2021-06-19 11:45:09 +08:00
return wqt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) updateTaskProcess ( ) {
2021-10-14 20:18:33 +08:00
parentTask := wqt . getParentTask ( )
2021-10-11 09:54:37 +08:00
if parentTask == nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "watchQueryChannelTask: parentTask should not be nil" )
2021-10-11 09:54:37 +08:00
return
2021-08-02 22:39:25 +08:00
}
2021-10-14 20:18:33 +08:00
parentTask . updateTaskProcess ( )
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) preExecute ( context . Context ) error {
2021-10-14 20:18:33 +08:00
wqt . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do watchQueryChannelTask" ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "collectionID" , wqt . CollectionID ) ,
zap . String ( "queryChannel" , wqt . RequestChannelID ) ,
zap . String ( "queryResultChannel" , wqt . ResultChannelID ) ,
zap . Int64 ( "loaded nodeID" , wqt . NodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
wqt . retryCount --
} ( )
2021-08-02 22:39:25 +08:00
err := wqt . cluster . addQueryChannel ( wqt . ctx , wqt . NodeID , wqt . AddQueryChannelRequest )
2021-06-15 12:41:40 +08:00
if err != nil {
2021-10-24 15:16:00 +08:00
log . Warn ( "watchQueryChannelTask: watchQueryChannel occur error" , zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
wqt . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
log . Debug ( "watchQueryChannelTask Execute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "collectionID" , wqt . CollectionID ) ,
zap . String ( "queryChannel" , wqt . RequestChannelID ) ,
zap . String ( "queryResultChannel" , wqt . ResultChannelID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-06-16 11:09:56 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) postExecute ( context . Context ) error {
log . Debug ( "watchQueryChannelTask postExecute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "collectionID" , wqt . CollectionID ) ,
zap . String ( "queryChannel" , wqt . RequestChannelID ) ,
zap . String ( "queryResultChannel" , wqt . ResultChannelID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-24 22:39:09 +08:00
//****************************handoff task********************************//
2021-10-18 21:34:47 +08:00
type handoffTask struct {
2021-10-24 22:39:09 +08:00
* baseTask
* querypb . HandoffSegmentsRequest
dataCoord types . DataCoord
cluster Cluster
meta Meta
}
func ( ht * handoffTask ) msgBase ( ) * commonpb . MsgBase {
return ht . Base
}
func ( ht * handoffTask ) marshal ( ) ( [ ] byte , error ) {
return proto . Marshal ( ht . HandoffSegmentsRequest )
}
func ( ht * handoffTask ) msgType ( ) commonpb . MsgType {
return ht . Base . MsgType
}
func ( ht * handoffTask ) timestamp ( ) Timestamp {
return ht . Base . Timestamp
}
func ( ht * handoffTask ) preExecute ( context . Context ) error {
ht . setResultInfo ( nil )
segmentIDs := make ( [ ] UniqueID , 0 )
segmentInfos := ht . HandoffSegmentsRequest . SegmentInfos
for _ , info := range segmentInfos {
segmentIDs = append ( segmentIDs , info . SegmentID )
}
log . Debug ( "start do handoff segments task" ,
zap . Int64s ( "segmentIDs" , segmentIDs ) )
return nil
}
func ( ht * handoffTask ) execute ( ctx context . Context ) error {
segmentInfos := ht . HandoffSegmentsRequest . SegmentInfos
for _ , segmentInfo := range segmentInfos {
collectionID := segmentInfo . CollectionID
partitionID := segmentInfo . PartitionID
segmentID := segmentInfo . SegmentID
collectionInfo , err := ht . meta . getCollectionInfoByID ( collectionID )
if err != nil {
log . Debug ( "handoffTask: collection has not been loaded into memory" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "segmentID" , segmentID ) )
continue
}
2021-10-26 13:04:18 +08:00
if collectionInfo . LoadType == querypb . LoadType_loadCollection && ht . meta . hasReleasePartition ( collectionID , partitionID ) {
log . Debug ( "handoffTask: partition has not been released" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) )
continue
}
2021-10-24 22:39:09 +08:00
partitionLoaded := false
for _ , id := range collectionInfo . PartitionIDs {
if id == partitionID {
partitionLoaded = true
}
}
if collectionInfo . LoadType != querypb . LoadType_loadCollection && ! partitionLoaded {
log . Debug ( "handoffTask: partition has not been loaded into memory" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Int64 ( "segmentID" , segmentID ) )
continue
}
2021-11-08 21:00:02 +08:00
// segment which is compacted from should be exist in query node
for _ , compactedSegID := range segmentInfo . CompactionFrom {
_ , err = ht . meta . getSegmentInfoByID ( compactedSegID )
if err != nil {
log . Error ( "handoffTask: compacted segment has not been loaded into memory" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Int64 ( "segmentID" , segmentID ) )
ht . setResultInfo ( err )
return err
}
}
// segment which is compacted to should not exist in query node
2021-10-24 22:39:09 +08:00
_ , err = ht . meta . getSegmentInfoByID ( segmentID )
if err != nil {
getRecoveryInfoRequest := & datapb . GetRecoveryInfoRequest {
Base : ht . Base ,
CollectionID : collectionID ,
PartitionID : partitionID ,
}
recoveryInfo , err := ht . dataCoord . GetRecoveryInfo ( ctx , getRecoveryInfoRequest )
if err != nil {
ht . setResultInfo ( err )
return err
}
findBinlog := false
var loadSegmentReq * querypb . LoadSegmentsRequest
2021-11-13 08:49:08 +08:00
var watchDeltaChannels [ ] * datapb . VchannelInfo
2021-10-24 22:39:09 +08:00
for _ , segmentBinlogs := range recoveryInfo . Binlogs {
if segmentBinlogs . SegmentID == segmentID {
findBinlog = true
segmentLoadInfo := & querypb . SegmentLoadInfo {
2021-11-08 21:00:02 +08:00
SegmentID : segmentID ,
PartitionID : partitionID ,
CollectionID : collectionID ,
BinlogPaths : segmentBinlogs . FieldBinlogs ,
NumOfRows : segmentBinlogs . NumOfRows ,
CompactionFrom : segmentInfo . CompactionFrom ,
2021-11-17 14:37:11 +08:00
EnableIndex : segmentInfo . EnableIndex ,
2021-11-17 09:47:12 +08:00
IndexPathInfos : segmentInfo . IndexPathInfos ,
2021-10-24 22:39:09 +08:00
}
msgBase := proto . Clone ( ht . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
loadSegmentReq = & querypb . LoadSegmentsRequest {
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : collectionInfo . Schema ,
LoadCondition : querypb . TriggerCondition_handoff ,
}
}
}
2021-11-13 08:49:08 +08:00
if len ( watchDeltaChannels ) != len ( recoveryInfo . Channels ) {
2021-11-05 14:47:19 +08:00
for _ , info := range recoveryInfo . Channels {
2021-11-13 08:49:08 +08:00
deltaChannelName , err := rootcoord . ConvertChannelName ( info . ChannelName , Params . DmlChannelPrefix , Params . DeltaChannelPrefix )
2021-11-05 14:47:19 +08:00
if err != nil {
return err
}
2021-11-13 08:49:08 +08:00
deltaChannel := proto . Clone ( info ) . ( * datapb . VchannelInfo )
deltaChannel . ChannelName = deltaChannelName
watchDeltaChannels = append ( watchDeltaChannels , deltaChannel )
2021-11-05 14:47:19 +08:00
}
}
2021-10-24 22:39:09 +08:00
if ! findBinlog {
err = fmt . Errorf ( "segmnet has not been flushed, segmentID is %d" , segmentID )
ht . setResultInfo ( err )
return err
}
2021-11-13 08:49:08 +08:00
msgBase := proto . Clone ( ht . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDeltaChannels
watchDeltaChannelReq := & querypb . WatchDeltaChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : watchDeltaChannels ,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
ht . meta . setDeltaChannel ( watchDeltaChannelReq . CollectionID , watchDeltaChannelReq . Infos )
internalTasks , err := assignInternalTask ( ctx , collectionID , ht , ht . meta , ht . cluster , [ ] * querypb . LoadSegmentsRequest { loadSegmentReq } , nil , watchDeltaChannelReq , true , nil , nil )
2021-10-24 22:39:09 +08:00
if err != nil {
log . Error ( "handoffTask: assign child task failed" , zap . Any ( "segmentInfo" , segmentInfo ) )
ht . setResultInfo ( err )
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
ht . addChildTask ( internalTask )
log . Debug ( "handoffTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Int64 ( "segmentID" , segmentID ) , zap . Any ( "task" , internalTask ) )
}
2021-10-24 22:39:09 +08:00
} else {
err = fmt . Errorf ( "sealed segment has been exist on query node, segmentID is %d" , segmentID )
log . Error ( "handoffTask: sealed segment has been exist on query node" , zap . Int64 ( "segmentID" , segmentID ) )
ht . setResultInfo ( err )
return err
}
}
log . Debug ( "handoffTask: assign child task done" , zap . Any ( "segmentInfos" , segmentInfos ) )
log . Debug ( "handoffTask Execute done" ,
zap . Int64 ( "taskID" , ht . getTaskID ( ) ) )
return nil
}
func ( ht * handoffTask ) postExecute ( context . Context ) error {
if ht . result . ErrorCode != commonpb . ErrorCode_Success {
ht . childTasks = [ ] task { }
}
log . Debug ( "handoffTask postExecute done" ,
zap . Int64 ( "taskID" , ht . getTaskID ( ) ) )
return nil
}
func ( ht * handoffTask ) rollBack ( ctx context . Context ) [ ] task {
resultTasks := make ( [ ] task , 0 )
childTasks := ht . getChildTask ( )
for _ , childTask := range childTasks {
if childTask . msgType ( ) == commonpb . MsgType_LoadSegments {
// TODO:: add release segment to rollBack, no release does not affect correctness of query
}
}
return resultTasks
2021-06-19 11:45:09 +08:00
}
2021-06-15 12:41:40 +08:00
2021-10-18 21:34:47 +08:00
type loadBalanceTask struct {
* baseTask
2021-06-19 11:45:09 +08:00
* querypb . LoadBalanceRequest
2021-11-17 09:47:12 +08:00
rootCoord types . RootCoord
dataCoord types . DataCoord
indexCoord types . IndexCoord
cluster Cluster
meta Meta
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lbt . Base
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lbt . LoadBalanceRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) msgType ( ) commonpb . MsgType {
2021-06-19 11:45:09 +08:00
return lbt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) timestamp ( ) Timestamp {
2021-06-19 11:45:09 +08:00
return lbt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) preExecute ( context . Context ) error {
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do loadBalanceTask" ,
2021-11-06 15:22:56 +08:00
zap . Int32 ( "trigger type" , int32 ( lbt . triggerCondition ) ) ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) ,
zap . Any ( "balanceReason" , lbt . BalanceReason ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lbt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lbt . retryCount --
} ( )
2021-06-19 11:45:09 +08:00
if lbt . triggerCondition == querypb . TriggerCondition_nodeDown {
for _ , nodeID := range lbt . SourceNodeIDs {
2021-08-02 22:39:25 +08:00
collectionInfos := lbt . cluster . getCollectionInfosByID ( lbt . ctx , nodeID )
for _ , info := range collectionInfos {
collectionID := info . CollectionID
2021-06-30 17:48:19 +08:00
metaInfo , err := lbt . meta . getCollectionInfoByID ( collectionID )
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "loadBalanceTask: getCollectionInfoByID occur error" , zap . String ( "error" , err . Error ( ) ) )
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( err )
2021-10-11 09:54:37 +08:00
return err
2021-06-30 17:48:19 +08:00
}
2021-08-02 22:39:25 +08:00
loadType := metaInfo . LoadType
2021-06-30 17:48:19 +08:00
schema := metaInfo . Schema
2021-06-19 11:45:09 +08:00
partitionIDs := info . PartitionIDs
segmentsToLoad := make ( [ ] UniqueID , 0 )
2021-06-30 17:48:19 +08:00
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
2021-06-19 11:45:09 +08:00
channelsToWatch := make ( [ ] string , 0 )
2021-06-30 17:48:19 +08:00
watchDmChannelReqs := make ( [ ] * querypb . WatchDmChannelsRequest , 0 )
2021-11-13 08:49:08 +08:00
var watchDeltaChannels [ ] * datapb . VchannelInfo
2021-06-19 11:45:09 +08:00
dmChannels , err := lbt . meta . getDmChannelsByNodeID ( collectionID , nodeID )
if err != nil {
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( err )
2021-06-19 11:45:09 +08:00
return err
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
for _ , partitionID := range partitionIDs {
getRecoveryInfo := & datapb . GetRecoveryInfoRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_LoadBalanceSegments ,
} ,
CollectionID : collectionID ,
PartitionID : partitionID ,
}
2021-07-01 15:24:17 +08:00
recoveryInfo , err := lbt . dataCoord . GetRecoveryInfo ( ctx , getRecoveryInfo )
2021-06-19 11:45:09 +08:00
if err != nil {
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( err )
2021-06-19 11:45:09 +08:00
return err
}
2021-06-30 17:48:19 +08:00
for _ , segmentBingLog := range recoveryInfo . Binlogs {
segmentID := segmentBingLog . SegmentID
segmentLoadInfo := & querypb . SegmentLoadInfo {
SegmentID : segmentID ,
PartitionID : partitionID ,
CollectionID : collectionID ,
BinlogPaths : segmentBingLog . FieldBinlogs ,
2021-09-07 11:35:18 +08:00
NumOfRows : segmentBingLog . NumOfRows ,
2021-10-22 14:31:13 +08:00
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
2021-06-30 17:48:19 +08:00
}
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lbt . rootCoord , lbt . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-06-30 17:48:19 +08:00
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
2021-06-30 17:48:19 +08:00
loadSegmentReq := & querypb . LoadSegmentsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-30 17:48:19 +08:00
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : schema ,
LoadCondition : querypb . TriggerCondition_nodeDown ,
2021-10-22 19:07:15 +08:00
SourceNodeID : nodeID ,
2021-06-30 17:48:19 +08:00
}
segmentsToLoad = append ( segmentsToLoad , segmentID )
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
}
2021-11-13 08:49:08 +08:00
if len ( watchDeltaChannels ) != len ( recoveryInfo . Channels ) {
2021-11-05 14:47:19 +08:00
for _ , info := range recoveryInfo . Channels {
2021-11-13 08:49:08 +08:00
deltaChannelName , err := rootcoord . ConvertChannelName ( info . ChannelName , Params . DmlChannelPrefix , Params . DeltaChannelPrefix )
if err != nil {
return err
2021-11-05 14:47:19 +08:00
}
2021-11-13 08:49:08 +08:00
deltaChannel := proto . Clone ( info ) . ( * datapb . VchannelInfo )
deltaChannel . ChannelName = deltaChannelName
watchDeltaChannels = append ( watchDeltaChannels , deltaChannel )
2021-11-05 14:47:19 +08:00
}
}
2021-06-19 11:45:09 +08:00
for _ , channelInfo := range recoveryInfo . Channels {
for _ , channel := range dmChannels {
if channelInfo . ChannelName == channel {
2021-08-02 22:39:25 +08:00
if loadType == querypb . LoadType_loadCollection {
2021-06-30 17:48:19 +08:00
merged := false
for index , channelName := range channelsToWatch {
if channel == channelName {
merged = true
oldInfo := watchDmChannelReqs [ index ] . Infos [ 0 ]
newInfo := mergeVChannelInfo ( oldInfo , channelInfo )
watchDmChannelReqs [ index ] . Infos = [ ] * datapb . VchannelInfo { newInfo }
break
}
}
if ! merged {
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
2021-06-30 17:48:19 +08:00
watchRequest := & querypb . WatchDmChannelsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-30 17:48:19 +08:00
CollectionID : collectionID ,
Infos : [ ] * datapb . VchannelInfo { channelInfo } ,
Schema : schema ,
}
2021-06-19 11:45:09 +08:00
channelsToWatch = append ( channelsToWatch , channel )
2021-06-30 17:48:19 +08:00
watchDmChannelReqs = append ( watchDmChannelReqs , watchRequest )
2021-06-19 11:45:09 +08:00
}
} else {
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
2021-06-30 17:48:19 +08:00
watchRequest := & querypb . WatchDmChannelsRequest {
2021-07-13 14:16:00 +08:00
Base : msgBase ,
2021-06-30 17:48:19 +08:00
CollectionID : collectionID ,
PartitionID : partitionID ,
Infos : [ ] * datapb . VchannelInfo { channelInfo } ,
Schema : schema ,
}
2021-06-19 11:45:09 +08:00
channelsToWatch = append ( channelsToWatch , channel )
2021-06-30 17:48:19 +08:00
watchDmChannelReqs = append ( watchDmChannelReqs , watchRequest )
2021-06-19 11:45:09 +08:00
}
break
}
}
}
2021-06-15 12:41:40 +08:00
}
2021-11-13 08:49:08 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDeltaChannels
watchDeltaChannelReq := & querypb . WatchDeltaChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : watchDeltaChannels ,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lbt . meta . setDeltaChannel ( watchDeltaChannelReq . CollectionID , watchDeltaChannelReq . Infos )
2021-11-11 12:56:42 +08:00
2021-11-13 08:49:08 +08:00
internalTasks , err := assignInternalTask ( ctx , collectionID , lbt , lbt . meta , lbt . cluster , loadSegmentReqs , watchDmChannelReqs , watchDeltaChannelReq , true , lbt . SourceNodeIDs , lbt . DstNodeIDs )
2021-09-29 09:56:04 +08:00
if err != nil {
2021-10-11 09:54:37 +08:00
log . Warn ( "loadBalanceTask: assign child task failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( err )
2021-09-29 09:56:04 +08:00
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
lbt . addChildTask ( internalTask )
log . Debug ( "loadBalanceTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Any ( "task" , internalTask ) )
}
2021-06-30 17:48:19 +08:00
log . Debug ( "loadBalanceTask: assign child task done" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-06-15 12:41:40 +08:00
}
2021-04-15 15:15:46 +08:00
}
}
2021-11-06 15:22:56 +08:00
//TODO:: use request.DstNodeIDs to balance
if lbt . triggerCondition == querypb . TriggerCondition_loadBalance {
if len ( lbt . SourceNodeIDs ) == 0 {
err := errors . New ( "loadBalanceTask: empty source Node list to balance" )
log . Error ( err . Error ( ) )
lbt . setResultInfo ( err )
return err
}
balancedSegmentInfos := make ( map [ UniqueID ] * querypb . SegmentInfo )
balancedSegmentIDs := make ( [ ] UniqueID , 0 )
for _ , nodeID := range lbt . SourceNodeIDs {
nodeExist := lbt . cluster . hasNode ( nodeID )
if ! nodeExist {
err := fmt . Errorf ( "loadBalanceTask: query node %d is not exist to balance" , nodeID )
log . Error ( err . Error ( ) )
lbt . setResultInfo ( err )
return err
}
segmentInfos := lbt . meta . getSegmentInfosByNode ( nodeID )
for _ , info := range segmentInfos {
balancedSegmentInfos [ info . SegmentID ] = info
balancedSegmentIDs = append ( balancedSegmentIDs , info . SegmentID )
}
}
// check balanced sealedSegmentIDs in request whether exist in query node
for _ , segmentID := range lbt . SealedSegmentIDs {
if _ , ok := balancedSegmentInfos [ segmentID ] ; ! ok {
err := fmt . Errorf ( "loadBalanceTask: unloaded segment %d" , segmentID )
log . Warn ( err . Error ( ) )
lbt . setResultInfo ( err )
return err
}
}
if len ( lbt . SealedSegmentIDs ) != 0 {
balancedSegmentIDs = lbt . SealedSegmentIDs
}
col2PartitionIDs := make ( map [ UniqueID ] [ ] UniqueID )
par2Segments := make ( map [ UniqueID ] [ ] * querypb . SegmentInfo )
for _ , segmentID := range balancedSegmentIDs {
info := balancedSegmentInfos [ segmentID ]
collectionID := info . CollectionID
partitionID := info . PartitionID
if _ , ok := col2PartitionIDs [ collectionID ] ; ! ok {
col2PartitionIDs [ collectionID ] = make ( [ ] UniqueID , 0 )
}
if _ , ok := par2Segments [ partitionID ] ; ! ok {
col2PartitionIDs [ collectionID ] = append ( col2PartitionIDs [ collectionID ] , partitionID )
par2Segments [ partitionID ] = make ( [ ] * querypb . SegmentInfo , 0 )
}
par2Segments [ partitionID ] = append ( par2Segments [ partitionID ] , info )
}
for collectionID , partitionIDs := range col2PartitionIDs {
segmentsToLoad := make ( [ ] UniqueID , 0 )
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
2021-11-13 08:49:08 +08:00
var watchDeltaChannels [ ] * datapb . VchannelInfo
2021-11-06 15:22:56 +08:00
collectionInfo , err := lbt . meta . getCollectionInfoByID ( collectionID )
if err != nil {
log . Error ( "loadBalanceTask: can't find collectionID in meta" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
lbt . setResultInfo ( err )
return err
}
for _ , partitionID := range partitionIDs {
getRecoveryInfoRequest := & datapb . GetRecoveryInfoRequest {
Base : lbt . Base ,
CollectionID : collectionID ,
PartitionID : partitionID ,
}
recoveryInfo , err := lbt . dataCoord . GetRecoveryInfo ( ctx , getRecoveryInfoRequest )
if err != nil {
lbt . setResultInfo ( err )
return err
}
segmentID2Binlog := make ( map [ UniqueID ] * datapb . SegmentBinlogs )
for _ , binlog := range recoveryInfo . Binlogs {
segmentID2Binlog [ binlog . SegmentID ] = binlog
}
for _ , segmentInfo := range par2Segments [ partitionID ] {
segmentID := segmentInfo . SegmentID
if _ , ok := segmentID2Binlog [ segmentID ] ; ! ok {
log . Warn ( "loadBalanceTask: can't find binlog of segment to balance, may be has been compacted" , zap . Int64 ( "segmentID" , segmentID ) )
continue
}
segmentBingLog := segmentID2Binlog [ segmentID ]
segmentLoadInfo := & querypb . SegmentLoadInfo {
SegmentID : segmentID ,
PartitionID : partitionID ,
CollectionID : collectionID ,
BinlogPaths : segmentBingLog . FieldBinlogs ,
NumOfRows : segmentBingLog . NumOfRows ,
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
}
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lbt . rootCoord , lbt . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-11-06 15:22:56 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
loadSegmentReq := & querypb . LoadSegmentsRequest {
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : collectionInfo . Schema ,
LoadCondition : querypb . TriggerCondition_grpcRequest ,
}
segmentsToLoad = append ( segmentsToLoad , segmentID )
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
}
2021-11-13 08:49:08 +08:00
if len ( watchDeltaChannels ) != len ( recoveryInfo . Channels ) {
2021-11-06 15:22:56 +08:00
for _ , info := range recoveryInfo . Channels {
2021-11-13 08:49:08 +08:00
deltaChannelName , err := rootcoord . ConvertChannelName ( info . ChannelName , Params . DmlChannelPrefix , Params . DeltaChannelPrefix )
2021-11-06 15:22:56 +08:00
if err != nil {
return err
}
2021-11-13 08:49:08 +08:00
deltaChannel := proto . Clone ( info ) . ( * datapb . VchannelInfo )
deltaChannel . ChannelName = deltaChannelName
watchDeltaChannels = append ( watchDeltaChannels , deltaChannel )
2021-11-06 15:22:56 +08:00
}
}
}
2021-11-13 08:49:08 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDeltaChannels
watchDeltaChannelReq := & querypb . WatchDeltaChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : watchDeltaChannels ,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lbt . meta . setDeltaChannel ( watchDeltaChannelReq . CollectionID , watchDeltaChannelReq . Infos )
2021-11-06 15:22:56 +08:00
// TODO:: assignInternalTask with multi collection
2021-11-13 08:49:08 +08:00
internalTasks , err := assignInternalTask ( ctx , collectionID , lbt , lbt . meta , lbt . cluster , loadSegmentReqs , nil , watchDeltaChannelReq , false , lbt . SourceNodeIDs , lbt . DstNodeIDs )
2021-11-06 15:22:56 +08:00
if err != nil {
2021-11-11 12:56:42 +08:00
log . Warn ( "loadBalanceTask: assign child task failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-11-06 15:22:56 +08:00
lbt . setResultInfo ( err )
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
lbt . addChildTask ( internalTask )
log . Debug ( "loadBalanceTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Any ( "task" , internalTask ) )
}
2021-11-06 15:22:56 +08:00
}
log . Debug ( "loadBalanceTask: assign child task done" , zap . Any ( "balance request" , lbt . LoadBalanceRequest ) )
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadBalanceTask Execute done" ,
2021-11-06 15:22:56 +08:00
zap . Int32 ( "trigger type" , int32 ( lbt . triggerCondition ) ) ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) ,
zap . Any ( "balanceReason" , lbt . BalanceReason ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lbt . getTaskID ( ) ) )
2021-06-19 11:45:09 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) postExecute ( context . Context ) error {
2021-11-06 15:22:56 +08:00
if lbt . result . ErrorCode != commonpb . ErrorCode_Success {
lbt . childTasks = [ ] task { }
}
if lbt . triggerCondition == querypb . TriggerCondition_nodeDown {
2021-10-11 09:54:37 +08:00
for _ , id := range lbt . SourceNodeIDs {
err := lbt . cluster . removeNodeInfo ( id )
if err != nil {
2021-11-06 15:22:56 +08:00
//TODO:: clear node info after removeNodeInfo failed
2021-10-18 21:34:47 +08:00
log . Error ( "loadBalanceTask: occur error when removing node info from cluster" , zap . Int64 ( "nodeID" , id ) )
2021-10-11 09:54:37 +08:00
}
2021-06-22 14:10:09 +08:00
}
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadBalanceTask postExecute done" ,
2021-11-06 15:22:56 +08:00
zap . Int32 ( "trigger type" , int32 ( lbt . triggerCondition ) ) ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) ,
zap . Any ( "balanceReason" , lbt . BalanceReason ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lbt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-06-16 11:09:56 +08:00
func mergeVChannelInfo ( info1 * datapb . VchannelInfo , info2 * datapb . VchannelInfo ) * datapb . VchannelInfo {
2021-06-15 12:41:40 +08:00
collectionID := info1 . CollectionID
channelName := info1 . ChannelName
2021-06-16 11:09:56 +08:00
var seekPosition * internalpb . MsgPosition
if info1 . SeekPosition == nil || info2 . SeekPosition == nil {
seekPosition = & internalpb . MsgPosition {
ChannelName : channelName ,
}
} else {
seekPosition = info1 . SeekPosition
if info1 . SeekPosition . Timestamp > info2 . SeekPosition . Timestamp {
seekPosition = info2 . SeekPosition
}
2021-06-15 12:41:40 +08:00
}
2021-06-16 11:09:56 +08:00
checkPoints := make ( [ ] * datapb . SegmentInfo , 0 )
checkPoints = append ( checkPoints , info1 . UnflushedSegments ... )
checkPoints = append ( checkPoints , info2 . UnflushedSegments ... )
2021-06-15 12:41:40 +08:00
2021-10-08 19:09:12 +08:00
flushedSegments := make ( [ ] * datapb . SegmentInfo , 0 )
2021-06-15 12:41:40 +08:00
flushedSegments = append ( flushedSegments , info1 . FlushedSegments ... )
flushedSegments = append ( flushedSegments , info2 . FlushedSegments ... )
2021-06-16 11:09:56 +08:00
return & datapb . VchannelInfo {
CollectionID : collectionID ,
ChannelName : channelName ,
SeekPosition : seekPosition ,
UnflushedSegments : checkPoints ,
FlushedSegments : flushedSegments ,
2021-06-15 12:41:40 +08:00
}
}
2021-10-11 09:54:37 +08:00
2021-06-30 16:18:13 +08:00
func assignInternalTask ( ctx context . Context ,
2021-11-11 12:56:42 +08:00
collectionID UniqueID , parentTask task , meta Meta , cluster Cluster ,
2021-06-26 16:08:11 +08:00
loadSegmentRequests [ ] * querypb . LoadSegmentsRequest ,
2021-10-11 09:54:37 +08:00
watchDmChannelRequests [ ] * querypb . WatchDmChannelsRequest ,
2021-11-13 08:49:08 +08:00
watchDeltaChannelRequest * querypb . WatchDeltaChannelsRequest ,
2021-11-12 18:49:10 +08:00
wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) ( [ ] task , error ) {
2021-06-30 16:18:13 +08:00
sp , _ := trace . StartSpanFromContext ( ctx )
defer sp . Finish ( )
2021-11-11 12:56:42 +08:00
internalTasks := make ( [ ] task , 0 )
2021-11-12 18:49:10 +08:00
err := cluster . allocateSegmentsToQueryNode ( ctx , loadSegmentRequests , wait , excludeNodeIDs , includeNodeIDs )
2021-10-11 09:54:37 +08:00
if err != nil {
2021-11-11 12:56:42 +08:00
log . Error ( "assignInternalTask: assign segment to node failed" , zap . Any ( "load segments requests" , loadSegmentRequests ) )
return nil , err
2021-10-11 09:54:37 +08:00
}
2021-11-11 12:56:42 +08:00
log . Debug ( "assignInternalTask: assign segment to node success" , zap . Any ( "load segments requests" , loadSegmentRequests ) )
err = cluster . allocateChannelsToQueryNode ( ctx , watchDmChannelRequests , wait , excludeNodeIDs )
2021-10-11 09:54:37 +08:00
if err != nil {
2021-11-11 12:56:42 +08:00
log . Error ( "assignInternalTask: assign dmChannel to node failed" , zap . Any ( "watch dmChannel requests" , watchDmChannelRequests ) )
return nil , err
2021-10-11 09:54:37 +08:00
}
2021-11-11 12:56:42 +08:00
log . Debug ( "assignInternalTask: assign dmChannel to node success" , zap . Any ( "watch dmChannel requests" , watchDmChannelRequests ) )
2021-06-26 16:08:11 +08:00
watchQueryChannelInfo := make ( map [ int64 ] bool )
2021-10-19 10:40:35 +08:00
node2Segments := make ( map [ int64 ] [ ] * querypb . LoadSegmentsRequest )
sizeCounts := make ( map [ int64 ] int )
2021-11-11 12:56:42 +08:00
for _ , req := range loadSegmentRequests {
nodeID := req . DstNodeID
sizeOfReq := getSizeOfLoadSegmentReq ( req )
2021-06-26 16:08:11 +08:00
if _ , ok := node2Segments [ nodeID ] ; ! ok {
2021-10-19 10:40:35 +08:00
node2Segments [ nodeID ] = make ( [ ] * querypb . LoadSegmentsRequest , 0 )
2021-11-11 12:56:42 +08:00
node2Segments [ nodeID ] = append ( node2Segments [ nodeID ] , req )
2021-10-20 22:32:36 +08:00
sizeCounts [ nodeID ] = sizeOfReq
} else {
2021-10-28 17:26:28 +08:00
if sizeCounts [ nodeID ] + sizeOfReq > MaxSendSizeToEtcd {
2021-11-11 12:56:42 +08:00
node2Segments [ nodeID ] = append ( node2Segments [ nodeID ] , req )
2021-10-20 22:32:36 +08:00
sizeCounts [ nodeID ] = sizeOfReq
} else {
lastReq := node2Segments [ nodeID ] [ len ( node2Segments [ nodeID ] ) - 1 ]
2021-11-11 12:56:42 +08:00
lastReq . Infos = append ( lastReq . Infos , req . Infos ... )
2021-10-20 22:32:36 +08:00
sizeCounts [ nodeID ] += sizeOfReq
}
2021-06-26 16:08:11 +08:00
}
2021-10-19 10:40:35 +08:00
2021-10-14 20:18:33 +08:00
if cluster . hasWatchedQueryChannel ( parentTask . traceCtx ( ) , nodeID , collectionID ) {
2021-06-26 16:08:11 +08:00
watchQueryChannelInfo [ nodeID ] = true
continue
}
watchQueryChannelInfo [ nodeID ] = false
}
2021-10-19 10:40:35 +08:00
for nodeID , loadSegmentsReqs := range node2Segments {
for _ , req := range loadSegmentsReqs {
ctx = opentracing . ContextWithSpan ( context . Background ( ) , sp )
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
loadSegmentTask := & loadSegmentTask {
baseTask : baseTask ,
LoadSegmentsRequest : req ,
meta : meta ,
cluster : cluster ,
2021-11-11 12:56:42 +08:00
excludeNodeIDs : excludeNodeIDs ,
2021-10-19 10:40:35 +08:00
}
2021-11-11 12:56:42 +08:00
internalTasks = append ( internalTasks , loadSegmentTask )
2021-06-26 16:08:11 +08:00
}
2021-11-05 14:47:19 +08:00
2021-11-13 08:49:08 +08:00
if watchDeltaChannelRequest != nil {
2021-11-05 14:47:19 +08:00
ctx = opentracing . ContextWithSpan ( context . Background ( ) , sp )
2021-11-13 08:49:08 +08:00
watchDeltaRequest := proto . Clone ( watchDeltaChannelRequest ) . ( * querypb . WatchDeltaChannelsRequest )
2021-11-05 14:47:19 +08:00
watchDeltaRequest . NodeID = nodeID
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
watchDeltaTask := & watchDeltaChannelTask {
baseTask : baseTask ,
WatchDeltaChannelsRequest : watchDeltaRequest ,
meta : meta ,
cluster : cluster ,
excludeNodeIDs : [ ] int64 { } ,
}
2021-11-11 12:56:42 +08:00
internalTasks = append ( internalTasks , watchDeltaTask )
2021-11-05 14:47:19 +08:00
}
2021-06-26 16:08:11 +08:00
}
2021-11-11 12:56:42 +08:00
for _ , req := range watchDmChannelRequests {
nodeID := req . NodeID
2021-06-30 16:18:13 +08:00
ctx = opentracing . ContextWithSpan ( context . Background ( ) , sp )
2021-10-14 20:18:33 +08:00
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
2021-10-18 21:34:47 +08:00
watchDmChannelTask := & watchDmChannelTask {
baseTask : baseTask ,
2021-11-11 12:56:42 +08:00
WatchDmChannelsRequest : req ,
2021-06-26 16:08:11 +08:00
meta : meta ,
cluster : cluster ,
2021-11-11 12:56:42 +08:00
excludeNodeIDs : excludeNodeIDs ,
2021-06-26 16:08:11 +08:00
}
2021-11-11 12:56:42 +08:00
internalTasks = append ( internalTasks , watchDmChannelTask )
if cluster . hasWatchedQueryChannel ( parentTask . traceCtx ( ) , nodeID , collectionID ) {
watchQueryChannelInfo [ nodeID ] = true
continue
}
watchQueryChannelInfo [ nodeID ] = false
2021-06-26 16:08:11 +08:00
}
for nodeID , watched := range watchQueryChannelInfo {
if ! watched {
2021-06-30 16:18:13 +08:00
ctx = opentracing . ContextWithSpan ( context . Background ( ) , sp )
2021-10-22 19:07:15 +08:00
queryChannelInfo , err := meta . getQueryChannelInfoByID ( collectionID )
2021-09-29 09:56:04 +08:00
if err != nil {
2021-11-11 12:56:42 +08:00
return nil , err
2021-09-29 09:56:04 +08:00
}
2021-06-26 16:08:11 +08:00
2021-10-14 20:18:33 +08:00
msgBase := proto . Clone ( parentTask . msgBase ( ) ) . ( * commonpb . MsgBase )
2021-07-13 14:16:00 +08:00
msgBase . MsgType = commonpb . MsgType_WatchQueryChannels
2021-06-26 16:08:11 +08:00
addQueryChannelRequest := & querypb . AddQueryChannelRequest {
2021-10-22 19:07:15 +08:00
Base : msgBase ,
NodeID : nodeID ,
CollectionID : collectionID ,
RequestChannelID : queryChannelInfo . QueryChannelID ,
ResultChannelID : queryChannelInfo . QueryResultChannelID ,
GlobalSealedSegments : queryChannelInfo . GlobalSealedSegments ,
SeekPosition : queryChannelInfo . SeekPosition ,
2021-06-26 16:08:11 +08:00
}
2021-10-14 20:18:33 +08:00
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
2021-10-18 21:34:47 +08:00
watchQueryChannelTask := & watchQueryChannelTask {
baseTask : baseTask ,
2021-06-26 16:08:11 +08:00
AddQueryChannelRequest : addQueryChannelRequest ,
cluster : cluster ,
}
2021-11-11 12:56:42 +08:00
internalTasks = append ( internalTasks , watchQueryChannelTask )
2021-06-26 16:08:11 +08:00
}
}
2021-11-11 12:56:42 +08:00
return internalTasks , nil
2021-06-26 16:08:11 +08:00
}
2021-10-19 10:40:35 +08:00
func getSizeOfLoadSegmentReq ( req * querypb . LoadSegmentsRequest ) int {
2021-10-20 19:27:27 +08:00
return proto . Size ( req )
2021-10-19 10:40:35 +08:00
}