// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package querycoord import ( "context" "errors" "fmt" "sync" "time" "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/trace" "github.com/opentracing/opentracing-go" ) const timeoutForRPC = 10 * time.Second const ( triggerTaskPrefix = "queryCoord-triggerTask" activeTaskPrefix = "queryCoord-activeTask" taskInfoPrefix = "queryCoord-taskInfo" loadBalanceInfoPrefix = "queryCoord-loadBalanceInfo" ) const ( // MaxRetryNum is the maximum number of times that each task can be retried MaxRetryNum = 5 // MaxSendSizeToEtcd is the default limit size of etcd messages that can be sent and received // MaxSendSizeToEtcd = 2097152 // Limit size of every loadSegmentReq to 200k MaxSendSizeToEtcd = 200000 ) type taskState int const ( taskUndo taskState = 0 taskDoing taskState = 1 taskDone taskState = 3 taskExpired taskState = 4 taskFailed taskState = 5 ) type task interface { traceCtx() context.Context getTaskID() UniqueID // return ReqId setTaskID(id UniqueID) msgBase() *commonpb.MsgBase msgType() commonpb.MsgType timestamp() Timestamp getTriggerCondition() querypb.TriggerCondition preExecute(ctx context.Context) error execute(ctx context.Context) error postExecute(ctx context.Context) error reschedule(ctx context.Context) ([]task, error) rollBack(ctx context.Context) []task waitToFinish() error notify(err error) taskPriority() querypb.TriggerCondition setParentTask(t task) getParentTask() task getChildTask() []task addChildTask(t task) removeChildTaskByID(taskID UniqueID) isValid() bool marshal() ([]byte, error) getState() taskState setState(state taskState) isRetryable() bool setResultInfo(err error) getResultInfo() *commonpb.Status updateTaskProcess() } type baseTask struct { condition ctx context.Context cancel context.CancelFunc result *commonpb.Status resultMu sync.RWMutex state taskState stateMu sync.RWMutex retryCount int //sync.RWMutex taskID UniqueID triggerCondition querypb.TriggerCondition parentTask task childTasks []task childTasksMu sync.RWMutex } func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask { childCtx, cancel := context.WithCancel(ctx) condition := newTaskCondition(childCtx) baseTask := &baseTask{ ctx: childCtx, cancel: cancel, condition: condition, state: taskUndo, retryCount: MaxRetryNum, triggerCondition: triggerType, childTasks: []task{}, } return baseTask } // getTaskID function returns the unique taskID of the trigger task func (bt *baseTask) getTaskID() UniqueID { return bt.taskID } // setTaskID function sets the trigger task with a unique id, which is allocated by tso func (bt *baseTask) setTaskID(id UniqueID) { bt.taskID = id } func (bt *baseTask) traceCtx() context.Context { return bt.ctx } func (bt *baseTask) getTriggerCondition() querypb.TriggerCondition { return bt.triggerCondition } func (bt *baseTask) taskPriority() querypb.TriggerCondition { return bt.triggerCondition } func (bt *baseTask) setParentTask(t task) { bt.parentTask = t } func (bt *baseTask) getParentTask() task { return bt.parentTask } // GetChildTask function returns all the child tasks of the trigger task // Child task may be loadSegmentTask, watchDmChannelTask or watchQueryChannelTask func (bt *baseTask) getChildTask() []task { bt.childTasksMu.RLock() defer bt.childTasksMu.RUnlock() return bt.childTasks } func (bt *baseTask) addChildTask(t task) { bt.childTasksMu.Lock() defer bt.childTasksMu.Unlock() bt.childTasks = append(bt.childTasks, t) } func (bt *baseTask) removeChildTaskByID(taskID UniqueID) { bt.childTasksMu.Lock() defer bt.childTasksMu.Unlock() result := make([]task, 0) for _, t := range bt.childTasks { if t.getTaskID() != taskID { result = append(result, t) } } bt.childTasks = result } func (bt *baseTask) isValid() bool { return true } func (bt *baseTask) reschedule(ctx context.Context) ([]task, error) { return nil, nil } // State returns the state of task, such as taskUndo, taskDoing, taskDone, taskExpired, taskFailed func (bt *baseTask) getState() taskState { bt.stateMu.RLock() defer bt.stateMu.RUnlock() return bt.state } func (bt *baseTask) setState(state taskState) { bt.stateMu.Lock() defer bt.stateMu.Unlock() bt.state = state } func (bt *baseTask) isRetryable() bool { return bt.retryCount > 0 } func (bt *baseTask) setResultInfo(err error) { bt.resultMu.Lock() defer bt.resultMu.Unlock() if bt.result == nil { bt.result = &commonpb.Status{} } if err == nil { bt.result.ErrorCode = commonpb.ErrorCode_Success bt.result.Reason = "" return } bt.result.ErrorCode = commonpb.ErrorCode_UnexpectedError bt.result.Reason = bt.result.Reason + ", " + err.Error() } func (bt *baseTask) getResultInfo() *commonpb.Status { bt.resultMu.RLock() defer bt.resultMu.RUnlock() return proto.Clone(bt.result).(*commonpb.Status) } func (bt *baseTask) updateTaskProcess() { // TODO:: } func (bt *baseTask) rollBack(ctx context.Context) []task { //TODO:: return nil } type loadCollectionTask struct { *baseTask *querypb.LoadCollectionRequest rootCoord types.RootCoord dataCoord types.DataCoord indexCoord types.IndexCoord cluster Cluster meta Meta } func (lct *loadCollectionTask) msgBase() *commonpb.MsgBase { return lct.Base } func (lct *loadCollectionTask) marshal() ([]byte, error) { return proto.Marshal(lct.LoadCollectionRequest) } func (lct *loadCollectionTask) msgType() commonpb.MsgType { return lct.Base.MsgType } func (lct *loadCollectionTask) timestamp() Timestamp { return lct.Base.Timestamp } func (lct *loadCollectionTask) updateTaskProcess() { collectionID := lct.CollectionID childTasks := lct.getChildTask() allDone := true for _, t := range childTasks { if t.getState() != taskDone { allDone = false } } if allDone { err := lct.meta.setLoadPercentage(collectionID, 0, 100, querypb.LoadType_loadCollection) if err != nil { log.Error("loadCollectionTask: set load percentage to meta's collectionInfo", zap.Int64("collectionID", collectionID)) lct.setResultInfo(err) } } } func (lct *loadCollectionTask) preExecute(ctx context.Context) error { collectionID := lct.CollectionID schema := lct.Schema lct.setResultInfo(nil) log.Debug("start do loadCollectionTask", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Stringer("schema", schema)) return nil } func (lct *loadCollectionTask) execute(ctx context.Context) error { defer func() { lct.retryCount-- }() collectionID := lct.CollectionID showPartitionRequest := &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, }, CollectionID: collectionID, } ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC) defer cancel2() showPartitionResponse, err := lct.rootCoord.ShowPartitions(ctx2, showPartitionRequest) if err != nil { log.Error("loadCollectionTask: showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err)) lct.setResultInfo(err) return err } if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_Success { err = errors.New(showPartitionResponse.Status.Reason) log.Error("loadCollectionTask: showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err)) lct.setResultInfo(err) return err } log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs)) partitionIDs := showPartitionResponse.PartitionIDs toLoadPartitionIDs := make([]UniqueID, 0) hasCollection := lct.meta.hasCollection(collectionID) watchPartition := false if hasCollection { watchPartition = true loadType, _ := lct.meta.getLoadType(collectionID) if loadType == querypb.LoadType_loadCollection { for _, partitionID := range partitionIDs { hasReleasePartition := lct.meta.hasReleasePartition(collectionID, partitionID) if hasReleasePartition { toLoadPartitionIDs = append(toLoadPartitionIDs, partitionID) } } } else { for _, partitionID := range partitionIDs { hasPartition := lct.meta.hasPartition(collectionID, partitionID) if !hasPartition { toLoadPartitionIDs = append(toLoadPartitionIDs, partitionID) } } } } else { toLoadPartitionIDs = partitionIDs } log.Debug("loadCollectionTask: toLoadPartitionIDs", zap.Int64s("partitionIDs", toLoadPartitionIDs)) lct.meta.addCollection(collectionID, lct.Schema) lct.meta.setLoadType(collectionID, querypb.LoadType_loadCollection) for _, id := range toLoadPartitionIDs { lct.meta.addPartition(collectionID, id) } loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) channelsToWatch := make([]string, 0) var watchDeltaChannels []*datapb.VchannelInfo for _, partitionID := range toLoadPartitionIDs { dmChannelInfos, binlogs, err := getRecoveryInfo(lct.ctx, lct.dataCoord, collectionID, partitionID) if err != nil { log.Error("loadCollectionTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) lct.setResultInfo(err) return err } for _, segmentBingLog := range binlogs { segmentID := segmentBingLog.SegmentID segmentLoadInfo := &querypb.SegmentLoadInfo{ SegmentID: segmentID, PartitionID: partitionID, CollectionID: collectionID, BinlogPaths: segmentBingLog.FieldBinlogs, NumOfRows: segmentBingLog.NumOfRows, Statslogs: segmentBingLog.Statslogs, Deltalogs: segmentBingLog.Deltalogs, } indexInfo, err := getIndexInfo(ctx, &querypb.SegmentInfo{ CollectionID: collectionID, SegmentID: segmentID, }, lct.rootCoord, lct.indexCoord) if err == nil && indexInfo.enableIndex { segmentLoadInfo.EnableIndex = true segmentLoadInfo.IndexPathInfos = indexInfo.infos } msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_LoadSegments loadSegmentReq := &querypb.LoadSegmentsRequest{ Base: msgBase, Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, Schema: lct.Schema, CollectionID: collectionID, } loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } for _, info := range dmChannelInfos { deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } for _, info := range dmChannelInfos { channel := info.ChannelName if !watchPartition { merged := false for index, channelName := range channelsToWatch { if channel == channelName { merged = true oldInfo := watchDmChannelReqs[index].Infos[0] newInfo := mergeVChannelInfo(oldInfo, info) watchDmChannelReqs[index].Infos = []*datapb.VchannelInfo{newInfo} break } } if !merged { msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDmChannels watchRequest := &querypb.WatchDmChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: []*datapb.VchannelInfo{info}, Schema: lct.Schema, } channelsToWatch = append(channelsToWatch, channel) watchDmChannelReqs = append(watchDmChannelReqs, watchRequest) } } else { msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDmChannels watchRequest := &querypb.WatchDmChannelsRequest{ Base: msgBase, CollectionID: collectionID, PartitionIDs: toLoadPartitionIDs, Infos: []*datapb.VchannelInfo{info}, Schema: lct.Schema, } channelsToWatch = append(channelsToWatch, channel) watchDmChannelReqs = append(watchDmChannelReqs, watchRequest) } } } mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule err = lct.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) if err != nil { return err } internalTasks, err := assignInternalTask(ctx, collectionID, lct, lct.meta, lct.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, false, nil, nil) if err != nil { log.Warn("loadCollectionTask: assign child task failed", zap.Int64("collectionID", collectionID)) lct.setResultInfo(err) return err } for _, internalTask := range internalTasks { lct.addChildTask(internalTask) log.Debug("loadCollectionTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("collectionID", collectionID), zap.Any("task", internalTask)) } log.Debug("loadCollectionTask: assign child task done", zap.Int64("collectionID", collectionID)) log.Debug("LoadCollection execute done", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } func (lct *loadCollectionTask) postExecute(ctx context.Context) error { collectionID := lct.CollectionID if lct.result.ErrorCode != commonpb.ErrorCode_Success { lct.childTasks = []task{} err := lct.meta.releaseCollection(collectionID) if err != nil { log.Error("loadCollectionTask: occur error when release collection info from meta", zap.Error(err)) } } log.Debug("loadCollectionTask postExecute done", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } func (lct *loadCollectionTask) rollBack(ctx context.Context) []task { nodes, _ := lct.cluster.onlineNodes() resultTasks := make([]task, 0) //TODO::call rootCoord.ReleaseDQLMessageStream for nodeID := range nodes { //brute force rollBack, should optimize req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, MsgID: lct.Base.MsgID, Timestamp: lct.Base.Timestamp, SourceID: lct.Base.SourceID, }, DbID: lct.DbID, CollectionID: lct.CollectionID, NodeID: nodeID, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest) baseTask.setParentTask(lct) releaseCollectionTask := &releaseCollectionTask{ baseTask: baseTask, ReleaseCollectionRequest: req, cluster: lct.cluster, } resultTasks = append(resultTasks, releaseCollectionTask) } log.Debug("loadCollectionTask: rollBack loadCollectionTask", zap.Any("loadCollectionTask", lct), zap.Any("rollBack task", resultTasks)) return resultTasks } // releaseCollectionTask will release all the data of this collection on query nodes type releaseCollectionTask struct { *baseTask *querypb.ReleaseCollectionRequest cluster Cluster meta Meta rootCoord types.RootCoord } func (rct *releaseCollectionTask) msgBase() *commonpb.MsgBase { return rct.Base } func (rct *releaseCollectionTask) marshal() ([]byte, error) { return proto.Marshal(rct.ReleaseCollectionRequest) } func (rct *releaseCollectionTask) msgType() commonpb.MsgType { return rct.Base.MsgType } func (rct *releaseCollectionTask) timestamp() Timestamp { return rct.Base.Timestamp } func (rct *releaseCollectionTask) preExecute(context.Context) error { collectionID := rct.CollectionID rct.setResultInfo(nil) log.Debug("start do releaseCollectionTask", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } func (rct *releaseCollectionTask) execute(ctx context.Context) error { defer func() { rct.retryCount-- }() collectionID := rct.CollectionID // if nodeID ==0, it means that the release request has not been assigned to the specified query node if rct.NodeID <= 0 { rct.meta.releaseCollection(collectionID) releaseDQLMessageStreamReq := &proxypb.ReleaseDQLMessageStreamRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RemoveQueryChannels, MsgID: rct.Base.MsgID, Timestamp: rct.Base.Timestamp, SourceID: rct.Base.SourceID, }, DbID: rct.DbID, CollectionID: rct.CollectionID, } ctx2, cancel2 := context.WithTimeout(rct.ctx, timeoutForRPC) defer cancel2() res, err := rct.rootCoord.ReleaseDQLMessageStream(ctx2, releaseDQLMessageStreamReq) if err != nil { log.Error("releaseCollectionTask: release collection end, releaseDQLMessageStream occur error", zap.Int64("collectionID", rct.CollectionID), zap.Error(err)) rct.setResultInfo(err) return err } if res.ErrorCode != commonpb.ErrorCode_Success { log.Error("releaseCollectionTask: release collection end, releaseDQLMessageStream occur error", zap.Int64("collectionID", rct.CollectionID), zap.String("error", res.Reason)) err = errors.New(res.Reason) rct.setResultInfo(err) return err } nodes, err := rct.cluster.onlineNodes() if err != nil { log.Debug(err.Error()) } for nodeID := range nodes { req := proto.Clone(rct.ReleaseCollectionRequest).(*querypb.ReleaseCollectionRequest) req.NodeID = nodeID baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest) baseTask.setParentTask(rct) releaseCollectionTask := &releaseCollectionTask{ baseTask: baseTask, ReleaseCollectionRequest: req, cluster: rct.cluster, } rct.addChildTask(releaseCollectionTask) log.Debug("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask)) } } else { err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest) if err != nil { log.Warn("releaseCollectionTask: release collection end, node occur error", zap.Int64("nodeID", rct.NodeID)) rct.setResultInfo(err) return err } } log.Debug("releaseCollectionTask Execute done", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) return nil } func (rct *releaseCollectionTask) postExecute(context.Context) error { collectionID := rct.CollectionID if rct.result.ErrorCode != commonpb.ErrorCode_Success { rct.childTasks = []task{} } log.Debug("releaseCollectionTask postExecute done", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) return nil } func (rct *releaseCollectionTask) rollBack(ctx context.Context) []task { //TODO:: //if taskID == 0, recovery meta //if taskID != 0, recovery collection on queryNode return nil } // loadPartitionTask will load all the data of this partition to query nodes type loadPartitionTask struct { *baseTask *querypb.LoadPartitionsRequest rootCoord types.RootCoord dataCoord types.DataCoord indexCoord types.IndexCoord cluster Cluster meta Meta addCol bool } func (lpt *loadPartitionTask) msgBase() *commonpb.MsgBase { return lpt.Base } func (lpt *loadPartitionTask) marshal() ([]byte, error) { return proto.Marshal(lpt.LoadPartitionsRequest) } func (lpt *loadPartitionTask) msgType() commonpb.MsgType { return lpt.Base.MsgType } func (lpt *loadPartitionTask) timestamp() Timestamp { return lpt.Base.Timestamp } func (lpt *loadPartitionTask) updateTaskProcess() { collectionID := lpt.CollectionID partitionIDs := lpt.PartitionIDs childTasks := lpt.getChildTask() allDone := true for _, t := range childTasks { if t.getState() != taskDone { allDone = false } } if allDone { for _, id := range partitionIDs { err := lpt.meta.setLoadPercentage(collectionID, id, 100, querypb.LoadType_LoadPartition) if err != nil { log.Error("loadPartitionTask: set load percentage to meta's collectionInfo", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", id)) lpt.setResultInfo(err) } } } } func (lpt *loadPartitionTask) preExecute(context.Context) error { collectionID := lpt.CollectionID lpt.setResultInfo(nil) log.Debug("start do loadPartitionTask", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } func (lpt *loadPartitionTask) execute(ctx context.Context) error { defer func() { lpt.retryCount-- }() collectionID := lpt.CollectionID partitionIDs := lpt.PartitionIDs if !lpt.meta.hasCollection(collectionID) { lpt.meta.addCollection(collectionID, lpt.Schema) lpt.meta.setLoadType(collectionID, querypb.LoadType_LoadPartition) lpt.addCol = true } for _, id := range partitionIDs { lpt.meta.addPartition(collectionID, id) } loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) channelsToWatch := make([]string, 0) watchDmReqs := make([]*querypb.WatchDmChannelsRequest, 0) var watchDeltaChannels []*datapb.VchannelInfo for _, partitionID := range partitionIDs { dmChannelInfos, binlogs, err := getRecoveryInfo(lpt.ctx, lpt.dataCoord, collectionID, partitionID) if err != nil { log.Error("loadPartitionTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) lpt.setResultInfo(err) return err } for _, segmentBingLog := range binlogs { segmentID := segmentBingLog.SegmentID segmentLoadInfo := &querypb.SegmentLoadInfo{ SegmentID: segmentID, PartitionID: partitionID, CollectionID: collectionID, BinlogPaths: segmentBingLog.FieldBinlogs, NumOfRows: segmentBingLog.NumOfRows, Statslogs: segmentBingLog.Statslogs, Deltalogs: segmentBingLog.Deltalogs, } indexInfo, err := getIndexInfo(ctx, &querypb.SegmentInfo{ CollectionID: collectionID, SegmentID: segmentID, }, lpt.rootCoord, lpt.indexCoord) if err == nil && indexInfo.enableIndex { segmentLoadInfo.EnableIndex = true segmentLoadInfo.IndexPathInfos = indexInfo.infos } msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_LoadSegments loadSegmentReq := &querypb.LoadSegmentsRequest{ Base: msgBase, Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, Schema: lpt.Schema, CollectionID: collectionID, } loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } for _, info := range dmChannelInfos { deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } for _, info := range dmChannelInfos { // watch dml channels channel := info.ChannelName msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDmChannels watchDmRequest := &querypb.WatchDmChannelsRequest{ Base: msgBase, CollectionID: collectionID, PartitionIDs: partitionIDs, Infos: []*datapb.VchannelInfo{info}, Schema: lpt.Schema, } channelsToWatch = append(channelsToWatch, channel) watchDmReqs = append(watchDmReqs, watchDmRequest) log.Debug("loadPartitionTask: set watchDmChannelsRequests", zap.Any("request", watchDmRequest), zap.Int64("collectionID", collectionID)) } } mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule err := lpt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) if err != nil { return err } internalTasks, err := assignInternalTask(ctx, collectionID, lpt, lpt.meta, lpt.cluster, loadSegmentReqs, watchDmReqs, watchDeltaChannelReq, false, nil, nil) if err != nil { log.Warn("loadPartitionTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) lpt.setResultInfo(err) return err } for _, internalTask := range internalTasks { lpt.addChildTask(internalTask) log.Debug("loadPartitionTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("collectionID", collectionID), zap.Any("task", internalTask)) } log.Debug("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) log.Debug("loadPartitionTask Execute done", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) return nil } func (lpt *loadPartitionTask) postExecute(ctx context.Context) error { collectionID := lpt.CollectionID partitionIDs := lpt.PartitionIDs if lpt.result.ErrorCode != commonpb.ErrorCode_Success { lpt.childTasks = []task{} if lpt.addCol { err := lpt.meta.releaseCollection(collectionID) if err != nil { log.Error("loadPartitionTask: occur error when release collection info from meta", zap.Error(err)) } } else { for _, partitionID := range partitionIDs { err := lpt.meta.releasePartition(collectionID, partitionID) if err != nil { log.Error("loadPartitionTask: occur error when release partition info from meta", zap.Error(err)) } } } } log.Debug("loadPartitionTask postExecute done", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) return nil } func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task { partitionIDs := lpt.PartitionIDs resultTasks := make([]task, 0) //brute force rollBack, should optimize if lpt.addCol { nodes, _ := lpt.cluster.onlineNodes() for nodeID := range nodes { req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, MsgID: lpt.Base.MsgID, Timestamp: lpt.Base.Timestamp, SourceID: lpt.Base.SourceID, }, DbID: lpt.DbID, CollectionID: lpt.CollectionID, NodeID: nodeID, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest) baseTask.setParentTask(lpt) releaseCollectionTask := &releaseCollectionTask{ baseTask: baseTask, ReleaseCollectionRequest: req, cluster: lpt.cluster, } resultTasks = append(resultTasks, releaseCollectionTask) } } else { nodes, _ := lpt.cluster.onlineNodes() for nodeID := range nodes { req := &querypb.ReleasePartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleasePartitions, MsgID: lpt.Base.MsgID, Timestamp: lpt.Base.Timestamp, SourceID: lpt.Base.SourceID, }, DbID: lpt.DbID, CollectionID: lpt.CollectionID, PartitionIDs: partitionIDs, NodeID: nodeID, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest) baseTask.setParentTask(lpt) releasePartitionTask := &releasePartitionTask{ baseTask: baseTask, ReleasePartitionsRequest: req, cluster: lpt.cluster, } resultTasks = append(resultTasks, releasePartitionTask) } } log.Debug("loadPartitionTask: rollBack loadPartitionTask", zap.Any("loadPartitionTask", lpt), zap.Any("rollBack task", resultTasks)) return resultTasks } // releasePartitionTask will release all the data of this partition on query nodes type releasePartitionTask struct { *baseTask *querypb.ReleasePartitionsRequest cluster Cluster } func (rpt *releasePartitionTask) msgBase() *commonpb.MsgBase { return rpt.Base } func (rpt *releasePartitionTask) marshal() ([]byte, error) { return proto.Marshal(rpt.ReleasePartitionsRequest) } func (rpt *releasePartitionTask) msgType() commonpb.MsgType { return rpt.Base.MsgType } func (rpt *releasePartitionTask) timestamp() Timestamp { return rpt.Base.Timestamp } func (rpt *releasePartitionTask) preExecute(context.Context) error { collectionID := rpt.CollectionID rpt.setResultInfo(nil) log.Debug("start do releasePartitionTask", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } func (rpt *releasePartitionTask) execute(ctx context.Context) error { defer func() { rpt.retryCount-- }() collectionID := rpt.CollectionID partitionIDs := rpt.PartitionIDs // if nodeID ==0, it means that the release request has not been assigned to the specified query node if rpt.NodeID <= 0 { nodes, err := rpt.cluster.onlineNodes() if err != nil { log.Debug(err.Error()) } for nodeID := range nodes { req := proto.Clone(rpt.ReleasePartitionsRequest).(*querypb.ReleasePartitionsRequest) req.NodeID = nodeID baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest) baseTask.setParentTask(rpt) releasePartitionTask := &releasePartitionTask{ baseTask: baseTask, ReleasePartitionsRequest: req, cluster: rpt.cluster, } rpt.addChildTask(releasePartitionTask) log.Debug("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Any("task", releasePartitionTask)) } } else { err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest) if err != nil { log.Warn("ReleasePartitionsTask: release partition end, node occur error", zap.String("nodeID", fmt.Sprintln(rpt.NodeID))) rpt.setResultInfo(err) return err } } log.Debug("releasePartitionTask Execute done", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("nodeID", rpt.NodeID)) return nil } func (rpt *releasePartitionTask) postExecute(context.Context) error { collectionID := rpt.CollectionID partitionIDs := rpt.PartitionIDs if rpt.result.ErrorCode != commonpb.ErrorCode_Success { rpt.childTasks = []task{} } log.Debug("releasePartitionTask postExecute done", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("nodeID", rpt.NodeID)) return nil } func (rpt *releasePartitionTask) rollBack(ctx context.Context) []task { //TODO:: //if taskID == 0, recovery meta //if taskID != 0, recovery partition on queryNode return nil } type loadSegmentTask struct { *baseTask *querypb.LoadSegmentsRequest meta Meta cluster Cluster excludeNodeIDs []int64 } func (lst *loadSegmentTask) msgBase() *commonpb.MsgBase { return lst.Base } func (lst *loadSegmentTask) marshal() ([]byte, error) { return proto.Marshal(lst.LoadSegmentsRequest) } func (lst *loadSegmentTask) isValid() bool { online, err := lst.cluster.isOnline(lst.DstNodeID) if err != nil { return false } return lst.ctx != nil && online } func (lst *loadSegmentTask) msgType() commonpb.MsgType { return lst.Base.MsgType } func (lst *loadSegmentTask) timestamp() Timestamp { return lst.Base.Timestamp } func (lst *loadSegmentTask) updateTaskProcess() { parentTask := lst.getParentTask() if parentTask == nil { log.Warn("loadSegmentTask: parentTask should not be nil") return } parentTask.updateTaskProcess() } func (lst *loadSegmentTask) preExecute(context.Context) error { segmentIDs := make([]UniqueID, 0) for _, info := range lst.Infos { segmentIDs = append(segmentIDs, info.SegmentID) } lst.setResultInfo(nil) log.Debug("start do loadSegmentTask", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("loaded nodeID", lst.DstNodeID), zap.Int64("taskID", lst.getTaskID())) return nil } func (lst *loadSegmentTask) execute(ctx context.Context) error { defer func() { lst.retryCount-- }() err := lst.cluster.loadSegments(ctx, lst.DstNodeID, lst.LoadSegmentsRequest) if err != nil { log.Warn("loadSegmentTask: loadSegment occur error", zap.Int64("taskID", lst.getTaskID())) lst.setResultInfo(err) return err } log.Debug("loadSegmentTask Execute done", zap.Int64("taskID", lst.getTaskID())) return nil } func (lst *loadSegmentTask) postExecute(context.Context) error { log.Debug("loadSegmentTask postExecute done", zap.Int64("taskID", lst.getTaskID())) return nil } func (lst *loadSegmentTask) reschedule(ctx context.Context) ([]task, error) { loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) collectionID := lst.CollectionID for _, info := range lst.Infos { msgBase := proto.Clone(lst.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_LoadSegments req := &querypb.LoadSegmentsRequest{ Base: msgBase, Infos: []*querypb.SegmentLoadInfo{info}, Schema: lst.Schema, SourceNodeID: lst.SourceNodeID, CollectionID: lst.CollectionID, } loadSegmentReqs = append(loadSegmentReqs, req) } if lst.excludeNodeIDs == nil { lst.excludeNodeIDs = []int64{} } lst.excludeNodeIDs = append(lst.excludeNodeIDs, lst.DstNodeID) deltaChannelInfos, err := lst.meta.getDeltaChannelsByCollectionID(collectionID) if err != nil { return nil, err } msgBase := proto.Clone(lst.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: deltaChannelInfos, } log.Debug("assignInternalTask: add a watchDeltaChannelTask childTask", zap.Any("task", watchDeltaRequest)) //TODO:: wait or not according msgType reScheduledTasks, err := assignInternalTask(ctx, collectionID, lst.getParentTask(), lst.meta, lst.cluster, loadSegmentReqs, nil, nil, false, lst.excludeNodeIDs, nil) if err != nil { log.Error("loadSegment reschedule failed", zap.Int64s("excludeNodes", lst.excludeNodeIDs), zap.Error(err)) return nil, err } return reScheduledTasks, nil } type releaseSegmentTask struct { *baseTask *querypb.ReleaseSegmentsRequest cluster Cluster } func (rst *releaseSegmentTask) msgBase() *commonpb.MsgBase { return rst.Base } func (rst *releaseSegmentTask) marshal() ([]byte, error) { return proto.Marshal(rst.ReleaseSegmentsRequest) } func (rst *releaseSegmentTask) isValid() bool { online, err := rst.cluster.isOnline(rst.NodeID) if err != nil { return false } return rst.ctx != nil && online } func (rst *releaseSegmentTask) msgType() commonpb.MsgType { return rst.Base.MsgType } func (rst *releaseSegmentTask) timestamp() Timestamp { return rst.Base.Timestamp } func (rst *releaseSegmentTask) preExecute(context.Context) error { segmentIDs := rst.SegmentIDs rst.setResultInfo(nil) log.Debug("start do releaseSegmentTask", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("loaded nodeID", rst.NodeID), zap.Int64("taskID", rst.getTaskID())) return nil } func (rst *releaseSegmentTask) execute(ctx context.Context) error { defer func() { rst.retryCount-- }() err := rst.cluster.releaseSegments(rst.ctx, rst.NodeID, rst.ReleaseSegmentsRequest) if err != nil { log.Warn("releaseSegmentTask: releaseSegment occur error", zap.Int64("taskID", rst.getTaskID())) rst.setResultInfo(err) return err } log.Debug("releaseSegmentTask Execute done", zap.Int64s("segmentIDs", rst.SegmentIDs), zap.Int64("taskID", rst.getTaskID())) return nil } func (rst *releaseSegmentTask) postExecute(context.Context) error { segmentIDs := rst.SegmentIDs log.Debug("releaseSegmentTask postExecute done", zap.Int64s("segmentIDs", segmentIDs), zap.Int64("taskID", rst.getTaskID())) return nil } type watchDmChannelTask struct { *baseTask *querypb.WatchDmChannelsRequest meta Meta cluster Cluster excludeNodeIDs []int64 } func (wdt *watchDmChannelTask) msgBase() *commonpb.MsgBase { return wdt.Base } func (wdt *watchDmChannelTask) marshal() ([]byte, error) { return proto.Marshal(wdt.WatchDmChannelsRequest) } func (wdt *watchDmChannelTask) isValid() bool { online, err := wdt.cluster.isOnline(wdt.NodeID) if err != nil { return false } return wdt.ctx != nil && online } func (wdt *watchDmChannelTask) msgType() commonpb.MsgType { return wdt.Base.MsgType } func (wdt *watchDmChannelTask) timestamp() Timestamp { return wdt.Base.Timestamp } func (wdt *watchDmChannelTask) updateTaskProcess() { parentTask := wdt.getParentTask() if parentTask == nil { log.Warn("watchDmChannelTask: parentTask should not be nil") return } parentTask.updateTaskProcess() } func (wdt *watchDmChannelTask) preExecute(context.Context) error { channelInfos := wdt.Infos channels := make([]string, 0) for _, info := range channelInfos { channels = append(channels, info.ChannelName) } wdt.setResultInfo(nil) log.Debug("start do watchDmChannelTask", zap.Strings("dmChannels", channels), zap.Int64("loaded nodeID", wdt.NodeID), zap.Int64("taskID", wdt.getTaskID())) return nil } func (wdt *watchDmChannelTask) execute(ctx context.Context) error { defer func() { wdt.retryCount-- }() err := wdt.cluster.watchDmChannels(wdt.ctx, wdt.NodeID, wdt.WatchDmChannelsRequest) if err != nil { log.Warn("watchDmChannelTask: watchDmChannel occur error", zap.Int64("taskID", wdt.getTaskID())) wdt.setResultInfo(err) return err } log.Debug("watchDmChannelsTask Execute done", zap.Int64("taskID", wdt.getTaskID())) return nil } func (wdt *watchDmChannelTask) postExecute(context.Context) error { log.Debug("watchDmChannelTask postExecute done", zap.Int64("taskID", wdt.getTaskID())) return nil } func (wdt *watchDmChannelTask) reschedule(ctx context.Context) ([]task, error) { collectionID := wdt.CollectionID watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) for _, info := range wdt.Infos { msgBase := proto.Clone(wdt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDmChannels req := &querypb.WatchDmChannelsRequest{ Base: msgBase, CollectionID: collectionID, PartitionIDs: wdt.PartitionIDs, Infos: []*datapb.VchannelInfo{info}, Schema: wdt.Schema, ExcludeInfos: wdt.ExcludeInfos, } watchDmChannelReqs = append(watchDmChannelReqs, req) } if wdt.excludeNodeIDs == nil { wdt.excludeNodeIDs = []int64{} } wdt.excludeNodeIDs = append(wdt.excludeNodeIDs, wdt.NodeID) //TODO:: wait or not according msgType reScheduledTasks, err := assignInternalTask(ctx, collectionID, wdt.parentTask, wdt.meta, wdt.cluster, nil, watchDmChannelReqs, nil, false, wdt.excludeNodeIDs, nil) if err != nil { log.Error("watchDmChannel reschedule failed", zap.Int64s("excludeNodes", wdt.excludeNodeIDs), zap.Error(err)) return nil, err } return reScheduledTasks, nil } type watchDeltaChannelTask struct { *baseTask *querypb.WatchDeltaChannelsRequest meta Meta cluster Cluster excludeNodeIDs []int64 } func (wdt *watchDeltaChannelTask) msgBase() *commonpb.MsgBase { return wdt.Base } func (wdt *watchDeltaChannelTask) marshal() ([]byte, error) { return proto.Marshal(wdt.WatchDeltaChannelsRequest) } func (wdt *watchDeltaChannelTask) msgType() commonpb.MsgType { return wdt.Base.MsgType } func (wdt *watchDeltaChannelTask) timestamp() Timestamp { return wdt.Base.Timestamp } func (wdt *watchDeltaChannelTask) updateTaskProcess() { parentTask := wdt.getParentTask() if parentTask == nil { log.Warn("watchDeltaChannel: parentTask should not be nil") return } parentTask.updateTaskProcess() } func (wdt *watchDeltaChannelTask) preExecute(context.Context) error { channelInfos := wdt.Infos channels := make([]string, 0) for _, info := range channelInfos { channels = append(channels, info.ChannelName) } wdt.setResultInfo(nil) log.Debug("start do watchDeltaChannelTask", zap.Strings("deltaChannels", channels), zap.Int64("loaded nodeID", wdt.NodeID), zap.Int64("taskID", wdt.getTaskID())) return nil } func (wdt *watchDeltaChannelTask) execute(ctx context.Context) error { defer func() { wdt.retryCount-- }() err := wdt.cluster.watchDeltaChannels(wdt.ctx, wdt.NodeID, wdt.WatchDeltaChannelsRequest) if err != nil { log.Warn("watchDeltaChannelTask: watchDeltaChannel occur error", zap.Int64("taskID", wdt.getTaskID()), zap.Error(err)) wdt.setResultInfo(err) return err } log.Debug("watchDeltaChannelsTask Execute done", zap.Int64("taskID", wdt.getTaskID())) return nil } func (wdt *watchDeltaChannelTask) postExecute(context.Context) error { log.Debug("watchDeltaChannelTask postExecute done", zap.Int64("taskID", wdt.getTaskID())) return nil } type watchQueryChannelTask struct { *baseTask *querypb.AddQueryChannelRequest cluster Cluster } func (wqt *watchQueryChannelTask) msgBase() *commonpb.MsgBase { return wqt.Base } func (wqt *watchQueryChannelTask) marshal() ([]byte, error) { return proto.Marshal(wqt.AddQueryChannelRequest) } func (wqt *watchQueryChannelTask) isValid() bool { online, err := wqt.cluster.isOnline(wqt.NodeID) if err != nil { return false } return wqt.ctx != nil && online } func (wqt *watchQueryChannelTask) msgType() commonpb.MsgType { return wqt.Base.MsgType } func (wqt *watchQueryChannelTask) timestamp() Timestamp { return wqt.Base.Timestamp } func (wqt *watchQueryChannelTask) updateTaskProcess() { parentTask := wqt.getParentTask() if parentTask == nil { log.Warn("watchQueryChannelTask: parentTask should not be nil") return } parentTask.updateTaskProcess() } func (wqt *watchQueryChannelTask) preExecute(context.Context) error { wqt.setResultInfo(nil) log.Debug("start do watchQueryChannelTask", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.QueryChannel), zap.String("queryResultChannel", wqt.QueryResultChannel), zap.Int64("loaded nodeID", wqt.NodeID), zap.Int64("taskID", wqt.getTaskID())) return nil } func (wqt *watchQueryChannelTask) execute(ctx context.Context) error { defer func() { wqt.retryCount-- }() err := wqt.cluster.addQueryChannel(wqt.ctx, wqt.NodeID, wqt.AddQueryChannelRequest) if err != nil { log.Warn("watchQueryChannelTask: watchQueryChannel occur error", zap.Int64("taskID", wqt.getTaskID()), zap.Error(err)) wqt.setResultInfo(err) return err } log.Debug("watchQueryChannelTask Execute done", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.QueryChannel), zap.String("queryResultChannel", wqt.QueryResultChannel), zap.Int64("taskID", wqt.getTaskID())) return nil } func (wqt *watchQueryChannelTask) postExecute(context.Context) error { log.Debug("watchQueryChannelTask postExecute done", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.QueryChannel), zap.String("queryResultChannel", wqt.QueryResultChannel), zap.Int64("taskID", wqt.getTaskID())) return nil } //****************************handoff task********************************// type handoffTask struct { *baseTask *querypb.HandoffSegmentsRequest dataCoord types.DataCoord cluster Cluster meta Meta } func (ht *handoffTask) msgBase() *commonpb.MsgBase { return ht.Base } func (ht *handoffTask) marshal() ([]byte, error) { return proto.Marshal(ht.HandoffSegmentsRequest) } func (ht *handoffTask) msgType() commonpb.MsgType { return ht.Base.MsgType } func (ht *handoffTask) timestamp() Timestamp { return ht.Base.Timestamp } func (ht *handoffTask) preExecute(context.Context) error { ht.setResultInfo(nil) segmentIDs := make([]UniqueID, 0) segmentInfos := ht.HandoffSegmentsRequest.SegmentInfos for _, info := range segmentInfos { segmentIDs = append(segmentIDs, info.SegmentID) } log.Debug("start do handoff segments task", zap.Int64s("segmentIDs", segmentIDs)) return nil } func (ht *handoffTask) execute(ctx context.Context) error { segmentInfos := ht.HandoffSegmentsRequest.SegmentInfos for _, segmentInfo := range segmentInfos { collectionID := segmentInfo.CollectionID partitionID := segmentInfo.PartitionID segmentID := segmentInfo.SegmentID collectionInfo, err := ht.meta.getCollectionInfoByID(collectionID) if err != nil { log.Debug("handoffTask: collection has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID)) continue } if collectionInfo.LoadType == querypb.LoadType_loadCollection && ht.meta.hasReleasePartition(collectionID, partitionID) { log.Debug("handoffTask: partition has not been released", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) continue } partitionLoaded := false for _, id := range collectionInfo.PartitionIDs { if id == partitionID { partitionLoaded = true } } if collectionInfo.LoadType != querypb.LoadType_loadCollection && !partitionLoaded { log.Debug("handoffTask: partition has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID)) continue } // segment which is compacted from should exist in query node for _, compactedSegID := range segmentInfo.CompactionFrom { _, err = ht.meta.getSegmentInfoByID(compactedSegID) if err != nil { log.Error("handoffTask: compacted segment has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID)) ht.setResultInfo(err) return err } } // segment which is compacted to should not exist in query node _, err = ht.meta.getSegmentInfoByID(segmentID) if err != nil { dmChannelInfos, binlogs, err := getRecoveryInfo(ht.ctx, ht.dataCoord, collectionID, partitionID) if err != nil { log.Error("handoffTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) ht.setResultInfo(err) return err } findBinlog := false var loadSegmentReq *querypb.LoadSegmentsRequest var watchDeltaChannels []*datapb.VchannelInfo for _, segmentBinlogs := range binlogs { if segmentBinlogs.SegmentID == segmentID { findBinlog = true segmentLoadInfo := &querypb.SegmentLoadInfo{ SegmentID: segmentID, PartitionID: partitionID, CollectionID: collectionID, BinlogPaths: segmentBinlogs.FieldBinlogs, NumOfRows: segmentBinlogs.NumOfRows, CompactionFrom: segmentInfo.CompactionFrom, EnableIndex: segmentInfo.EnableIndex, IndexPathInfos: segmentInfo.IndexPathInfos, } msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_LoadSegments loadSegmentReq = &querypb.LoadSegmentsRequest{ Base: msgBase, Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, Schema: collectionInfo.Schema, } } } for _, info := range dmChannelInfos { deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } if !findBinlog { err = fmt.Errorf("segmnet has not been flushed, segmentID is %d", segmentID) ht.setResultInfo(err) return err } mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule err = ht.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) if err != nil { return err } internalTasks, err := assignInternalTask(ctx, collectionID, ht, ht.meta, ht.cluster, []*querypb.LoadSegmentsRequest{loadSegmentReq}, nil, watchDeltaChannelReq, true, nil, nil) if err != nil { log.Error("handoffTask: assign child task failed", zap.Any("segmentInfo", segmentInfo)) ht.setResultInfo(err) return err } for _, internalTask := range internalTasks { ht.addChildTask(internalTask) log.Debug("handoffTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("segmentID", segmentID), zap.Any("task", internalTask)) } } else { err = fmt.Errorf("sealed segment has been exist on query node, segmentID is %d", segmentID) log.Error("handoffTask: sealed segment has been exist on query node", zap.Int64("segmentID", segmentID)) ht.setResultInfo(err) return err } } log.Debug("handoffTask: assign child task done", zap.Any("segmentInfos", segmentInfos)) log.Debug("handoffTask Execute done", zap.Int64("taskID", ht.getTaskID())) return nil } func (ht *handoffTask) postExecute(context.Context) error { if ht.result.ErrorCode != commonpb.ErrorCode_Success { ht.childTasks = []task{} } log.Debug("handoffTask postExecute done", zap.Int64("taskID", ht.getTaskID())) return nil } func (ht *handoffTask) rollBack(ctx context.Context) []task { resultTasks := make([]task, 0) childTasks := ht.getChildTask() for _, childTask := range childTasks { if childTask.msgType() == commonpb.MsgType_LoadSegments { // TODO:: add release segment to rollBack, no release does not affect correctness of query } } return resultTasks } type loadBalanceTask struct { *baseTask *querypb.LoadBalanceRequest rootCoord types.RootCoord dataCoord types.DataCoord indexCoord types.IndexCoord cluster Cluster meta Meta } func (lbt *loadBalanceTask) msgBase() *commonpb.MsgBase { return lbt.Base } func (lbt *loadBalanceTask) marshal() ([]byte, error) { return proto.Marshal(lbt.LoadBalanceRequest) } func (lbt *loadBalanceTask) msgType() commonpb.MsgType { return lbt.Base.MsgType } func (lbt *loadBalanceTask) timestamp() Timestamp { return lbt.Base.Timestamp } func (lbt *loadBalanceTask) preExecute(context.Context) error { lbt.setResultInfo(nil) log.Debug("start do loadBalanceTask", zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), zap.Int64("taskID", lbt.getTaskID())) return nil } func (lbt *loadBalanceTask) execute(ctx context.Context) error { defer func() { lbt.retryCount-- }() if lbt.triggerCondition == querypb.TriggerCondition_NodeDown { for _, nodeID := range lbt.SourceNodeIDs { collectionInfos := lbt.cluster.getCollectionInfosByID(lbt.ctx, nodeID) for _, info := range collectionInfos { collectionID := info.CollectionID metaInfo, err := lbt.meta.getCollectionInfoByID(collectionID) if err != nil { log.Warn("loadBalanceTask: getCollectionInfoByID occur error", zap.String("error", err.Error())) lbt.setResultInfo(err) return err } loadType := metaInfo.LoadType schema := metaInfo.Schema partitionIDs := info.PartitionIDs segmentsToLoad := make([]UniqueID, 0) loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) channelsToWatch := make([]string, 0) watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) var watchDeltaChannels []*datapb.VchannelInfo dmChannels, err := lbt.meta.getDmChannelsByNodeID(collectionID, nodeID) if err != nil { lbt.setResultInfo(err) return err } log.Debug("loadBalanceTask: partitions to recover", zap.Int64s("partitionIDs", partitionIDs)) for _, partitionID := range partitionIDs { dmChannelInfos, binlogs, err := getRecoveryInfo(lbt.ctx, lbt.dataCoord, collectionID, partitionID) if err != nil { log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) lbt.setResultInfo(err) return err } for _, segmentBingLog := range binlogs { segmentID := segmentBingLog.SegmentID segmentLoadInfo := &querypb.SegmentLoadInfo{ SegmentID: segmentID, PartitionID: partitionID, CollectionID: collectionID, BinlogPaths: segmentBingLog.FieldBinlogs, NumOfRows: segmentBingLog.NumOfRows, Statslogs: segmentBingLog.Statslogs, Deltalogs: segmentBingLog.Deltalogs, } indexInfo, err := getIndexInfo(ctx, &querypb.SegmentInfo{ CollectionID: collectionID, SegmentID: segmentID, }, lbt.rootCoord, lbt.indexCoord) if err == nil && indexInfo.enableIndex { segmentLoadInfo.EnableIndex = true segmentLoadInfo.IndexPathInfos = indexInfo.infos } msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_LoadSegments loadSegmentReq := &querypb.LoadSegmentsRequest{ Base: msgBase, Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, Schema: schema, SourceNodeID: nodeID, } segmentsToLoad = append(segmentsToLoad, segmentID) loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } for _, info := range dmChannelInfos { deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } for _, channelInfo := range dmChannelInfos { for _, channel := range dmChannels { if channelInfo.ChannelName == channel { if loadType == querypb.LoadType_loadCollection { merged := false for index, channelName := range channelsToWatch { if channel == channelName { merged = true oldInfo := watchDmChannelReqs[index].Infos[0] newInfo := mergeVChannelInfo(oldInfo, channelInfo) watchDmChannelReqs[index].Infos = []*datapb.VchannelInfo{newInfo} break } } if !merged { msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDmChannels watchRequest := &querypb.WatchDmChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: []*datapb.VchannelInfo{channelInfo}, Schema: schema, } channelsToWatch = append(channelsToWatch, channel) watchDmChannelReqs = append(watchDmChannelReqs, watchRequest) } } else { msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDmChannels watchRequest := &querypb.WatchDmChannelsRequest{ Base: msgBase, CollectionID: collectionID, PartitionIDs: partitionIDs, Infos: []*datapb.VchannelInfo{channelInfo}, Schema: schema, } channelsToWatch = append(channelsToWatch, channel) watchDmChannelReqs = append(watchDmChannelReqs, watchRequest) } break } } } } mergedDeltaChannel := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: mergedDeltaChannel, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule err = lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) if err != nil { return err } internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, true, lbt.SourceNodeIDs, lbt.DstNodeIDs) if err != nil { log.Warn("loadBalanceTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) lbt.setResultInfo(err) return err } for _, internalTask := range internalTasks { lbt.addChildTask(internalTask) log.Debug("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("task", internalTask)) } log.Debug("loadBalanceTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) } } } //TODO:: use request.DstNodeIDs to balance if lbt.triggerCondition == querypb.TriggerCondition_LoadBalance { if len(lbt.SourceNodeIDs) == 0 { err := errors.New("loadBalanceTask: empty source Node list to balance") log.Error(err.Error()) lbt.setResultInfo(err) return err } balancedSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo) balancedSegmentIDs := make([]UniqueID, 0) for _, nodeID := range lbt.SourceNodeIDs { nodeExist := lbt.cluster.hasNode(nodeID) if !nodeExist { err := fmt.Errorf("loadBalanceTask: query node %d is not exist to balance", nodeID) log.Error(err.Error()) lbt.setResultInfo(err) return err } segmentInfos := lbt.meta.getSegmentInfosByNode(nodeID) for _, info := range segmentInfos { balancedSegmentInfos[info.SegmentID] = info balancedSegmentIDs = append(balancedSegmentIDs, info.SegmentID) } } // check balanced sealedSegmentIDs in request whether exist in query node for _, segmentID := range lbt.SealedSegmentIDs { if _, ok := balancedSegmentInfos[segmentID]; !ok { err := fmt.Errorf("loadBalanceTask: unloaded segment %d", segmentID) log.Warn(err.Error()) lbt.setResultInfo(err) return err } } if len(lbt.SealedSegmentIDs) != 0 { balancedSegmentIDs = lbt.SealedSegmentIDs } col2PartitionIDs := make(map[UniqueID][]UniqueID) par2Segments := make(map[UniqueID][]*querypb.SegmentInfo) for _, segmentID := range balancedSegmentIDs { info := balancedSegmentInfos[segmentID] collectionID := info.CollectionID partitionID := info.PartitionID if _, ok := col2PartitionIDs[collectionID]; !ok { col2PartitionIDs[collectionID] = make([]UniqueID, 0) } if _, ok := par2Segments[partitionID]; !ok { col2PartitionIDs[collectionID] = append(col2PartitionIDs[collectionID], partitionID) par2Segments[partitionID] = make([]*querypb.SegmentInfo, 0) } par2Segments[partitionID] = append(par2Segments[partitionID], info) } for collectionID, partitionIDs := range col2PartitionIDs { segmentsToLoad := make([]UniqueID, 0) loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) var watchDeltaChannels []*datapb.VchannelInfo collectionInfo, err := lbt.meta.getCollectionInfoByID(collectionID) if err != nil { log.Error("loadBalanceTask: can't find collectionID in meta", zap.Int64("collectionID", collectionID), zap.Error(err)) lbt.setResultInfo(err) return err } for _, partitionID := range partitionIDs { dmChannelInfos, binlogs, err := getRecoveryInfo(lbt.ctx, lbt.dataCoord, collectionID, partitionID) if err != nil { log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) lbt.setResultInfo(err) return err } segmentID2Binlog := make(map[UniqueID]*datapb.SegmentBinlogs) for _, binlog := range binlogs { segmentID2Binlog[binlog.SegmentID] = binlog } for _, segmentInfo := range par2Segments[partitionID] { segmentID := segmentInfo.SegmentID if _, ok := segmentID2Binlog[segmentID]; !ok { log.Warn("loadBalanceTask: can't find binlog of segment to balance, may be has been compacted", zap.Int64("segmentID", segmentID)) continue } segmentBingLog := segmentID2Binlog[segmentID] segmentLoadInfo := &querypb.SegmentLoadInfo{ SegmentID: segmentID, PartitionID: partitionID, CollectionID: collectionID, BinlogPaths: segmentBingLog.FieldBinlogs, NumOfRows: segmentBingLog.NumOfRows, Statslogs: segmentBingLog.Statslogs, Deltalogs: segmentBingLog.Deltalogs, } indexInfo, err := getIndexInfo(ctx, &querypb.SegmentInfo{ CollectionID: collectionID, SegmentID: segmentID, }, lbt.rootCoord, lbt.indexCoord) if err == nil && indexInfo.enableIndex { segmentLoadInfo.EnableIndex = true segmentLoadInfo.IndexPathInfos = indexInfo.infos } msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_LoadSegments loadSegmentReq := &querypb.LoadSegmentsRequest{ Base: msgBase, Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, Schema: collectionInfo.Schema, } segmentsToLoad = append(segmentsToLoad, segmentID) loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } for _, info := range dmChannelInfos { deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule err = lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) if err != nil { return err } // TODO:: assignInternalTask with multi collection internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, nil, watchDeltaChannelReq, false, lbt.SourceNodeIDs, lbt.DstNodeIDs) if err != nil { log.Warn("loadBalanceTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) lbt.setResultInfo(err) return err } for _, internalTask := range internalTasks { lbt.addChildTask(internalTask) log.Debug("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("task", internalTask)) } } log.Debug("loadBalanceTask: assign child task done", zap.Any("balance request", lbt.LoadBalanceRequest)) } log.Debug("loadBalanceTask Execute done", zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), zap.Int64("taskID", lbt.getTaskID())) return nil } func (lbt *loadBalanceTask) postExecute(context.Context) error { if lbt.result.ErrorCode != commonpb.ErrorCode_Success { lbt.childTasks = []task{} } if lbt.triggerCondition == querypb.TriggerCondition_NodeDown { for _, id := range lbt.SourceNodeIDs { err := lbt.cluster.removeNodeInfo(id) if err != nil { //TODO:: clear node info after removeNodeInfo failed log.Error("loadBalanceTask: occur error when removing node info from cluster", zap.Int64("nodeID", id)) } } } log.Debug("loadBalanceTask postExecute done", zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), zap.Int64("taskID", lbt.getTaskID())) return nil } func mergeVChannelInfo(info1 *datapb.VchannelInfo, info2 *datapb.VchannelInfo) *datapb.VchannelInfo { collectionID := info1.CollectionID channelName := info1.ChannelName var seekPosition *internalpb.MsgPosition if info1.SeekPosition == nil || info2.SeekPosition == nil { seekPosition = &internalpb.MsgPosition{ ChannelName: channelName, } } else { seekPosition = info1.SeekPosition if info1.SeekPosition.Timestamp > info2.SeekPosition.Timestamp { seekPosition = info2.SeekPosition } } checkPoints := make([]*datapb.SegmentInfo, 0) checkPoints = append(checkPoints, info1.UnflushedSegments...) checkPoints = append(checkPoints, info2.UnflushedSegments...) flushedSegments := make([]*datapb.SegmentInfo, 0) flushedSegments = append(flushedSegments, info1.FlushedSegments...) flushedSegments = append(flushedSegments, info2.FlushedSegments...) return &datapb.VchannelInfo{ CollectionID: collectionID, ChannelName: channelName, SeekPosition: seekPosition, UnflushedSegments: checkPoints, FlushedSegments: flushedSegments, } } func assignInternalTask(ctx context.Context, collectionID UniqueID, parentTask task, meta Meta, cluster Cluster, loadSegmentRequests []*querypb.LoadSegmentsRequest, watchDmChannelRequests []*querypb.WatchDmChannelsRequest, watchDeltaChannelRequest *querypb.WatchDeltaChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) ([]task, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() log.Debug("assignInternalTask: start assign task to query node") internalTasks := make([]task, 0) err := cluster.allocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs) if err != nil { log.Error("assignInternalTask: assign segment to node failed", zap.Any("load segments requests", loadSegmentRequests)) return nil, err } log.Debug("assignInternalTask: assign segment to node success", zap.Any("load segments requests", loadSegmentRequests)) err = cluster.allocateChannelsToQueryNode(ctx, watchDmChannelRequests, wait, excludeNodeIDs) if err != nil { log.Error("assignInternalTask: assign dmChannel to node failed", zap.Any("watch dmChannel requests", watchDmChannelRequests)) return nil, err } log.Debug("assignInternalTask: assign dmChannel to node success", zap.Any("watch dmChannel requests", watchDmChannelRequests)) watchQueryChannelInfo := make(map[int64]bool) node2Segments := make(map[int64][]*querypb.LoadSegmentsRequest) sizeCounts := make(map[int64]int) for _, req := range loadSegmentRequests { nodeID := req.DstNodeID sizeOfReq := getSizeOfLoadSegmentReq(req) if _, ok := node2Segments[nodeID]; !ok { node2Segments[nodeID] = make([]*querypb.LoadSegmentsRequest, 0) node2Segments[nodeID] = append(node2Segments[nodeID], req) sizeCounts[nodeID] = sizeOfReq } else { if sizeCounts[nodeID]+sizeOfReq > MaxSendSizeToEtcd { node2Segments[nodeID] = append(node2Segments[nodeID], req) sizeCounts[nodeID] = sizeOfReq } else { lastReq := node2Segments[nodeID][len(node2Segments[nodeID])-1] lastReq.Infos = append(lastReq.Infos, req.Infos...) sizeCounts[nodeID] += sizeOfReq } } if cluster.hasWatchedQueryChannel(parentTask.traceCtx(), nodeID, collectionID) { watchQueryChannelInfo[nodeID] = true continue } watchQueryChannelInfo[nodeID] = false } for nodeID, loadSegmentsReqs := range node2Segments { for _, req := range loadSegmentsReqs { ctx = opentracing.ContextWithSpan(context.Background(), sp) baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) loadSegmentTask := &loadSegmentTask{ baseTask: baseTask, LoadSegmentsRequest: req, meta: meta, cluster: cluster, excludeNodeIDs: excludeNodeIDs, } internalTasks = append(internalTasks, loadSegmentTask) } if watchDeltaChannelRequest != nil && len(loadSegmentsReqs) != 0 { ctx = opentracing.ContextWithSpan(context.Background(), sp) watchDeltaRequest := proto.Clone(watchDeltaChannelRequest).(*querypb.WatchDeltaChannelsRequest) watchDeltaRequest.NodeID = nodeID baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) watchDeltaTask := &watchDeltaChannelTask{ baseTask: baseTask, WatchDeltaChannelsRequest: watchDeltaRequest, meta: meta, cluster: cluster, excludeNodeIDs: []int64{}, } internalTasks = append(internalTasks, watchDeltaTask) } } for _, req := range watchDmChannelRequests { nodeID := req.NodeID ctx = opentracing.ContextWithSpan(context.Background(), sp) baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) watchDmChannelTask := &watchDmChannelTask{ baseTask: baseTask, WatchDmChannelsRequest: req, meta: meta, cluster: cluster, excludeNodeIDs: excludeNodeIDs, } internalTasks = append(internalTasks, watchDmChannelTask) if cluster.hasWatchedQueryChannel(parentTask.traceCtx(), nodeID, collectionID) { watchQueryChannelInfo[nodeID] = true continue } watchQueryChannelInfo[nodeID] = false } for nodeID, watched := range watchQueryChannelInfo { if !watched { ctx = opentracing.ContextWithSpan(context.Background(), sp) queryChannelInfo, err := meta.getQueryChannelInfoByID(collectionID) if err != nil { return nil, err } msgBase := proto.Clone(parentTask.msgBase()).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchQueryChannels addQueryChannelRequest := &querypb.AddQueryChannelRequest{ Base: msgBase, NodeID: nodeID, CollectionID: collectionID, QueryChannel: queryChannelInfo.QueryChannel, QueryResultChannel: queryChannelInfo.QueryResultChannel, GlobalSealedSegments: queryChannelInfo.GlobalSealedSegments, SeekPosition: queryChannelInfo.SeekPosition, } baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) watchQueryChannelTask := &watchQueryChannelTask{ baseTask: baseTask, AddQueryChannelRequest: addQueryChannelRequest, cluster: cluster, } internalTasks = append(internalTasks, watchQueryChannelTask) } } return internalTasks, nil } func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int { return proto.Size(req) } func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) { deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) if err != nil { return nil, err } deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) deltaChannel.ChannelName = deltaChannelName deltaChannel.UnflushedSegments = nil deltaChannel.FlushedSegments = nil deltaChannel.DroppedSegments = nil return deltaChannel, nil } func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.VchannelInfo { minPositions := make(map[string]int) for index, info := range infos { _, ok := minPositions[info.ChannelName] if !ok { minPositions[info.ChannelName] = index } minTimeStampIndex := minPositions[info.ChannelName] if info.SeekPosition.GetTimestamp() < infos[minTimeStampIndex].SeekPosition.GetTimestamp() { minPositions[info.ChannelName] = index } } var result []*datapb.VchannelInfo for _, index := range minPositions { result = append(result, infos[index]) } log.Debug("merge delta channels finished", zap.Any("origin info length", len(infos)), zap.Any("merged info length", len(result)), ) return result } func getRecoveryInfo(ctx context.Context, dataCoord types.DataCoord, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) { ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC) defer cancel2() getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_GetRecoveryInfo, }, CollectionID: collectionID, PartitionID: partitionID, } recoveryInfo, err := dataCoord.GetRecoveryInfo(ctx2, getRecoveryInfoRequest) if err != nil { return nil, nil, err } if recoveryInfo.Status.ErrorCode != commonpb.ErrorCode_Success { err = errors.New(recoveryInfo.Status.Reason) return nil, nil, err } return recoveryInfo.Channels, recoveryInfo.Binlogs, nil }