enhance: [cherry-pick] Alter compactTo segments before compactFrom to avoid data loss if crash (#34532)

issue: #34512 
pr: #34513

Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
wayblink 2024-07-12 00:47:37 +08:00 committed by GitHub
parent 1d2062a68a
commit 86b57b7827
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 82 deletions

View File

@ -388,18 +388,13 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
}
if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
log.RatedInfo(20, "collection auto compaction disabled",
zap.Int64("collectionID", group.collectionID),
)
log.RatedInfo(20, "collection auto compaction disabled")
return nil
}
ct, err := getCompactTime(tsoutil.ComposeTSByTime(time.Now(), 0), coll)
if err != nil {
log.Warn("get compact time failed, skip to handle compaction",
zap.Int64("collectionID", group.collectionID),
zap.Int64("partitionID", group.partitionID),
zap.String("channel", group.channelName))
log.Warn("get compact time failed, skip to handle compaction")
return err
}
@ -412,9 +407,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
totalRows := plan.A
segIDs := plan.B
if !signal.isForce && t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full",
zap.Int64("collectionID", signal.collectionID),
zap.Int64s("segmentIDs", segIDs))
log.Warn("compaction plan skipped due to handler full", zap.Int64s("segmentIDs", segIDs))
break
}
start := time.Now()
@ -429,7 +422,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
CollectionTtl: ct.collectionTTL.Nanoseconds(),
CollectionID: signal.collectionID,
CollectionID: group.collectionID,
PartitionID: group.partitionID,
Channel: group.channelName,
InputSegments: segIDs,
@ -439,19 +432,13 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
err := t.compactionHandler.enqueueCompaction(task)
if err != nil {
log.Warn("failed to execute compaction task",
zap.Int64("collectionID", signal.collectionID),
zap.Int64("planID", planID),
zap.Int64s("segmentIDs", segIDs),
zap.Error(err))
continue
}
log.Info("time cost of generating global compaction",
zap.Int64("planID", planID),
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID),
zap.String("channel", group.channelName),
zap.Int64("partitionID", group.partitionID),
zap.Int64s("segmentIDs", segIDs))
}
}

View File

