mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
Use segment's position as the delta position (#19577)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
d827391af1
commit
2e9a08aecf
@ -254,13 +254,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) {
|
||||
}
|
||||
log = log.With(zap.Int64("shardLeader", leader))
|
||||
|
||||
deltaPositions, err := getSegmentDeltaPositions(ctx, ex.targetMgr, ex.broker, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetInsertChannel())
|
||||
if err != nil {
|
||||
log.Warn("failed to get delta positions of segment", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
req := packLoadSegmentRequest(task, action, schema, loadMeta, loadInfo, deltaPositions)
|
||||
req := packLoadSegmentRequest(task, action, schema, loadMeta, loadInfo, segment)
|
||||
loadTask := NewLoadSegmentsTask(task, step, req)
|
||||
ex.merger.Add(loadTask)
|
||||
log.Info("load segment task committed")
|
||||
|
@ -45,19 +45,19 @@ func (task *LoadSegmentsTask) Merge(other MergeableTask[segmentIndex, *querypb.L
|
||||
task.tasks = append(task.tasks, otherTask.tasks...)
|
||||
task.steps = append(task.steps, otherTask.steps...)
|
||||
task.req.Infos = append(task.req.Infos, otherTask.req.GetInfos()...)
|
||||
deltaPositions := make(map[string]*internalpb.MsgPosition)
|
||||
positions := make(map[string]*internalpb.MsgPosition)
|
||||
for _, position := range task.req.DeltaPositions {
|
||||
deltaPositions[position.GetChannelName()] = position
|
||||
positions[position.GetChannelName()] = position
|
||||
}
|
||||
for _, position := range otherTask.req.GetDeltaPositions() {
|
||||
merged, ok := deltaPositions[position.GetChannelName()]
|
||||
merged, ok := positions[position.GetChannelName()]
|
||||
if !ok || merged.GetTimestamp() > position.GetTimestamp() {
|
||||
merged = position
|
||||
}
|
||||
deltaPositions[position.GetChannelName()] = merged
|
||||
positions[position.GetChannelName()] = merged
|
||||
}
|
||||
task.req.DeltaPositions = make([]*internalpb.MsgPosition, 0, len(deltaPositions))
|
||||
for _, position := range deltaPositions {
|
||||
task.req.DeltaPositions = make([]*internalpb.MsgPosition, 0, len(positions))
|
||||
for _, position := range positions {
|
||||
task.req.DeltaPositions = append(task.req.DeltaPositions, position)
|
||||
}
|
||||
}
|
||||
|
@ -58,8 +58,13 @@ func packLoadSegmentRequest(
|
||||
schema *schemapb.CollectionSchema,
|
||||
loadMeta *querypb.LoadMetaInfo,
|
||||
loadInfo *querypb.SegmentLoadInfo,
|
||||
deltaPositions []*internalpb.MsgPosition,
|
||||
segment *datapb.SegmentInfo,
|
||||
) *querypb.LoadSegmentsRequest {
|
||||
deltaPosition := segment.GetDmlPosition()
|
||||
if deltaPosition == nil {
|
||||
deltaPosition = segment.GetStartPosition()
|
||||
}
|
||||
|
||||
return &querypb.LoadSegmentsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadSegments,
|
||||
@ -70,7 +75,7 @@ func packLoadSegmentRequest(
|
||||
LoadMeta: loadMeta,
|
||||
CollectionID: task.CollectionID(),
|
||||
ReplicaID: task.ReplicaID(),
|
||||
DeltaPositions: deltaPositions,
|
||||
DeltaPositions: []*internalpb.MsgPosition{deltaPosition},
|
||||
DstNodeID: action.Node(),
|
||||
Version: time.Now().UnixNano(),
|
||||
NeedTransfer: true,
|
||||
|
Loading…
Reference in New Issue
Block a user