enhance: Handoff growing segment after sorted (#37385)

issue: #33744 

1. Segments generated from inserts will be loaded as growing until they
are sorted by primary key.
2. This PR may increase memory pressure on the delegator, but we need to
test the performance of stats. In local testing, the speed of stats is
greater than the insert speed.

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-11-07 16:08:24 +08:00 committed by GitHub
parent e47bf21305
commit 4dc684126e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 140 additions and 40 deletions

View File

@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, UpdateSegmentVisible(segID))
operators = append(operators, SetSegmentIsInvisible(segID, false))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}

View File

@ -145,8 +145,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
if s.GetIsInvisible() {
// skip invisible segments
if s.GetIsInvisible() && s.GetCreatedByCompaction() {
// skip invisible compaction segments
continue
}
@ -154,11 +154,10 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
case !isFlushState(s.GetState()) || s.GetIsInvisible():
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
default:
flushedIDs.Insert(s.GetID())
}
@ -185,7 +184,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
return false
}
}

View File

@ -572,13 +572,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
// | | | | | |
// \ | / \ / |
// \ | / \ / |
// [13u] [14i, 15u] 12i
// | | | |
// \ / \ /
// \ / \ /
// [16u] [17u]
// [13u] [14i, 15u] 12i [19u](unsorted)
// | | | | |
// \ / \ / |
// \ / \ / |
// [16u] [17u] [18u](unsorted) [20u](sorted) [21i](unsorted)
// all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6, 20, 21], growing: [18]
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
@ -702,8 +702,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
assert.NoError(t, err)
@ -719,8 +720,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
assert.NoError(t, err)
@ -736,8 +738,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10))
assert.NoError(t, err)
@ -753,8 +756,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11))
assert.NoError(t, err)
@ -770,8 +774,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12))
assert.NoError(t, err)
@ -787,8 +792,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13))
assert.NoError(t, err)
@ -804,8 +810,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14))
assert.NoError(t, err)
@ -821,8 +828,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15))
assert.NoError(t, err)
@ -838,8 +846,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16))
assert.NoError(t, err)
@ -855,15 +864,98 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17))
assert.NoError(t, err)
seg18 := &datapb.SegmentInfo{
ID: 18,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg18))
assert.NoError(t, err)
seg19 := &datapb.SegmentInfo{
ID: 19,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg19))
assert.NoError(t, err)
seg20 := &datapb.SegmentInfo{
ID: 20,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{19},
CreatedByCompaction: true,
IsInvisible: false,
IsSorted: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20))
assert.NoError(t, err)
seg21 := &datapb.SegmentInfo{
ID: 21,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{},
IsInvisible: false,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg21))
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6, 20, 21}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds)
})
}

View File

@ -791,7 +791,7 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator {
}
}
func UpdateSegmentVisible(segmentID int64) UpdateOperator {
func SetSegmentIsInvisible(segmentID int64, isInvisible bool) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
@ -799,7 +799,7 @@ func UpdateSegmentVisible(segmentID int64) UpdateOperator {
zap.Int64("segmentID", segmentID))
return false
}
segment.IsInvisible = false
segment.IsInvisible = isInvisible
return true
}
}
@ -1979,6 +1979,11 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
resultInvisible := oldSegment.GetIsInvisible()
if !oldSegment.GetCreatedByCompaction() {
resultInvisible = false
}
segmentInfo := &datapb.SegmentInfo{
CollectionID: oldSegment.GetCollectionID(),
PartitionID: oldSegment.GetPartitionID(),
@ -1994,7 +1999,8 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
LastLevel: oldSegment.GetLastLevel(),
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(),
IsInvisible: oldSegment.GetIsInvisible(),
CreatedByCompaction: oldSegment.GetCreatedByCompaction(),
IsInvisible: resultInvisible,
ID: result.GetSegmentID(),
NumOfRows: result.GetNumRows(),
Binlogs: result.GetInsertLogs(),
@ -2002,7 +2008,6 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
TextStatsLogs: result.GetTextStatsLogs(),
Bm25Statslogs: result.GetBm25Logs(),
Deltalogs: nil,
CreatedByCompaction: true,
CompactionFrom: []int64{oldSegmentID},
IsSorted: true,
}

View File

@ -1011,8 +1011,12 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush")
}
// set segment to SegmentState_Flushed
if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
var operators []UpdateOperator
operators = append(operators, SetSegmentIsInvisible(segmentID, true))
operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed))
err := s.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("flush segment complete failed", zap.Error(err))
return err
}
select {