2021-11-17 19:49:32 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:47:10 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-17 19:49:32 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:47:10 +08:00
//
2021-11-17 19:49:32 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:47:10 +08:00
2021-06-22 16:44:09 +08:00
package querycoord
2021-04-15 15:15:46 +08:00
import (
"context"
2021-06-23 17:44:12 +08:00
"errors"
2021-04-15 15:15:46 +08:00
"fmt"
2021-10-11 09:54:37 +08:00
"sync"
2021-12-03 15:15:32 +08:00
"time"
2021-04-15 15:15:46 +08:00
2021-06-19 11:45:09 +08:00
"github.com/golang/protobuf/proto"
2021-04-15 15:15:46 +08:00
"go.uber.org/zap"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-07-02 10:40:13 +08:00
"github.com/milvus-io/milvus/internal/proto/proxypb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/querypb"
2021-11-05 14:47:19 +08:00
"github.com/milvus-io/milvus/internal/rootcoord"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/types"
2021-04-15 15:15:46 +08:00
)
2021-12-03 15:15:32 +08:00
const timeoutForRPC = 10 * time . Second
2021-06-19 11:45:09 +08:00
const (
2021-06-22 16:44:09 +08:00
triggerTaskPrefix = "queryCoord-triggerTask"
activeTaskPrefix = "queryCoord-activeTask"
taskInfoPrefix = "queryCoord-taskInfo"
loadBalanceInfoPrefix = "queryCoord-loadBalanceInfo"
2021-06-19 11:45:09 +08:00
)
2021-10-11 09:54:37 +08:00
const (
2021-10-15 20:25:08 +08:00
// MaxRetryNum is the maximum number of times that each task can be retried
2021-10-11 09:54:37 +08:00
MaxRetryNum = 5
2021-10-21 10:53:09 +08:00
// MaxSendSizeToEtcd is the default limit size of etcd messages that can be sent and received
2021-10-27 19:32:21 +08:00
// MaxSendSizeToEtcd = 2097152
// Limit size of every loadSegmentReq to 200k
MaxSendSizeToEtcd = 200000
2021-10-11 09:54:37 +08:00
)
2021-06-19 11:45:09 +08:00
type taskState int
const (
taskUndo taskState = 0
taskDoing taskState = 1
taskDone taskState = 3
taskExpired taskState = 4
2021-10-11 09:54:37 +08:00
taskFailed taskState = 5
2021-06-19 11:45:09 +08:00
)
2021-04-15 15:15:46 +08:00
type task interface {
2021-10-14 20:18:33 +08:00
traceCtx ( ) context . Context
getTaskID ( ) UniqueID // return ReqId
setTaskID ( id UniqueID )
msgBase ( ) * commonpb . MsgBase
msgType ( ) commonpb . MsgType
timestamp ( ) Timestamp
getTriggerCondition ( ) querypb . TriggerCondition
preExecute ( ctx context . Context ) error
execute ( ctx context . Context ) error
postExecute ( ctx context . Context ) error
reschedule ( ctx context . Context ) ( [ ] task , error )
rollBack ( ctx context . Context ) [ ] task
waitToFinish ( ) error
notify ( err error )
taskPriority ( ) querypb . TriggerCondition
setParentTask ( t task )
getParentTask ( ) task
getChildTask ( ) [ ] task
addChildTask ( t task )
removeChildTaskByID ( taskID UniqueID )
isValid ( ) bool
marshal ( ) ( [ ] byte , error )
getState ( ) taskState
setState ( state taskState )
isRetryable ( ) bool
setResultInfo ( err error )
getResultInfo ( ) * commonpb . Status
updateTaskProcess ( )
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
type baseTask struct {
2021-10-21 10:51:14 +08:00
condition
2021-10-11 09:54:37 +08:00
ctx context . Context
cancel context . CancelFunc
result * commonpb . Status
resultMu sync . RWMutex
state taskState
stateMu sync . RWMutex
retryCount int
//sync.RWMutex
2021-06-15 12:41:40 +08:00
taskID UniqueID
triggerCondition querypb . TriggerCondition
parentTask task
childTasks [ ] task
2021-10-11 09:54:37 +08:00
childTasksMu sync . RWMutex
}
2021-10-18 21:34:47 +08:00
func newBaseTask ( ctx context . Context , triggerType querypb . TriggerCondition ) * baseTask {
2021-10-11 09:54:37 +08:00
childCtx , cancel := context . WithCancel ( ctx )
2021-10-21 10:51:14 +08:00
condition := newTaskCondition ( childCtx )
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
baseTask := & baseTask {
2021-10-11 09:54:37 +08:00
ctx : childCtx ,
cancel : cancel ,
2021-10-21 10:51:14 +08:00
condition : condition ,
2021-10-11 09:54:37 +08:00
state : taskUndo ,
retryCount : MaxRetryNum ,
triggerCondition : triggerType ,
childTasks : [ ] task { } ,
}
return baseTask
2021-06-15 12:41:40 +08:00
}
2021-10-14 20:18:33 +08:00
// getTaskID function returns the unique taskID of the trigger task
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getTaskID ( ) UniqueID {
2021-06-15 12:41:40 +08:00
return bt . taskID
}
2021-10-14 20:18:33 +08:00
// setTaskID function sets the trigger task with a unique id, which is allocated by tso
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setTaskID ( id UniqueID ) {
2021-06-15 12:41:40 +08:00
bt . taskID = id
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) traceCtx ( ) context . Context {
2021-04-15 15:15:46 +08:00
return bt . ctx
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getTriggerCondition ( ) querypb . TriggerCondition {
2021-10-11 09:54:37 +08:00
return bt . triggerCondition
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) taskPriority ( ) querypb . TriggerCondition {
2021-06-15 12:41:40 +08:00
return bt . triggerCondition
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setParentTask ( t task ) {
2021-10-11 09:54:37 +08:00
bt . parentTask = t
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getParentTask ( ) task {
2021-06-15 12:41:40 +08:00
return bt . parentTask
}
2021-10-13 21:26:33 +08:00
// GetChildTask function returns all the child tasks of the trigger task
// Child task may be loadSegmentTask, watchDmChannelTask or watchQueryChannelTask
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getChildTask ( ) [ ] task {
2021-10-11 09:54:37 +08:00
bt . childTasksMu . RLock ( )
defer bt . childTasksMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
return bt . childTasks
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) addChildTask ( t task ) {
2021-10-11 09:54:37 +08:00
bt . childTasksMu . Lock ( )
defer bt . childTasksMu . Unlock ( )
2021-06-15 12:41:40 +08:00
bt . childTasks = append ( bt . childTasks , t )
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) removeChildTaskByID ( taskID UniqueID ) {
2021-10-11 09:54:37 +08:00
bt . childTasksMu . Lock ( )
defer bt . childTasksMu . Unlock ( )
result := make ( [ ] task , 0 )
for _ , t := range bt . childTasks {
2021-10-14 20:18:33 +08:00
if t . getTaskID ( ) != taskID {
2021-10-11 09:54:37 +08:00
result = append ( result , t )
}
}
bt . childTasks = result
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) isValid ( ) bool {
2021-06-19 11:45:09 +08:00
return true
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) reschedule ( ctx context . Context ) ( [ ] task , error ) {
2021-06-19 11:45:09 +08:00
return nil , nil
}
2021-10-12 23:40:36 +08:00
// State returns the state of task, such as taskUndo, taskDoing, taskDone, taskExpired, taskFailed
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getState ( ) taskState {
2021-10-11 09:54:37 +08:00
bt . stateMu . RLock ( )
defer bt . stateMu . RUnlock ( )
2021-06-19 11:45:09 +08:00
return bt . state
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setState ( state taskState ) {
2021-10-11 09:54:37 +08:00
bt . stateMu . Lock ( )
defer bt . stateMu . Unlock ( )
2021-06-19 11:45:09 +08:00
bt . state = state
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) isRetryable ( ) bool {
2021-10-11 09:54:37 +08:00
return bt . retryCount > 0
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) setResultInfo ( err error ) {
2021-10-11 09:54:37 +08:00
bt . resultMu . Lock ( )
defer bt . resultMu . Unlock ( )
if bt . result == nil {
bt . result = & commonpb . Status { }
}
if err == nil {
bt . result . ErrorCode = commonpb . ErrorCode_Success
bt . result . Reason = ""
return
}
bt . result . ErrorCode = commonpb . ErrorCode_UnexpectedError
bt . result . Reason = bt . result . Reason + ", " + err . Error ( )
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) getResultInfo ( ) * commonpb . Status {
2021-10-11 09:54:37 +08:00
bt . resultMu . RLock ( )
defer bt . resultMu . RUnlock ( )
return proto . Clone ( bt . result ) . ( * commonpb . Status )
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) updateTaskProcess ( ) {
2021-10-11 09:54:37 +08:00
// TODO::
}
2021-10-18 21:34:47 +08:00
func ( bt * baseTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
//TODO::
return nil
}
2021-10-18 21:34:47 +08:00
type loadCollectionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . LoadCollectionRequest
2021-11-17 09:47:12 +08:00
rootCoord types . RootCoord
dataCoord types . DataCoord
indexCoord types . IndexCoord
cluster Cluster
meta Meta
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lct . Base
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lct . LoadCollectionRequest )
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return lct . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return lct . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) updateTaskProcess ( ) {
2021-10-11 09:54:37 +08:00
collectionID := lct . CollectionID
2021-10-14 20:18:33 +08:00
childTasks := lct . getChildTask ( )
2021-10-11 09:54:37 +08:00
allDone := true
for _ , t := range childTasks {
2021-10-14 20:18:33 +08:00
if t . getState ( ) != taskDone {
2021-10-11 09:54:37 +08:00
allDone = false
}
}
if allDone {
err := lct . meta . setLoadPercentage ( collectionID , 0 , 100 , querypb . LoadType_loadCollection )
if err != nil {
log . Error ( "loadCollectionTask: set load percentage to meta's collectionInfo" , zap . Int64 ( "collectionID" , collectionID ) )
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-10-11 09:54:37 +08:00
}
}
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) preExecute ( ctx context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := lct . CollectionID
schema := lct . Schema
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do loadCollectionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lct . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Stringer ( "schema" , schema ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lct . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := lct . CollectionID
showPartitionRequest := & milvuspb . ShowPartitionsRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ShowPartitions ,
} ,
CollectionID : collectionID ,
}
2021-12-03 15:15:32 +08:00
ctx2 , cancel2 := context . WithTimeout ( ctx , timeoutForRPC )
defer cancel2 ( )
showPartitionResponse , err := lct . rootCoord . ShowPartitions ( ctx2 , showPartitionRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadCollectionTask: showPartition failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
2021-12-13 10:29:27 +08:00
lct . setResultInfo ( err )
return err
}
if showPartitionResponse . Status . ErrorCode != commonpb . ErrorCode_Success {
err = errors . New ( showPartitionResponse . Status . Reason )
2021-12-21 11:57:39 +08:00
log . Error ( "loadCollectionTask: showPartition failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-24 21:10:13 +08:00
2021-12-21 11:57:39 +08:00
toLoadPartitionIDs := showPartitionResponse . PartitionIDs
log . Debug ( "loadCollectionTask: get collection's all partitionIDs" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , toLoadPartitionIDs ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) )
2021-06-23 17:44:12 +08:00
2021-06-26 16:08:11 +08:00
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
watchDmChannelReqs := make ( [ ] * querypb . WatchDmChannelsRequest , 0 )
2021-12-21 11:57:39 +08:00
var deltaChannelInfos [ ] * datapb . VchannelInfo
var dmChannelInfos [ ] * datapb . VchannelInfo
2021-06-23 17:44:12 +08:00
for _ , partitionID := range toLoadPartitionIDs {
2021-12-21 11:57:39 +08:00
vChannelInfos , binlogs , err := getRecoveryInfo ( lct . ctx , lct . dataCoord , collectionID , partitionID )
2021-06-15 12:41:40 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadCollectionTask: getRecoveryInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-06-15 12:41:40 +08:00
return err
}
2021-04-15 15:15:46 +08:00
2021-12-13 10:29:27 +08:00
for _ , segmentBingLog := range binlogs {
2021-06-15 12:41:40 +08:00
segmentID := segmentBingLog . SegmentID
segmentLoadInfo := & querypb . SegmentLoadInfo {
2021-06-26 16:08:11 +08:00
SegmentID : segmentID ,
2021-06-15 12:41:40 +08:00
PartitionID : partitionID ,
CollectionID : collectionID ,
2021-06-26 16:08:11 +08:00
BinlogPaths : segmentBingLog . FieldBinlogs ,
2021-09-07 11:35:18 +08:00
NumOfRows : segmentBingLog . NumOfRows ,
2021-10-22 14:31:13 +08:00
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
2021-04-15 15:15:46 +08:00
}
2021-06-26 16:08:11 +08:00
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lct . rootCoord , lct . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lct . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
2021-06-26 16:08:11 +08:00
loadSegmentReq := & querypb . LoadSegmentsRequest {
2021-12-15 16:53:12 +08:00
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : lct . Schema ,
CollectionID : collectionID ,
2021-06-26 16:08:11 +08:00
}
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
2021-04-15 15:15:46 +08:00
}
2021-06-15 12:41:40 +08:00
2021-12-21 11:57:39 +08:00
for _ , info := range vChannelInfos {
deltaChannelInfo , err := generateWatchDeltaChannelInfo ( info )
2021-11-26 22:47:17 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadCollectionTask: generateWatchDeltaChannelInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . String ( "channelName" , info . ChannelName ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
lct . setResultInfo ( err )
2021-11-26 22:47:17 +08:00
return err
2021-11-05 14:47:19 +08:00
}
2021-12-21 11:57:39 +08:00
deltaChannelInfos = append ( deltaChannelInfos , deltaChannelInfo )
dmChannelInfos = append ( dmChannelInfos , info )
2021-04-15 15:15:46 +08:00
}
2021-11-13 08:49:08 +08:00
}
2021-12-21 11:57:39 +08:00
mergedDeltaChannels := mergeWatchDeltaChannelInfo ( deltaChannelInfos )
2021-11-13 08:49:08 +08:00
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
2021-12-21 11:57:39 +08:00
err = lct . meta . setDeltaChannel ( collectionID , mergedDeltaChannels )
2021-12-01 16:39:31 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadCollectionTask: set delta channel info failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
lct . setResultInfo ( err )
2021-12-01 16:39:31 +08:00
return err
}
2021-04-15 15:15:46 +08:00
2021-12-21 11:57:39 +08:00
//TODO:: queryNode receive dm message according partitionID cache
//TODO:: queryNode add partitionID to cache if receive create partition message from dmChannel
mergedDmChannel := mergeDmChannelInfo ( dmChannelInfos )
for _ , info := range mergedDmChannel {
msgBase := proto . Clone ( lct . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
watchRequest := & querypb . WatchDmChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
//PartitionIDs: toLoadPartitionIDs,
Infos : [ ] * datapb . VchannelInfo { info } ,
Schema : lct . Schema ,
}
watchDmChannelReqs = append ( watchDmChannelReqs , watchRequest )
}
internalTasks , err := assignInternalTask ( ctx , lct , lct . meta , lct . cluster , loadSegmentReqs , watchDmChannelReqs , false , nil , nil )
2021-09-29 09:56:04 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadCollectionTask: assign child task failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
lct . setResultInfo ( err )
2021-09-29 09:56:04 +08:00
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
lct . addChildTask ( internalTask )
2021-12-21 11:57:39 +08:00
log . Debug ( "loadCollectionTask: add a childTask" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) )
}
log . Debug ( "loadCollectionTask: assign child task done" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) )
err = lct . meta . addCollection ( collectionID , querypb . LoadType_loadCollection , lct . Schema )
if err != nil {
log . Error ( "loadCollectionTask: add collection to meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
lct . setResultInfo ( err )
return err
}
err = lct . meta . addPartitions ( collectionID , toLoadPartitionIDs )
if err != nil {
log . Error ( "loadCollectionTask: add partitions to meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , toLoadPartitionIDs ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
lct . setResultInfo ( err )
return err
2021-11-11 12:56:42 +08:00
}
2021-06-15 12:41:40 +08:00
2021-04-15 15:15:46 +08:00
log . Debug ( "LoadCollection execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lct . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-04-15 15:15:46 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) postExecute ( ctx context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := lct . CollectionID
2021-06-23 17:44:12 +08:00
if lct . result . ErrorCode != commonpb . ErrorCode_Success {
2021-10-11 09:54:37 +08:00
lct . childTasks = [ ] task { }
err := lct . meta . releaseCollection ( collectionID )
2021-06-30 17:48:19 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadCollectionTask: occur error when release collection info from meta" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
panic ( err )
2021-06-23 17:44:12 +08:00
}
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadCollectionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lct . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lct * loadCollectionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-12-21 11:57:39 +08:00
onlineNodeIDs := lct . cluster . onlineNodeIDs ( )
2021-10-11 09:54:37 +08:00
resultTasks := make ( [ ] task , 0 )
2021-12-21 11:57:39 +08:00
for _ , nodeID := range onlineNodeIDs {
2021-10-11 09:54:37 +08:00
//brute force rollBack, should optimize
2021-12-21 11:57:39 +08:00
msgBase := proto . Clone ( lct . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_ReleaseCollection
2021-10-11 09:54:37 +08:00
req := & querypb . ReleaseCollectionRequest {
2021-12-21 11:57:39 +08:00
Base : msgBase ,
2021-10-11 09:54:37 +08:00
DbID : lct . DbID ,
CollectionID : lct . CollectionID ,
NodeID : nodeID ,
}
2021-12-15 16:53:12 +08:00
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_GrpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( lct )
2021-10-18 21:34:47 +08:00
releaseCollectionTask := & releaseCollectionTask {
baseTask : baseTask ,
2021-10-11 09:54:37 +08:00
ReleaseCollectionRequest : req ,
cluster : lct . cluster ,
}
resultTasks = append ( resultTasks , releaseCollectionTask )
}
2021-12-21 11:57:39 +08:00
err := lct . meta . releaseCollection ( lct . CollectionID )
if err != nil {
log . Error ( "releaseCollectionTask: release collectionInfo from meta failed" , zap . Int64 ( "collectionID" , lct . CollectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) , zap . Error ( err ) )
panic ( err )
}
log . Debug ( "loadCollectionTask: generate rollBack task for loadCollectionTask" , zap . Int64 ( "collectionID" , lct . CollectionID ) , zap . Int64 ( "msgID" , lct . Base . MsgID ) )
2021-10-11 09:54:37 +08:00
return resultTasks
}
2021-10-18 21:34:47 +08:00
// releaseCollectionTask will release all the data of this collection on query nodes
type releaseCollectionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . ReleaseCollectionRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-08-02 22:39:25 +08:00
meta Meta
2021-07-02 10:40:13 +08:00
rootCoord types . RootCoord
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return rct . Base
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( rct . ReleaseCollectionRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return rct . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return rct . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) preExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rct . CollectionID
2021-10-14 20:18:33 +08:00
rct . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do releaseCollectionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rct . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
rct . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := rct . CollectionID
2021-10-11 09:54:37 +08:00
2021-09-18 18:45:51 +08:00
// if nodeID ==0, it means that the release request has not been assigned to the specified query node
2021-06-19 11:45:09 +08:00
if rct . NodeID <= 0 {
2021-07-02 10:40:13 +08:00
releaseDQLMessageStreamReq := & proxypb . ReleaseDQLMessageStreamRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_RemoveQueryChannels ,
MsgID : rct . Base . MsgID ,
Timestamp : rct . Base . Timestamp ,
SourceID : rct . Base . SourceID ,
} ,
DbID : rct . DbID ,
CollectionID : rct . CollectionID ,
}
2021-12-03 15:15:32 +08:00
ctx2 , cancel2 := context . WithTimeout ( rct . ctx , timeoutForRPC )
defer cancel2 ( )
res , err := rct . rootCoord . ReleaseDQLMessageStream ( ctx2 , releaseDQLMessageStreamReq )
2021-12-13 10:29:27 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "releaseCollectionTask: release collection end, releaseDQLMessageStream occur error" , zap . Int64 ( "collectionID" , rct . CollectionID ) , zap . Int64 ( "msgID" , rct . Base . MsgID ) , zap . Error ( err ) )
2021-12-13 10:29:27 +08:00
rct . setResultInfo ( err )
return err
}
if res . ErrorCode != commonpb . ErrorCode_Success {
err = errors . New ( res . Reason )
2021-12-21 11:57:39 +08:00
log . Error ( "releaseCollectionTask: release collection end, releaseDQLMessageStream occur error" , zap . Int64 ( "collectionID" , rct . CollectionID ) , zap . Int64 ( "msgID" , rct . Base . MsgID ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
rct . setResultInfo ( err )
2021-07-02 10:40:13 +08:00
return err
}
2021-12-21 11:57:39 +08:00
onlineNodeIDs := rct . cluster . onlineNodeIDs ( )
for _ , nodeID := range onlineNodeIDs {
2021-06-19 11:45:09 +08:00
req := proto . Clone ( rct . ReleaseCollectionRequest ) . ( * querypb . ReleaseCollectionRequest )
req . NodeID = nodeID
2021-12-15 16:53:12 +08:00
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_GrpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( rct )
2021-10-18 21:34:47 +08:00
releaseCollectionTask := & releaseCollectionTask {
baseTask : baseTask ,
2021-06-19 11:45:09 +08:00
ReleaseCollectionRequest : req ,
2021-06-15 12:41:40 +08:00
cluster : rct . cluster ,
}
2021-10-11 09:54:37 +08:00
2021-10-14 20:18:33 +08:00
rct . addChildTask ( releaseCollectionTask )
2021-10-18 21:34:47 +08:00
log . Debug ( "releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask" , zap . Any ( "task" , releaseCollectionTask ) )
2021-06-15 12:41:40 +08:00
}
2021-12-21 11:57:39 +08:00
//TODO::xige-16 delete collection info from should wait all internal release task execute done
// if some query nodes release collection failed, the collection data will can't be removed from query node
err = rct . meta . releaseCollection ( collectionID )
if err != nil {
log . Error ( "releaseCollectionTask: release collectionInfo from meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , rct . Base . MsgID ) , zap . Error ( err ) )
rct . setResultInfo ( err )
return err
}
2021-06-15 12:41:40 +08:00
} else {
2021-08-02 22:39:25 +08:00
err := rct . cluster . releaseCollection ( ctx , rct . NodeID , rct . ReleaseCollectionRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "releaseCollectionTask: release collection end, node occur error" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "nodeID" , rct . NodeID ) )
2021-10-14 20:18:33 +08:00
rct . setResultInfo ( err )
2021-06-23 17:44:12 +08:00
return err
}
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
log . Debug ( "releaseCollectionTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rct . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rct . NodeID ) )
2021-04-15 15:15:46 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) postExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rct . CollectionID
2021-10-11 09:54:37 +08:00
if rct . result . ErrorCode != commonpb . ErrorCode_Success {
rct . childTasks = [ ] task { }
}
2021-04-15 15:15:46 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "releaseCollectionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rct . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rct . NodeID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rct * releaseCollectionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
//TODO::
//if taskID == 0, recovery meta
//if taskID != 0, recovery collection on queryNode
return nil
}
2021-10-18 21:34:47 +08:00
// loadPartitionTask will load all the data of this partition to query nodes
type loadPartitionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . LoadPartitionsRequest
2021-11-17 09:47:12 +08:00
rootCoord types . RootCoord
dataCoord types . DataCoord
indexCoord types . IndexCoord
cluster Cluster
meta Meta
addCol bool
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lpt . Base
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lpt . LoadPartitionsRequest )
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return lpt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return lpt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) updateTaskProcess ( ) {
2021-04-15 15:15:46 +08:00
collectionID := lpt . CollectionID
2021-10-11 09:54:37 +08:00
partitionIDs := lpt . PartitionIDs
2021-10-14 20:18:33 +08:00
childTasks := lpt . getChildTask ( )
2021-10-11 09:54:37 +08:00
allDone := true
for _ , t := range childTasks {
2021-10-14 20:18:33 +08:00
if t . getState ( ) != taskDone {
2021-10-11 09:54:37 +08:00
allDone = false
}
}
if allDone {
for _ , id := range partitionIDs {
err := lpt . meta . setLoadPercentage ( collectionID , id , 100 , querypb . LoadType_LoadPartition )
if err != nil {
log . Error ( "loadPartitionTask: set load percentage to meta's collectionInfo" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , id ) )
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( err )
2021-10-11 09:54:37 +08:00
}
}
2021-06-23 17:44:12 +08:00
}
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) preExecute ( context . Context ) error {
2021-10-11 09:54:37 +08:00
collectionID := lpt . CollectionID
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do loadPartitionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lpt . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lpt . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := lpt . CollectionID
partitionIDs := lpt . PartitionIDs
2021-12-21 11:57:39 +08:00
var loadSegmentReqs [ ] * querypb . LoadSegmentsRequest
var watchDmChannelReqs [ ] * querypb . WatchDmChannelsRequest
var deltaChannelInfos [ ] * datapb . VchannelInfo
var dmChannelInfos [ ] * datapb . VchannelInfo
2021-04-15 15:15:46 +08:00
for _ , partitionID := range partitionIDs {
2021-12-21 11:57:39 +08:00
vChannelInfos , binlogs , err := getRecoveryInfo ( lpt . ctx , lpt . dataCoord , collectionID , partitionID )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadPartitionTask: getRecoveryInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
2021-12-13 10:29:27 +08:00
for _ , segmentBingLog := range binlogs {
2021-06-15 12:41:40 +08:00
segmentID := segmentBingLog . SegmentID
segmentLoadInfo := & querypb . SegmentLoadInfo {
SegmentID : segmentID ,
2021-04-15 15:15:46 +08:00
PartitionID : partitionID ,
2021-06-15 12:41:40 +08:00
CollectionID : collectionID ,
2021-06-26 16:08:11 +08:00
BinlogPaths : segmentBingLog . FieldBinlogs ,
2021-09-07 11:35:18 +08:00
NumOfRows : segmentBingLog . NumOfRows ,
2021-10-22 14:31:13 +08:00
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
2021-06-26 16:08:11 +08:00
}
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lpt . rootCoord , lpt . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lpt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
2021-06-26 16:08:11 +08:00
loadSegmentReq := & querypb . LoadSegmentsRequest {
2021-12-15 16:53:12 +08:00
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : lpt . Schema ,
CollectionID : collectionID ,
2021-04-15 15:15:46 +08:00
}
2021-06-26 16:08:11 +08:00
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
2021-06-15 12:41:40 +08:00
}
2021-12-21 11:57:39 +08:00
for _ , info := range vChannelInfos {
deltaChannelInfo , err := generateWatchDeltaChannelInfo ( info )
2021-11-25 18:49:16 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadPartitionTask: generateWatchDeltaChannelInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . String ( "channelName" , info . ChannelName ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
lpt . setResultInfo ( err )
2021-11-25 18:49:16 +08:00
return err
2021-11-05 14:47:19 +08:00
}
2021-12-21 11:57:39 +08:00
deltaChannelInfos = append ( deltaChannelInfos , deltaChannelInfo )
dmChannelInfos = append ( dmChannelInfos , info )
2021-11-05 14:47:19 +08:00
}
2021-11-13 08:49:08 +08:00
}
2021-12-21 11:57:39 +08:00
mergedDeltaChannels := mergeWatchDeltaChannelInfo ( deltaChannelInfos )
2021-11-13 08:49:08 +08:00
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
2021-12-21 11:57:39 +08:00
err := lpt . meta . setDeltaChannel ( collectionID , mergedDeltaChannels )
2021-12-01 16:39:31 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadPartitionTask: set delta channel info failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
lpt . setResultInfo ( err )
2021-12-01 16:39:31 +08:00
return err
}
2021-12-21 11:57:39 +08:00
mergedDmChannel := mergeDmChannelInfo ( dmChannelInfos )
for _ , info := range mergedDmChannel {
msgBase := proto . Clone ( lpt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
watchRequest := & querypb . WatchDmChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
PartitionIDs : partitionIDs ,
Infos : [ ] * datapb . VchannelInfo { info } ,
Schema : lpt . Schema ,
}
watchDmChannelReqs = append ( watchDmChannelReqs , watchRequest )
}
internalTasks , err := assignInternalTask ( ctx , lpt , lpt . meta , lpt . cluster , loadSegmentReqs , watchDmChannelReqs , false , nil , nil )
2021-09-29 09:56:04 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadPartitionTask: assign child task failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
lpt . setResultInfo ( err )
2021-09-29 09:56:04 +08:00
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
lpt . addChildTask ( internalTask )
2021-12-21 11:57:39 +08:00
log . Debug ( "loadPartitionTask: add a childTask" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) )
}
log . Debug ( "loadPartitionTask: assign child task done" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) )
err = lpt . meta . addCollection ( collectionID , querypb . LoadType_LoadPartition , lpt . Schema )
if err != nil {
log . Error ( "loadPartitionTask: add collection to meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
lpt . setResultInfo ( err )
return err
}
err = lpt . meta . addPartitions ( collectionID , partitionIDs )
if err != nil {
log . Error ( "loadPartitionTask: add partition to meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
lpt . setResultInfo ( err )
return err
2021-11-11 12:56:42 +08:00
}
2021-04-15 15:15:46 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadPartitionTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lpt . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-12-21 11:57:39 +08:00
zap . Int64s ( "partitionIDs" , partitionIDs ) ,
zap . Int64 ( "msgID" , lpt . Base . MsgID ) )
2021-04-15 15:15:46 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) postExecute ( ctx context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := lpt . CollectionID
partitionIDs := lpt . PartitionIDs
2021-06-23 17:44:12 +08:00
if lpt . result . ErrorCode != commonpb . ErrorCode_Success {
2021-10-11 09:54:37 +08:00
lpt . childTasks = [ ] task { }
2021-12-21 11:57:39 +08:00
err := lpt . meta . releaseCollection ( collectionID )
if err != nil {
log . Error ( "loadPartitionTask: occur error when release collection info from meta" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
panic ( err )
2021-06-23 17:44:12 +08:00
}
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadPartitionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , lpt . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lpt * loadPartitionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-12-21 11:57:39 +08:00
collectionID := lpt . CollectionID
2021-10-11 09:54:37 +08:00
resultTasks := make ( [ ] task , 0 )
//brute force rollBack, should optimize
2021-12-21 11:57:39 +08:00
onlineNodeIDs := lpt . cluster . onlineNodeIDs ( )
for _ , nodeID := range onlineNodeIDs {
req := & querypb . ReleaseCollectionRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ReleaseCollection ,
MsgID : lpt . Base . MsgID ,
Timestamp : lpt . Base . Timestamp ,
SourceID : lpt . Base . SourceID ,
} ,
DbID : lpt . DbID ,
CollectionID : collectionID ,
NodeID : nodeID ,
2021-10-11 09:54:37 +08:00
}
2021-12-21 11:57:39 +08:00
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_GrpcRequest )
baseTask . setParentTask ( lpt )
releaseCollectionTask := & releaseCollectionTask {
baseTask : baseTask ,
ReleaseCollectionRequest : req ,
cluster : lpt . cluster ,
2021-10-11 09:54:37 +08:00
}
2021-12-21 11:57:39 +08:00
resultTasks = append ( resultTasks , releaseCollectionTask )
2021-10-11 09:54:37 +08:00
}
2021-12-21 11:57:39 +08:00
err := lpt . meta . releaseCollection ( collectionID )
if err != nil {
log . Error ( "loadPartitionTask: release collection info from meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) , zap . Error ( err ) )
panic ( err )
}
log . Debug ( "loadPartitionTask: generate rollBack task for loadPartitionTask" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , lpt . Base . MsgID ) )
2021-10-11 09:54:37 +08:00
return resultTasks
}
2021-10-18 21:34:47 +08:00
// releasePartitionTask will release all the data of this partition on query nodes
type releasePartitionTask struct {
* baseTask
2021-04-15 15:15:46 +08:00
* querypb . ReleasePartitionsRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-12-21 11:57:39 +08:00
meta Meta
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return rpt . Base
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( rpt . ReleasePartitionsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) msgType ( ) commonpb . MsgType {
2021-04-15 15:15:46 +08:00
return rpt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) timestamp ( ) Timestamp {
2021-04-15 15:15:46 +08:00
return rpt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) preExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rpt . CollectionID
2021-12-21 11:57:39 +08:00
partitionIDs := rpt . PartitionIDs
2021-10-14 20:18:33 +08:00
rpt . setResultInfo ( nil )
2021-04-15 15:15:46 +08:00
log . Debug ( "start do releasePartitionTask" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rpt . getTaskID ( ) ) ,
2021-12-21 11:57:39 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
rpt . retryCount --
} ( )
2021-04-15 15:15:46 +08:00
collectionID := rpt . CollectionID
partitionIDs := rpt . PartitionIDs
2021-10-11 09:54:37 +08:00
2021-09-23 21:06:02 +08:00
// if nodeID ==0, it means that the release request has not been assigned to the specified query node
2021-06-19 11:45:09 +08:00
if rpt . NodeID <= 0 {
2021-12-21 11:57:39 +08:00
onlineNodeIDs := rpt . cluster . onlineNodeIDs ( )
for _ , nodeID := range onlineNodeIDs {
2021-06-19 11:45:09 +08:00
req := proto . Clone ( rpt . ReleasePartitionsRequest ) . ( * querypb . ReleasePartitionsRequest )
req . NodeID = nodeID
2021-12-15 16:53:12 +08:00
baseTask := newBaseTask ( ctx , querypb . TriggerCondition_GrpcRequest )
2021-10-14 20:18:33 +08:00
baseTask . setParentTask ( rpt )
2021-10-18 21:34:47 +08:00
releasePartitionTask := & releasePartitionTask {
baseTask : baseTask ,
2021-06-19 11:45:09 +08:00
ReleasePartitionsRequest : req ,
2021-06-15 12:41:40 +08:00
cluster : rpt . cluster ,
2021-12-21 11:57:39 +08:00
meta : rpt . meta ,
2021-06-15 12:41:40 +08:00
}
2021-10-14 20:18:33 +08:00
rpt . addChildTask ( releasePartitionTask )
2021-12-21 11:57:39 +08:00
log . Debug ( "releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , rpt . Base . MsgID ) )
}
//TODO::xige-16 delete partition info from meta should wait all internal release task execute done
// if some query nodes release partitions failed, the partition data will can't be removed from query node
err := rpt . meta . releasePartitions ( collectionID , partitionIDs )
if err != nil {
log . Error ( "releasePartitionTask: release collectionInfo from meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "msgID" , rpt . Base . MsgID ) , zap . Error ( err ) )
rpt . setResultInfo ( err )
return err
2021-06-15 12:41:40 +08:00
}
} else {
2021-08-02 22:39:25 +08:00
err := rpt . cluster . releasePartitions ( ctx , rpt . NodeID , rpt . ReleasePartitionsRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Warn ( "ReleasePartitionsTask: release partition end, node occur error" , zap . Int64 ( "collectionID" , collectionID ) , zap . String ( "nodeID" , fmt . Sprintln ( rpt . NodeID ) ) )
2021-10-14 20:18:33 +08:00
rpt . setResultInfo ( err )
2021-06-23 17:44:12 +08:00
return err
}
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
log . Debug ( "releasePartitionTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rpt . getTaskID ( ) ) ,
2021-04-15 15:15:46 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-06-15 12:41:40 +08:00
zap . Int64s ( "partitionIDs" , partitionIDs ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rpt . NodeID ) )
2021-04-15 15:15:46 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) postExecute ( context . Context ) error {
2021-04-15 15:15:46 +08:00
collectionID := rpt . CollectionID
partitionIDs := rpt . PartitionIDs
2021-10-11 09:54:37 +08:00
if rpt . result . ErrorCode != commonpb . ErrorCode_Success {
rpt . childTasks = [ ] task { }
}
2021-06-15 12:41:40 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "releasePartitionTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "msgID" , rpt . getTaskID ( ) ) ,
2021-06-15 12:41:40 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64s ( "partitionIDs" , partitionIDs ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "nodeID" , rpt . NodeID ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rpt * releasePartitionTask ) rollBack ( ctx context . Context ) [ ] task {
2021-10-11 09:54:37 +08:00
//TODO::
//if taskID == 0, recovery meta
//if taskID != 0, recovery partition on queryNode
return nil
}
2021-10-18 21:34:47 +08:00
type loadSegmentTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . LoadSegmentsRequest
2021-10-11 09:54:37 +08:00
meta Meta
cluster Cluster
excludeNodeIDs [ ] int64
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lst . Base
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lst . LoadSegmentsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) isValid ( ) bool {
2021-10-22 19:07:15 +08:00
online , err := lst . cluster . isOnline ( lst . DstNodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return lst . ctx != nil && online
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) msgType ( ) commonpb . MsgType {
2021-06-15 12:41:40 +08:00
return lst . Base . MsgType
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) timestamp ( ) Timestamp {
2021-06-15 12:41:40 +08:00
return lst . Base . Timestamp
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) updateTaskProcess ( ) {
2021-10-14 20:18:33 +08:00
parentTask := lst . getParentTask ( )
2021-10-11 09:54:37 +08:00
if parentTask == nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "loadSegmentTask: parentTask should not be nil" )
2021-10-11 09:54:37 +08:00
return
}
2021-10-14 20:18:33 +08:00
parentTask . updateTaskProcess ( )
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) preExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
segmentIDs := make ( [ ] UniqueID , 0 )
for _ , info := range lst . Infos {
segmentIDs = append ( segmentIDs , info . SegmentID )
}
2021-10-14 20:18:33 +08:00
lst . setResultInfo ( nil )
2021-06-15 12:41:40 +08:00
log . Debug ( "start do loadSegmentTask" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , segmentIDs ) ,
2021-10-22 19:07:15 +08:00
zap . Int64 ( "loaded nodeID" , lst . DstNodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lst . retryCount --
} ( )
2021-10-22 19:07:15 +08:00
err := lst . cluster . loadSegments ( ctx , lst . DstNodeID , lst . LoadSegmentsRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "loadSegmentTask: loadSegment occur error" , zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-10-14 20:18:33 +08:00
lst . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
2021-06-19 11:45:09 +08:00
log . Debug ( "loadSegmentTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) postExecute ( context . Context ) error {
2021-06-19 11:45:09 +08:00
log . Debug ( "loadSegmentTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lst * loadSegmentTask ) reschedule ( ctx context . Context ) ( [ ] task , error ) {
2021-11-11 12:56:42 +08:00
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
2021-06-19 11:45:09 +08:00
for _ , info := range lst . Infos {
2021-11-11 12:56:42 +08:00
msgBase := proto . Clone ( lst . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
req := & querypb . LoadSegmentsRequest {
2021-12-15 16:53:12 +08:00
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { info } ,
Schema : lst . Schema ,
SourceNodeID : lst . SourceNodeID ,
CollectionID : lst . CollectionID ,
2021-11-11 12:56:42 +08:00
}
loadSegmentReqs = append ( loadSegmentReqs , req )
}
if lst . excludeNodeIDs == nil {
lst . excludeNodeIDs = [ ] int64 { }
2021-06-19 11:45:09 +08:00
}
2021-10-22 19:07:15 +08:00
lst . excludeNodeIDs = append ( lst . excludeNodeIDs , lst . DstNodeID )
2021-11-13 08:49:08 +08:00
2021-11-11 12:56:42 +08:00
//TODO:: wait or not according msgType
2021-12-21 11:57:39 +08:00
reScheduledTasks , err := assignInternalTask ( ctx , lst . getParentTask ( ) , lst . meta , lst . cluster , loadSegmentReqs , nil , false , lst . excludeNodeIDs , nil )
2021-10-11 09:54:37 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadSegment reschedule failed" , zap . Int64s ( "excludeNodes" , lst . excludeNodeIDs ) , zap . Int64 ( "taskID" , lst . getTaskID ( ) ) , zap . Error ( err ) )
2021-10-11 09:54:37 +08:00
return nil , err
}
2021-06-19 11:45:09 +08:00
2021-11-11 12:56:42 +08:00
return reScheduledTasks , nil
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
type releaseSegmentTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . ReleaseSegmentsRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return rst . Base
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( rst . ReleaseSegmentsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) isValid ( ) bool {
2021-10-11 09:54:37 +08:00
online , err := rst . cluster . isOnline ( rst . NodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return rst . ctx != nil && online
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) msgType ( ) commonpb . MsgType {
2021-06-15 12:41:40 +08:00
return rst . Base . MsgType
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) timestamp ( ) Timestamp {
2021-06-15 12:41:40 +08:00
return rst . Base . Timestamp
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) preExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
segmentIDs := rst . SegmentIDs
2021-10-14 20:18:33 +08:00
rst . setResultInfo ( nil )
2021-06-15 12:41:40 +08:00
log . Debug ( "start do releaseSegmentTask" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , segmentIDs ) ,
zap . Int64 ( "loaded nodeID" , rst . NodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
rst . retryCount --
} ( )
2021-08-02 22:39:25 +08:00
err := rst . cluster . releaseSegments ( rst . ctx , rst . NodeID , rst . ReleaseSegmentsRequest )
2021-06-15 12:41:40 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "releaseSegmentTask: releaseSegment occur error" , zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-10-14 20:18:33 +08:00
rst . setResultInfo ( err )
2021-06-15 12:41:40 +08:00
return err
}
log . Debug ( "releaseSegmentTask Execute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , rst . SegmentIDs ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( rst * releaseSegmentTask ) postExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
segmentIDs := rst . SegmentIDs
log . Debug ( "releaseSegmentTask postExecute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "segmentIDs" , segmentIDs ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , rst . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
type watchDmChannelTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . WatchDmChannelsRequest
2021-10-11 09:54:37 +08:00
meta Meta
cluster Cluster
excludeNodeIDs [ ] int64
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return wdt . Base
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( wdt . WatchDmChannelsRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) isValid ( ) bool {
2021-10-11 09:54:37 +08:00
online , err := wdt . cluster . isOnline ( wdt . NodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return wdt . ctx != nil && online
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) msgType ( ) commonpb . MsgType {
2021-06-15 12:41:40 +08:00
return wdt . Base . MsgType
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) timestamp ( ) Timestamp {
2021-06-15 12:41:40 +08:00
return wdt . Base . Timestamp
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) updateTaskProcess ( ) {
2021-10-14 20:18:33 +08:00
parentTask := wdt . getParentTask ( )
2021-10-11 09:54:37 +08:00
if parentTask == nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "watchDmChannelTask: parentTask should not be nil" )
2021-10-11 09:54:37 +08:00
return
}
2021-10-14 20:18:33 +08:00
parentTask . updateTaskProcess ( )
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) preExecute ( context . Context ) error {
2021-06-15 12:41:40 +08:00
channelInfos := wdt . Infos
channels := make ( [ ] string , 0 )
for _ , info := range channelInfos {
channels = append ( channels , info . ChannelName )
2021-04-15 15:15:46 +08:00
}
2021-10-14 20:18:33 +08:00
wdt . setResultInfo ( nil )
2021-06-15 12:41:40 +08:00
log . Debug ( "start do watchDmChannelTask" ,
2021-06-19 11:45:09 +08:00
zap . Strings ( "dmChannels" , channels ) ,
zap . Int64 ( "loaded nodeID" , wdt . NodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
wdt . retryCount --
} ( )
2021-08-02 22:39:25 +08:00
err := wdt . cluster . watchDmChannels ( wdt . ctx , wdt . NodeID , wdt . WatchDmChannelsRequest )
2021-04-15 15:15:46 +08:00
if err != nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "watchDmChannelTask: watchDmChannel occur error" , zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-10-14 20:18:33 +08:00
wdt . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
2021-06-19 11:45:09 +08:00
log . Debug ( "watchDmChannelsTask Execute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) postExecute ( context . Context ) error {
2021-06-19 11:45:09 +08:00
log . Debug ( "watchDmChannelTask postExecute done" ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wdt * watchDmChannelTask ) reschedule ( ctx context . Context ) ( [ ] task , error ) {
2021-06-19 11:45:09 +08:00
collectionID := wdt . CollectionID
2021-11-11 12:56:42 +08:00
watchDmChannelReqs := make ( [ ] * querypb . WatchDmChannelsRequest , 0 )
2021-06-19 11:45:09 +08:00
for _ , info := range wdt . Infos {
2021-11-11 12:56:42 +08:00
msgBase := proto . Clone ( wdt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
req := & querypb . WatchDmChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
2021-12-17 20:12:42 +08:00
PartitionIDs : wdt . PartitionIDs ,
2021-11-11 12:56:42 +08:00
Infos : [ ] * datapb . VchannelInfo { info } ,
Schema : wdt . Schema ,
ExcludeInfos : wdt . ExcludeInfos ,
}
watchDmChannelReqs = append ( watchDmChannelReqs , req )
2021-06-19 11:45:09 +08:00
}
2021-11-11 12:56:42 +08:00
if wdt . excludeNodeIDs == nil {
wdt . excludeNodeIDs = [ ] int64 { }
}
2021-10-11 09:54:37 +08:00
wdt . excludeNodeIDs = append ( wdt . excludeNodeIDs , wdt . NodeID )
2021-12-21 11:57:39 +08:00
reScheduledTasks , err := assignInternalTask ( ctx , wdt . parentTask , wdt . meta , wdt . cluster , nil , watchDmChannelReqs , false , wdt . excludeNodeIDs , nil )
2021-10-11 09:54:37 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "watchDmChannel reschedule failed" , zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) , zap . Int64s ( "excludeNodes" , wdt . excludeNodeIDs ) , zap . Error ( err ) )
2021-10-11 09:54:37 +08:00
return nil , err
}
2021-06-19 11:45:09 +08:00
2021-11-11 12:56:42 +08:00
return reScheduledTasks , nil
2021-06-19 11:45:09 +08:00
}
2021-11-05 14:47:19 +08:00
type watchDeltaChannelTask struct {
* baseTask
* querypb . WatchDeltaChannelsRequest
meta Meta
cluster Cluster
excludeNodeIDs [ ] int64
}
func ( wdt * watchDeltaChannelTask ) msgBase ( ) * commonpb . MsgBase {
return wdt . Base
}
func ( wdt * watchDeltaChannelTask ) marshal ( ) ( [ ] byte , error ) {
return proto . Marshal ( wdt . WatchDeltaChannelsRequest )
}
func ( wdt * watchDeltaChannelTask ) msgType ( ) commonpb . MsgType {
return wdt . Base . MsgType
}
func ( wdt * watchDeltaChannelTask ) timestamp ( ) Timestamp {
return wdt . Base . Timestamp
}
func ( wdt * watchDeltaChannelTask ) updateTaskProcess ( ) {
parentTask := wdt . getParentTask ( )
if parentTask == nil {
log . Warn ( "watchDeltaChannel: parentTask should not be nil" )
return
}
parentTask . updateTaskProcess ( )
}
func ( wdt * watchDeltaChannelTask ) preExecute ( context . Context ) error {
channelInfos := wdt . Infos
channels := make ( [ ] string , 0 )
for _ , info := range channelInfos {
channels = append ( channels , info . ChannelName )
}
wdt . setResultInfo ( nil )
log . Debug ( "start do watchDeltaChannelTask" ,
zap . Strings ( "deltaChannels" , channels ) ,
zap . Int64 ( "loaded nodeID" , wdt . NodeID ) ,
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
return nil
}
func ( wdt * watchDeltaChannelTask ) execute ( ctx context . Context ) error {
defer func ( ) {
wdt . retryCount --
} ( )
err := wdt . cluster . watchDeltaChannels ( wdt . ctx , wdt . NodeID , wdt . WatchDeltaChannelsRequest )
if err != nil {
log . Warn ( "watchDeltaChannelTask: watchDeltaChannel occur error" , zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) , zap . Error ( err ) )
wdt . setResultInfo ( err )
return err
}
log . Debug ( "watchDeltaChannelsTask Execute done" ,
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
return nil
}
func ( wdt * watchDeltaChannelTask ) postExecute ( context . Context ) error {
log . Debug ( "watchDeltaChannelTask postExecute done" ,
zap . Int64 ( "taskID" , wdt . getTaskID ( ) ) )
return nil
}
2021-10-18 21:34:47 +08:00
type watchQueryChannelTask struct {
* baseTask
2021-06-15 12:41:40 +08:00
* querypb . AddQueryChannelRequest
2021-09-15 20:40:07 +08:00
cluster Cluster
2021-06-15 12:41:40 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return wqt . Base
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( wqt . AddQueryChannelRequest )
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) isValid ( ) bool {
2021-10-11 09:54:37 +08:00
online , err := wqt . cluster . isOnline ( wqt . NodeID )
2021-06-30 17:48:19 +08:00
if err != nil {
return false
}
2021-10-11 09:54:37 +08:00
return wqt . ctx != nil && online
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) msgType ( ) commonpb . MsgType {
2021-06-19 11:45:09 +08:00
return wqt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) timestamp ( ) Timestamp {
2021-06-19 11:45:09 +08:00
return wqt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) updateTaskProcess ( ) {
2021-10-14 20:18:33 +08:00
parentTask := wqt . getParentTask ( )
2021-10-11 09:54:37 +08:00
if parentTask == nil {
2021-10-18 21:34:47 +08:00
log . Warn ( "watchQueryChannelTask: parentTask should not be nil" )
2021-10-11 09:54:37 +08:00
return
2021-08-02 22:39:25 +08:00
}
2021-10-14 20:18:33 +08:00
parentTask . updateTaskProcess ( )
2021-10-11 09:54:37 +08:00
}
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) preExecute ( context . Context ) error {
2021-10-14 20:18:33 +08:00
wqt . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do watchQueryChannelTask" ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "collectionID" , wqt . CollectionID ) ,
2021-12-15 16:53:12 +08:00
zap . String ( "queryChannel" , wqt . QueryChannel ) ,
zap . String ( "queryResultChannel" , wqt . QueryResultChannel ) ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "loaded nodeID" , wqt . NodeID ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
wqt . retryCount --
} ( )
2021-08-02 22:39:25 +08:00
err := wqt . cluster . addQueryChannel ( wqt . ctx , wqt . NodeID , wqt . AddQueryChannelRequest )
2021-06-15 12:41:40 +08:00
if err != nil {
2021-10-24 15:16:00 +08:00
log . Warn ( "watchQueryChannelTask: watchQueryChannel occur error" , zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
wqt . setResultInfo ( err )
2021-04-15 15:15:46 +08:00
return err
}
2021-06-15 12:41:40 +08:00
log . Debug ( "watchQueryChannelTask Execute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "collectionID" , wqt . CollectionID ) ,
2021-12-15 16:53:12 +08:00
zap . String ( "queryChannel" , wqt . QueryChannel ) ,
zap . String ( "queryResultChannel" , wqt . QueryResultChannel ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) )
2021-06-15 12:41:40 +08:00
return nil
}
2021-06-16 11:09:56 +08:00
2021-10-18 21:34:47 +08:00
func ( wqt * watchQueryChannelTask ) postExecute ( context . Context ) error {
log . Debug ( "watchQueryChannelTask postExecute done" ,
2021-06-19 11:45:09 +08:00
zap . Int64 ( "collectionID" , wqt . CollectionID ) ,
2021-12-15 16:53:12 +08:00
zap . String ( "queryChannel" , wqt . QueryChannel ) ,
zap . String ( "queryResultChannel" , wqt . QueryResultChannel ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , wqt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-24 22:39:09 +08:00
//****************************handoff task********************************//
2021-10-18 21:34:47 +08:00
type handoffTask struct {
2021-10-24 22:39:09 +08:00
* baseTask
* querypb . HandoffSegmentsRequest
dataCoord types . DataCoord
cluster Cluster
meta Meta
}
func ( ht * handoffTask ) msgBase ( ) * commonpb . MsgBase {
return ht . Base
}
func ( ht * handoffTask ) marshal ( ) ( [ ] byte , error ) {
return proto . Marshal ( ht . HandoffSegmentsRequest )
}
func ( ht * handoffTask ) msgType ( ) commonpb . MsgType {
return ht . Base . MsgType
}
func ( ht * handoffTask ) timestamp ( ) Timestamp {
return ht . Base . Timestamp
}
func ( ht * handoffTask ) preExecute ( context . Context ) error {
ht . setResultInfo ( nil )
segmentIDs := make ( [ ] UniqueID , 0 )
segmentInfos := ht . HandoffSegmentsRequest . SegmentInfos
for _ , info := range segmentInfos {
segmentIDs = append ( segmentIDs , info . SegmentID )
}
log . Debug ( "start do handoff segments task" ,
zap . Int64s ( "segmentIDs" , segmentIDs ) )
return nil
}
func ( ht * handoffTask ) execute ( ctx context . Context ) error {
segmentInfos := ht . HandoffSegmentsRequest . SegmentInfos
for _ , segmentInfo := range segmentInfos {
collectionID := segmentInfo . CollectionID
partitionID := segmentInfo . PartitionID
segmentID := segmentInfo . SegmentID
collectionInfo , err := ht . meta . getCollectionInfoByID ( collectionID )
if err != nil {
log . Debug ( "handoffTask: collection has not been loaded into memory" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "segmentID" , segmentID ) )
continue
}
2021-10-26 13:04:18 +08:00
if collectionInfo . LoadType == querypb . LoadType_loadCollection && ht . meta . hasReleasePartition ( collectionID , partitionID ) {
log . Debug ( "handoffTask: partition has not been released" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) )
continue
}
2021-10-24 22:39:09 +08:00
partitionLoaded := false
for _ , id := range collectionInfo . PartitionIDs {
if id == partitionID {
partitionLoaded = true
}
}
if collectionInfo . LoadType != querypb . LoadType_loadCollection && ! partitionLoaded {
log . Debug ( "handoffTask: partition has not been loaded into memory" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Int64 ( "segmentID" , segmentID ) )
continue
}
2021-12-10 10:13:44 +08:00
// segment which is compacted from should exist in query node
2021-11-08 21:00:02 +08:00
for _ , compactedSegID := range segmentInfo . CompactionFrom {
_ , err = ht . meta . getSegmentInfoByID ( compactedSegID )
if err != nil {
log . Error ( "handoffTask: compacted segment has not been loaded into memory" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Int64 ( "segmentID" , segmentID ) )
ht . setResultInfo ( err )
return err
}
}
// segment which is compacted to should not exist in query node
2021-10-24 22:39:09 +08:00
_ , err = ht . meta . getSegmentInfoByID ( segmentID )
if err != nil {
2021-12-13 10:29:27 +08:00
dmChannelInfos , binlogs , err := getRecoveryInfo ( ht . ctx , ht . dataCoord , collectionID , partitionID )
2021-10-24 22:39:09 +08:00
if err != nil {
2021-12-13 10:29:27 +08:00
log . Error ( "handoffTask: getRecoveryInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Error ( err ) )
2021-10-24 22:39:09 +08:00
ht . setResultInfo ( err )
return err
}
findBinlog := false
var loadSegmentReq * querypb . LoadSegmentsRequest
2021-11-13 08:49:08 +08:00
var watchDeltaChannels [ ] * datapb . VchannelInfo
2021-12-13 10:29:27 +08:00
for _ , segmentBinlogs := range binlogs {
2021-10-24 22:39:09 +08:00
if segmentBinlogs . SegmentID == segmentID {
findBinlog = true
segmentLoadInfo := & querypb . SegmentLoadInfo {
2021-11-08 21:00:02 +08:00
SegmentID : segmentID ,
PartitionID : partitionID ,
CollectionID : collectionID ,
BinlogPaths : segmentBinlogs . FieldBinlogs ,
NumOfRows : segmentBinlogs . NumOfRows ,
CompactionFrom : segmentInfo . CompactionFrom ,
2021-11-17 14:37:11 +08:00
EnableIndex : segmentInfo . EnableIndex ,
2021-11-17 09:47:12 +08:00
IndexPathInfos : segmentInfo . IndexPathInfos ,
2021-10-24 22:39:09 +08:00
}
msgBase := proto . Clone ( ht . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
loadSegmentReq = & querypb . LoadSegmentsRequest {
2021-12-21 11:57:39 +08:00
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : collectionInfo . Schema ,
CollectionID : collectionID ,
2021-10-24 22:39:09 +08:00
}
}
}
2021-12-13 10:29:27 +08:00
for _ , info := range dmChannelInfos {
2021-11-26 22:47:17 +08:00
deltaChannel , err := generateWatchDeltaChannelInfo ( info )
if err != nil {
return err
2021-11-05 14:47:19 +08:00
}
2021-11-26 22:47:17 +08:00
watchDeltaChannels = append ( watchDeltaChannels , deltaChannel )
2021-11-05 14:47:19 +08:00
}
2021-10-24 22:39:09 +08:00
if ! findBinlog {
err = fmt . Errorf ( "segmnet has not been flushed, segmentID is %d" , segmentID )
ht . setResultInfo ( err )
return err
}
2021-11-26 22:47:17 +08:00
mergedDeltaChannels := mergeWatchDeltaChannelInfo ( watchDeltaChannels )
2021-11-13 08:49:08 +08:00
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
2021-12-21 11:57:39 +08:00
err = ht . meta . setDeltaChannel ( collectionID , mergedDeltaChannels )
2021-12-01 16:39:31 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "handoffTask: set delta channel info to meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "segmentID" , segmentID ) , zap . Error ( err ) )
ht . setResultInfo ( err )
2021-12-01 16:39:31 +08:00
return err
}
2021-12-21 11:57:39 +08:00
internalTasks , err := assignInternalTask ( ctx , ht , ht . meta , ht . cluster , [ ] * querypb . LoadSegmentsRequest { loadSegmentReq } , nil , true , nil , nil )
2021-10-24 22:39:09 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "handoffTask: assign child task failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "segmentID" , segmentID ) , zap . Error ( err ) )
2021-10-24 22:39:09 +08:00
ht . setResultInfo ( err )
return err
}
2021-11-11 12:56:42 +08:00
for _ , internalTask := range internalTasks {
ht . addChildTask ( internalTask )
2021-12-21 11:57:39 +08:00
log . Debug ( "handoffTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Int64 ( "segmentID" , segmentID ) )
2021-11-11 12:56:42 +08:00
}
2021-10-24 22:39:09 +08:00
} else {
err = fmt . Errorf ( "sealed segment has been exist on query node, segmentID is %d" , segmentID )
2021-12-21 11:57:39 +08:00
log . Error ( "handoffTask: handoff segment failed" , zap . Int64 ( "segmentID" , segmentID ) , zap . Error ( err ) )
2021-10-24 22:39:09 +08:00
ht . setResultInfo ( err )
return err
}
}
log . Debug ( "handoffTask: assign child task done" , zap . Any ( "segmentInfos" , segmentInfos ) )
log . Debug ( "handoffTask Execute done" ,
zap . Int64 ( "taskID" , ht . getTaskID ( ) ) )
return nil
}
func ( ht * handoffTask ) postExecute ( context . Context ) error {
if ht . result . ErrorCode != commonpb . ErrorCode_Success {
ht . childTasks = [ ] task { }
}
log . Debug ( "handoffTask postExecute done" ,
zap . Int64 ( "taskID" , ht . getTaskID ( ) ) )
return nil
}
func ( ht * handoffTask ) rollBack ( ctx context . Context ) [ ] task {
resultTasks := make ( [ ] task , 0 )
childTasks := ht . getChildTask ( )
for _ , childTask := range childTasks {
if childTask . msgType ( ) == commonpb . MsgType_LoadSegments {
// TODO:: add release segment to rollBack, no release does not affect correctness of query
}
}
return resultTasks
2021-06-19 11:45:09 +08:00
}
2021-06-15 12:41:40 +08:00
2021-10-18 21:34:47 +08:00
type loadBalanceTask struct {
* baseTask
2021-06-19 11:45:09 +08:00
* querypb . LoadBalanceRequest
2021-11-17 09:47:12 +08:00
rootCoord types . RootCoord
dataCoord types . DataCoord
indexCoord types . IndexCoord
cluster Cluster
meta Meta
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) msgBase ( ) * commonpb . MsgBase {
2021-06-26 16:08:11 +08:00
return lbt . Base
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) marshal ( ) ( [ ] byte , error ) {
2021-08-03 22:03:25 +08:00
return proto . Marshal ( lbt . LoadBalanceRequest )
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) msgType ( ) commonpb . MsgType {
2021-06-19 11:45:09 +08:00
return lbt . Base . MsgType
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) timestamp ( ) Timestamp {
2021-06-19 11:45:09 +08:00
return lbt . Base . Timestamp
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) preExecute ( context . Context ) error {
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( nil )
2021-10-18 21:34:47 +08:00
log . Debug ( "start do loadBalanceTask" ,
2021-11-06 15:22:56 +08:00
zap . Int32 ( "trigger type" , int32 ( lbt . triggerCondition ) ) ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) ,
zap . Any ( "balanceReason" , lbt . BalanceReason ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lbt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-06-19 11:45:09 +08:00
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) execute ( ctx context . Context ) error {
2021-10-11 09:54:37 +08:00
defer func ( ) {
lbt . retryCount --
} ( )
2021-06-19 11:45:09 +08:00
2021-12-15 16:53:12 +08:00
if lbt . triggerCondition == querypb . TriggerCondition_NodeDown {
2021-12-21 11:57:39 +08:00
segmentID2Info := make ( map [ UniqueID ] * querypb . SegmentInfo )
dmChannel2WatchInfo := make ( map [ string ] * querypb . DmChannelWatchInfo )
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
watchDmChannelReqs := make ( [ ] * querypb . WatchDmChannelsRequest , 0 )
recoveredCollectionIDs := make ( map [ UniqueID ] struct { } )
2021-06-19 11:45:09 +08:00
for _ , nodeID := range lbt . SourceNodeIDs {
2021-12-21 11:57:39 +08:00
segmentInfos := lbt . meta . getSegmentInfosByNode ( nodeID )
for _ , segmentInfo := range segmentInfos {
segmentID2Info [ segmentInfo . SegmentID ] = segmentInfo
recoveredCollectionIDs [ segmentInfo . CollectionID ] = struct { } { }
}
dmChannelWatchInfos := lbt . meta . getDmChannelInfosByNodeID ( nodeID )
for _ , watchInfo := range dmChannelWatchInfos {
dmChannel2WatchInfo [ watchInfo . DmChannel ] = watchInfo
recoveredCollectionIDs [ watchInfo . CollectionID ] = struct { } { }
}
}
for collectionID := range recoveredCollectionIDs {
collectionInfo , err := lbt . meta . getCollectionInfoByID ( collectionID )
if err != nil {
log . Error ( "loadBalanceTask: get collectionInfo from meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
lbt . setResultInfo ( err )
return err
}
schema := collectionInfo . Schema
var deltaChannelInfos [ ] * datapb . VchannelInfo
var dmChannelInfos [ ] * datapb . VchannelInfo
var toRecoverPartitionIDs [ ] UniqueID
if collectionInfo . LoadType == querypb . LoadType_loadCollection {
showPartitionRequest := & milvuspb . ShowPartitionsRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ShowPartitions ,
} ,
CollectionID : collectionID ,
}
showPartitionResponse , err := lbt . rootCoord . ShowPartitions ( ctx , showPartitionRequest )
2021-06-30 17:48:19 +08:00
if err != nil {
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( err )
2021-10-11 09:54:37 +08:00
return err
2021-06-30 17:48:19 +08:00
}
2021-12-21 11:57:39 +08:00
//TODO:: queryNode receive dm message according partitionID cache
//TODO:: queryNode add partitionID to cache if receive create partition message from dmChannel
toRecoverPartitionIDs = showPartitionResponse . PartitionIDs
} else {
toRecoverPartitionIDs = collectionInfo . PartitionIDs
}
log . Debug ( "loadBalanceTask: get collection's all partitionIDs" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , toRecoverPartitionIDs ) )
2021-06-19 11:45:09 +08:00
2021-12-21 11:57:39 +08:00
for _ , partitionID := range toRecoverPartitionIDs {
vChannelInfos , binlogs , err := getRecoveryInfo ( lbt . ctx , lbt . dataCoord , collectionID , partitionID )
2021-06-19 11:45:09 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadBalanceTask: getRecoveryInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Error ( err ) )
2021-10-14 20:18:33 +08:00
lbt . setResultInfo ( err )
2021-06-19 11:45:09 +08:00
return err
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-12-21 11:57:39 +08:00
for _ , segmentBingLog := range binlogs {
segmentID := segmentBingLog . SegmentID
if _ , ok := segmentID2Info [ segmentID ] ; ok {
2021-06-30 17:48:19 +08:00
segmentLoadInfo := & querypb . SegmentLoadInfo {
SegmentID : segmentID ,
PartitionID : partitionID ,
CollectionID : collectionID ,
BinlogPaths : segmentBingLog . FieldBinlogs ,
2021-09-07 11:35:18 +08:00
NumOfRows : segmentBingLog . NumOfRows ,
2021-10-22 14:31:13 +08:00
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
2021-06-30 17:48:19 +08:00
}
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lbt . rootCoord , lbt . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-06-30 17:48:19 +08:00
2021-07-13 14:16:00 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
2021-06-30 17:48:19 +08:00
loadSegmentReq := & querypb . LoadSegmentsRequest {
2021-12-15 16:53:12 +08:00
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : schema ,
2021-12-21 11:57:39 +08:00
CollectionID : collectionID ,
2021-06-30 17:48:19 +08:00
}
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
}
2021-12-21 11:57:39 +08:00
}
2021-06-30 17:48:19 +08:00
2021-12-21 11:57:39 +08:00
for _ , info := range vChannelInfos {
deltaChannel , err := generateWatchDeltaChannelInfo ( info )
if err != nil {
log . Error ( "loadBalanceTask: generateWatchDeltaChannelInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . String ( "channelName" , info . ChannelName ) , zap . Error ( err ) )
lbt . setResultInfo ( err )
return err
2021-11-05 14:47:19 +08:00
}
2021-12-21 11:57:39 +08:00
deltaChannelInfos = append ( deltaChannelInfos , deltaChannel )
dmChannelInfos = append ( dmChannelInfos , info )
}
}
2021-11-05 14:47:19 +08:00
2021-12-21 11:57:39 +08:00
mergedDeltaChannel := mergeWatchDeltaChannelInfo ( deltaChannelInfos )
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
err = lbt . meta . setDeltaChannel ( collectionID , mergedDeltaChannel )
if err != nil {
log . Error ( "loadBalanceTask: set delta channel info meta failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
lbt . setResultInfo ( err )
return err
}
mergedDmChannel := mergeDmChannelInfo ( dmChannelInfos )
2021-12-22 16:29:06 +08:00
for channelName , vChannelInfo := range mergedDmChannel {
if _ , ok := dmChannel2WatchInfo [ channelName ] ; ok {
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDmChannels
watchRequest := & querypb . WatchDmChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : [ ] * datapb . VchannelInfo { vChannelInfo } ,
Schema : schema ,
}
2021-12-21 11:57:39 +08:00
2021-12-22 16:29:06 +08:00
if collectionInfo . LoadType == querypb . LoadType_LoadPartition {
watchRequest . PartitionIDs = toRecoverPartitionIDs
}
2021-11-11 12:56:42 +08:00
2021-12-22 16:29:06 +08:00
watchDmChannelReqs = append ( watchDmChannelReqs , watchRequest )
2021-09-29 09:56:04 +08:00
}
2021-06-15 12:41:40 +08:00
}
2021-04-15 15:15:46 +08:00
}
2021-12-21 11:57:39 +08:00
internalTasks , err := assignInternalTask ( ctx , lbt , lbt . meta , lbt . cluster , loadSegmentReqs , watchDmChannelReqs , true , lbt . SourceNodeIDs , lbt . DstNodeIDs )
if err != nil {
log . Warn ( "loadBalanceTask: assign child task failed" , zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) )
lbt . setResultInfo ( err )
return err
}
for _ , internalTask := range internalTasks {
lbt . addChildTask ( internalTask )
log . Debug ( "loadBalanceTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Any ( "task" , internalTask ) )
}
log . Debug ( "loadBalanceTask: assign child task done" , zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) )
2021-04-15 15:15:46 +08:00
}
2021-12-15 16:53:12 +08:00
if lbt . triggerCondition == querypb . TriggerCondition_LoadBalance {
2021-11-06 15:22:56 +08:00
if len ( lbt . SourceNodeIDs ) == 0 {
err := errors . New ( "loadBalanceTask: empty source Node list to balance" )
log . Error ( err . Error ( ) )
lbt . setResultInfo ( err )
return err
}
balancedSegmentInfos := make ( map [ UniqueID ] * querypb . SegmentInfo )
balancedSegmentIDs := make ( [ ] UniqueID , 0 )
for _ , nodeID := range lbt . SourceNodeIDs {
nodeExist := lbt . cluster . hasNode ( nodeID )
if ! nodeExist {
err := fmt . Errorf ( "loadBalanceTask: query node %d is not exist to balance" , nodeID )
log . Error ( err . Error ( ) )
lbt . setResultInfo ( err )
return err
}
segmentInfos := lbt . meta . getSegmentInfosByNode ( nodeID )
for _ , info := range segmentInfos {
balancedSegmentInfos [ info . SegmentID ] = info
balancedSegmentIDs = append ( balancedSegmentIDs , info . SegmentID )
}
}
// check balanced sealedSegmentIDs in request whether exist in query node
for _ , segmentID := range lbt . SealedSegmentIDs {
if _ , ok := balancedSegmentInfos [ segmentID ] ; ! ok {
err := fmt . Errorf ( "loadBalanceTask: unloaded segment %d" , segmentID )
log . Warn ( err . Error ( ) )
lbt . setResultInfo ( err )
return err
}
}
if len ( lbt . SealedSegmentIDs ) != 0 {
balancedSegmentIDs = lbt . SealedSegmentIDs
}
col2PartitionIDs := make ( map [ UniqueID ] [ ] UniqueID )
par2Segments := make ( map [ UniqueID ] [ ] * querypb . SegmentInfo )
for _ , segmentID := range balancedSegmentIDs {
info := balancedSegmentInfos [ segmentID ]
collectionID := info . CollectionID
partitionID := info . PartitionID
if _ , ok := col2PartitionIDs [ collectionID ] ; ! ok {
col2PartitionIDs [ collectionID ] = make ( [ ] UniqueID , 0 )
}
if _ , ok := par2Segments [ partitionID ] ; ! ok {
col2PartitionIDs [ collectionID ] = append ( col2PartitionIDs [ collectionID ] , partitionID )
par2Segments [ partitionID ] = make ( [ ] * querypb . SegmentInfo , 0 )
}
par2Segments [ partitionID ] = append ( par2Segments [ partitionID ] , info )
}
2021-12-21 11:57:39 +08:00
loadSegmentReqs := make ( [ ] * querypb . LoadSegmentsRequest , 0 )
2021-11-06 15:22:56 +08:00
for collectionID , partitionIDs := range col2PartitionIDs {
2021-11-13 08:49:08 +08:00
var watchDeltaChannels [ ] * datapb . VchannelInfo
2021-11-06 15:22:56 +08:00
collectionInfo , err := lbt . meta . getCollectionInfoByID ( collectionID )
if err != nil {
log . Error ( "loadBalanceTask: can't find collectionID in meta" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
lbt . setResultInfo ( err )
return err
}
for _ , partitionID := range partitionIDs {
2021-12-13 10:29:27 +08:00
dmChannelInfos , binlogs , err := getRecoveryInfo ( lbt . ctx , lbt . dataCoord , collectionID , partitionID )
2021-11-06 15:22:56 +08:00
if err != nil {
2021-12-13 10:29:27 +08:00
log . Error ( "loadBalanceTask: getRecoveryInfo failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Error ( err ) )
2021-11-06 15:22:56 +08:00
lbt . setResultInfo ( err )
return err
}
segmentID2Binlog := make ( map [ UniqueID ] * datapb . SegmentBinlogs )
2021-12-13 10:29:27 +08:00
for _ , binlog := range binlogs {
2021-11-06 15:22:56 +08:00
segmentID2Binlog [ binlog . SegmentID ] = binlog
}
for _ , segmentInfo := range par2Segments [ partitionID ] {
segmentID := segmentInfo . SegmentID
if _ , ok := segmentID2Binlog [ segmentID ] ; ! ok {
log . Warn ( "loadBalanceTask: can't find binlog of segment to balance, may be has been compacted" , zap . Int64 ( "segmentID" , segmentID ) )
continue
}
segmentBingLog := segmentID2Binlog [ segmentID ]
segmentLoadInfo := & querypb . SegmentLoadInfo {
SegmentID : segmentID ,
PartitionID : partitionID ,
CollectionID : collectionID ,
BinlogPaths : segmentBingLog . FieldBinlogs ,
NumOfRows : segmentBingLog . NumOfRows ,
Statslogs : segmentBingLog . Statslogs ,
Deltalogs : segmentBingLog . Deltalogs ,
}
2021-11-17 09:47:12 +08:00
indexInfo , err := getIndexInfo ( ctx , & querypb . SegmentInfo {
CollectionID : collectionID ,
SegmentID : segmentID ,
} , lbt . rootCoord , lbt . indexCoord )
if err == nil && indexInfo . enableIndex {
2021-11-17 14:37:11 +08:00
segmentLoadInfo . EnableIndex = true
2021-11-17 09:47:12 +08:00
segmentLoadInfo . IndexPathInfos = indexInfo . infos
}
2021-11-06 15:22:56 +08:00
msgBase := proto . Clone ( lbt . Base ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_LoadSegments
loadSegmentReq := & querypb . LoadSegmentsRequest {
2021-12-21 11:57:39 +08:00
Base : msgBase ,
Infos : [ ] * querypb . SegmentLoadInfo { segmentLoadInfo } ,
Schema : collectionInfo . Schema ,
CollectionID : collectionID ,
2021-11-06 15:22:56 +08:00
}
loadSegmentReqs = append ( loadSegmentReqs , loadSegmentReq )
}
2021-12-13 10:29:27 +08:00
for _ , info := range dmChannelInfos {
2021-11-26 22:47:17 +08:00
deltaChannel , err := generateWatchDeltaChannelInfo ( info )
if err != nil {
return err
2021-11-06 15:22:56 +08:00
}
2021-11-26 22:47:17 +08:00
watchDeltaChannels = append ( watchDeltaChannels , deltaChannel )
2021-11-06 15:22:56 +08:00
}
}
2021-11-26 22:47:17 +08:00
mergedDeltaChannels := mergeWatchDeltaChannelInfo ( watchDeltaChannels )
2021-11-13 08:49:08 +08:00
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
2021-12-21 11:57:39 +08:00
err = lbt . meta . setDeltaChannel ( collectionID , mergedDeltaChannels )
2021-11-06 15:22:56 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "loadBalanceTask: set delta channel info to meta failed" , zap . Error ( err ) )
2021-11-06 15:22:56 +08:00
lbt . setResultInfo ( err )
return err
}
2021-12-21 11:57:39 +08:00
}
internalTasks , err := assignInternalTask ( ctx , lbt , lbt . meta , lbt . cluster , loadSegmentReqs , nil , false , lbt . SourceNodeIDs , lbt . DstNodeIDs )
if err != nil {
log . Error ( "loadBalanceTask: assign child task failed" , zap . Any ( "balance request" , lbt . LoadBalanceRequest ) )
lbt . setResultInfo ( err )
return err
}
for _ , internalTask := range internalTasks {
lbt . addChildTask ( internalTask )
log . Debug ( "loadBalanceTask: add a childTask" , zap . Int32 ( "task type" , int32 ( internalTask . msgType ( ) ) ) , zap . Any ( "balance request" , lbt . LoadBalanceRequest ) )
2021-11-06 15:22:56 +08:00
}
log . Debug ( "loadBalanceTask: assign child task done" , zap . Any ( "balance request" , lbt . LoadBalanceRequest ) )
}
2021-06-19 11:45:09 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadBalanceTask Execute done" ,
2021-11-06 15:22:56 +08:00
zap . Int32 ( "trigger type" , int32 ( lbt . triggerCondition ) ) ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) ,
zap . Any ( "balanceReason" , lbt . BalanceReason ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lbt . getTaskID ( ) ) )
2021-06-19 11:45:09 +08:00
return nil
}
2021-10-18 21:34:47 +08:00
func ( lbt * loadBalanceTask ) postExecute ( context . Context ) error {
2021-11-06 15:22:56 +08:00
if lbt . result . ErrorCode != commonpb . ErrorCode_Success {
lbt . childTasks = [ ] task { }
}
2021-12-15 16:53:12 +08:00
if lbt . triggerCondition == querypb . TriggerCondition_NodeDown {
2021-10-11 09:54:37 +08:00
for _ , id := range lbt . SourceNodeIDs {
err := lbt . cluster . removeNodeInfo ( id )
if err != nil {
2021-11-06 15:22:56 +08:00
//TODO:: clear node info after removeNodeInfo failed
2021-10-18 21:34:47 +08:00
log . Error ( "loadBalanceTask: occur error when removing node info from cluster" , zap . Int64 ( "nodeID" , id ) )
2021-10-11 09:54:37 +08:00
}
2021-06-22 14:10:09 +08:00
}
}
2021-10-11 09:54:37 +08:00
2021-10-18 21:34:47 +08:00
log . Debug ( "loadBalanceTask postExecute done" ,
2021-11-06 15:22:56 +08:00
zap . Int32 ( "trigger type" , int32 ( lbt . triggerCondition ) ) ,
2021-06-19 11:45:09 +08:00
zap . Int64s ( "sourceNodeIDs" , lbt . SourceNodeIDs ) ,
zap . Any ( "balanceReason" , lbt . BalanceReason ) ,
2021-10-14 20:18:33 +08:00
zap . Int64 ( "taskID" , lbt . getTaskID ( ) ) )
2021-06-30 16:18:13 +08:00
return nil
2021-04-15 15:15:46 +08:00
}
2021-06-30 16:18:13 +08:00
func assignInternalTask ( ctx context . Context ,
2021-12-21 11:57:39 +08:00
parentTask task , meta Meta , cluster Cluster ,
2021-06-26 16:08:11 +08:00
loadSegmentRequests [ ] * querypb . LoadSegmentsRequest ,
2021-10-11 09:54:37 +08:00
watchDmChannelRequests [ ] * querypb . WatchDmChannelsRequest ,
2021-11-12 18:49:10 +08:00
wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) ( [ ] task , error ) {
2021-11-18 19:35:15 +08:00
log . Debug ( "assignInternalTask: start assign task to query node" )
2021-11-11 12:56:42 +08:00
internalTasks := make ( [ ] task , 0 )
2021-11-12 18:49:10 +08:00
err := cluster . allocateSegmentsToQueryNode ( ctx , loadSegmentRequests , wait , excludeNodeIDs , includeNodeIDs )
2021-10-11 09:54:37 +08:00
if err != nil {
2021-11-11 12:56:42 +08:00
log . Error ( "assignInternalTask: assign segment to node failed" , zap . Any ( "load segments requests" , loadSegmentRequests ) )
return nil , err
2021-10-11 09:54:37 +08:00
}
2021-12-21 11:57:39 +08:00
log . Debug ( "assignInternalTask: assign segment to node success" )
2021-11-11 12:56:42 +08:00
err = cluster . allocateChannelsToQueryNode ( ctx , watchDmChannelRequests , wait , excludeNodeIDs )
2021-10-11 09:54:37 +08:00
if err != nil {
2021-11-11 12:56:42 +08:00
log . Error ( "assignInternalTask: assign dmChannel to node failed" , zap . Any ( "watch dmChannel requests" , watchDmChannelRequests ) )
return nil , err
2021-10-11 09:54:37 +08:00
}
2021-12-21 11:57:39 +08:00
log . Debug ( "assignInternalTask: assign dmChannel to node success" )
watchQueryChannelInfo := make ( map [ int64 ] [ ] UniqueID )
watchDeltaChannelInfo := make ( map [ int64 ] [ ] UniqueID )
addChannelWatchInfoFn := func ( nodeID int64 , collectionID UniqueID , watchInfo map [ int64 ] [ ] UniqueID ) {
if _ , ok := watchInfo [ nodeID ] ; ! ok {
watchInfo [ nodeID ] = [ ] UniqueID { }
}
findColID := false
for _ , colID := range watchInfo [ nodeID ] {
if colID == collectionID {
findColID = true
}
}
if ! findColID {
watchInfo [ nodeID ] = append ( watchInfo [ nodeID ] , collectionID )
}
}
2021-06-26 16:08:11 +08:00
2021-12-21 11:57:39 +08:00
mergedLoadSegmentReqs := make ( map [ UniqueID ] map [ int64 ] [ ] * querypb . LoadSegmentsRequest )
sizeCounts := make ( map [ UniqueID ] map [ int64 ] int )
2021-11-11 12:56:42 +08:00
for _ , req := range loadSegmentRequests {
nodeID := req . DstNodeID
2021-12-21 11:57:39 +08:00
collectionID := req . CollectionID
if _ , ok := mergedLoadSegmentReqs [ collectionID ] ; ! ok {
mergedLoadSegmentReqs [ collectionID ] = make ( map [ int64 ] [ ] * querypb . LoadSegmentsRequest )
sizeCounts [ collectionID ] = make ( map [ int64 ] int )
}
mergedLoadSegmentReqsPerCol := mergedLoadSegmentReqs [ collectionID ]
sizeCountsPerCol := sizeCounts [ collectionID ]
2021-11-11 12:56:42 +08:00
sizeOfReq := getSizeOfLoadSegmentReq ( req )
2021-12-21 11:57:39 +08:00
if _ , ok := mergedLoadSegmentReqsPerCol [ nodeID ] ; ! ok {
mergedLoadSegmentReqsPerCol [ nodeID ] = [ ] * querypb . LoadSegmentsRequest { }
mergedLoadSegmentReqsPerCol [ nodeID ] = append ( mergedLoadSegmentReqsPerCol [ nodeID ] , req )
sizeCountsPerCol [ nodeID ] = sizeOfReq
2021-10-20 22:32:36 +08:00
} else {
2021-12-21 11:57:39 +08:00
if sizeCountsPerCol [ nodeID ] + sizeOfReq > MaxSendSizeToEtcd {
mergedLoadSegmentReqsPerCol [ nodeID ] = append ( mergedLoadSegmentReqsPerCol [ nodeID ] , req )
sizeCountsPerCol [ nodeID ] = sizeOfReq
2021-10-20 22:32:36 +08:00
} else {
2021-12-21 11:57:39 +08:00
lastReq := mergedLoadSegmentReqsPerCol [ nodeID ] [ len ( mergedLoadSegmentReqsPerCol [ nodeID ] ) - 1 ]
2021-11-11 12:56:42 +08:00
lastReq . Infos = append ( lastReq . Infos , req . Infos ... )
2021-12-21 11:57:39 +08:00
sizeCountsPerCol [ nodeID ] += sizeOfReq
2021-10-20 22:32:36 +08:00
}
2021-06-26 16:08:11 +08:00
}
}
2021-12-21 11:57:39 +08:00
for collectionID , loadSegmentsReqsPerCol := range mergedLoadSegmentReqs {
for nodeID , loadSegmentReqs := range loadSegmentsReqsPerCol {
for _ , req := range loadSegmentReqs {
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
loadSegmentTask := & loadSegmentTask {
baseTask : baseTask ,
LoadSegmentsRequest : req ,
meta : meta ,
cluster : cluster ,
excludeNodeIDs : excludeNodeIDs ,
}
internalTasks = append ( internalTasks , loadSegmentTask )
2021-10-19 10:40:35 +08:00
}
2021-11-05 14:47:19 +08:00
2021-12-21 11:57:39 +08:00
if ! cluster . hasWatchedQueryChannel ( parentTask . traceCtx ( ) , nodeID , collectionID ) {
addChannelWatchInfoFn ( nodeID , collectionID , watchQueryChannelInfo )
}
if ! cluster . hasWatchedDeltaChannel ( parentTask . traceCtx ( ) , nodeID , collectionID ) {
addChannelWatchInfoFn ( nodeID , collectionID , watchDeltaChannelInfo )
2021-11-05 14:47:19 +08:00
}
}
2021-06-26 16:08:11 +08:00
}
2021-11-11 12:56:42 +08:00
for _ , req := range watchDmChannelRequests {
nodeID := req . NodeID
2021-10-14 20:18:33 +08:00
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
2021-10-18 21:34:47 +08:00
watchDmChannelTask := & watchDmChannelTask {
baseTask : baseTask ,
2021-11-11 12:56:42 +08:00
WatchDmChannelsRequest : req ,
2021-06-26 16:08:11 +08:00
meta : meta ,
cluster : cluster ,
2021-11-11 12:56:42 +08:00
excludeNodeIDs : excludeNodeIDs ,
2021-06-26 16:08:11 +08:00
}
2021-11-11 12:56:42 +08:00
internalTasks = append ( internalTasks , watchDmChannelTask )
2021-12-21 11:57:39 +08:00
if ! cluster . hasWatchedQueryChannel ( parentTask . traceCtx ( ) , nodeID , req . CollectionID ) {
addChannelWatchInfoFn ( nodeID , req . CollectionID , watchQueryChannelInfo )
2021-11-11 12:56:42 +08:00
}
2021-06-26 16:08:11 +08:00
}
2021-12-21 11:57:39 +08:00
for nodeID , collectionIDs := range watchQueryChannelInfo {
for _ , collectionID := range collectionIDs {
2021-12-21 13:50:54 +08:00
queryChannelInfo := meta . getQueryChannelInfoByID ( collectionID )
2021-10-14 20:18:33 +08:00
msgBase := proto . Clone ( parentTask . msgBase ( ) ) . ( * commonpb . MsgBase )
2021-07-13 14:16:00 +08:00
msgBase . MsgType = commonpb . MsgType_WatchQueryChannels
2021-06-26 16:08:11 +08:00
addQueryChannelRequest := & querypb . AddQueryChannelRequest {
2021-10-22 19:07:15 +08:00
Base : msgBase ,
NodeID : nodeID ,
CollectionID : collectionID ,
2021-12-15 16:53:12 +08:00
QueryChannel : queryChannelInfo . QueryChannel ,
QueryResultChannel : queryChannelInfo . QueryResultChannel ,
2021-10-22 19:07:15 +08:00
GlobalSealedSegments : queryChannelInfo . GlobalSealedSegments ,
2021-06-26 16:08:11 +08:00
}
2021-10-14 20:18:33 +08:00
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
2021-10-18 21:34:47 +08:00
watchQueryChannelTask := & watchQueryChannelTask {
baseTask : baseTask ,
2021-06-26 16:08:11 +08:00
AddQueryChannelRequest : addQueryChannelRequest ,
cluster : cluster ,
}
2021-11-11 12:56:42 +08:00
internalTasks = append ( internalTasks , watchQueryChannelTask )
2021-06-26 16:08:11 +08:00
}
}
2021-12-21 11:57:39 +08:00
for nodeID , collectionIDs := range watchDeltaChannelInfo {
for _ , collectionID := range collectionIDs {
deltaChannelInfo , err := meta . getDeltaChannelsByCollectionID ( collectionID )
if err != nil {
return nil , err
}
msgBase := proto . Clone ( parentTask . msgBase ( ) ) . ( * commonpb . MsgBase )
msgBase . MsgType = commonpb . MsgType_WatchDeltaChannels
watchDeltaRequest := & querypb . WatchDeltaChannelsRequest {
Base : msgBase ,
CollectionID : collectionID ,
Infos : deltaChannelInfo ,
}
watchDeltaRequest . NodeID = nodeID
baseTask := newBaseTask ( ctx , parentTask . getTriggerCondition ( ) )
baseTask . setParentTask ( parentTask )
watchDeltaTask := & watchDeltaChannelTask {
baseTask : baseTask ,
WatchDeltaChannelsRequest : watchDeltaRequest ,
meta : meta ,
cluster : cluster ,
excludeNodeIDs : [ ] int64 { } ,
}
internalTasks = append ( internalTasks , watchDeltaTask )
}
}
2021-11-11 12:56:42 +08:00
return internalTasks , nil
2021-06-26 16:08:11 +08:00
}
2021-10-19 10:40:35 +08:00
func getSizeOfLoadSegmentReq ( req * querypb . LoadSegmentsRequest ) int {
2021-10-20 19:27:27 +08:00
return proto . Size ( req )
2021-10-19 10:40:35 +08:00
}
2021-11-25 17:37:15 +08:00
func generateWatchDeltaChannelInfo ( info * datapb . VchannelInfo ) ( * datapb . VchannelInfo , error ) {
2021-12-23 18:39:11 +08:00
deltaChannelName , err := rootcoord . ConvertChannelName ( info . ChannelName , Params . QueryCoordCfg . DmlChannelPrefix , Params . QueryCoordCfg . DeltaChannelPrefix )
2021-11-25 17:37:15 +08:00
if err != nil {
return nil , err
}
deltaChannel := proto . Clone ( info ) . ( * datapb . VchannelInfo )
deltaChannel . ChannelName = deltaChannelName
deltaChannel . UnflushedSegments = nil
deltaChannel . FlushedSegments = nil
deltaChannel . DroppedSegments = nil
return deltaChannel , nil
}
2021-11-25 18:49:16 +08:00
func mergeWatchDeltaChannelInfo ( infos [ ] * datapb . VchannelInfo ) [ ] * datapb . VchannelInfo {
minPositions := make ( map [ string ] int )
for index , info := range infos {
_ , ok := minPositions [ info . ChannelName ]
if ! ok {
minPositions [ info . ChannelName ] = index
}
minTimeStampIndex := minPositions [ info . ChannelName ]
if info . SeekPosition . GetTimestamp ( ) < infos [ minTimeStampIndex ] . SeekPosition . GetTimestamp ( ) {
minPositions [ info . ChannelName ] = index
}
}
var result [ ] * datapb . VchannelInfo
for _ , index := range minPositions {
result = append ( result , infos [ index ] )
}
2021-12-01 16:39:31 +08:00
log . Debug ( "merge delta channels finished" ,
zap . Any ( "origin info length" , len ( infos ) ) ,
zap . Any ( "merged info length" , len ( result ) ) ,
)
2021-11-25 18:49:16 +08:00
return result
}
2021-12-13 10:29:27 +08:00
func getRecoveryInfo ( ctx context . Context , dataCoord types . DataCoord , collectionID UniqueID , partitionID UniqueID ) ( [ ] * datapb . VchannelInfo , [ ] * datapb . SegmentBinlogs , error ) {
ctx2 , cancel2 := context . WithTimeout ( ctx , timeoutForRPC )
defer cancel2 ( )
getRecoveryInfoRequest := & datapb . GetRecoveryInfoRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_GetRecoveryInfo ,
} ,
CollectionID : collectionID ,
PartitionID : partitionID ,
}
recoveryInfo , err := dataCoord . GetRecoveryInfo ( ctx2 , getRecoveryInfoRequest )
if err != nil {
return nil , nil , err
}
if recoveryInfo . Status . ErrorCode != commonpb . ErrorCode_Success {
err = errors . New ( recoveryInfo . Status . Reason )
return nil , nil , err
}
return recoveryInfo . Channels , recoveryInfo . Binlogs , nil
}
2021-12-21 11:57:39 +08:00
func mergeDmChannelInfo ( infos [ ] * datapb . VchannelInfo ) map [ string ] * datapb . VchannelInfo {
minPositions := make ( map [ string ] * datapb . VchannelInfo )
for _ , info := range infos {
if _ , ok := minPositions [ info . ChannelName ] ; ! ok {
minPositions [ info . ChannelName ] = info
continue
}
minPositionInfo := minPositions [ info . ChannelName ]
if info . SeekPosition . GetTimestamp ( ) < minPositionInfo . SeekPosition . GetTimestamp ( ) {
minPositionInfo . SeekPosition = info . SeekPosition
minPositionInfo . DroppedSegments = append ( minPositionInfo . DroppedSegments , info . DroppedSegments ... )
minPositionInfo . UnflushedSegments = append ( minPositionInfo . UnflushedSegments , info . UnflushedSegments ... )
minPositionInfo . FlushedSegments = append ( minPositionInfo . FlushedSegments , info . FlushedSegments ... )
}
}
return minPositions
}