From d7cd1f2a6dcae231c27085b9638a8daa43fa338b Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 4 Sep 2023 17:29:48 +0800 Subject: [PATCH] Fix bug for get index state with compaction segment (#26822) Signed-off-by: cai.zhang --- internal/datacoord/compaction.go | 2 +- internal/datacoord/index_service.go | 4 ++-- internal/datacoord/meta.go | 4 +++- internal/datacoord/meta_test.go | 30 ++++++++++++++++------------- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 6a6472f029..221fa9bfe4 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -252,7 +252,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error { // Also prepare metric updates. - _, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result) + _, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result) if err != nil { return err } diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 2b6701f4d8..f6d85ca764 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -393,7 +393,7 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In segIdx, ok := seg.segmentIndexes[index.IndexID] if !ok { - if seg.GetStartPosition().GetTimestamp() <= ts && !seg.GetCreatedByCompaction() { + if seg.GetLastExpireTime() <= ts { cntUnissued++ } pendingIndexRows += seg.GetNumOfRows() @@ -405,7 +405,7 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In // if realTime, calculate current statistics // if not realTime, skip segments created after index create - if !realTime && (seg.GetCreatedByCompaction() || seg.GetStartPosition().GetTimestamp() > ts) { + if !realTime && seg.GetLastExpireTime() > ts { continue } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index a0514a9b3c..a75f5f78cf 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -980,9 +980,10 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { // - the segment info of compactedFrom segments after compaction to alter // - the segment info of compactedTo segment after compaction to add // The compactedTo segment could contain 0 numRows -func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs, +func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionResult) ([]*SegmentInfo, []*SegmentInfo, *SegmentInfo, *segMetricMutation, error) { log.Info("meta update: prepare for complete compaction mutation") + compactionLogs := plan.GetSegmentBinlogs() m.Lock() defer m.Unlock() @@ -1057,6 +1058,7 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac DmlPosition: dmlPosition, CreatedByCompaction: true, CompactionFrom: compactionFrom, + LastExpireTime: plan.GetStartTime(), } segment := NewSegmentInfo(segmentInfo) metricMutation.addNewSeg(segment.GetState(), segment.GetNumOfRows()) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index d9aa8ccd79..3db3490764 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -651,19 +651,22 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { segments: prepareSegments, } - inCompactionLogs := []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: 1, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, - }, - { - SegmentID: 2, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")}, + plan := &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 1, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + }, + { + SegmentID: 2, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")}, + }, }, + StartTime: 15, } inCompactionResult := &datapb.CompactionResult{ @@ -673,7 +676,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")}, NumOfRows: 2, } - beforeCompact, afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult) + beforeCompact, afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult) assert.NoError(t, err) assert.NotNil(t, beforeCompact) assert.NotNil(t, afterCompact) @@ -704,6 +707,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { assert.EqualValues(t, inCompactionResult.GetField2StatslogPaths(), newSegment.GetStatslogs()) assert.EqualValues(t, inCompactionResult.GetDeltalogs(), newSegment.GetDeltalogs()) assert.NotZero(t, newSegment.lastFlushTime) + assert.Equal(t, uint64(15), newSegment.GetLastExpireTime()) } func Test_meta_SetSegmentCompacting(t *testing.T) {