mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
fix: compacted segment status was flushing instead flushed and L0 segment trigger gc slowly (#29587)
relate: https://github.com/milvus-io/milvus/issues/29492 Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
a903ae641b
commit
cb18f18c1d
@ -412,7 +412,7 @@ func (c *compactionPlanHandler) handleL0CompactionResult(plan *datapb.Compaction
|
||||
})
|
||||
|
||||
for _, seg := range levelZeroSegments {
|
||||
operators = append(operators, UpdateStatusOperator(seg.SegmentID, commonpb.SegmentState_Dropped))
|
||||
operators = append(operators, UpdateStatusOperator(seg.GetSegmentID(), commonpb.SegmentState_Dropped), UpdateCompactedOperator(seg.GetSegmentID()))
|
||||
}
|
||||
|
||||
log.Info("meta update: update segments info for level zero compaction",
|
||||
|
@ -128,9 +128,9 @@ func (s *CompactionPlanHandlerSuite) TestClean() {
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() {
|
||||
channel := "Ch-1"
|
||||
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
Run(func(operators ...UpdateOperator) {
|
||||
s.Equal(5, len(operators))
|
||||
s.Equal(7, len(operators))
|
||||
}).Return(nil).Once()
|
||||
|
||||
deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}
|
||||
|
@ -521,6 +521,19 @@ func UpdateStatusOperator(segmentID int64, status commonpb.SegmentState) UpdateO
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateCompactedOperator(segmentID int64) UpdateOperator {
|
||||
return func(modPack *updateSegmentPack) bool {
|
||||
segment := modPack.Get(segmentID)
|
||||
if segment == nil {
|
||||
log.Warn("meta update: update binlog failed - segment not found",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
return false
|
||||
}
|
||||
segment.Compacted = true
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// update binlogs in segmentInfo
|
||||
func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator {
|
||||
return func(modPack *updateSegmentPack) bool {
|
||||
@ -1032,7 +1045,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
|
||||
PartitionID: modSegments[0].PartitionID,
|
||||
InsertChannel: modSegments[0].InsertChannel,
|
||||
NumOfRows: compactToSegment.NumOfRows,
|
||||
State: commonpb.SegmentState_Flushing,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
MaxRowNum: modSegments[0].MaxRowNum,
|
||||
Binlogs: compactToSegment.GetInsertLogs(),
|
||||
Statslogs: compactToSegment.GetField2StatslogPaths(),
|
||||
|
@ -533,6 +533,30 @@ func TestUpdateSegmentsInfo(t *testing.T) {
|
||||
assert.Equal(t, updated.NumOfRows, expected.NumOfRows)
|
||||
})
|
||||
|
||||
t.Run("update compacted segment", func(t *testing.T) {
|
||||
meta, err := newMemoryMeta()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// segment not found
|
||||
err = meta.UpdateSegmentsInfo(
|
||||
UpdateCompactedOperator(1),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// normal
|
||||
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1, State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog0", 1))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog0", 1))},
|
||||
}}
|
||||
err = meta.AddSegment(context.TODO(), segment1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = meta.UpdateSegmentsInfo(
|
||||
UpdateCompactedOperator(1),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
t.Run("update non-existed segment", func(t *testing.T) {
|
||||
meta, err := newMemoryMeta()
|
||||
assert.NoError(t, err)
|
||||
@ -745,7 +769,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
|
||||
assert.Equal(t, UniqueID(100), newSegment.GetCollectionID())
|
||||
assert.Equal(t, UniqueID(10), newSegment.GetPartitionID())
|
||||
assert.Equal(t, inSegment.NumOfRows, newSegment.GetNumOfRows())
|
||||
assert.Equal(t, commonpb.SegmentState_Flushing, newSegment.GetState())
|
||||
assert.Equal(t, commonpb.SegmentState_Flushed, newSegment.GetState())
|
||||
|
||||
assert.EqualValues(t, inSegment.GetInsertLogs(), newSegment.GetBinlogs())
|
||||
assert.EqualValues(t, inSegment.GetField2StatslogPaths(), newSegment.GetStatslogs())
|
||||
|
Loading…
Reference in New Issue
Block a user