enhance: [Cherry-Pick] support mark error as user error (#33498) (#34396)

relate: https://github.com/milvus-io/milvus/issues/33492
pr: https://github.com/milvus-io/milvus/pull/33498
---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2024-07-04 10:08:10 +08:00 committed by GitHub
parent 014cb7b071
commit 9087b6f42e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 165 additions and 42 deletions

View File

@ -181,24 +181,46 @@ func (i *GrpcAccessInfo) ErrorCode() string {
return fmt.Sprint(merr.Code(i.err))
}
func (i *GrpcAccessInfo) respStatus() *commonpb.Status {
baseResp, ok := i.resp.(BaseResponse)
if ok {
return baseResp.GetStatus()
}
status, ok := i.resp.(*commonpb.Status)
if ok {
return status
}
return nil
}
func (i *GrpcAccessInfo) ErrorMsg() string {
if i.err != nil {
return i.err.Error()
}
baseResp, ok := i.resp.(BaseResponse)
if ok {
status := baseResp.GetStatus()
if status := i.respStatus(); status != nil {
return status.GetReason()
}
status, ok := i.resp.(*commonpb.Status)
if ok {
return status.GetReason()
}
return Unknown
}
func (i *GrpcAccessInfo) ErrorType() string {
if i.err != nil {
return merr.GetErrorType(i.err).String()
}
if status := i.respStatus(); status.GetCode() > 0 {
if _, ok := status.ExtraInfo[merr.InputErrorFlagKey]; ok {
return merr.InputError.String()
}
return merr.SystemError.String()
}
return ""
}
func (i *GrpcAccessInfo) DbName() string {
name, ok := requestutil.GetDbNameFromRequest(i.req)
if !ok {

View File

@ -103,6 +103,22 @@ func (s *GrpcAccessInfoSuite) TestErrorMsg() {
s.Equal("rpc error: code = Unavailable desc = mock", result[0])
}
func (s *GrpcAccessInfoSuite) TestErrorType() {
s.info.resp = &milvuspb.QueryResults{
Status: merr.Status(nil),
}
result := Get(s.info, "$error_type")
s.Equal("", result[0])
s.info.resp = merr.Status(merr.WrapErrAsInputError(merr.ErrParameterInvalid))
result = Get(s.info, "$error_type")
s.Equal(merr.InputError.String(), result[0])
s.info.err = merr.ErrParameterInvalid
result = Get(s.info, "$error_type")
s.Equal(merr.SystemError.String(), result[0])
}
func (s *GrpcAccessInfoSuite) TestDbName() {
s.info.req = nil
result := Get(s.info, "$database_name")

View File

@ -34,6 +34,7 @@ var MetricFuncMap = map[string]getMetricFunc{
"$response_size": getResponseSize,
"$error_code": getErrorCode,
"$error_msg": getErrorMsg,
"$error_type": getErrorType,
"$database_name": getDbName,
"$collection_name": getCollectionName,
"$partition_name": getPartitionName,
@ -61,6 +62,7 @@ type AccessInfo interface {
ResponseSize() string
ErrorCode() string
ErrorMsg() string
ErrorType() string
DbName() string
CollectionName() string
PartitionName() string
@ -115,6 +117,10 @@ func getErrorMsg(i AccessInfo) string {
return i.ErrorMsg()
}
func getErrorType(i AccessInfo) string {
return i.ErrorType()
}
func getDbName(i AccessInfo) string {
return i.DbName()
}

View File

@ -131,6 +131,10 @@ func (i *RestfulInfo) ErrorMsg() string {
return fmt.Sprint(message)
}
func (i *RestfulInfo) ErrorType() string {
return Unknown
}
func (i *RestfulInfo) SdkVersion() string {
return "Restful"
}

View File

@ -2551,7 +2551,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
log.Warn("Failed to enqueue insert task: " + err.Error())
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return constructFailedResponse(err), nil
return constructFailedResponse(merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)), nil
}
log.Debug("Detail of insert request in Proxy")

View File

@ -622,6 +622,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
t.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
// nolint
t.result.Status.Reason = fmt.Sprintf("can't find collection[database=%s][collection=%s]", t.GetDbName(), t.GetCollectionName())
t.result.Status.ExtraInfo = map[string]string{merr.InputErrorFlagKey: "true"}
}
return nil
}
@ -1439,7 +1440,7 @@ func (t *flushTask) Execute(ctx context.Context) error {
for _, collName := range t.CollectionNames {
collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collName)
if err != nil {
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
flushReq := &datapb.FlushRequest{
Base: commonpbutil.UpdateMsgBase(

View File

@ -259,9 +259,10 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
if err := validateCollectionName(collName); err != nil {
return ErrWithLog(log, "Invalid collection name", err)
}
dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName)
if err != nil {
return ErrWithLog(log, "Failed to get collection id", err)
return ErrWithLog(log, "Failed to get collection id", merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound))
}
dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName)
@ -307,11 +308,11 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr())
if err != nil {
return merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err)
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}
if planparserv2.IsAlwaysTruePlan(plan) {
return merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr())
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)

View File

@ -116,13 +116,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
if maxInsertSize != -1 && it.insertMsg.Size() > maxInsertSize {
log.Warn("insert request size exceeds maxInsertSize",
zap.Int("request size", it.insertMsg.Size()), zap.Int("maxInsertSize", maxInsertSize))
return merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize")
return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize"))
}
schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
it.schema = schema.CollectionSchema
@ -208,7 +208,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
if err := newValidateUtil(withNANCheck(), withOverflowCheck(), withMaxLenCheck(), withMaxCapCheck()).
Validate(it.insertMsg.GetFieldsData(), schema.CollectionSchema, it.insertMsg.NRows()); err != nil {
return err
return merr.WrapErrAsInputError(err)
}
log.Debug("Proxy Insert PreExecute done")

View File

@ -200,7 +200,7 @@ func createCntPlan(expr string, schemaHelper *typeutil.SchemaHelper) (*planpb.Pl
plan, err := planparserv2.CreateRetrievePlan(schemaHelper, expr)
if err != nil {
return nil, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
return nil, merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err))
}
plan.Node.(*planpb.PlanNode_Query).Query.IsCount = true
@ -223,7 +223,7 @@ func (t *queryTask) createPlan(ctx context.Context) error {
if t.plan == nil {
t.plan, err = planparserv2.CreateRetrievePlan(schema.schemaHelper, t.request.Expr)
if err != nil {
return merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err))
}
}
@ -291,7 +291,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName)
if err != nil {
log.Warn("Failed to get collection id.", zap.String("collectionName", collectionName), zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
t.CollectionID = collID
log.Debug("Get collection ID by name", zap.Int64("collectionID", t.CollectionID))
@ -302,11 +302,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
return err
}
if t.partitionKeyMode && len(t.request.GetPartitionNames()) != 0 {
return errors.New("not support manually specifying the partition names if partition key mode is used")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("not support manually specifying the partition names if partition key mode is used"))
}
if t.mustUsePartitionKey && !t.partitionKeyMode {
return merr.WrapErrParameterInvalidMsg("must use partition key in the query request " +
"because the mustUsePartitionKey config is true")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the query request " +
"because the mustUsePartitionKey config is true"))
}
for _, tag := range t.request.PartitionNames {
@ -363,7 +363,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.plan.Node.(*planpb.PlanNode_Query).Query.Limit = t.RetrieveRequest.Limit
if planparserv2.IsAlwaysTruePlan(t.plan) && t.RetrieveRequest.Limit == typeutil.Unlimited {
return fmt.Errorf("empty expression should be used with limit")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("empty expression should be used with limit"))
}
// convert partition names only when requery is false
@ -390,7 +390,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
// count with pagination
if t.plan.GetQuery().GetIsCount() && t.queryParams.limit != typeutil.Unlimited {
return fmt.Errorf("count entities with pagination is not allowed")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("count entities with pagination is not allowed"))
}
t.RetrieveRequest.IsCount = t.plan.GetQuery().GetIsCount()

