Replace all status with only error string (#27125)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2023-09-18 14:05:28 +08:00 committed by GitHub
parent 0aa7503d54
commit a6b98740b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 79 additions and 130 deletions

View File

@ -596,12 +596,9 @@ func (s *Server) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStat
if err != nil {
log.Error("failed to updated segment state in dataCoord meta",
zap.Int64("segmentID", req.SegmentId),
zap.String("to state", req.GetNewState().String()))
zap.String("newState", req.GetNewState().String()))
return &datapb.SetSegmentStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}, nil
}
return &datapb.SetSegmentStateResponse{
@ -1013,15 +1010,12 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
log.Warn("DataCoord.GetMetrics failed to parse metric type",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.Error(err))
zap.Error(err),
)
return &milvuspb.GetMetricsResponse{
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
Status: merr.Status(err),
}, nil
}
@ -1030,10 +1024,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
if err != nil {
log.Warn("DataCoord GetMetrics failed", zap.Int64("nodeID", paramtable.GetNodeID()), zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}, nil
}
@ -1598,10 +1589,7 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe
log.Error("failed to get DataNode client for SaveImportSegment",
zap.Int64("DataNode ID", nodeID),
zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
return merr.Status(err), nil
}
resp, err := cli.AddImportSegment(ctx,
&datapb.AddImportSegmentRequest{
@ -1618,10 +1606,7 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe
})
if err := VerifyResponse(resp.GetStatus(), err); err != nil {
log.Error("failed to add segment", zap.Int64("DataNode ID", nodeID), zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
return merr.Status(err), nil
}
log.Info("succeed to add segment", zap.Int64("DataNode ID", nodeID), zap.Any("add segment req", req))
// Fill in start position message ID.
@ -1631,10 +1616,7 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe
rsp, err := s.SaveBinlogPaths(context.Background(), req.GetSaveBinlogPathReq())
if err := VerifyResponse(rsp, err); err != nil {
log.Error("failed to SaveBinlogPaths", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
return merr.Status(err), nil
}
return merr.Status(nil), nil
}
@ -1649,17 +1631,14 @@ func (s *Server) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsI
for _, segID := range req.GetSegmentIds() {
if err := s.meta.UnsetIsImporting(segID); err != nil {
// Fail-open.
log.Error("failed to unset segment is importing state", zap.Int64("segmentID", segID))
log.Error("failed to unset segment is importing state",
zap.Int64("segmentID", segID),
)
reportErr = err
}
}
if reportErr != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: reportErr.Error(),
}, nil
}
return merr.Status(nil), nil
return merr.Status(reportErr), nil
}
// MarkSegmentsDropped marks the given segments as `Dropped`.

View File

@ -19,7 +19,6 @@ package datanode
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -83,10 +82,7 @@ func (node *DataNode) getSystemInfoMetrics(ctx context.Context, req *milvuspb.Ge
quotaMetrics, err := node.getQuotaMetrics()
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()),
}, nil
}
@ -122,10 +118,7 @@ func (node *DataNode) getSystemInfoMetrics(ctx context.Context, req *milvuspb.Ge
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()),
}, nil

View File

