diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index baf83ffb83..c5c7d77ef4 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -254,13 +254,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) { } log = log.With(zap.Int64("shardLeader", leader)) - deltaPositions, err := getSegmentDeltaPositions(ctx, ex.targetMgr, ex.broker, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetInsertChannel()) - if err != nil { - log.Warn("failed to get delta positions of segment", zap.Error(err)) - return - } - - req := packLoadSegmentRequest(task, action, schema, loadMeta, loadInfo, deltaPositions) + req := packLoadSegmentRequest(task, action, schema, loadMeta, loadInfo, segment) loadTask := NewLoadSegmentsTask(task, step, req) ex.merger.Add(loadTask) log.Info("load segment task committed") diff --git a/internal/querycoordv2/task/merge_task.go b/internal/querycoordv2/task/merge_task.go index 4bf35e6f1b..fd19db25c4 100644 --- a/internal/querycoordv2/task/merge_task.go +++ b/internal/querycoordv2/task/merge_task.go @@ -45,19 +45,19 @@ func (task *LoadSegmentsTask) Merge(other MergeableTask[segmentIndex, *querypb.L task.tasks = append(task.tasks, otherTask.tasks...) task.steps = append(task.steps, otherTask.steps...) task.req.Infos = append(task.req.Infos, otherTask.req.GetInfos()...) - deltaPositions := make(map[string]*internalpb.MsgPosition) + positions := make(map[string]*internalpb.MsgPosition) for _, position := range task.req.DeltaPositions { - deltaPositions[position.GetChannelName()] = position + positions[position.GetChannelName()] = position } for _, position := range otherTask.req.GetDeltaPositions() { - merged, ok := deltaPositions[position.GetChannelName()] + merged, ok := positions[position.GetChannelName()] if !ok || merged.GetTimestamp() > position.GetTimestamp() { merged = position } - deltaPositions[position.GetChannelName()] = merged + positions[position.GetChannelName()] = merged } - task.req.DeltaPositions = make([]*internalpb.MsgPosition, 0, len(deltaPositions)) - for _, position := range deltaPositions { + task.req.DeltaPositions = make([]*internalpb.MsgPosition, 0, len(positions)) + for _, position := range positions { task.req.DeltaPositions = append(task.req.DeltaPositions, position) } } diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index 53f6765404..fcc89e2616 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -58,8 +58,13 @@ func packLoadSegmentRequest( schema *schemapb.CollectionSchema, loadMeta *querypb.LoadMetaInfo, loadInfo *querypb.SegmentLoadInfo, - deltaPositions []*internalpb.MsgPosition, + segment *datapb.SegmentInfo, ) *querypb.LoadSegmentsRequest { + deltaPosition := segment.GetDmlPosition() + if deltaPosition == nil { + deltaPosition = segment.GetStartPosition() + } + return &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadSegments, @@ -70,7 +75,7 @@ func packLoadSegmentRequest( LoadMeta: loadMeta, CollectionID: task.CollectionID(), ReplicaID: task.ReplicaID(), - DeltaPositions: deltaPositions, + DeltaPositions: []*internalpb.MsgPosition{deltaPosition}, DstNodeID: action.Node(), Version: time.Now().UnixNano(), NeedTransfer: true,