View File

@ -114,7 +114,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
t.collectionName = collectionName
collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName)
if err != nil { // err is not nil if collection not exists
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
t.SearchRequest.DbID = 0 // todo
@ -135,8 +135,8 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
return errors.New("not support manually specifying the partition names if partition key mode is used")
}
if t.mustUsePartitionKey && !t.partitionKeyMode {
return merr.WrapErrParameterInvalidMsg("must use partition key in the search request " +
"because the mustUsePartitionKey config is true")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the search request " +
"because the mustUsePartitionKey config is true"))
}
if !t.partitionKeyMode && len(t.request.GetPartitionNames()) > 0 {

View File

@ -187,14 +187,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
if err != nil {
log.Warn("check primary field data and hash primary key failed when upsert",
zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
}
// set field ID to insert field data
err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema)
if err != nil {
log.Warn("insert set fieldID to fieldData failed when upsert",
zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
}
if it.partitionKeyMode {

View File

@ -646,10 +646,10 @@ func parsePrimaryFieldData2IDs(fieldData *schemapb.FieldData) (*schemapb.IDs, er
StrId: scalarField.GetStringData(),
}
default:
return nil, errors.New("currently only support DataType Int64 or VarChar as PrimaryField")
return nil, merr.WrapErrParameterInvalidMsg("currently only support DataType Int64 or VarChar as PrimaryField")
}
default:
return nil, errors.New("currently not support vector field as PrimaryField")
return nil, merr.WrapErrParameterInvalidMsg("currently not support vector field as PrimaryField")
}
return primaryData, nil

View File

@ -26,6 +26,22 @@ const (
TimeoutCode int32 = 10001
)
type ErrorType int32
const (
SystemError ErrorType = 0
InputError ErrorType = 1
)
var ErrorTypeName = map[ErrorType]string{
SystemError: "system_error",
InputError: "input_error",
}
func (err ErrorType) String() string {
return ErrorTypeName[err]
}
// Define leaf errors here,
// WARN: take care to add new error,
// check whether you can use the errors below before adding a new one.
@ -194,29 +210,40 @@ var (
ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false)
)
type errorOption func(*milvusError)
func WithDetail(detail string) errorOption {
return func(err *milvusError) {
err.detail = detail
}
}
func WithErrorType(etype ErrorType) errorOption {
return func(err *milvusError) {
err.errType = etype
}
}
type milvusError struct {
msg string
detail string
retriable bool
errCode int32
errType ErrorType
}
func newMilvusError(msg string, code int32, retriable bool) milvusError {
return milvusError{
func newMilvusError(msg string, code int32, retriable bool, options ...errorOption) milvusError {
err := milvusError{
msg: msg,
detail: msg,
retriable: retriable,
errCode: code,
}
}
func newMilvusErrorWithDetail(msg string, detail string, code int32, retriable bool) milvusError {
return milvusError{
msg: msg,
detail: detail,
retriable: retriable,
errCode: code,
for _, option := range options {
option(&err)
}
return err
}
func (e milvusError) code() int32 {

View File

@ -22,12 +22,16 @@ import (
"strings"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
const InputErrorFlagKey string = "is_input_error"
// Code returns the error code of the given error,
// WARN: DO NOT use this for now
func Code(err error) int32 {
@ -71,7 +75,8 @@ func Status(err error) *commonpb.Status {
}
code := Code(err)
return &commonpb.Status{
status := &commonpb.Status{
Code: code,
Reason: previousLastError(err).Error(),
// Deprecated, for compatibility
@ -79,6 +84,11 @@ func Status(err error) *commonpb.Status {
Retriable: IsRetryableErr(err),
Detail: err.Error(),
}
if GetErrorType(err) == InputError {
status.ExtraInfo = map[string]string{InputErrorFlagKey: "true"}
}
return status
}
func previousLastError(err error) error {
@ -233,12 +243,18 @@ func Error(status *commonpb.Status) error {
return nil
}
var eType ErrorType
_, ok := status.GetExtraInfo()[InputErrorFlagKey]
if ok {
eType = InputError
}
// use code first
code := status.GetCode()
if code == 0 {
return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), Code(OldCodeToMerr(status.GetErrorCode())), false)
return newMilvusError(status.GetReason(), Code(OldCodeToMerr(status.GetErrorCode())), false, WithDetail(status.GetDetail()), WithErrorType(eType))
}
return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), code, status.GetRetriable())
return newMilvusError(status.GetReason(), code, status.GetRetriable(), WithDetail(status.GetDetail()), WithErrorType(eType))
}
// SegcoreError returns a merr according to the given segcore error code and message
@ -293,6 +309,36 @@ func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) er
return nil
}
func WrapErrAsInputError(err error) error {
if merr, ok := err.(milvusError); ok {
WithErrorType(InputError)(&merr)
return merr
}
return err
}
func WrapErrAsInputErrorWhen(err error, targets ...milvusError) error {
if merr, ok := err.(milvusError); ok {
for _, target := range targets {
if target.errCode == merr.errCode {
log.Info("mark error as input error", zap.Error(err))
WithErrorType(InputError)(&merr)
log.Info("test--", zap.String("type", merr.errType.String()))
return merr
}
}
}
return err
}
func GetErrorType(err error) ErrorType {
if merr, ok := err.(milvusError); ok {
return merr.errType
}
return SystemError
}
// Service related
func WrapErrServiceNotReady(role string, sessionID int64, state string, msg ...string) error {
err := wrapFieldsWithDesc(ErrServiceNotReady,