diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 0efdbf5fec..a29f1a9d0c 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error { func (t *clusteringCompactionTask) markResultSegmentsVisible() error { var operators []UpdateOperator for _, segID := range t.GetTaskProto().GetResultSegments() { - operators = append(operators, UpdateSegmentVisible(segID)) + operators = append(operators, SetSegmentIsInvisible(segID, false)) operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID())) } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 2c78982996..86693188ce 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -145,8 +145,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // Skip bulk insert segments. continue } - if s.GetIsInvisible() { - // skip invisible segments + if s.GetIsInvisible() && s.GetCreatedByCompaction() { + // skip invisible compaction segments continue } @@ -154,11 +154,10 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . switch { case s.GetState() == commonpb.SegmentState_Dropped: droppedIDs.Insert(s.GetID()) - case !isFlushState(s.GetState()): + case !isFlushState(s.GetState()) || s.GetIsInvisible(): growingIDs.Insert(s.GetID()) case s.GetLevel() == datapb.SegmentLevel_L0: levelZeroIDs.Insert(s.GetID()) - default: flushedIDs.Insert(s.GetID()) } @@ -185,7 +184,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // ================================================ isValid := func(ids ...UniqueID) bool { for _, id := range ids { - if seg, ok := validSegmentInfos[id]; !ok || seg == nil { + if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() { return false } } diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index 788c76a773..88f17c2987 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -572,13 +572,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { // | | | | | | // \ | / \ / | // \ | / \ / | - // [13u] [14i, 15u] 12i - // | | | | - // \ / \ / - // \ / \ / - // [16u] [17u] + // [13u] [14i, 15u] 12i [19u](unsorted) + // | | | | | + // \ / \ / | + // \ / \ / | + // [16u] [17u] [18u](unsorted) [20u](sorted) [21i](unsorted) // all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6] - // should be returned: flushed: [7, 8, 9, 10, 4, 5, 6] + // should be returned: flushed: [7, 8, 9, 10, 4, 5, 6, 20, 21], growing: [18] svr := newTestServer(t) defer closeTestServer(t, svr) schema := newTestSchema() @@ -702,8 +702,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{1, 2, 3}, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8)) assert.NoError(t, err) @@ -719,8 +720,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{1, 2, 3}, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9)) assert.NoError(t, err) @@ -736,8 +738,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{1, 2, 3}, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10)) assert.NoError(t, err) @@ -753,8 +756,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{4, 5, 6}, + NumOfRows: 2048, + CompactionFrom: []int64{4, 5, 6}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11)) assert.NoError(t, err) @@ -770,8 +774,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{4, 5, 6}, + NumOfRows: 100, + CompactionFrom: []int64{4, 5, 6}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12)) assert.NoError(t, err) @@ -787,8 +792,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2047, - CompactionFrom: []int64{7, 8, 9}, + NumOfRows: 2047, + CompactionFrom: []int64{7, 8, 9}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13)) assert.NoError(t, err) @@ -804,8 +810,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{10, 11}, + NumOfRows: 100, + CompactionFrom: []int64{10, 11}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14)) assert.NoError(t, err) @@ -821,8 +828,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{10, 11}, + NumOfRows: 2048, + CompactionFrom: []int64{10, 11}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15)) assert.NoError(t, err) @@ -838,8 +846,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{13, 14}, + NumOfRows: 2048, + CompactionFrom: []int64{13, 14}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16)) assert.NoError(t, err) @@ -855,15 +864,98 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{12, 15}, + NumOfRows: 2048, + CompactionFrom: []int64{12, 15}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17)) assert.NoError(t, err) + seg18 := &datapb.SegmentInfo{ + ID: 18, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{}, + IsInvisible: true, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg18)) + assert.NoError(t, err) + seg19 := &datapb.SegmentInfo{ + ID: 19, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{}, + IsInvisible: true, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg19)) + assert.NoError(t, err) + seg20 := &datapb.SegmentInfo{ + ID: 20, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{19}, + CreatedByCompaction: true, + IsInvisible: false, + IsSorted: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20)) + assert.NoError(t, err) + seg21 := &datapb.SegmentInfo{ + ID: 21, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{}, + IsInvisible: false, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg21)) + assert.NoError(t, err) vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) - assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds) - assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds) + assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6, 20, 21}, vchan.FlushedSegmentIds) + assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds) + assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds) }) } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 257e8980cb..ac384cc047 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -791,7 +791,7 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator { } } -func UpdateSegmentVisible(segmentID int64) UpdateOperator { +func SetSegmentIsInvisible(segmentID int64, isInvisible bool) UpdateOperator { return func(modPack *updateSegmentPack) bool { segment := modPack.Get(segmentID) if segment == nil { @@ -799,7 +799,7 @@ func UpdateSegmentVisible(segmentID int64) UpdateOperator { zap.Int64("segmentID", segmentID)) return false } - segment.IsInvisible = false + segment.IsInvisible = isInvisible return true } } @@ -1979,6 +1979,11 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats // metrics mutation for compaction from segments updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) + resultInvisible := oldSegment.GetIsInvisible() + if !oldSegment.GetCreatedByCompaction() { + resultInvisible = false + } + segmentInfo := &datapb.SegmentInfo{ CollectionID: oldSegment.GetCollectionID(), PartitionID: oldSegment.GetPartitionID(), @@ -1994,7 +1999,8 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats LastLevel: oldSegment.GetLastLevel(), PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(), LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(), - IsInvisible: oldSegment.GetIsInvisible(), + CreatedByCompaction: oldSegment.GetCreatedByCompaction(), + IsInvisible: resultInvisible, ID: result.GetSegmentID(), NumOfRows: result.GetNumRows(), Binlogs: result.GetInsertLogs(), @@ -2002,7 +2008,6 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats TextStatsLogs: result.GetTextStatsLogs(), Bm25Statslogs: result.GetBm25Logs(), Deltalogs: nil, - CreatedByCompaction: true, CompactionFrom: []int64{oldSegmentID}, IsSorted: true, } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 21e34ce602..c735681951 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1011,8 +1011,12 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush") } // set segment to SegmentState_Flushed - if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { - log.Error("flush segment complete failed", zap.Error(err)) + var operators []UpdateOperator + operators = append(operators, SetSegmentIsInvisible(segmentID, true)) + operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed)) + err := s.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("flush segment complete failed", zap.Error(err)) return err } select {