mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 19:39:21 +08:00
fix: Correct dropped segment num metrics (#37410)
See also: #31891 Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
f813fb4563
commit
51ed2a61c8
@ -1551,14 +1551,13 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
|
||||
IsSorted: compactToSegment.GetIsSorted(),
|
||||
})
|
||||
|
||||
// L1 segment with NumRows=0 will be discarded, so no need to change the metric
|
||||
if compactToSegmentInfo.GetNumOfRows() > 0 {
|
||||
// metrics mutation for compactTo segments
|
||||
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetIsSorted(), compactToSegmentInfo.GetNumOfRows())
|
||||
} else {
|
||||
if compactToSegmentInfo.GetNumOfRows() == 0 {
|
||||
compactToSegmentInfo.State = commonpb.SegmentState_Dropped
|
||||
}
|
||||
|
||||
// metrics mutation for compactTo segments
|
||||
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetIsSorted(), compactToSegmentInfo.GetNumOfRows())
|
||||
|
||||
log.Info("Add a new compactTo segment",
|
||||
zap.Int64("compactTo", compactToSegmentInfo.GetID()),
|
||||
zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()),
|
||||
@ -1582,6 +1581,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
|
||||
for _, seg := range compactToInfos {
|
||||
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
|
||||
}
|
||||
|
||||
// alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments
|
||||
if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil {
|
||||
log.Warn("fail to alter compactTo segments", zap.Error(err))
|
||||
|
@ -182,103 +182,176 @@ func (suite *MetaBasicSuite) TestCollection() {
|
||||
}
|
||||
|
||||
func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
|
||||
latestSegments := NewSegmentsInfo()
|
||||
for segID, segment := range map[UniqueID]*SegmentInfo{
|
||||
1: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
|
||||
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
|
||||
NumOfRows: 2,
|
||||
}},
|
||||
2: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
|
||||
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
|
||||
NumOfRows: 2,
|
||||
}},
|
||||
} {
|
||||
latestSegments.SetSegment(segID, segment)
|
||||
getLatestSegments := func() *SegmentsInfo {
|
||||
latestSegments := NewSegmentsInfo()
|
||||
for segID, segment := range map[UniqueID]*SegmentInfo{
|
||||
1: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
|
||||
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
|
||||
NumOfRows: 2,
|
||||
}},
|
||||
2: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
|
||||
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
|
||||
NumOfRows: 2,
|
||||
}},
|
||||
} {
|
||||
latestSegments.SetSegment(segID, segment)
|
||||
}
|
||||
|
||||
return latestSegments
|
||||
}
|
||||
|
||||
mockChMgr := mocks.NewChunkManager(suite.T())
|
||||
m := &meta{
|
||||
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
|
||||
segments: latestSegments,
|
||||
chunkManager: mockChMgr,
|
||||
}
|
||||
|
||||
compactToSeg := &datapb.CompactionSegment{
|
||||
SegmentID: 3,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)},
|
||||
NumOfRows: 2,
|
||||
}
|
||||
|
||||
result := &datapb.CompactionPlanResult{
|
||||
Segments: []*datapb.CompactionSegment{compactToSeg},
|
||||
}
|
||||
task := &datapb.CompactionTask{
|
||||
InputSegments: []UniqueID{1, 2},
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
}
|
||||
|
||||
infos, mutation, err := m.CompleteCompactionMutation(task, result)
|
||||
assert.NoError(suite.T(), err)
|
||||
suite.Equal(1, len(infos))
|
||||
info := infos[0]
|
||||
suite.NoError(err)
|
||||
suite.NotNil(info)
|
||||
suite.NotNil(mutation)
|
||||
|
||||
// check newSegment
|
||||
suite.EqualValues(3, info.GetID())
|
||||
suite.Equal(datapb.SegmentLevel_L1, info.GetLevel())
|
||||
suite.Equal(commonpb.SegmentState_Flushed, info.GetState())
|
||||
|
||||
binlogs := info.GetBinlogs()
|
||||
for _, fbinlog := range binlogs {
|
||||
for _, blog := range fbinlog.GetBinlogs() {
|
||||
suite.Empty(blog.GetLogPath())
|
||||
suite.EqualValues(50000, blog.GetLogID())
|
||||
suite.Run("test complete with compactTo 0 num of rows", func() {
|
||||
latestSegments := getLatestSegments()
|
||||
compactToSeg := &datapb.CompactionSegment{
|
||||
SegmentID: 4,
|
||||
InsertLogs: []*datapb.FieldBinlog{},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{},
|
||||
NumOfRows: 0,
|
||||
}
|
||||
}
|
||||
|
||||
statslogs := info.GetStatslogs()
|
||||
for _, fbinlog := range statslogs {
|
||||
for _, blog := range fbinlog.GetBinlogs() {
|
||||
suite.Empty(blog.GetLogPath())
|
||||
suite.EqualValues(50001, blog.GetLogID())
|
||||
result := &datapb.CompactionPlanResult{
|
||||
Segments: []*datapb.CompactionSegment{compactToSeg},
|
||||
}
|
||||
task := &datapb.CompactionTask{
|
||||
InputSegments: []UniqueID{1, 2},
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
}
|
||||
m := &meta{
|
||||
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
|
||||
segments: latestSegments,
|
||||
chunkManager: mockChMgr,
|
||||
}
|
||||
}
|
||||
|
||||
// check compactFrom segments
|
||||
for _, segID := range []int64{1, 2} {
|
||||
seg := m.GetSegment(segID)
|
||||
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
|
||||
suite.NotEmpty(seg.GetDroppedAt())
|
||||
infos, mutation, err := m.CompleteCompactionMutation(task, result)
|
||||
assert.NoError(suite.T(), err)
|
||||
suite.Equal(1, len(infos))
|
||||
info := infos[0]
|
||||
suite.NoError(err)
|
||||
suite.NotNil(info)
|
||||
suite.NotNil(mutation)
|
||||
|
||||
suite.EqualValues(segID, seg.GetID())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs())
|
||||
}
|
||||
// check compact to segments
|
||||
suite.EqualValues(4, info.GetID())
|
||||
suite.Equal(datapb.SegmentLevel_L1, info.GetLevel())
|
||||
suite.Equal(commonpb.SegmentState_Dropped, info.GetState())
|
||||
|
||||
// check mutation metrics
|
||||
suite.Equal(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()]))
|
||||
suite.EqualValues(-2, mutation.rowCountChange)
|
||||
suite.EqualValues(2, mutation.rowCountAccChange)
|
||||
suite.Empty(info.GetBinlogs())
|
||||
suite.Empty(info.GetStatslogs())
|
||||
|
||||
// check compactFrom segments
|
||||
for _, segID := range []int64{1, 2} {
|
||||
seg := m.GetSegment(segID)
|
||||
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
|
||||
suite.NotEmpty(seg.GetDroppedAt())
|
||||
|
||||
suite.EqualValues(segID, seg.GetID())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs())
|
||||
}
|
||||
|
||||
// check mutation metrics
|
||||
suite.EqualValues(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()]))
|
||||
suite.EqualValues(-4, mutation.rowCountChange)
|
||||
suite.EqualValues(0, mutation.rowCountAccChange)
|
||||
flushedUnsorted := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Flushed.String()][getSortStatus(false)]
|
||||
suite.EqualValues(-2, flushedUnsorted)
|
||||
|
||||
droppedUnsorted := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()][getSortStatus(false)]
|
||||
suite.EqualValues(3, droppedUnsorted)
|
||||
})
|
||||
|
||||
suite.Run("test complete compaction mutation", func() {
|
||||
latestSegments := getLatestSegments()
|
||||
compactToSeg := &datapb.CompactionSegment{
|
||||
SegmentID: 3,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)},
|
||||
NumOfRows: 2,
|
||||
}
|
||||
|
||||
result := &datapb.CompactionPlanResult{
|
||||
Segments: []*datapb.CompactionSegment{compactToSeg},
|
||||
}
|
||||
task := &datapb.CompactionTask{
|
||||
InputSegments: []UniqueID{1, 2},
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
}
|
||||
m := &meta{
|
||||
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
|
||||
segments: latestSegments,
|
||||
chunkManager: mockChMgr,
|
||||
}
|
||||
|
||||
infos, mutation, err := m.CompleteCompactionMutation(task, result)
|
||||
assert.NoError(suite.T(), err)
|
||||
suite.Equal(1, len(infos))
|
||||
info := infos[0]
|
||||
suite.NoError(err)
|
||||
suite.NotNil(info)
|
||||
suite.NotNil(mutation)
|
||||
|
||||
// check newSegment
|
||||
suite.EqualValues(3, info.GetID())
|
||||
suite.Equal(datapb.SegmentLevel_L1, info.GetLevel())
|
||||
suite.Equal(commonpb.SegmentState_Flushed, info.GetState())
|
||||
|
||||
binlogs := info.GetBinlogs()
|
||||
for _, fbinlog := range binlogs {
|
||||
for _, blog := range fbinlog.GetBinlogs() {
|
||||
suite.Empty(blog.GetLogPath())
|
||||
suite.EqualValues(50000, blog.GetLogID())
|
||||
}
|
||||
}
|
||||
|
||||
statslogs := info.GetStatslogs()
|
||||
for _, fbinlog := range statslogs {
|
||||
for _, blog := range fbinlog.GetBinlogs() {
|
||||
suite.Empty(blog.GetLogPath())
|
||||
suite.EqualValues(50001, blog.GetLogID())
|
||||
}
|
||||
}
|
||||
|
||||
// check compactFrom segments
|
||||
for _, segID := range []int64{1, 2} {
|
||||
seg := m.GetSegment(segID)
|
||||
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
|
||||
suite.NotEmpty(seg.GetDroppedAt())
|
||||
|
||||
suite.EqualValues(segID, seg.GetID())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs())
|
||||
suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs())
|
||||
}
|
||||
|
||||
// check mutation metrics
|
||||
suite.EqualValues(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()]))
|
||||
suite.EqualValues(-2, mutation.rowCountChange)
|
||||
suite.EqualValues(2, mutation.rowCountAccChange)
|
||||
flushedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Flushed.String()][getSortStatus(false)]
|
||||
suite.EqualValues(-1, flushedCount)
|
||||
|
||||
droppedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()][getSortStatus(false)]
|
||||
suite.EqualValues(2, droppedCount)
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *MetaBasicSuite) TestSetSegment() {
|
||||
|
Loading…
Reference in New Issue
Block a user