From e38c6f6c44b991fb68402094b2158f8c358ebff7 Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 16 May 2022 15:41:56 +0800 Subject: [PATCH] Fix load the same segments multiple times for manual LoadBalance (#16921) Signed-off-by: yah01 --- internal/querycoord/task.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index dadd01c9c3..e5ebb37e92 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -2068,9 +2068,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { log.Info("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("task", internalTask)) } log.Info("loadBalanceTask: assign child task done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs)) - } - - if lbt.triggerCondition == querypb.TriggerCondition_LoadBalance { + } else if lbt.triggerCondition == querypb.TriggerCondition_LoadBalance { if err := lbt.checkForManualLoadBalance(); err != nil { lbt.setResultInfo(err) return err @@ -2084,7 +2082,6 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { balancedSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo) balancedSegmentIDs := make([]UniqueID, 0) - for _, nodeID := range lbt.SourceNodeIDs { nodeExist := lbt.cluster.hasNode(nodeID) if !nodeExist { @@ -2159,20 +2156,18 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { continue } - for _, replica := range segmentInfo.ReplicaIds { - segmentBingLog := segmentID2Binlog[segmentID] - segmentLoadInfo := lbt.broker.generateSegmentLoadInfo(ctx, collectionID, partitionID, segmentBingLog, true, collectionInfo.Schema) - msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) - msgBase.MsgType = commonpb.MsgType_LoadSegments - loadSegmentReq := &querypb.LoadSegmentsRequest{ - Base: msgBase, - Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, - Schema: collectionInfo.Schema, - CollectionID: collectionID, - ReplicaID: replica, - } - loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) + segmentBingLog := segmentID2Binlog[segmentID] + segmentLoadInfo := lbt.broker.generateSegmentLoadInfo(ctx, collectionID, partitionID, segmentBingLog, true, collectionInfo.Schema) + msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) + msgBase.MsgType = commonpb.MsgType_LoadSegments + loadSegmentReq := &querypb.LoadSegmentsRequest{ + Base: msgBase, + Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, + Schema: collectionInfo.Schema, + CollectionID: collectionID, + ReplicaID: lbt.replicaID, } + loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } for _, info := range dmChannelInfos {