From 31ef0a1fe891d5a55b0661b38a6aec2e77ee15ec Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 20 Jun 2024 10:10:05 +0800 Subject: [PATCH] enhance: Add trace for bf cost in l0 compactor (#33860) Signed-off-by: Wei Liu --- internal/datanode/compaction/l0_compactor.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index 0163d443d3..984d7b20e4 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -182,6 +182,8 @@ func getMaxBatchSize(totalSize int64) int { } func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) { + traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact serializeUpload") + defer span.End() allBlobs := make(map[string][]byte) results := make([]*datapb.CompactionSegment, 0) for segID, writer := range segmentWriters { @@ -221,7 +223,7 @@ func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr return nil, nil } - if err := t.Upload(ctx, allBlobs); err != nil { + if err := t.Upload(traceCtx, allBlobs); err != nil { log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err)) return nil, err } @@ -234,7 +236,7 @@ func (t *LevelZeroCompactionTask) splitDelta( allDelta []*storage.DeleteData, segmentBfs map[int64]*metacache.BloomFilterSet, ) map[int64]*SegmentDeltaWriter { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") + traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") defer span.End() allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) { @@ -243,7 +245,7 @@ func (t *LevelZeroCompactionTask) splitDelta( // spilt all delete data to segments - retMap := t.applyBFInParallel(allDelta, io.GetBFApplyPool(), segmentBfs) + retMap := t.applyBFInParallel(traceCtx, allDelta, io.GetBFApplyPool(), segmentBfs) targetSegBuffer := make(map[int64]*SegmentDeltaWriter) retMap.Range(func(key int, value *BatchApplyRet) bool { @@ -278,7 +280,9 @@ type BatchApplyRet = struct { Segment2Hits map[int64][]bool } -func (t *LevelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] { +func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel") + defer span.End() batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {