mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
enhance: Make compaction log has traceID (#30338)
See also #30167 After support open telemetry tracing, we want to have traceID as well, this PR adds util functions to set traceID with span & propagate traceID between different context. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
743bdf1434
commit
0c7a96b48d
@ -31,6 +31,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/tracer"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -344,10 +345,11 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
|
||||
innerTask := task
|
||||
c.RefreshPlan(innerTask)
|
||||
getOrCreateIOPool().Submit(func() (any, error) {
|
||||
ctx := tracer.SetupSpan(context.Background(), innerTask.span)
|
||||
plan := innerTask.plan
|
||||
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID))
|
||||
log := log.Ctx(ctx).With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID))
|
||||
log.Info("Notify compaction task to DataNode")
|
||||
ts, err := c.allocator.allocTimestamp(context.TODO())
|
||||
ts, err := c.allocator.allocTimestamp(ctx)
|
||||
if err != nil {
|
||||
log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err))
|
||||
// update plan ts to TIMEOUT ts
|
||||
@ -356,8 +358,6 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
|
||||
}
|
||||
c.updateTask(plan.PlanID, setStartTime(ts))
|
||||
|
||||
ctx := trace.ContextWithSpan(context.Background(), task.span)
|
||||
|
||||
err = c.sessions.Compaction(ctx, innerTask.dataNodeID, plan)
|
||||
|
||||
c.updateTask(plan.PlanID, setState(executing))
|
||||
|
@ -432,7 +432,7 @@ func (t *compactionTask) merge(
|
||||
func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("Compact-%d", t.getPlanID()))
|
||||
defer span.End()
|
||||
log := log.With(zap.Int64("planID", t.plan.GetPlanID()))
|
||||
log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID()))
|
||||
compactStart := time.Now()
|
||||
if ok := funcutil.CheckCtxValid(ctx); !ok {
|
||||
log.Warn("compact wrong, task context done or timeout")
|
||||
|
@ -112,7 +112,7 @@ func (t *levelZeroCompactionTask) injectDone() {}
|
||||
func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error) {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "L0Compact")
|
||||
defer span.End()
|
||||
log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String()))
|
||||
log := log.Ctx(t.ctx).With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String()))
|
||||
log.Info("L0 compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
|
||||
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -46,6 +45,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/tracer"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
@ -265,9 +265,11 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
|
||||
}
|
||||
}
|
||||
|
||||
spanCtx := trace.SpanContextFromContext(ctx)
|
||||
/*
|
||||
spanCtx := trace.SpanContextFromContext(ctx)
|
||||
|
||||
taskCtx := trace.ContextWithSpanContext(node.ctx, spanCtx)
|
||||
taskCtx := trace.ContextWithSpanContext(node.ctx, spanCtx)*/
|
||||
taskCtx := tracer.Propagate(ctx, node.ctx)
|
||||
|
||||
var task compactor
|
||||
switch req.GetType() {
|
||||
|
27
pkg/tracer/util.go
Normal file
27
pkg/tracer/util.go
Normal file
@ -0,0 +1,27 @@
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
// SetupSpan add span into ctx values.
|
||||
// Also setup logger in context with tracerID field.
|
||||
func SetupSpan(ctx context.Context, span trace.Span) context.Context {
|
||||
ctx = trace.ContextWithSpan(ctx, span)
|
||||
ctx = log.WithFields(ctx, zap.Stringer("traceID", span.SpanContext().TraceID()))
|
||||
return ctx
|
||||
}
|
||||
|
||||
// Propagate passes span context into a new ctx with different lifetime.
|
||||
// Also setup logger in new context with traceID field.
|
||||
func Propagate(ctx, newRoot context.Context) context.Context {
|
||||
spanCtx := trace.SpanContextFromContext(ctx)
|
||||
|
||||
newCtx := trace.ContextWithSpanContext(newRoot, spanCtx)
|
||||
return log.WithFields(newCtx, zap.Stringer("traceID", spanCtx.TraceID()))
|
||||
}
|
Loading…
Reference in New Issue
Block a user