enhance: Add trace for bf cost in l0 compactor (#33860)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-06-20 10:10:05 +08:00 committed by GitHub
parent f3d902cf16
commit 31ef0a1fe8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -182,6 +182,8 @@ func getMaxBatchSize(totalSize int64) int {
} }
func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) { func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact serializeUpload")
defer span.End()
allBlobs := make(map[string][]byte) allBlobs := make(map[string][]byte)
results := make([]*datapb.CompactionSegment, 0) results := make([]*datapb.CompactionSegment, 0)
for segID, writer := range segmentWriters { for segID, writer := range segmentWriters {
@ -221,7 +223,7 @@ func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr
return nil, nil return nil, nil
} }
if err := t.Upload(ctx, allBlobs); err != nil { if err := t.Upload(traceCtx, allBlobs); err != nil {
log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err)) log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err))
return nil, err return nil, err
} }
@ -234,7 +236,7 @@ func (t *LevelZeroCompactionTask) splitDelta(
allDelta []*storage.DeleteData, allDelta []*storage.DeleteData,
segmentBfs map[int64]*metacache.BloomFilterSet, segmentBfs map[int64]*metacache.BloomFilterSet,
) map[int64]*SegmentDeltaWriter { ) map[int64]*SegmentDeltaWriter {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End() defer span.End()
allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) { allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) {
@ -243,7 +245,7 @@ func (t *LevelZeroCompactionTask) splitDelta(
// spilt all delete data to segments // spilt all delete data to segments
retMap := t.applyBFInParallel(allDelta, io.GetBFApplyPool(), segmentBfs) retMap := t.applyBFInParallel(traceCtx, allDelta, io.GetBFApplyPool(), segmentBfs)
targetSegBuffer := make(map[int64]*SegmentDeltaWriter) targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
retMap.Range(func(key int, value *BatchApplyRet) bool { retMap.Range(func(key int, value *BatchApplyRet) bool {
@ -278,7 +280,9 @@ type BatchApplyRet = struct {
Segment2Hits map[int64][]bool Segment2Hits map[int64][]bool
} }
func (t *LevelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] { func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel")
defer span.End()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool { batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {