mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
enhance: Use newer checkpoint when packing LoadSegmentRequest (#29922)
See also: #29650 Either segment dml position & channel checkpoint could be newer in some cases. This PR make PackLoadSegments use the newer one improving load performance during cases where there are lots of upsert. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ed89c6a2ee
commit
082ee1a709
@ -60,14 +60,19 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met
|
||||
// packSegmentLoadInfo packs SegmentLoadInfo for given segment,
|
||||
// packs with index if withIndex is true, this fetch indexes from IndexCoord
|
||||
func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.MsgPosition, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo {
|
||||
posTime := tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())
|
||||
checkpoint := segment.GetDmlPosition()
|
||||
if channelCheckpoint.GetTimestamp() > checkpoint.GetTimestamp() {
|
||||
checkpoint = channelCheckpoint
|
||||
}
|
||||
|
||||
posTime := tsoutil.PhysicalTime(checkpoint.GetTimestamp())
|
||||
tsLag := time.Since(posTime)
|
||||
if tsLag >= 10*time.Minute {
|
||||
log.Warn("delta position is quite stale",
|
||||
zap.Int64("collectionID", segment.GetCollectionID()),
|
||||
zap.Int64("segmentID", segment.GetID()),
|
||||
zap.String("channel", segment.InsertChannel),
|
||||
zap.Uint64("posTs", channelCheckpoint.GetTimestamp()),
|
||||
zap.Uint64("posTs", checkpoint.GetTimestamp()),
|
||||
zap.Time("posTime", posTime),
|
||||
zap.Duration("tsLag", tsLag))
|
||||
}
|
||||
@ -82,7 +87,7 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M
|
||||
InsertChannel: segment.InsertChannel,
|
||||
IndexInfos: indexes,
|
||||
StartPosition: segment.GetStartPosition(),
|
||||
DeltaPosition: channelCheckpoint,
|
||||
DeltaPosition: checkpoint,
|
||||
Level: segment.GetLevel(),
|
||||
StorageVersion: segment.GetStorageVersion(),
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ func Test_packLoadSegmentRequest(t *testing.T) {
|
||||
t0 := tsoutil.ComposeTSByTime(time.Now().Add(-20*time.Minute), 0)
|
||||
t1 := tsoutil.ComposeTSByTime(time.Now().Add(-8*time.Minute), 0)
|
||||
t2 := tsoutil.ComposeTSByTime(time.Now().Add(-5*time.Minute), 0)
|
||||
t3 := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0)
|
||||
|
||||
segmentInfo := &datapb.SegmentInfo{
|
||||
ID: 0,
|
||||
@ -64,12 +65,21 @@ func Test_packLoadSegmentRequest(t *testing.T) {
|
||||
assert.Equal(t, t2, req.GetDeltaPosition().Timestamp)
|
||||
})
|
||||
|
||||
t.Run("test channel cp after segment dml position", func(t *testing.T) {
|
||||
channel := proto.Clone(channel).(*datapb.VchannelInfo)
|
||||
channel.SeekPosition.Timestamp = t3
|
||||
req := PackSegmentLoadInfo(segmentInfo, channel.GetSeekPosition(), nil)
|
||||
assert.NotNil(t, req.GetDeltaPosition())
|
||||
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
|
||||
assert.Equal(t, t3, req.GetDeltaPosition().Timestamp)
|
||||
})
|
||||
|
||||
t.Run("test tsLag > 10minutes", func(t *testing.T) {
|
||||
channel := proto.Clone(channel).(*datapb.VchannelInfo)
|
||||
channel.SeekPosition.Timestamp = t0
|
||||
req := PackSegmentLoadInfo(segmentInfo, channel.GetSeekPosition(), nil)
|
||||
assert.NotNil(t, req.GetDeltaPosition())
|
||||
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
|
||||
assert.Equal(t, t0, req.GetDeltaPosition().Timestamp)
|
||||
assert.Equal(t, segmentInfo.GetDmlPosition().GetTimestamp(), req.GetDeltaPosition().GetTimestamp())
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user