mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Fix compaction selection policy (#17690)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
1722431699
commit
6fdf88f452
@ -237,9 +237,6 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
||||
}
|
||||
|
||||
plans := t.generatePlans(group.segments, signal.isForce, signal.compactTime)
|
||||
if len(plans) != 0 {
|
||||
log.Info("global generated plans", zap.Int64("collection", signal.collectionID), zap.Int("plan count", len(plans)))
|
||||
}
|
||||
for _, plan := range plans {
|
||||
if !signal.isForce && t.compactionHandler.isFull() {
|
||||
log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID))
|
||||
@ -250,7 +247,11 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
||||
log.Warn("failed to fill plan", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
t.compactionHandler.execCompactionPlan(signal, plan)
|
||||
err := t.compactionHandler.execCompactionPlan(signal, plan)
|
||||
if err != nil {
|
||||
log.Warn("failed to execute compaction plan", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("time cost of generating global compaction", zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(start).Milliseconds()),
|
||||
zap.Int64("collectionID", signal.collectionID), zap.String("channel", group.channelName), zap.Int64("partitionID", group.partitionID))
|
||||
@ -278,7 +279,6 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
|
||||
partitionID := segment.GetPartitionID()
|
||||
segments := t.getCandidateSegments(channel, partitionID)
|
||||
plans := t.generatePlans(segments, signal.isForce, signal.compactTime)
|
||||
log.Info("single generated plans", zap.Int64("collection", signal.collectionID), zap.Int("plan count", len(plans)))
|
||||
for _, plan := range plans {
|
||||
if t.compactionHandler.isFull() {
|
||||
log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID))
|
||||
@ -316,15 +316,15 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
|
||||
var plans []*datapb.CompactionPlan
|
||||
// sort segment from large to small
|
||||
sort.Slice(prioritizedCandidates, func(i, j int) bool {
|
||||
if prioritizedCandidates[i].getSegmentSize() != prioritizedCandidates[i].getSegmentSize() {
|
||||
return prioritizedCandidates[i].getSegmentSize() > prioritizedCandidates[i].getSegmentSize()
|
||||
if prioritizedCandidates[i].GetNumOfRows() != prioritizedCandidates[j].GetNumOfRows() {
|
||||
return prioritizedCandidates[i].GetNumOfRows() > prioritizedCandidates[j].GetNumOfRows()
|
||||
}
|
||||
return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID()
|
||||
})
|
||||
|
||||
sort.Slice(smallCandidates, func(i, j int) bool {
|
||||
if smallCandidates[i].getSegmentSize() != smallCandidates[i].getSegmentSize() {
|
||||
return smallCandidates[i].getSegmentSize() > smallCandidates[i].getSegmentSize()
|
||||
if smallCandidates[i].GetNumOfRows() != smallCandidates[j].GetNumOfRows() {
|
||||
return smallCandidates[i].GetNumOfRows() > smallCandidates[j].GetNumOfRows()
|
||||
}
|
||||
return smallCandidates[i].GetID() < smallCandidates[j].GetID()
|
||||
})
|
||||
@ -341,9 +341,9 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
|
||||
prioritizedCandidates = prioritizedCandidates[1:]
|
||||
|
||||
// only do single file compaction if segment is already large enough
|
||||
if segment.getSegmentSize() < int64(Params.DataCoordCfg.SegmentMaxSize)*1024*1024 {
|
||||
if segment.GetNumOfRows() < segment.GetMaxRowNum() {
|
||||
var result []*SegmentInfo
|
||||
free := int64(Params.DataCoordCfg.SegmentMaxSize)*1024*1024 - segment.getSegmentSize()
|
||||
free := segment.GetMaxRowNum() - segment.GetNumOfRows()
|
||||
maxNum := Params.DataCoordCfg.MaxSegmentToMerge - 1
|
||||
prioritizedCandidates, result, free = greedySelect(prioritizedCandidates, free, maxNum)
|
||||
bucket = append(bucket, result...)
|
||||
@ -354,7 +354,16 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
|
||||
}
|
||||
}
|
||||
// since this is priority compaction, we will execute even if there is only segment
|
||||
plans = append(plans, segmentsToPlan(bucket, compactTime))
|
||||
plan := segmentsToPlan(bucket, compactTime)
|
||||
var size int64
|
||||
var row int64
|
||||
for _, s := range bucket {
|
||||
size += s.getSegmentSize()
|
||||
row += s.GetNumOfRows()
|
||||
}
|
||||
log.Info("generate a plan for priority candidates", zap.Any("plan", plan),
|
||||
zap.Int64("target segment row", row), zap.Int64("target segment size", size))
|
||||
plans = append(plans, plan)
|
||||
}
|
||||
|
||||
// check if there are small candidates left can be merged into large segments
|
||||
@ -366,16 +375,25 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
|
||||
smallCandidates = smallCandidates[1:]
|
||||
|
||||
var result []*SegmentInfo
|
||||
free := int64(Params.DataCoordCfg.SegmentMaxSize*1024*1024) - segment.getSegmentSize()
|
||||
free := segment.GetMaxRowNum() - segment.GetNumOfRows()
|
||||
// for small segment merge, we pick one largest segment and merge as much as small segment together with it
|
||||
// Why reverse? try to merge as many segments as expected.
|
||||
// for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit.
|
||||
smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge-1)
|
||||
bucket = append(bucket, result...)
|
||||
|
||||
// only merge if candidate number is large than MinSegmentToMerge
|
||||
if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge {
|
||||
plans = append(plans, segmentsToPlan(bucket, compactTime))
|
||||
var size int64
|
||||
var targetRow int64
|
||||
for _, s := range bucket {
|
||||
size += s.getSegmentSize()
|
||||
targetRow += s.GetNumOfRows()
|
||||
}
|
||||
// only merge if candidate number is large than MinSegmentToMerge or if target row is large enough
|
||||
if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge || targetRow > int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentSmallProportion) {
|
||||
plan := segmentsToPlan(bucket, compactTime)
|
||||
log.Info("generate a plan for small candidates", zap.Any("plan", plan),
|
||||
zap.Int64("target segment row", targetRow), zap.Int64("target segment size", size))
|
||||
plans = append(plans, plan)
|
||||
}
|
||||
}
|
||||
|
||||
@ -407,9 +425,9 @@ func greedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*Seg
|
||||
|
||||
for i := 0; i < len(candidates); {
|
||||
candidate := candidates[i]
|
||||
if len(result) < maxSegment && candidate.getSegmentSize() < free {
|
||||
if len(result) < maxSegment && candidate.GetNumOfRows() < free {
|
||||
result = append(result, candidate)
|
||||
free -= candidate.getSegmentSize()
|
||||
free -= candidate.GetNumOfRows()
|
||||
candidates = append(candidates[:i], candidates[i+1:]...)
|
||||
} else {
|
||||
i++
|
||||
@ -424,9 +442,9 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int)
|
||||
|
||||
for i := len(candidates) - 1; i >= 0; i-- {
|
||||
candidate := candidates[i]
|
||||
if (len(result) < maxSegment) && (candidate.getSegmentSize() < free) {
|
||||
if (len(result) < maxSegment) && (candidate.GetNumOfRows() < free) {
|
||||
result = append(result, candidate)
|
||||
free -= candidate.getSegmentSize()
|
||||
free -= candidate.GetNumOfRows()
|
||||
candidates = append(candidates[:i], candidates[i+1:]...)
|
||||
}
|
||||
}
|
||||
@ -447,7 +465,7 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo) bool {
|
||||
return segment.getSegmentSize() < int64(Params.DataCoordCfg.SegmentMaxSize*Params.DataCoordCfg.SegmentSmallProportion*1024*1024)
|
||||
return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentSmallProportion)
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error {
|
||||
@ -489,17 +507,20 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
|
||||
|
||||
// if expire time is enabled, put segment into compaction candidate
|
||||
totalExpiredSize := int64(0)
|
||||
totalExpiredRows := 0
|
||||
for _, binlogs := range segment.GetBinlogs() {
|
||||
for _, l := range binlogs.GetBinlogs() {
|
||||
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
|
||||
if l.TimestampTo < compactTime.expireTime {
|
||||
totalExpiredRows += int(l.GetEntriesNum())
|
||||
totalExpiredSize += l.GetLogSize()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize {
|
||||
log.Info("total expired entities is too much, trigger compation", zap.Int64("segment", segment.ID), zap.Int64("expired log size", totalExpiredSize))
|
||||
if float32(totalExpiredRows)/float32(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold || totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize {
|
||||
log.Info("total expired entities is too much, trigger compation", zap.Int64("segment", segment.ID),
|
||||
zap.Int("expired rows", totalExpiredRows), zap.Int64("expired log size", totalExpiredSize))
|
||||
return true
|
||||
}
|
||||
|
||||
@ -508,8 +529,6 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
|
||||
// to ensure that all insert logs is beyond the timetravel.
|
||||
// TODO: add meta in insert binlog
|
||||
if segment.LastExpireTime >= compactTime.travelTime {
|
||||
log.Debug("compaction is not triggered", zap.Int64("segment", segment.ID), zap.Int64("expired log size", totalExpiredSize),
|
||||
zap.Uint64("Expire", segment.LastExpireTime), zap.Uint64("Travel", compactTime.travelTime))
|
||||
return false
|
||||
}
|
||||
|
||||
@ -525,14 +544,12 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
|
||||
}
|
||||
|
||||
// currently delta log size and delete ratio policy is applied
|
||||
if float32(totalDeletedRows)/float32(segment.NumOfRows) >= Params.DataCoordCfg.SingleCompactionRatioThreshold || totalDeleteLogSize > Params.DataCoordCfg.SingleCompactionDeltaLogMaxSize {
|
||||
if float32(totalDeletedRows)/float32(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold || totalDeleteLogSize > Params.DataCoordCfg.SingleCompactionDeltaLogMaxSize {
|
||||
log.Info("total delete entities is too much, trigger compation", zap.Int64("segment", segment.ID),
|
||||
zap.Int("deleted rows", totalDeletedRows), zap.Int64("delete log size", totalDeleteLogSize))
|
||||
return true
|
||||
}
|
||||
|
||||
log.Debug("compaction is not triggered", zap.Int64("segment", segment.ID), zap.Int64("expired log size", totalExpiredSize),
|
||||
zap.Int("deleted rows", totalDeletedRows), zap.Int64("delete log size", totalDeleteLogSize))
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -258,7 +258,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
MaxRowNum: 300000,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{
|
||||
@ -417,7 +417,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 1,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -428,13 +428,6 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogPath: "deltalog1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
2: {
|
||||
@ -443,7 +436,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 200,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -469,7 +462,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 2,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -480,13 +473,6 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogPath: "deltalog1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
4: {
|
||||
@ -495,7 +481,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 3,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -506,13 +492,6 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogPath: "deltalog1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -591,7 +570,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 50,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -617,7 +596,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 50,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -643,7 +622,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 50,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -669,7 +648,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 50,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
@ -767,8 +746,8 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
NumOfRows: size[i],
|
||||
MaxRowNum: 512,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{
|
||||
@ -912,7 +891,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 600,
|
||||
NumOfRows: 100,
|
||||
NumOfRows: 10000,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
|
@ -838,7 +838,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
|
||||
|
||||
ct, err := getCompactTime(ctx, s.allocator)
|
||||
if err != nil {
|
||||
log.Warn("failed to get timetravel reverse time", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err))
|
||||
log.Warn("failed to get compact time", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err))
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -175,20 +175,8 @@ func (dn *deleteNode) showDelBuf() {
|
||||
delDataBuf, _ := v.(*DelDataBuf)
|
||||
log.Debug("delta buffer status",
|
||||
zap.Int64("segment ID", segID),
|
||||
zap.Int64("size", delDataBuf.GetEntriesNum()),
|
||||
zap.Int64("entries", delDataBuf.GetEntriesNum()),
|
||||
zap.String("vChannel", dn.channelName))
|
||||
// TODO control the printed length
|
||||
length := len(delDataBuf.delData.Pks)
|
||||
for i := 0; i < length; i++ {
|
||||
// this log will influence the data node performance as it may be too many,
|
||||
// only use it when we focus on delete operations
|
||||
//log.Debug("del data",
|
||||
// zap.Any("pk", delDataBuf.delData.Pks[i]),
|
||||
// zap.Uint64("ts", delDataBuf.delData.Tss[i]),
|
||||
// zap.Int64("segment ID", segID),
|
||||
// zap.String("vChannel", dn.channelName),
|
||||
//)
|
||||
}
|
||||
} else {
|
||||
log.Debug("segment not exist",
|
||||
zap.Int64("segment ID", segID),
|
||||
|
Loading…
Reference in New Issue
Block a user