mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
enhance: Remove pre-marking segments as L2 during clustering compaction (#36799)
issue: #36686 This pr will remove pre-marking segments as L2 during clustering compaction in version 2.5, and ensure compatibility with version 2.4. The core of this change is to **ensure that the many-to-many lineage derivation logic is correct, making sure that both the parent and child cannot simultaneously exist in the target segment view.** feature: - Clustering compaction no longer marks the input segments as L2. - Add a new field `is_invisible` to `segmentInfo`, and mark segments that have completed clustering but have not yet built indexes as `is_invisible` to prevent them from being loaded prematurely." - Do not mark the input segment as `Dropped` before the clustering compaction is completed. - After compaction fails, only the result segment needs to be marked as Dropped. compatibility: - If the upgraded task has not failed, there are no compatibility issues. - If the status after the upgrade is `MetaSaved`, then skip the stats task based on whether TmpSegments is empty. - If the failure occurs before `MetaSaved`: - there are no ResultSegments, and InputSegments have not been marked as dropped yet. - the level of input segments need to revert to LastLevel - If the failure occurs after `MetaSaved`: - ResultSegments have already been generated, and InputSegments have been marked as Dropped. At this point, simply make the ResultSegments visible. - the level of ResultSegments needs to be set to L1(in order to participate in mixCompaction) --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
f0b3942a08
commit
ac8c5fcd5d
@ -126,7 +126,8 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
|
||||
isFlush(segment) &&
|
||||
!segment.isCompacting && // not compacting now
|
||||
!segment.GetIsImporting() && // not importing now
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0 // ignore level zero segments
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
|
||||
!segment.GetIsInvisible()
|
||||
})
|
||||
|
||||
views := make([]CompactionView, 0)
|
||||
|
@ -94,7 +94,8 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
|
||||
isFlush(segment) &&
|
||||
!segment.isCompacting && // not compacting now
|
||||
!segment.GetIsImporting() && // not importing now
|
||||
segment.GetLevel() == datapb.SegmentLevel_L2 // only support L2 for now
|
||||
segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now
|
||||
!segment.GetIsInvisible()
|
||||
})
|
||||
|
||||
views := make([]CompactionView, 0)
|
||||
|
@ -213,15 +213,7 @@ func (t *clusteringCompactionTask) processPipelining() error {
|
||||
log.Debug("wait for the node to be assigned before proceeding with the subsequent steps")
|
||||
return nil
|
||||
}
|
||||
var operators []UpdateOperator
|
||||
for _, segID := range t.InputSegments {
|
||||
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2))
|
||||
}
|
||||
err := t.meta.UpdateSegmentsInfo(operators...)
|
||||
if err != nil {
|
||||
log.Warn("fail to set segment level to L2", zap.Error(err))
|
||||
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo before compaction executing", err)
|
||||
}
|
||||
// don't mark segment level to L2 before clustering compaction after v2.5.0
|
||||
|
||||
if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) {
|
||||
err := t.doAnalyze()
|
||||
@ -298,6 +290,12 @@ func (t *clusteringCompactionTask) processMetaSaved() error {
|
||||
}); err != nil {
|
||||
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
|
||||
}
|
||||
// to ensure compatibility, if a task upgraded from version 2.4 has a status of MetaSave,
|
||||
// its TmpSegments will be empty, so skip the stats task, to build index.
|
||||
if len(t.GetTmpSegments()) == 0 {
|
||||
log.Info("tmp segments is nil, skip stats task", zap.Int64("planID", t.GetPlanID()))
|
||||
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
|
||||
}
|
||||
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic))
|
||||
}
|
||||
|
||||
@ -418,34 +416,63 @@ func (t *clusteringCompactionTask) processIndexing() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
|
||||
var operators []UpdateOperator
|
||||
for _, segID := range t.GetResultSegments() {
|
||||
operators = append(operators, UpdateSegmentVisible(segID))
|
||||
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID()))
|
||||
}
|
||||
|
||||
err := t.meta.UpdateSegmentsInfo(operators...)
|
||||
if err != nil {
|
||||
log.Warn("markResultSegmentVisible UpdateSegmentsInfo fail", zap.Error(err))
|
||||
return merr.WrapErrClusteringCompactionMetaError("markResultSegmentVisible UpdateSegmentsInfo", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
|
||||
var operators []UpdateOperator
|
||||
// mark
|
||||
for _, segID := range t.GetInputSegments() {
|
||||
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
|
||||
}
|
||||
err := t.meta.UpdateSegmentsInfo(operators...)
|
||||
if err != nil {
|
||||
log.Warn("markInputSegmentsDropped UpdateSegmentsInfo fail", zap.Error(err))
|
||||
return merr.WrapErrClusteringCompactionMetaError("markInputSegmentsDropped UpdateSegmentsInfo", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// indexed is the final state of a clustering compaction task
|
||||
// one task should only run this once
|
||||
func (t *clusteringCompactionTask) completeTask() error {
|
||||
err := t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
|
||||
var err error
|
||||
// first mark result segments visible
|
||||
if err = t.markResultSegmentsVisible(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update current partition stats version
|
||||
// at this point, the segment view includes both the input segments and the result segments.
|
||||
if err = t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
|
||||
CollectionID: t.GetCollectionID(),
|
||||
PartitionID: t.GetPartitionID(),
|
||||
VChannel: t.GetChannel(),
|
||||
Version: t.GetPlanID(),
|
||||
SegmentIDs: t.GetResultSegments(),
|
||||
CommitTime: time.Now().Unix(),
|
||||
})
|
||||
if err != nil {
|
||||
}); err != nil {
|
||||
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
|
||||
}
|
||||
|
||||
var operators []UpdateOperator
|
||||
for _, segID := range t.GetResultSegments() {
|
||||
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID()))
|
||||
}
|
||||
err = t.meta.UpdateSegmentsInfo(operators...)
|
||||
if err != nil {
|
||||
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err)
|
||||
// mark input segments as dropped
|
||||
// now, the segment view only includes the result segments.
|
||||
if err = t.markInputSegmentsDropped(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID())
|
||||
if err != nil {
|
||||
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
|
||||
}
|
||||
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
|
||||
}
|
||||
|
||||
@ -479,31 +506,65 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
|
||||
|
||||
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
|
||||
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))
|
||||
|
||||
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||
PlanID: t.GetPlanID(),
|
||||
}); err != nil {
|
||||
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
|
||||
}
|
||||
isInputDropped := false
|
||||
for _, segID := range t.GetInputSegments() {
|
||||
if t.meta.GetHealthySegment(segID) == nil {
|
||||
isInputDropped = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isInputDropped {
|
||||
log.Info("input segments dropped, doing for compatibility",
|
||||
zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
|
||||
// this task must be generated by v2.4, just for compatibility
|
||||
// revert segments meta
|
||||
var operators []UpdateOperator
|
||||
// revert level of input segments
|
||||
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
|
||||
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
|
||||
for _, segID := range t.GetInputSegments() {
|
||||
operators = append(operators, RevertSegmentLevelOperator(segID))
|
||||
}
|
||||
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
|
||||
for _, segID := range t.GetResultSegments() {
|
||||
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
|
||||
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
|
||||
}
|
||||
for _, segID := range t.GetTmpSegments() {
|
||||
// maybe no necessary, there will be no `TmpSegments` that task was generated by v2.4
|
||||
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
|
||||
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
|
||||
}
|
||||
err := t.meta.UpdateSegmentsInfo(operators...)
|
||||
if err != nil {
|
||||
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
|
||||
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
|
||||
}
|
||||
} else {
|
||||
// after v2.5.0, mark the results segment as dropped
|
||||
var operators []UpdateOperator
|
||||
for _, segID := range t.GetResultSegments() {
|
||||
// Don't worry about them being loaded; they are all invisible.
|
||||
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
|
||||
}
|
||||
for _, segID := range t.GetTmpSegments() {
|
||||
// Don't worry about them being loaded; they are all invisible.
|
||||
// tmpSegment is always invisible
|
||||
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
|
||||
}
|
||||
|
||||
// revert segments meta
|
||||
var operators []UpdateOperator
|
||||
// revert level of input segments
|
||||
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
|
||||
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
|
||||
for _, segID := range t.InputSegments {
|
||||
operators = append(operators, RevertSegmentLevelOperator(segID))
|
||||
}
|
||||
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
|
||||
for _, segID := range t.ResultSegments {
|
||||
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
|
||||
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
|
||||
}
|
||||
err := t.meta.UpdateSegmentsInfo(operators...)
|
||||
if err != nil {
|
||||
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
|
||||
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
|
||||
err := t.meta.UpdateSegmentsInfo(operators...)
|
||||
if err != nil {
|
||||
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
|
||||
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.resetSegmentCompacting()
|
||||
|
||||
// drop partition stats if uploaded
|
||||
@ -514,7 +575,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
|
||||
Version: t.GetPlanID(),
|
||||
SegmentIDs: t.GetResultSegments(),
|
||||
}
|
||||
err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
|
||||
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
|
||||
if err != nil {
|
||||
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
|
||||
}
|
||||
|
@ -107,53 +107,181 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
|
||||
},
|
||||
})
|
||||
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
task := s.generateBasicTask(false)
|
||||
|
||||
task.processPipelining()
|
||||
|
||||
seg11 := s.meta.GetSegment(101)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg11.Level)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
|
||||
seg21 := s.meta.GetSegment(102)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
|
||||
s.Equal(int64(10000), seg21.PartitionStatsVersion)
|
||||
|
||||
task.ResultSegments = []int64{103, 104}
|
||||
// fake some compaction result segment
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 103,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
},
|
||||
})
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 104,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
},
|
||||
s.Run("v2.4.x", func() {
|
||||
// fake some compaction result segment
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 101,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
LastLevel: datapb.SegmentLevel_L1,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
},
|
||||
})
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 102,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
LastLevel: datapb.SegmentLevel_L2,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
PartitionStatsVersion: 10000,
|
||||
},
|
||||
})
|
||||
|
||||
// fake some compaction result segment
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 103,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
},
|
||||
})
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 104,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
},
|
||||
})
|
||||
|
||||
task := s.generateBasicTask(false)
|
||||
task.sessions = s.mockSessionMgr
|
||||
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
|
||||
task.InputSegments = []int64{101, 102}
|
||||
task.ResultSegments = []int64{103, 104}
|
||||
|
||||
task.processFailedOrTimeout()
|
||||
|
||||
seg12 := s.meta.GetSegment(101)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
|
||||
s.Equal(commonpb.SegmentState_Dropped, seg12.State)
|
||||
|
||||
seg22 := s.meta.GetSegment(102)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
|
||||
s.Equal(int64(10000), seg22.PartitionStatsVersion)
|
||||
s.Equal(commonpb.SegmentState_Dropped, seg22.State)
|
||||
|
||||
seg32 := s.meta.GetSegment(103)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg32.Level)
|
||||
s.Equal(int64(0), seg32.PartitionStatsVersion)
|
||||
s.Equal(commonpb.SegmentState_Flushed, seg32.State)
|
||||
|
||||
seg42 := s.meta.GetSegment(104)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg42.Level)
|
||||
s.Equal(int64(0), seg42.PartitionStatsVersion)
|
||||
s.Equal(commonpb.SegmentState_Flushed, seg42.State)
|
||||
})
|
||||
|
||||
task.processFailedOrTimeout()
|
||||
s.Run("v2.5.0", func() {
|
||||
// fake some compaction result segment
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 101,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
},
|
||||
})
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 102,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
PartitionStatsVersion: 10000,
|
||||
},
|
||||
})
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 103,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
IsInvisible: true,
|
||||
},
|
||||
})
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 104,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
IsInvisible: true,
|
||||
},
|
||||
})
|
||||
|
||||
seg12 := s.meta.GetSegment(101)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
|
||||
seg22 := s.meta.GetSegment(102)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
|
||||
s.Equal(int64(10000), seg22.PartitionStatsVersion)
|
||||
// fake some compaction result segment
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 105,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
IsInvisible: true,
|
||||
},
|
||||
})
|
||||
s.meta.AddSegment(context.TODO(), &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 106,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
CreatedByCompaction: true,
|
||||
PartitionStatsVersion: 10001,
|
||||
IsInvisible: true,
|
||||
},
|
||||
})
|
||||
|
||||
seg32 := s.meta.GetSegment(103)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg32.Level)
|
||||
s.Equal(int64(0), seg32.PartitionStatsVersion)
|
||||
seg42 := s.meta.GetSegment(104)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg42.Level)
|
||||
s.Equal(int64(0), seg42.PartitionStatsVersion)
|
||||
task := s.generateBasicTask(false)
|
||||
task.sessions = s.mockSessionMgr
|
||||
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
|
||||
task.InputSegments = []int64{101, 102}
|
||||
task.TmpSegments = []int64{103, 104}
|
||||
task.ResultSegments = []int64{105, 106}
|
||||
|
||||
task.processFailedOrTimeout()
|
||||
|
||||
seg12 := s.meta.GetSegment(101)
|
||||
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
|
||||
seg22 := s.meta.GetSegment(102)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
|
||||
s.Equal(int64(10000), seg22.PartitionStatsVersion)
|
||||
|
||||
seg32 := s.meta.GetSegment(103)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg32.Level)
|
||||
s.Equal(commonpb.SegmentState_Dropped, seg32.State)
|
||||
s.True(seg32.IsInvisible)
|
||||
|
||||
seg42 := s.meta.GetSegment(104)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg42.Level)
|
||||
s.Equal(commonpb.SegmentState_Dropped, seg42.State)
|
||||
s.True(seg42.IsInvisible)
|
||||
|
||||
seg52 := s.meta.GetSegment(105)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg52.Level)
|
||||
s.Equal(int64(10001), seg52.PartitionStatsVersion)
|
||||
s.Equal(commonpb.SegmentState_Dropped, seg52.State)
|
||||
s.True(seg52.IsInvisible)
|
||||
|
||||
seg62 := s.meta.GetSegment(106)
|
||||
s.Equal(datapb.SegmentLevel_L2, seg62.Level)
|
||||
s.Equal(int64(10001), seg62.PartitionStatsVersion)
|
||||
s.Equal(commonpb.SegmentState_Dropped, seg62.State)
|
||||
s.True(seg62.IsInvisible)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask {
|
||||
|
@ -317,7 +317,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
|
||||
!segment.isCompacting && // not compacting now
|
||||
!segment.GetIsImporting() && // not importing now
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
|
||||
segment.GetLevel() != datapb.SegmentLevel_L2 // ignore l2 segment
|
||||
segment.GetLevel() != datapb.SegmentLevel_L2 && // ignore l2 segment
|
||||
!segment.GetIsInvisible()
|
||||
}) // partSegments is list of chanPartSegments, which is channel-partition organized segments
|
||||
|
||||
if len(partSegments) == 0 {
|
||||
|
@ -76,6 +76,10 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni
|
||||
// Skip bulk insert segments.
|
||||
continue
|
||||
}
|
||||
if s.GetIsInvisible() {
|
||||
// skip invisible segments
|
||||
continue
|
||||
}
|
||||
|
||||
if s.GetState() == commonpb.SegmentState_Dropped {
|
||||
droppedIDs.Insert(s.GetID())
|
||||
@ -119,10 +123,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
|
||||
}
|
||||
|
||||
var (
|
||||
flushedIDs = make(typeutil.UniqueSet)
|
||||
droppedIDs = make(typeutil.UniqueSet)
|
||||
growingIDs = make(typeutil.UniqueSet)
|
||||
levelZeroIDs = make(typeutil.UniqueSet)
|
||||
flushedIDs = make(typeutil.UniqueSet)
|
||||
droppedIDs = make(typeutil.UniqueSet)
|
||||
growingIDs = make(typeutil.UniqueSet)
|
||||
levelZeroIDs = make(typeutil.UniqueSet)
|
||||
newFlushedIDs = make(typeutil.UniqueSet)
|
||||
)
|
||||
|
||||
// cannot use GetSegmentsByChannel since dropped segments are needed here
|
||||
@ -132,7 +137,6 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
|
||||
indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...)
|
||||
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)
|
||||
|
||||
unIndexedIDs := make(typeutil.UniqueSet)
|
||||
for _, s := range segments {
|
||||
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
|
||||
continue
|
||||
@ -141,37 +145,22 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
|
||||
// Skip bulk insert segments.
|
||||
continue
|
||||
}
|
||||
|
||||
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), s.GetPartitionID(), channel.GetName())
|
||||
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() != currentPartitionStatsVersion {
|
||||
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
|
||||
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
|
||||
// seen atomically, otherwise users will see intermediate result
|
||||
if s.GetIsInvisible() {
|
||||
// skip invisible segments
|
||||
continue
|
||||
}
|
||||
|
||||
validSegmentInfos[s.GetID()] = s
|
||||
switch {
|
||||
case s.GetState() == commonpb.SegmentState_Dropped:
|
||||
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
|
||||
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
|
||||
// it must have been indexed, this is guaranteed by clustering compaction process
|
||||
// this is to ensure that the current valid L2 compaction produce is available to search/query
|
||||
// to avoid insufficient data
|
||||
flushedIDs.Insert(s.GetID())
|
||||
continue
|
||||
}
|
||||
droppedIDs.Insert(s.GetID())
|
||||
case !isFlushState(s.GetState()):
|
||||
growingIDs.Insert(s.GetID())
|
||||
case s.GetLevel() == datapb.SegmentLevel_L0:
|
||||
levelZeroIDs.Insert(s.GetID())
|
||||
case indexed.Contain(s.GetID()) || s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64():
|
||||
// fill in indexed segments into flushed directly
|
||||
flushedIDs.Insert(s.GetID())
|
||||
|
||||
default:
|
||||
// unIndexed segments will be checked if it's parents are all indexed
|
||||
unIndexedIDs.Insert(s.GetID())
|
||||
flushedIDs.Insert(s.GetID())
|
||||
}
|
||||
}
|
||||
|
||||
@ -203,36 +192,55 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
|
||||
return true
|
||||
}
|
||||
|
||||
retrieveUnIndexed := func() bool {
|
||||
var compactionFromExist func(segID UniqueID) bool
|
||||
|
||||
compactionFromExist = func(segID UniqueID) bool {
|
||||
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
|
||||
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
|
||||
return false
|
||||
}
|
||||
for _, fromID := range compactionFrom {
|
||||
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
|
||||
return true
|
||||
}
|
||||
if compactionFromExist(fromID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
segmentIndexed := func(segID UniqueID) bool {
|
||||
return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64()
|
||||
}
|
||||
|
||||
retrieve := func() bool {
|
||||
continueRetrieve := false
|
||||
for id := range unIndexedIDs {
|
||||
for id := range flushedIDs {
|
||||
compactionFrom := validSegmentInfos[id].GetCompactionFrom()
|
||||
compactTos := []UniqueID{} // neighbors and itself
|
||||
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
|
||||
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
|
||||
newFlushedIDs.Insert(id)
|
||||
continue
|
||||
}
|
||||
if segmentIndexed(id) && !compactionFromExist(id) {
|
||||
newFlushedIDs.Insert(id)
|
||||
} else {
|
||||
for _, fromID := range compactionFrom {
|
||||
if len(compactTos) == 0 {
|
||||
compactToInfo, _ := h.s.meta.GetCompactionTo(fromID)
|
||||
compactTos = lo.Map(compactToInfo, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() })
|
||||
}
|
||||
if indexed.Contain(fromID) {
|
||||
flushedIDs.Insert(fromID)
|
||||
} else {
|
||||
unIndexedIDs.Insert(fromID)
|
||||
continueRetrieve = true
|
||||
}
|
||||
newFlushedIDs.Insert(fromID)
|
||||
continueRetrieve = true
|
||||
droppedIDs.Remove(fromID)
|
||||
}
|
||||
unIndexedIDs.Remove(compactTos...)
|
||||
flushedIDs.Remove(compactTos...)
|
||||
droppedIDs.Remove(compactionFrom...)
|
||||
}
|
||||
}
|
||||
return continueRetrieve
|
||||
}
|
||||
for retrieveUnIndexed() {
|
||||
|
||||
for retrieve() {
|
||||
flushedIDs = newFlushedIDs
|
||||
newFlushedIDs = make(typeutil.UniqueSet)
|
||||
}
|
||||
|
||||
// unindexed is flushed segments as well
|
||||
flushedIDs.Insert(unIndexedIDs.Collect()...)
|
||||
flushedIDs = newFlushedIDs
|
||||
|
||||
log.Info("GetQueryVChanPositions",
|
||||
zap.Int64("collectionID", channel.GetCollectionID()),
|
||||
|
@ -560,6 +560,312 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
|
||||
// assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
|
||||
// assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e
|
||||
})
|
||||
|
||||
t.Run("complex derivation", func(t *testing.T) {
|
||||
// numbers indicate segmentID, letters indicate segment information
|
||||
// i: indexed, u: unindexed, g: gced
|
||||
// 1i, 2i, 3g 4i, 5i, 6i
|
||||
// | | | | | |
|
||||
// \ | / \ | /
|
||||
// \ | / \ | /
|
||||
// 7u, [8i,9i,10i] [11u, 12i]
|
||||
// | | | | | |
|
||||
// \ | / \ / |
|
||||
// \ | / \ / |
|
||||
// [13u] [14i, 15u] 12i
|
||||
// | | | |
|
||||
// \ / \ /
|
||||
// \ / \ /
|
||||
// [16u] [17u]
|
||||
// all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6]
|
||||
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6]
|
||||
svr := newTestServer(t)
|
||||
defer closeTestServer(t, svr)
|
||||
schema := newTestSchema()
|
||||
svr.meta.AddCollection(&collectionInfo{
|
||||
ID: 0,
|
||||
Schema: schema,
|
||||
})
|
||||
err := svr.meta.indexMeta.CreateIndex(&model.Index{
|
||||
TenantID: "",
|
||||
CollectionID: 0,
|
||||
FieldID: 2,
|
||||
IndexID: 1,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
seg1 := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1))
|
||||
assert.NoError(t, err)
|
||||
seg2 := &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
|
||||
assert.NoError(t, err)
|
||||
// seg3 was GCed
|
||||
seg4 := &datapb.SegmentInfo{
|
||||
ID: 4,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4))
|
||||
assert.NoError(t, err)
|
||||
seg5 := &datapb.SegmentInfo{
|
||||
ID: 5,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
|
||||
assert.NoError(t, err)
|
||||
seg6 := &datapb.SegmentInfo{
|
||||
ID: 6,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg6))
|
||||
assert.NoError(t, err)
|
||||
seg7 := &datapb.SegmentInfo{
|
||||
ID: 7,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 2048,
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg7))
|
||||
assert.NoError(t, err)
|
||||
seg8 := &datapb.SegmentInfo{
|
||||
ID: 8,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
CompactionFrom: []int64{1, 2, 3},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
|
||||
assert.NoError(t, err)
|
||||
seg9 := &datapb.SegmentInfo{
|
||||
ID: 9,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
CompactionFrom: []int64{1, 2, 3},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
|
||||
assert.NoError(t, err)
|
||||
seg10 := &datapb.SegmentInfo{
|
||||
ID: 10,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
CompactionFrom: []int64{1, 2, 3},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10))
|
||||
assert.NoError(t, err)
|
||||
seg11 := &datapb.SegmentInfo{
|
||||
ID: 11,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 2048,
|
||||
CompactionFrom: []int64{4, 5, 6},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11))
|
||||
assert.NoError(t, err)
|
||||
seg12 := &datapb.SegmentInfo{
|
||||
ID: 12,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
CompactionFrom: []int64{4, 5, 6},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12))
|
||||
assert.NoError(t, err)
|
||||
seg13 := &datapb.SegmentInfo{
|
||||
ID: 13,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 2047,
|
||||
CompactionFrom: []int64{7, 8, 9},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13))
|
||||
assert.NoError(t, err)
|
||||
seg14 := &datapb.SegmentInfo{
|
||||
ID: 14,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 100,
|
||||
CompactionFrom: []int64{10, 11},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14))
|
||||
assert.NoError(t, err)
|
||||
seg15 := &datapb.SegmentInfo{
|
||||
ID: 15,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 2048,
|
||||
CompactionFrom: []int64{10, 11},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15))
|
||||
assert.NoError(t, err)
|
||||
seg16 := &datapb.SegmentInfo{
|
||||
ID: 16,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 2048,
|
||||
CompactionFrom: []int64{13, 14},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16))
|
||||
assert.NoError(t, err)
|
||||
seg17 := &datapb.SegmentInfo{
|
||||
ID: 17,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
NumOfRows: 2048,
|
||||
CompactionFrom: []int64{12, 15},
|
||||
}
|
||||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17))
|
||||
assert.NoError(t, err)
|
||||
|
||||
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
|
||||
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
|
||||
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestShouldDropChannel(t *testing.T) {
|
||||
|
@ -704,7 +704,7 @@ func (p *updateSegmentPack) Get(segmentID int64) *SegmentInfo {
|
||||
}
|
||||
|
||||
segment := p.meta.segments.GetSegment(segmentID)
|
||||
if segment == nil || !isSegmentHealthy(segment) {
|
||||
if segment == nil {
|
||||
log.Warn("meta update: get segment failed - segment not found",
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Bool("segment nil", segment == nil),
|
||||
@ -791,6 +791,19 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator {
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateSegmentVisible(segmentID int64) UpdateOperator {
|
||||
return func(modPack *updateSegmentPack) bool {
|
||||
segment := modPack.Get(segmentID)
|
||||
if segment == nil {
|
||||
log.Warn("meta update: update segment visible fail - segment not found",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
return false
|
||||
}
|
||||
segment.IsInvisible = false
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateSegmentLevelOperator(segmentID int64, level datapb.SegmentLevel) UpdateOperator {
|
||||
return func(modPack *updateSegmentPack) bool {
|
||||
segment := modPack.Get(segmentID)
|
||||
@ -832,9 +845,13 @@ func RevertSegmentLevelOperator(segmentID int64) UpdateOperator {
|
||||
zap.Int64("segmentID", segmentID))
|
||||
return false
|
||||
}
|
||||
segment.Level = segment.LastLevel
|
||||
log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String()))
|
||||
return true
|
||||
// just for compatibility,
|
||||
if segment.GetLevel() != segment.GetLastLevel() && segment.GetLastLevel() != datapb.SegmentLevel_Legacy {
|
||||
segment.Level = segment.LastLevel
|
||||
log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String()))
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@ -1417,14 +1434,9 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
|
||||
}
|
||||
|
||||
cloned := segment.Clone()
|
||||
cloned.DroppedAt = uint64(time.Now().UnixNano())
|
||||
cloned.Compacted = true
|
||||
|
||||
compactFromSegInfos = append(compactFromSegInfos, cloned)
|
||||
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())
|
||||
|
||||
// metrics mutation for compaction from segments
|
||||
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
|
||||
}
|
||||
|
||||
for _, seg := range result.GetSegments() {
|
||||
@ -1448,6 +1460,8 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
|
||||
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
|
||||
return info.GetDmlPosition()
|
||||
})),
|
||||
// visible after stats and index
|
||||
IsInvisible: true,
|
||||
}
|
||||
segment := NewSegmentInfo(segmentInfo)
|
||||
compactToSegInfos = append(compactToSegInfos, segment)
|
||||
@ -1458,10 +1472,6 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
|
||||
log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs))
|
||||
log.Debug("meta update: prepare for meta mutation - complete")
|
||||
|
||||
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
|
||||
return info.SegmentInfo
|
||||
})
|
||||
|
||||
compactToInfos := lo.Map(compactToSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
|
||||
return info.SegmentInfo
|
||||
})
|
||||
@ -1470,18 +1480,11 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
|
||||
for _, seg := range compactToInfos {
|
||||
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
|
||||
}
|
||||
// alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments
|
||||
// only add new segments
|
||||
if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil {
|
||||
log.Warn("fail to alter compactTo segments", zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
|
||||
log.Warn("fail to alter compactFrom segments", zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
|
||||
m.segments.SetSegment(info.GetID(), info)
|
||||
})
|
||||
lo.ForEach(compactToSegInfos, func(info *SegmentInfo, _ int) {
|
||||
m.segments.SetSegment(info.GetID(), info)
|
||||
})
|
||||
@ -1977,35 +1980,41 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
|
||||
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
|
||||
|
||||
segmentInfo := &datapb.SegmentInfo{
|
||||
ID: result.GetSegmentID(),
|
||||
CollectionID: result.GetCollectionID(),
|
||||
PartitionID: result.GetPartitionID(),
|
||||
InsertChannel: result.GetChannel(),
|
||||
NumOfRows: result.GetNumRows(),
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
MaxRowNum: cloned.GetMaxRowNum(),
|
||||
Binlogs: result.GetInsertLogs(),
|
||||
Statslogs: result.GetStatsLogs(),
|
||||
Bm25Statslogs: result.GetBm25Logs(),
|
||||
TextStatsLogs: result.GetTextStatsLogs(),
|
||||
CreatedByCompaction: true,
|
||||
CompactionFrom: []int64{oldSegmentID},
|
||||
LastExpireTime: cloned.GetLastExpireTime(),
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
StartPosition: cloned.GetStartPosition(),
|
||||
DmlPosition: cloned.GetDmlPosition(),
|
||||
IsSorted: true,
|
||||
IsImporting: cloned.GetIsImporting(),
|
||||
CollectionID: oldSegment.GetCollectionID(),
|
||||
PartitionID: oldSegment.GetPartitionID(),
|
||||
InsertChannel: oldSegment.GetInsertChannel(),
|
||||
MaxRowNum: oldSegment.GetMaxRowNum(),
|
||||
LastExpireTime: oldSegment.GetLastExpireTime(),
|
||||
StartPosition: oldSegment.GetStartPosition(),
|
||||
DmlPosition: oldSegment.GetDmlPosition(),
|
||||
IsImporting: oldSegment.GetIsImporting(),
|
||||
StorageVersion: oldSegment.GetStorageVersion(),
|
||||
State: oldSegment.GetState(),
|
||||
Level: oldSegment.GetLevel(),
|
||||
LastLevel: oldSegment.GetLastLevel(),
|
||||
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
|
||||
LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(),
|
||||
IsInvisible: oldSegment.GetIsInvisible(),
|
||||
ID: result.GetSegmentID(),
|
||||
NumOfRows: result.GetNumRows(),
|
||||
Binlogs: result.GetInsertLogs(),
|
||||
Statslogs: result.GetStatsLogs(),
|
||||
TextStatsLogs: result.GetTextStatsLogs(),
|
||||
Bm25Statslogs: result.GetBm25Logs(),
|
||||
Deltalogs: nil,
|
||||
CreatedByCompaction: true,
|
||||
CompactionFrom: []int64{oldSegmentID},
|
||||
IsSorted: true,
|
||||
}
|
||||
segment := NewSegmentInfo(segmentInfo)
|
||||
if segment.GetNumOfRows() > 0 {
|
||||
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows())
|
||||
} else {
|
||||
segment.State = commonpb.SegmentState_Dropped
|
||||
segment.DroppedAt = uint64(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", result.GetNumRows()))
|
||||
|
||||
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil {
|
||||
log.Warn("fail to alter segments and new segment", zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -121,6 +121,8 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat
|
||||
}
|
||||
|
||||
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos[info.GetVersion()] = info
|
||||
// after v2.5.0, the current version will be updated when updating the partition stats info, so there’s no need to split it into two steps
|
||||
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].currentVersion = info.Version
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -75,7 +75,7 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() {
|
||||
s.NotNil(ps)
|
||||
|
||||
currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
|
||||
s.Equal(emptyPartitionStatsVersion, currentVersion)
|
||||
s.Equal(int64(100), currentVersion)
|
||||
|
||||
currentVersion2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-2")
|
||||
s.Equal(emptyPartitionStatsVersion, currentVersion2)
|
||||
|
@ -67,6 +67,7 @@ func (s *statsTaskSuite) SetupSuite() {
|
||||
NumOfRows: 65535,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
MaxRowNum: 65535,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -82,6 +83,7 @@ func (s *statsTaskSuite) SetupSuite() {
|
||||
NumOfRows: 65535,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
MaxRowNum: 65535,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -97,6 +99,7 @@ func (s *statsTaskSuite) SetupSuite() {
|
||||
NumOfRows: 65535,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
MaxRowNum: 65535,
|
||||
Level: datapb.SegmentLevel_L2,
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -545,12 +548,14 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
|
||||
catalog := catalogmocks.NewDataCoordCatalog(s.T())
|
||||
s.mt.catalog = catalog
|
||||
s.mt.statsTaskMeta.catalog = catalog
|
||||
s.mt.segments.SetState(s.segID, commonpb.SegmentState_Flushed)
|
||||
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
s.NoError(st.SetJobInfo(s.mt))
|
||||
s.NotNil(s.mt.GetHealthySegment(s.segID + 1))
|
||||
s.NotNil(s.mt.GetHealthySegment(s.targetID))
|
||||
s.Equal(indexpb.JobState_JobStateFinished, s.mt.statsTaskMeta.tasks[s.taskID].GetState())
|
||||
s.Equal(datapb.SegmentLevel_L2, s.mt.GetHealthySegment(s.targetID).GetLevel())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -357,6 +357,10 @@ message SegmentInfo {
|
||||
// textStatsLogs is used to record tokenization index for fields.
|
||||
map<int64, TextIndexStats> textStatsLogs = 26;
|
||||
repeated FieldBinlog bm25statslogs = 27;
|
||||
|
||||
// This field is used to indicate that some intermediate state segments should not be loaded.
|
||||
// For example, segments that have been clustered but haven't undergone stats yet.
|
||||
bool is_invisible = 28;
|
||||
}
|
||||
|
||||
message SegmentStartPosition {
|
||||
|
Loading…
Reference in New Issue
Block a user