mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Fix bug for get index state with compaction segment (#26822)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
7056e9c0f7
commit
d7cd1f2a6d
@ -252,7 +252,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
|
||||
|
||||
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error {
|
||||
// Also prepare metric updates.
|
||||
_, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
|
||||
_, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -393,7 +393,7 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
|
||||
segIdx, ok := seg.segmentIndexes[index.IndexID]
|
||||
|
||||
if !ok {
|
||||
if seg.GetStartPosition().GetTimestamp() <= ts && !seg.GetCreatedByCompaction() {
|
||||
if seg.GetLastExpireTime() <= ts {
|
||||
cntUnissued++
|
||||
}
|
||||
pendingIndexRows += seg.GetNumOfRows()
|
||||
@ -405,7 +405,7 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
|
||||
|
||||
// if realTime, calculate current statistics
|
||||
// if not realTime, skip segments created after index create
|
||||
if !realTime && (seg.GetCreatedByCompaction() || seg.GetStartPosition().GetTimestamp() > ts) {
|
||||
if !realTime && seg.GetLastExpireTime() > ts {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -980,9 +980,10 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
|
||||
// - the segment info of compactedFrom segments after compaction to alter
|
||||
// - the segment info of compactedTo segment after compaction to add
|
||||
// The compactedTo segment could contain 0 numRows
|
||||
func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs,
|
||||
func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
|
||||
result *datapb.CompactionResult) ([]*SegmentInfo, []*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
|
||||
log.Info("meta update: prepare for complete compaction mutation")
|
||||
compactionLogs := plan.GetSegmentBinlogs()
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
@ -1057,6 +1058,7 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
|
||||
DmlPosition: dmlPosition,
|
||||
CreatedByCompaction: true,
|
||||
CompactionFrom: compactionFrom,
|
||||
LastExpireTime: plan.GetStartTime(),
|
||||
}
|
||||
segment := NewSegmentInfo(segmentInfo)
|
||||
metricMutation.addNewSeg(segment.GetState(), segment.GetNumOfRows())
|
||||
|
@ -651,19 +651,22 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
|
||||
segments: prepareSegments,
|
||||
}
|
||||
|
||||
inCompactionLogs := []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: 1,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")},
|
||||
},
|
||||
{
|
||||
SegmentID: 2,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")},
|
||||
plan := &datapb.CompactionPlan{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: 1,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")},
|
||||
},
|
||||
{
|
||||
SegmentID: 2,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")},
|
||||
},
|
||||
},
|
||||
StartTime: 15,
|
||||
}
|
||||
|
||||
inCompactionResult := &datapb.CompactionResult{
|
||||
@ -673,7 +676,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")},
|
||||
NumOfRows: 2,
|
||||
}
|
||||
beforeCompact, afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult)
|
||||
beforeCompact, afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, beforeCompact)
|
||||
assert.NotNil(t, afterCompact)
|
||||
@ -704,6 +707,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
|
||||
assert.EqualValues(t, inCompactionResult.GetField2StatslogPaths(), newSegment.GetStatslogs())
|
||||
assert.EqualValues(t, inCompactionResult.GetDeltalogs(), newSegment.GetDeltalogs())
|
||||
assert.NotZero(t, newSegment.lastFlushTime)
|
||||
assert.Equal(t, uint64(15), newSegment.GetLastExpireTime())
|
||||
}
|
||||
|
||||
func Test_meta_SetSegmentCompacting(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user