From 0eb71f7ee5fafde3d51a25cb7331e272ff0a18e6 Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Tue, 9 Nov 2021 11:01:02 +0800 Subject: [PATCH] Logging traceID in process of processing delete request (#11374) Signed-off-by: Xiangyu Wang --- internal/datanode/flow_graph_delete_node.go | 5 ++++- internal/proxy/impl.go | 11 ++++++----- internal/querynode/flow_graph_delete_node.go | 5 ++++- internal/querynode/flow_graph_filter_dm_node.go | 4 +++- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 5e17b33b27..a297eb79a6 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -175,7 +175,10 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { msg.SetTraceCtx(ctx) } - for _, msg := range fgMsg.deleteMessages { + for i, msg := range fgMsg.deleteMessages { + traceID, _, _ := trace.InfoFromSpan(spans[i]) + log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID)) + if err := dn.bufferDeleteMsg(msg, fgMsg.timeRange); err != nil { log.Error("buffer delete msg failed", zap.Error(err)) } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 5bcfc496e3..651acfc74b 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1367,8 +1367,8 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete") defer sp.Finish() traceID, _, _ := trace.InfoFromSpan(sp) - log.Info("Delete request begin", zap.String("traceID", traceID)) - defer log.Info("Delete request end", zap.String("traceID", traceID)) + log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID)) + defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID)) if !node.checkHealthy() { return &milvuspb.MutationResult{ @@ -1405,7 +1405,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) chTicker: node.chTicker, } - log.Debug("Delete request enqueue", + log.Debug("Enqueue delete request in Proxy", zap.String("role", Params.RoleName), zap.String("db", request.DbName), zap.String("collection", request.CollectionName), @@ -1423,14 +1423,15 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) }, nil } - log.Debug("Delete request detail", + log.Debug("Detail of delete request in Proxy", zap.String("role", Params.RoleName), zap.Int64("msgID", dt.Base.MsgID), zap.Uint64("timestamp", dt.Base.Timestamp), zap.String("db", request.DbName), zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName), - zap.String("expr", request.Expr)) + zap.String("expr", request.Expr), + zap.String("traceID", traceID)) if err := dt.WaitToFinish(); err != nil { log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID)) diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 21e8551ddc..d349a3ae8e 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -49,7 +49,10 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // 1. filter segment by bloom filter - for _, delMsg := range dMsg.deleteMessages { + for i, delMsg := range dMsg.deleteMessages { + traceID, _, _ := trace.InfoFromSpan(spans[i]) + log.Info("Process delete request in QueryNode", zap.String("traceID", traceID)) + if dNode.replica.getSegmentNum() != 0 { log.Debug("delete in historical replica", zap.Any("collectionID", delMsg.CollectionID), diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 9b59241485..176d52cb23 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -70,7 +70,9 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { }, } - for _, msg := range msgStreamMsg.TsMessages() { + for i, msg := range msgStreamMsg.TsMessages() { + traceID, _, _ := trace.InfoFromSpan(spans[i]) + log.Info("Filter invalid message in QueryNode", zap.String("traceID", traceID)) switch msg.Type() { case commonpb.MsgType_Insert: resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))