diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 582a741b6b..3597a2a632 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -344,10 +345,11 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { innerTask := task c.RefreshPlan(innerTask) getOrCreateIOPool().Submit(func() (any, error) { + ctx := tracer.SetupSpan(context.Background(), innerTask.span) plan := innerTask.plan - log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID)) + log := log.Ctx(ctx).With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID)) log.Info("Notify compaction task to DataNode") - ts, err := c.allocator.allocTimestamp(context.TODO()) + ts, err := c.allocator.allocTimestamp(ctx) if err != nil { log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err)) // update plan ts to TIMEOUT ts @@ -356,8 +358,6 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { } c.updateTask(plan.PlanID, setStartTime(ts)) - ctx := trace.ContextWithSpan(context.Background(), task.span) - err = c.sessions.Compaction(ctx, innerTask.dataNodeID, plan) c.updateTask(plan.PlanID, setState(executing)) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 34809db0cc..263841273b 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -432,7 +432,7 @@ func (t *compactionTask) merge( func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("Compact-%d", t.getPlanID())) defer span.End() - log := log.With(zap.Int64("planID", t.plan.GetPlanID())) + log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID())) compactStart := time.Now() if ok := funcutil.CheckCtxValid(ctx); !ok { log.Warn("compact wrong, task context done or timeout") diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 77dee3a39c..f717a41dfa 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -112,7 +112,7 @@ func (t *levelZeroCompactionTask) injectDone() {} func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "L0Compact") defer span.End() - log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String())) + log := log.Ctx(t.ctx).With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String())) log.Info("L0 compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) if !funcutil.CheckCtxValid(ctx) { diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 53be89c264..5499aa594c 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -27,7 +27,6 @@ import ( "time" "github.com/cockroachdb/errors" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -46,6 +45,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" @@ -265,9 +265,11 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan } } - spanCtx := trace.SpanContextFromContext(ctx) + /* + spanCtx := trace.SpanContextFromContext(ctx) - taskCtx := trace.ContextWithSpanContext(node.ctx, spanCtx) + taskCtx := trace.ContextWithSpanContext(node.ctx, spanCtx)*/ + taskCtx := tracer.Propagate(ctx, node.ctx) var task compactor switch req.GetType() { diff --git a/pkg/tracer/util.go b/pkg/tracer/util.go new file mode 100644 index 0000000000..54427d94bb --- /dev/null +++ b/pkg/tracer/util.go @@ -0,0 +1,27 @@ +package tracer + +import ( + "context" + + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" +) + +// SetupSpan add span into ctx values. +// Also setup logger in context with tracerID field. +func SetupSpan(ctx context.Context, span trace.Span) context.Context { + ctx = trace.ContextWithSpan(ctx, span) + ctx = log.WithFields(ctx, zap.Stringer("traceID", span.SpanContext().TraceID())) + return ctx +} + +// Propagate passes span context into a new ctx with different lifetime. +// Also setup logger in new context with traceID field. +func Propagate(ctx, newRoot context.Context) context.Context { + spanCtx := trace.SpanContextFromContext(ctx) + + newCtx := trace.ContextWithSpanContext(newRoot, spanCtx) + return log.WithFields(newCtx, zap.Stringer("traceID", spanCtx.TraceID())) +}