Refine log of Insert (#12380)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
This commit is contained in:
dragondriver 2021-11-30 20:03:35 +08:00 committed by GitHub
parent bd6b4807cc
commit 36ef91f412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1314,8 +1314,21 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
Status: unhealthyStatus(),
}, nil
}
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
defer sp.Finish()
traceID, _, _ := trace.InfoFromSpan(sp)
log.Debug("Insert received",
zap.String("traceID", traceID),
zap.String("role", Params.RoleName),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
it := &insertTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -1339,31 +1352,9 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
chMgr: node.chMgr,
chTicker: node.chTicker,
}
var err error
log.Debug("Insert",
zap.String("role", Params.RoleName),
zap.Int64("msgID", it.BaseInsertTask.InsertRequest.Base.MsgID),
zap.Uint64("timestamp", it.BaseInsertTask.InsertRequest.Base.Timestamp),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Any("len(RowData)", len(it.RowData)),
zap.Any("len(RowIDs)", len(it.RowIDs)))
defer func() {
log.Debug("Insert Done",
zap.Error(err),
zap.String("role", Params.RoleName),
zap.Int64("msgID", it.BaseInsertTask.InsertRequest.Base.MsgID),
zap.Uint64("timestamp", it.BaseInsertTask.InsertRequest.Base.Timestamp),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Any("len(RowData)", len(it.RowData)),
zap.Any("len(RowIDs)", len(it.RowIDs)))
}()
if len(it.PartitionName) <= 0 {
it.PartitionName = Params.DefaultPartitionName
}
@ -1373,30 +1364,61 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
ErrorCode: commonpb.ErrorCode_Success,
},
}
err = node.sched.dmQueue.Enqueue(it)
log.Debug("Insert Task Enqueue",
zap.Int64("msgID", it.BaseInsertTask.InsertRequest.Base.MsgID),
zap.Uint64("timestamp", it.BaseInsertTask.InsertRequest.Base.Timestamp),
if err != nil {
log.Debug("Insert failed to enqueue",
zap.Error(err),
zap.String("traceID", traceID),
zap.String("role", Params.RoleName),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
result.Status.Reason = err.Error()
numRows := it.req.NumRows
errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ {
errIndex[i] = i
}
result.ErrIndex = errIndex
return result, nil
}
log.Debug("Insert enqueued",
zap.String("traceID", traceID),
zap.String("role", Params.RoleName),
zap.Int64("MsgID", it.ID()),
zap.Uint64("BeginTS", it.BeginTs()),
zap.Uint64("EndTS", it.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
)
if err != nil {
result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
result.Status.Reason = err.Error()
numRows := it.req.NumRows
errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ {
errIndex[i] = i
}
result.ErrIndex = errIndex
return result, nil
}
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
err = it.WaitToFinish()
if err != nil {
log.Debug("Insert failed to WaitToFinish",
zap.Error(err),
zap.String("traceID", traceID),
zap.String("role", Params.RoleName),
zap.Int64("MsgID", it.ID()),
zap.Uint64("BeginTS", it.BeginTs()),
zap.Uint64("EndTS", it.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
result.Status.Reason = err.Error()
numRows := it.req.NumRows
@ -1407,6 +1429,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
result.ErrIndex = errIndex
return result, nil
}
if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
numRows := it.req.NumRows
errIndex := make([]uint32, numRows)
@ -1416,6 +1439,20 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
it.result.ErrIndex = errIndex
}
it.result.InsertCnt = int64(it.req.NumRows)
log.Debug("Insert done",
zap.String("traceID", traceID),
zap.String("role", Params.RoleName),
zap.Int64("MsgID", it.ID()),
zap.Uint64("BeginTS", it.BeginTs()),
zap.Uint64("EndTS", it.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
return it.result, nil
}