@ -790,6 +790,7 @@ func UpdateSegmentPartitionStatsVersionOperator(segmentID int64, version int64)
}
segment.LastPartitionStatsVersion = segment.PartitionStatsVersion
segment.PartitionStatsVersion = version
log.Debug("update segment version", zap.Int64("segmentID", segmentID), zap.Int64("PartitionStatsVersion", version), zap.Int64("LastPartitionStatsVersion", segment.LastPartitionStatsVersion))
return true
}
}
@ -803,6 +804,7 @@ func RevertSegmentLevelOperator(segmentID int64) UpdateOperator {
return false
}
segment.Level = segment.LastLevel
log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String()))
return true
}
}
@ -816,6 +818,7 @@ func RevertSegmentPartitionStatsVersionOperator(segmentID int64) UpdateOperator
return false
}
segment.PartitionStatsVersion = segment.LastPartitionStatsVersion
log.Debug("revert segment partition stats version", zap.Int64("segmentID", segmentID), zap.Int64("LastPartitionStatsVersion", segment.LastPartitionStatsVersion))
return true
}
}
@ -1351,12 +1354,30 @@ func (m *meta) SetSegmentLevel(segmentID UniqueID, level datapb.SegmentLevel) {
m.segments.SetLevel(segmentID, level)
}
func getMinPosition(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}
func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.With(zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
compactFromSegIDs := make([]int64, 0)
compactToSegIDs := make([]int64, 0)
compactFromSegInfos := make([]*SegmentInfo, 0)
compactToSegInfos := make([]*SegmentInfo, 0)
for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if segment == nil {
@ -1367,92 +1388,85 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegInfos = append(compactFromSegInfos, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
}
getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}
newSegments := make([]*SegmentInfo, 0)
for _, seg := range result.GetSegments() {
segmentInfo := &datapb.SegmentInfo{
ID: seg.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
CollectionID: compactFromSegInfos[0].CollectionID,
PartitionID: compactFromSegInfos[0].PartitionID,
InsertChannel: t.GetChannel(),
NumOfRows: seg.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
MaxRowNum: compactFromSegInfos[0].MaxRowNum,
Binlogs: seg.GetInsertLogs(),
Statslogs: seg.GetField2StatslogPaths(),
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L2,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
}
segment := NewSegmentInfo(segmentInfo)
newSegments = append(newSegments, segment)
compactToSegInfos = append(compactToSegInfos, segment)
compactToSegIDs = append(compactToSegIDs, segment.GetID())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
}
compactionTo := make([]UniqueID, 0, len(newSegments))
for _, s := range newSegments {
compactionTo = append(compactionTo, s.GetID())
}
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", latestCompactFromSegments[0].CollectionID),
zap.Int64("partitionID", latestCompactFromSegments[0].PartitionID),
zap.Any("compacted from", compactFromSegIDs),
zap.Any("compacted to", compactionTo))
log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs))
log.Debug("meta update: prepare for meta mutation - complete")
compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
newSegmentInfos := lo.Map(newSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactToInfos := lo.Map(compactToSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
binlogs := make([]metastore.BinlogsIncrement, 0)
for _, seg := range newSegmentInfos {
for _, seg := range compactToInfos {
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
}
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, newSegmentInfos...), binlogs...); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
// alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments
if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil {
log.Warn("fail to alter compactTo segments", zap.Error(err))
return nil, nil, err
}
lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
log.Warn("fail to alter compactFrom segments", zap.Error(err))
return nil, nil, err
}
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
lo.ForEach(newSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactToSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
return newSegments, metricMutation, nil
log.Info("meta update: alter in memory meta after compaction - complete")
return compactToSegInfos, metricMutation, nil
}
func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.With(zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
var compactFromSegInfos []*SegmentInfo
for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if segment == nil {
@ -1463,36 +1477,25 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegInfos = append(compactFromSegInfos, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
}
getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}
// MixCompaction / MergeCompaction will generates one and only one segment
compactToSegment := result.GetSegments()[0]
compactToSegmentInfo := NewSegmentInfo(
&datapb.SegmentInfo{
ID: compactToSegment.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
CollectionID: compactFromSegInfos[0].CollectionID,
PartitionID: compactFromSegInfos[0].PartitionID,
InsertChannel: t.GetChannel(),
NumOfRows: compactToSegment.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
MaxRowNum: compactFromSegInfos[0].MaxRowNum,
Binlogs: compactToSegment.GetInsertLogs(),
Statslogs: compactToSegment.GetField2StatslogPaths(),
Deltalogs: compactToSegment.GetDeltalogs(),
@ -1502,10 +1505,10 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L1,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
})
@ -1519,15 +1522,13 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
}
log = log.With(
zap.String("channel", t.GetChannel()),
zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()),
zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()),
zap.Int64s("compactFrom", compactFromSegIDs),
zap.Int64("compactTo", compactToSegmentInfo.GetID()),
zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()),
zap.Any("compactFrom segments(to be updated as dropped)", compactFromSegIDs),
)
log.Debug("meta update: prepare for meta mutation - complete")
compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
@ -1536,14 +1537,18 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
)
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, compactToSegmentInfo.SegmentInfo),
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{compactToSegmentInfo.SegmentInfo},
metastore.BinlogsIncrement{Segment: compactToSegmentInfo.SegmentInfo},
); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
log.Warn("fail to alter compactTo segments", zap.Error(err))
return nil, nil, err
}
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
log.Warn("fail to alter compactFrom segments", zap.Error(err))
return nil, nil, err
}
lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
m.segments.SetSegment(compactToSegmentInfo.GetID(), compactToSegmentInfo)