From 082ee1a70928b8cbd3699387621e092895d77f17 Mon Sep 17 00:00:00 2001 From: congqixia Date: Sat, 13 Jan 2024 10:46:53 +0800 Subject: [PATCH] enhance: Use newer checkpoint when packing LoadSegmentRequest (#29922) See also: #29650 Either segment dml position & channel checkpoint could be newer in some cases. This PR make PackLoadSegments use the newer one improving load performance during cases where there are lots of upsert. --------- Signed-off-by: Congqi Xia --- internal/querycoordv2/utils/types.go | 11 ++++++++--- internal/querycoordv2/utils/types_test.go | 12 +++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/internal/querycoordv2/utils/types.go b/internal/querycoordv2/utils/types.go index aa4481cd34..095fc65f2b 100644 --- a/internal/querycoordv2/utils/types.go +++ b/internal/querycoordv2/utils/types.go @@ -60,14 +60,19 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met // packSegmentLoadInfo packs SegmentLoadInfo for given segment, // packs with index if withIndex is true, this fetch indexes from IndexCoord func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.MsgPosition, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo { - posTime := tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp()) + checkpoint := segment.GetDmlPosition() + if channelCheckpoint.GetTimestamp() > checkpoint.GetTimestamp() { + checkpoint = channelCheckpoint + } + + posTime := tsoutil.PhysicalTime(checkpoint.GetTimestamp()) tsLag := time.Since(posTime) if tsLag >= 10*time.Minute { log.Warn("delta position is quite stale", zap.Int64("collectionID", segment.GetCollectionID()), zap.Int64("segmentID", segment.GetID()), zap.String("channel", segment.InsertChannel), - zap.Uint64("posTs", channelCheckpoint.GetTimestamp()), + zap.Uint64("posTs", checkpoint.GetTimestamp()), zap.Time("posTime", posTime), zap.Duration("tsLag", tsLag)) } @@ -82,7 +87,7 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M InsertChannel: segment.InsertChannel, IndexInfos: indexes, StartPosition: segment.GetStartPosition(), - DeltaPosition: channelCheckpoint, + DeltaPosition: checkpoint, Level: segment.GetLevel(), StorageVersion: segment.GetStorageVersion(), } diff --git a/internal/querycoordv2/utils/types_test.go b/internal/querycoordv2/utils/types_test.go index dc6a899531..e4f0d1cb11 100644 --- a/internal/querycoordv2/utils/types_test.go +++ b/internal/querycoordv2/utils/types_test.go @@ -35,6 +35,7 @@ func Test_packLoadSegmentRequest(t *testing.T) { t0 := tsoutil.ComposeTSByTime(time.Now().Add(-20*time.Minute), 0) t1 := tsoutil.ComposeTSByTime(time.Now().Add(-8*time.Minute), 0) t2 := tsoutil.ComposeTSByTime(time.Now().Add(-5*time.Minute), 0) + t3 := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0) segmentInfo := &datapb.SegmentInfo{ ID: 0, @@ -64,12 +65,21 @@ func Test_packLoadSegmentRequest(t *testing.T) { assert.Equal(t, t2, req.GetDeltaPosition().Timestamp) }) + t.Run("test channel cp after segment dml position", func(t *testing.T) { + channel := proto.Clone(channel).(*datapb.VchannelInfo) + channel.SeekPosition.Timestamp = t3 + req := PackSegmentLoadInfo(segmentInfo, channel.GetSeekPosition(), nil) + assert.NotNil(t, req.GetDeltaPosition()) + assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName) + assert.Equal(t, t3, req.GetDeltaPosition().Timestamp) + }) + t.Run("test tsLag > 10minutes", func(t *testing.T) { channel := proto.Clone(channel).(*datapb.VchannelInfo) channel.SeekPosition.Timestamp = t0 req := PackSegmentLoadInfo(segmentInfo, channel.GetSeekPosition(), nil) assert.NotNil(t, req.GetDeltaPosition()) assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName) - assert.Equal(t, t0, req.GetDeltaPosition().Timestamp) + assert.Equal(t, segmentInfo.GetDmlPosition().GetTimestamp(), req.GetDeltaPosition().GetTimestamp()) }) }