From 36ef91f412c7d3ba1cc701e85316a3754d48d559 Mon Sep 17 00:00:00 2001 From: dragondriver Date: Tue, 30 Nov 2021 20:03:35 +0800 Subject: [PATCH] Refine log of Insert (#12380) Signed-off-by: dragondriver --- internal/proxy/impl.go | 115 +++++++++++++++++++++++++++-------------- 1 file changed, 76 insertions(+), 39 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index b96c4007b1..27787f336e 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1314,8 +1314,21 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) Status: unhealthyStatus(), }, nil } + sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert") defer sp.Finish() + traceID, _, _ := trace.InfoFromSpan(sp) + + log.Debug("Insert received", + zap.String("traceID", traceID), + zap.String("role", Params.RoleName), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("partition", request.PartitionName), + zap.Int("len(FieldsData)", len(request.FieldsData)), + zap.Int("len(HashKeys)", len(request.HashKeys)), + zap.Uint32("NumRows", request.NumRows)) + it := &insertTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -1339,31 +1352,9 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) chMgr: node.chMgr, chTicker: node.chTicker, } + var err error - log.Debug("Insert", - zap.String("role", Params.RoleName), - zap.Int64("msgID", it.BaseInsertTask.InsertRequest.Base.MsgID), - zap.Uint64("timestamp", it.BaseInsertTask.InsertRequest.Base.Timestamp), - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName), - zap.String("partition", request.PartitionName), - zap.Any("len(RowData)", len(it.RowData)), - zap.Any("len(RowIDs)", len(it.RowIDs))) - - defer func() { - log.Debug("Insert Done", - zap.Error(err), - zap.String("role", Params.RoleName), - zap.Int64("msgID", it.BaseInsertTask.InsertRequest.Base.MsgID), - zap.Uint64("timestamp", it.BaseInsertTask.InsertRequest.Base.Timestamp), - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName), - zap.String("partition", request.PartitionName), - zap.Any("len(RowData)", len(it.RowData)), - zap.Any("len(RowIDs)", len(it.RowIDs))) - }() - if len(it.PartitionName) <= 0 { it.PartitionName = Params.DefaultPartitionName } @@ -1373,30 +1364,61 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) ErrorCode: commonpb.ErrorCode_Success, }, } + err = node.sched.dmQueue.Enqueue(it) - log.Debug("Insert Task Enqueue", - zap.Int64("msgID", it.BaseInsertTask.InsertRequest.Base.MsgID), - zap.Uint64("timestamp", it.BaseInsertTask.InsertRequest.Base.Timestamp), + if err != nil { + log.Debug("Insert failed to enqueue", + zap.Error(err), + zap.String("traceID", traceID), + zap.String("role", Params.RoleName), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("partition", request.PartitionName), + zap.Int("len(FieldsData)", len(request.FieldsData)), + zap.Int("len(HashKeys)", len(request.HashKeys)), + zap.Uint32("NumRows", request.NumRows)) + + result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + result.Status.Reason = err.Error() + numRows := it.req.NumRows + errIndex := make([]uint32, numRows) + for i := uint32(0); i < numRows; i++ { + errIndex[i] = i + } + result.ErrIndex = errIndex + return result, nil + } + + log.Debug("Insert enqueued", + zap.String("traceID", traceID), + zap.String("role", Params.RoleName), + zap.Int64("MsgID", it.ID()), + zap.Uint64("BeginTS", it.BeginTs()), + zap.Uint64("EndTS", it.EndTs()), zap.String("db", request.DbName), zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName), - ) - - if err != nil { - result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - result.Status.Reason = err.Error() - numRows := it.req.NumRows - errIndex := make([]uint32, numRows) - for i := uint32(0); i < numRows; i++ { - errIndex[i] = i - } - result.ErrIndex = errIndex - return result, nil - } + zap.Int("len(FieldsData)", len(request.FieldsData)), + zap.Int("len(HashKeys)", len(request.HashKeys)), + zap.Uint32("NumRows", request.NumRows)) err = it.WaitToFinish() if err != nil { + log.Debug("Insert failed to WaitToFinish", + zap.Error(err), + zap.String("traceID", traceID), + zap.String("role", Params.RoleName), + zap.Int64("MsgID", it.ID()), + zap.Uint64("BeginTS", it.BeginTs()), + zap.Uint64("EndTS", it.EndTs()), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("partition", request.PartitionName), + zap.Int("len(FieldsData)", len(request.FieldsData)), + zap.Int("len(HashKeys)", len(request.HashKeys)), + zap.Uint32("NumRows", request.NumRows)) + result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError result.Status.Reason = err.Error() numRows := it.req.NumRows @@ -1407,6 +1429,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) result.ErrIndex = errIndex return result, nil } + if it.result.Status.ErrorCode != commonpb.ErrorCode_Success { numRows := it.req.NumRows errIndex := make([]uint32, numRows) @@ -1416,6 +1439,20 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) it.result.ErrIndex = errIndex } it.result.InsertCnt = int64(it.req.NumRows) + + log.Debug("Insert done", + zap.String("traceID", traceID), + zap.String("role", Params.RoleName), + zap.Int64("MsgID", it.ID()), + zap.Uint64("BeginTS", it.BeginTs()), + zap.Uint64("EndTS", it.EndTs()), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("partition", request.PartitionName), + zap.Int("len(FieldsData)", len(request.FieldsData)), + zap.Int("len(HashKeys)", len(request.HashKeys)), + zap.Uint32("NumRows", request.NumRows)) + return it.result, nil }