@ -433,18 +433,15 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
// function to report import state to RootCoord.
// retry 10 times, if the rootcoord is down, the report function will cost 20+ seconds
reportFunc := reportImportFunc(node)
returnFailFunc := func(msg string, inputErr error) (*commonpb.Status, error) {
logFields = append(logFields, zap.Error(inputErr))
returnFailFunc := func(msg string, err error) (*commonpb.Status, error) {
logFields = append(logFields, zap.Error(err))
log.Warn(msg, logFields...)
importResult.State = commonpb.ImportState_ImportFailed
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: inputErr.Error()})
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: err.Error()})
reportFunc(importResult)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: inputErr.Error(),
}, nil
return merr.Status(err), nil
}
if !node.isHealthy() {

View File

@ -44,28 +44,29 @@ import (
func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) {
if !i.lifetime.Add(commonpbutil.IsHealthy) {
stateCode := i.lifetime.GetState()
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()),
zap.String("ClusterID", req.GetClusterID()), zap.Int64("IndexBuildID", req.GetBuildID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "state code is not healthy",
}, nil
log.Ctx(ctx).Warn("index node not ready",
zap.String("state", stateCode.String()),
zap.String("clusterID", req.GetClusterID()),
zap.Int64("indexBuildID", req.GetBuildID()),
)
return merr.Status(merr.WrapErrServiceNotReady(stateCode.String())), nil
}
defer i.lifetime.Done()
log.Ctx(ctx).Info("IndexNode building index ...",
zap.String("ClusterID", req.GetClusterID()),
zap.Int64("IndexBuildID", req.GetBuildID()),
zap.Int64("IndexID", req.GetIndexID()),
zap.String("IndexName", req.GetIndexName()),
zap.String("IndexFilePrefix", req.GetIndexFilePrefix()),
zap.Int64("IndexVersion", req.GetIndexVersion()),
zap.Strings("DataPaths", req.GetDataPaths()),
zap.Any("TypeParams", req.GetTypeParams()),
zap.Any("IndexParams", req.GetIndexParams()),
zap.Int64("num_rows", req.GetNumRows()))
zap.String("clusterID", req.GetClusterID()),
zap.Int64("indexBuildID", req.GetBuildID()),
zap.Int64("indexID", req.GetIndexID()),
zap.String("indexName", req.GetIndexName()),
zap.String("indexFilePrefix", req.GetIndexFilePrefix()),
zap.Int64("indexVersion", req.GetIndexVersion()),
zap.Strings("dataPaths", req.GetDataPaths()),
zap.Any("typeParams", req.GetTypeParams()),
zap.Any("indexParams", req.GetIndexParams()),
zap.Int64("numRows", req.GetNumRows()),
)
ctx, sp := otel.Tracer(typeutil.IndexNodeRole).Start(ctx, "IndexNode-CreateIndex", trace.WithAttributes(
attribute.Int64("IndexBuildID", req.GetBuildID()),
attribute.String("ClusterID", req.GetClusterID()),
attribute.Int64("indexBuildID", req.GetBuildID()),
attribute.String("clusterID", req.GetClusterID()),
))
defer sp.End()
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.TotalLabel).Inc()
@ -74,7 +75,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
if oldInfo := i.loadOrStoreTask(req.GetClusterID(), req.GetBuildID(), &taskInfo{
cancel: taskCancel,
state: commonpb.IndexState_InProgress}); oldInfo != nil {
log.Ctx(ctx).Warn("duplicated index build task", zap.String("ClusterID", req.GetClusterID()), zap.Int64("BuildID", req.GetBuildID()))
log.Ctx(ctx).Warn("duplicated index build task", zap.String("clusterID", req.GetClusterID()), zap.Int64("buildID", req.GetBuildID()))
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc()
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_BuildIndexError,
@ -83,9 +84,9 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
}
cm, err := i.storageFactory.NewChunkManager(i.loopCtx, req.GetStorageConfig())
if err != nil {
log.Ctx(ctx).Error("create chunk manager failed", zap.String("Bucket", req.GetStorageConfig().GetBucketName()),
zap.String("AccessKey", req.GetStorageConfig().GetAccessKeyID()),
zap.String("ClusterID", req.GetClusterID()), zap.Int64("IndexBuildID", req.GetBuildID()),
log.Ctx(ctx).Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()),
zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()),
zap.String("clusterID", req.GetClusterID()), zap.Int64("indexBuildID", req.GetBuildID()),
zap.Error(err),
)
i.deleteTaskInfos(ctx, []taskKey{{ClusterID: req.GetClusterID(), BuildID: req.GetBuildID()}})
@ -110,31 +111,28 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
}
ret := merr.Status(nil)
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("IndexBuildID", req.GetBuildID()),
zap.String("ClusterID", req.GetClusterID()), zap.Error(err))
log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", req.GetBuildID()),
zap.String("clusterID", req.GetClusterID()), zap.Error(err))
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
ret.Reason = err.Error()
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc()
return ret, nil
}
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc()
log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("IndexBuildID", req.GetBuildID()),
zap.String("ClusterID", req.GetClusterID()), zap.String("indexName", req.GetIndexName()))
log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("indexBuildID", req.GetBuildID()),
zap.String("clusterID", req.GetClusterID()), zap.String("indexName", req.GetIndexName()))
return ret, nil
}
func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) {
log := log.Ctx(ctx).With(
zap.String("ClusterID", req.GetClusterID()),
zap.String("clusterID", req.GetClusterID()),
).WithRateGroup("in.queryJobs", 1, 60)
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
stateCode := i.lifetime.GetState()
log.Warn("index node not ready", zap.String("state", stateCode.String()))
return &indexpb.QueryJobsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "state code is not healthy",
},
Status: merr.Status(merr.WrapErrServiceNotReady(stateCode.String())),
}, nil
}
defer i.lifetime.Done()
@ -167,22 +165,24 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
ret.IndexInfos[i].SerializedSize = info.serializedSize
ret.IndexInfos[i].FailReason = info.failReason
log.RatedDebug(5, "querying index build task",
zap.Int64("IndexBuildID", buildID), zap.String("state", info.state.String()),
zap.String("fail reason", info.failReason))
zap.Int64("indexBuildID", buildID),
zap.String("state", info.state.String()),
zap.String("reason", info.failReason),
)
}
}
return ret, nil
}
func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Info("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs))
log.Ctx(ctx).Info("drop index build jobs",
zap.String("clusterID", req.ClusterID),
zap.Int64s("indexBuildIDs", req.BuildIDs),
)
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
stateCode := i.lifetime.GetState()
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()), zap.String("ClusterID", req.ClusterID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "state code is not healthy",
}, nil
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()), zap.String("clusterID", req.ClusterID))
return merr.Status(merr.WrapErrServiceNotReady(stateCode.String())), nil
}
defer i.lifetime.Done()
keys := make([]taskKey, 0, len(req.GetBuildIDs()))
@ -195,8 +195,8 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest)
info.cancel()
}
}
log.Ctx(ctx).Info("drop index build jobs success", zap.String("ClusterID", req.GetClusterID()),
zap.Int64s("IndexBuildIDs", req.GetBuildIDs()))
log.Ctx(ctx).Info("drop index build jobs success", zap.String("clusterID", req.GetClusterID()),
zap.Int64s("indexBuildIDs", req.GetBuildIDs()))
return merr.Status(nil), nil
}
@ -205,10 +205,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
stateCode := i.lifetime.GetState()
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()))
return &indexpb.GetJobStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "state code is not healthy",
},
Status: merr.Status(merr.WrapErrServiceNotReady(stateCode.String())),
}, nil
}
defer i.lifetime.Done()
@ -223,7 +220,11 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
if i.sched.buildParallel > unissued+active {
slots = i.sched.buildParallel - unissued - active
}
log.Ctx(ctx).Info("Get Index Job Stats", zap.Int("Unissued", unissued), zap.Int("Active", active), zap.Int("Slot", slots))
log.Ctx(ctx).Info("Get Index Job Stats",
zap.Int("unissued", unissued),
zap.Int("active", active),
zap.Int("slot", slots),
)
return &indexpb.GetJobStatsResponse{
Status: merr.Status(nil),
TotalJobNum: int64(active) + int64(unissued),
@ -262,10 +263,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
Response: "",
}, nil
}
@ -276,7 +274,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
log.Ctx(ctx).RatedDebug(60, "IndexNode.GetMetrics",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.GetRequest()),
zap.String("metric_type", metricType),
zap.String("metricType", metricType),
zap.Error(err))
return metrics, nil
@ -285,7 +283,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
log.Ctx(ctx).RatedWarn(60, "IndexNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.GetRequest()),
zap.String("metric_type", metricType))
zap.String("metricType", metricType))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{

View File

@ -37,19 +37,19 @@ func TestAbnormalIndexNode(t *testing.T) {
ctx := context.TODO()
status, err := in.CreateJob(ctx, &indexpb.CreateJobRequest{})
assert.NoError(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.ErrorIs(t, merr.Error(status), merr.ErrServiceNotReady)
qresp, err := in.QueryJobs(ctx, &indexpb.QueryJobsRequest{})
assert.NoError(t, err)
assert.Equal(t, qresp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError)
assert.ErrorIs(t, merr.Error(qresp.GetStatus()), merr.ErrServiceNotReady)
status, err = in.DropJobs(ctx, &indexpb.DropJobsRequest{})
assert.NoError(t, err)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.ErrorIs(t, merr.Error(status), merr.ErrServiceNotReady)
jobNumRsp, err := in.GetJobStats(ctx, &indexpb.GetJobStatsRequest{})
assert.NoError(t, err)
assert.Equal(t, jobNumRsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError)
assert.ErrorIs(t, merr.Error(jobNumRsp.GetStatus()), merr.ErrServiceNotReady)
metricsResp, err := in.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
assert.NoError(t, err)

View File

@ -19,7 +19,6 @@ package indexnode
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -64,10 +63,7 @@ func getSystemInfoMetrics(
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, paramtable.GetNodeID()),
}, nil

View File

@ -3412,10 +3412,7 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}, nil
}
@ -4485,10 +4482,7 @@ func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCol
resp, err := node.rootCoord.RenameCollection(ctx, req)
if err != nil {
log.Warn("failed to rename collection", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, err
return merr.Status(err), err
}
return resp, nil

View File

@ -20,7 +20,6 @@ import (
"context"
"sync"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/util/hardware"
@ -418,10 +417,7 @@ func getSystemInfoMetrics(
resp, err := metricsinfo.MarshalTopology(systemTopology)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID()),
}, nil

View File

@ -21,7 +21,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
@ -69,10 +68,7 @@ func (c *Core) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetric
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID),
}, nil