Rectify wrong exception messages associated with Array datatype (#27769)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2023-10-19 17:24:07 +08:00 committed by GitHub
parent aa1fba79b1
commit 020ad9a6bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 102 additions and 67 deletions

View File

@ -811,23 +811,24 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col
return nil
}
// do not retry unless got NoReplicaAvailable from querycoord
err2 := merr.Error(resp.GetStatus())
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_NoReplicaAvailable {
return retry.Unrecoverable(fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.GetStatus().GetReason()))
return retry.Unrecoverable(err2)
}
return fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.GetStatus().GetReason())
return err2
})
if err != nil {
return nil, err
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.GetStatus().GetReason())
return nil, merr.Error(resp.GetStatus())
}
shards := parseShardLeaderList2QueryNode(resp.GetShards())
info, err = m.getFullCollectionInfo(ctx, database, collectionName, collectionID)
if err != nil {
return nil, fmt.Errorf("failed to get shards, collectionName %s, colectionID %d not found", collectionName, collectionID)
return nil, err
}
// lock leader
info.leaderMutex.Lock()

View File

@ -258,19 +258,19 @@ func validateFieldName(fieldName string) error {
fieldName = strings.TrimSpace(fieldName)
if fieldName == "" {
return errors.New("field name should not be empty")
return merr.WrapErrFieldNameInvalid(fieldName, "field name should not be empty")
}
invalidMsg := "Invalid field name: " + fieldName + ". "
if len(fieldName) > Params.ProxyCfg.MaxNameLength.GetAsInt() {
msg := invalidMsg + "The length of a field name must be less than " + Params.ProxyCfg.MaxNameLength.GetValue() + " characters."
return errors.New(msg)
return merr.WrapErrFieldNameInvalid(fieldName, msg)
}
firstChar := fieldName[0]
if firstChar != '_' && !isAlpha(firstChar) {
msg := invalidMsg + "The first character of a field name must be an underscore or letter."
return errors.New(msg)
return merr.WrapErrFieldNameInvalid(fieldName, msg)
}
fieldNameSize := len(fieldName)
@ -278,7 +278,7 @@ func validateFieldName(fieldName string) error {
c := fieldName[i]
if c != '_' && !isAlpha(c) && !isNumber(c) {
msg := invalidMsg + "Field name cannot only contain numbers, letters, and underscores."
return errors.New(msg)
return merr.WrapErrFieldNameInvalid(fieldName, msg)
}
}
return nil
@ -323,7 +323,7 @@ func validateMaxLengthPerRow(collectionName string, field *schemapb.FieldSchema)
return err
}
if maxLengthPerRow > defaultMaxVarCharLength || maxLengthPerRow <= 0 {
return fmt.Errorf("the maximum length specified for a VarChar shoule be in (0, 65535]")
return merr.WrapErrParameterInvalidMsg("the maximum length specified for a VarChar should be in (0, 65535]")
}
exist = true
}
@ -347,7 +347,7 @@ func validateMaxCapacityPerRow(collectionName string, field *schemapb.FieldSchem
return fmt.Errorf("the value of %s must be an integer", common.MaxCapacityKey)
}
if maxCapacityPerRow > defaultMaxArrayCapacity || maxCapacityPerRow <= 0 {
return fmt.Errorf("the maximum capacity specified for a Array shoule be in (0, 4096]")
return fmt.Errorf("the maximum capacity specified for a Array should be in (0, 4096]")
}
exist = true
}
@ -532,7 +532,7 @@ func validateSchema(coll *schemapb.CollectionSchema) error {
return fmt.Errorf("there are more than one primary key, field name = %s, %s", coll.Fields[primaryIdx].Name, field.Name)
}
if field.DataType != schemapb.DataType_Int64 {
return fmt.Errorf("type of primary key shoule be int64")
return fmt.Errorf("type of primary key should be int64")
}
primaryIdx = idx
}

View File

@ -323,7 +323,7 @@ func (v *validateUtil) checkJSONFieldData(field *schemapb.FieldData, fieldSchema
jsonArray := field.GetScalars().GetJsonData().GetData()
if jsonArray == nil {
msg := fmt.Sprintf("json field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need string array", "got nil", msg)
return merr.WrapErrParameterInvalid("need json array", "got nil", msg)
}
if v.checkMaxLen {
@ -434,8 +434,10 @@ func (v *validateUtil) checkArrayElement(array *schemapb.ArrayArray, field *sche
func (v *validateUtil) checkArrayFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
data := field.GetScalars().GetArrayData()
if data == nil {
elementTypeStr := fieldSchema.GetElementType().String()
msg := fmt.Sprintf("array field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need string array", "got nil", msg)
expectStr := fmt.Sprintf("need %s array", elementTypeStr)
return merr.WrapErrParameterInvalid(expectStr, "got nil", msg)
}
if v.checkMaxCap {
maxCapacity, err := parameterutil.GetMaxCapacity(fieldSchema)
@ -473,36 +475,29 @@ func verifyLengthPerRow[E interface{ ~string | ~[]byte }](strArr []E, maxLength
func verifyCapacityPerRow(arrayArray []*schemapb.ScalarField, maxCapacity int64, elementType schemapb.DataType) error {
for i, array := range arrayArray {
arrayLen := 0
switch elementType {
case schemapb.DataType_Bool:
if int64(len(array.GetBoolData().GetData())) <= maxCapacity {
continue
}
arrayLen = len(array.GetBoolData().GetData())
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
if int64(len(array.GetIntData().GetData())) <= maxCapacity {
continue
}
arrayLen = len(array.GetIntData().GetData())
case schemapb.DataType_Int64:
if int64(len(array.GetLongData().GetData())) <= maxCapacity {
continue
}
arrayLen = len(array.GetLongData().GetData())
case schemapb.DataType_String, schemapb.DataType_VarChar:
if int64(len(array.GetStringData().GetData())) <= maxCapacity {
continue
}
arrayLen = len(array.GetStringData().GetData())
case schemapb.DataType_Float:
if int64(len(array.GetFloatData().GetData())) <= maxCapacity {
continue
}
arrayLen = len(array.GetFloatData().GetData())
case schemapb.DataType_Double:
if int64(len(array.GetDoubleData().GetData())) <= maxCapacity {
continue
}
arrayLen = len(array.GetDoubleData().GetData())
default:
msg := fmt.Sprintf("array element type: %s is not supported", elementType.String())
return merr.WrapErrParameterInvalid("valid array element type", "array element type is not supported", msg)
}
msg := fmt.Sprintf("the length (%d) of %dth array exceeds max capacity (%d)", len(arrayArray), i, maxCapacity)
if int64(arrayLen) <= maxCapacity {
continue
}
msg := fmt.Sprintf("the length (%d) of %dth array exceeds max capacity (%d)", arrayLen, i, maxCapacity)
return merr.WrapErrParameterInvalid("valid length array", "array length exceeds max capacity", msg)
}

View File

@ -22,7 +22,6 @@ import (
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
@ -332,9 +331,9 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
leaderInfo = s.nodeMgr.Get(leader)
}
if leaderInfo == nil {
msg := fmt.Sprintf("failed to get shard leader for shard %s, the collection not loaded or leader is offline", channel)
msg := fmt.Sprintf("failed to get shard leader for shard %s", channel)
log.Warn(msg)
return nil, errors.Wrap(merr.WrapErrNodeNotFound(leader), msg)
return nil, merr.WrapErrNodeNotFound(leader, msg)
}
shard := &milvuspb.ShardReplica{

View File

@ -97,14 +97,14 @@ func (suite *QueryNodeSuite) TestBasic() {
err = suite.node.Init()
suite.NoError(err)
// node shoule be unhealthy before node start
// node should be unhealthy before node start
suite.False(suite.node.lifetime.GetState() == commonpb.StateCode_Healthy)
// start node
err = suite.node.Start()
suite.NoError(err)
// node shoule be healthy after node start
// node should be healthy after node start
suite.True(suite.node.lifetime.GetState() == commonpb.StateCode_Healthy)
// register node to etcd

View File

@ -657,7 +657,7 @@ func (mt *MetaTable) listCollectionFromCache(dbName string, onlyAvail bool) ([]*
db, ok := mt.dbName2Meta[dbName]
if !ok {
return nil, fmt.Errorf("database:%s not found", dbName)
return nil, merr.WrapErrDatabaseNotFound(dbName)
}
collectionFromCache := make([]*model.Collection, 0, len(mt.collID2Meta))
@ -889,7 +889,7 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin
// Since cache always keep the latest version, and the ts should always be the latest.
if !mt.names.exist(dbName) {
return fmt.Errorf("database %s not found", dbName)
return merr.WrapErrDatabaseNotFound(dbName)
}
if collID, ok := mt.names.get(dbName, alias); ok {
@ -899,14 +899,14 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin
}
// allow alias with dropping&dropped
if coll.State != pb.CollectionState_CollectionDropping && coll.State != pb.CollectionState_CollectionDropped {
return fmt.Errorf("cannot alter alias, collection already exists with same name: %s", alias)
return merr.WrapErrAliasCollectionNameConflict(dbName, alias)
}
}
collectionID, ok := mt.names.get(dbName, collectionName)
if !ok {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
return merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName)
}
// check if alias exists.
@ -917,14 +917,15 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin
} else if ok {
// TODO: better to check if aliasedCollectionID exist or is available, though not very possible.
aliasedColl := mt.collID2Meta[aliasedCollectionID]
return fmt.Errorf("alias exists and already aliased to another collection, alias: %s, collection: %s, other collection: %s", alias, collectionName, aliasedColl.Name)
msg := fmt.Sprintf("%s is alias to another collection: %s", alias, aliasedColl.Name)
return merr.WrapErrAliasAlreadyExist(dbName, alias, msg)
}
// alias didn't exist.
coll, ok := mt.collID2Meta[collectionID]
if !ok || !coll.Available() {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
return merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName)
}
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
@ -993,34 +994,34 @@ func (mt *MetaTable) AlterAlias(ctx context.Context, dbName string, alias string
// Since cache always keep the latest version, and the ts should always be the latest.
if !mt.names.exist(dbName) {
return fmt.Errorf("database not found: %s", dbName)
return merr.WrapErrDatabaseNotFound(dbName)
}
if collID, ok := mt.names.get(dbName, alias); ok {
coll := mt.collID2Meta[collID]
// allow alias with dropping&dropped
if coll.State != pb.CollectionState_CollectionDropping && coll.State != pb.CollectionState_CollectionDropped {
return fmt.Errorf("cannot alter alias, collection already exists with same name: %s", alias)
return merr.WrapErrAliasCollectionNameConflict(dbName, alias)
}
}
collectionID, ok := mt.names.get(dbName, collectionName)
if !ok {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
return merr.WrapErrCollectionNotFound(collectionName)
}
coll, ok := mt.collID2Meta[collectionID]
if !ok || !coll.Available() {
// you cannot alias to a non-existent collection.
return fmt.Errorf("collection not exists: %s", collectionName)
return merr.WrapErrCollectionNotFound(collectionName)
}
// check if alias exists.
_, ok = mt.aliases.get(dbName, alias)
if !ok {
//
return fmt.Errorf("failed to alter alias, alias does not exist: %s", alias)
return merr.WrapErrAliasNotFound(dbName, alias)
}
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
@ -1102,7 +1103,7 @@ func (mt *MetaTable) GetPartitionNameByID(collID UniqueID, partitionID UniqueID,
return partition.PartitionName, nil
}
}
return "", fmt.Errorf("partition not exist: %d", partitionID)
return "", merr.WrapErrPartitionNotFound(partitionID)
}
// GetPartitionByName serve for bulk insert.
@ -1126,7 +1127,7 @@ func (mt *MetaTable) GetPartitionByName(collID UniqueID, partitionName string, t
return common.InvalidPartitionID, err
}
if !coll.Available() {
return common.InvalidPartitionID, fmt.Errorf("collection not exist: %d", collID)
return common.InvalidPartitionID, merr.WrapErrCollectionNotFoundWithDB(coll.DBID, collID)
}
for _, partition := range coll.Partitions {
// no need to check time travel logic again, since catalog already did.

View File

@ -112,8 +112,14 @@ var (
// this operation is denied because the user has no permission to do this, user need higher privilege
ErrPrivilegeNotPermitted = newMilvusError("privilege not permitted", 1401, false)
// Alias related
ErrAliasNotFound = newMilvusError("alias not found", 1600, false)
ErrAliasCollectionNameConfilct = newMilvusError("alias and collection name conflict", 1601, false)
ErrAliasAlreadyExist = newMilvusError("alias already exist", 1602, false)
// field related
ErrFieldNotFound = newMilvusError("field not found", 1700, false)
ErrFieldNotFound = newMilvusError("field not found", 1700, false)
ErrFieldInvalidName = newMilvusError("field name invalid", 1701, false)
// high-level restful api related
ErrNeedAuthenticate = newMilvusError("user hasn't authenticated", 1800, false)

View File

@ -213,7 +213,6 @@ func Error(status *commonpb.Status) error {
if code == 0 {
return newMilvusError(status.GetReason(), Code(OldCodeToMerr(status.GetErrorCode())), false)
}
return newMilvusError(status.GetReason(), code, code&retryableFlag != 0)
}
@ -409,6 +408,30 @@ func WrapErrCollectionNotFullyLoaded(collection any, msg ...string) error {
return err
}
func WrapErrAliasNotFound(db any, alias any, msg ...string) error {
err := errors.Wrapf(ErrAliasNotFound, "alias %v:%v", db, alias)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrAliasCollectionNameConflict(db any, alias any, msg ...string) error {
err := errors.Wrapf(ErrAliasCollectionNameConfilct, "alias %v:%v", db, alias)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrAliasAlreadyExist(db any, alias any, msg ...string) error {
err := errors.Wrapf(ErrAliasAlreadyExist, "alias %v:%v already exist", db, alias)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Partition related
func WrapErrPartitionNotFound(partition any, msg ...string) error {
err := wrapWithField(ErrPartitionNotFound, "partition", partition)
@ -725,6 +748,14 @@ func WrapErrFieldNotFound[T any](field T, msg ...string) error {
return err
}
func WrapErrFieldNameInvalid(field any, msg ...string) error {
err := wrapWithField(ErrFieldInvalidName, "field", field)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func wrapWithField(err error, name string, value any) error {
return errors.Wrapf(err, "%s=%v", name, value)
}

View File

@ -12,7 +12,7 @@ allure-pytest==2.7.0
pytest-print==0.2.1
pytest-level==0.1.1
pytest-xdist==2.5.0
pymilvus==2.3.1.post1.dev7
pymilvus==2.3.1.post1.dev8
pytest-rerunfailures==9.1.1
git+https://github.com/Projectplace/pytest-tags
ndg-httpsclient

View File

@ -43,7 +43,7 @@ class TestAliasParamsInvalid(TestcaseBase):
collection_w = self.init_collection_wrap(name=c_name, schema=default_schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: default_schema})
error = {ct.err_code: 1, ct.err_msg: "Invalid collection alias"}
error = {ct.err_code: 1100, ct.err_msg: "Invalid collection alias"}
self.utility_wrap.create_alias(collection_w.name, alias_name,
check_task=CheckTasks.err_res,
check_items=error)
@ -424,7 +424,7 @@ class TestAliasOperationInvalid(TestcaseBase):
collection_2 = self.init_collection_wrap(name=c_2_name, schema=default_schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_2_name, exp_schema: default_schema})
error = {ct.err_code: 65535,
error = {ct.err_code: 1602,
ct.err_msg: f"alias exists and already aliased to another collection, alias: {alias_a_name}, "
f"collection: {c_1_name}, other collection: {c_2_name}"}
self.utility_wrap.create_alias(collection_2.name, alias_a_name,
@ -454,8 +454,8 @@ class TestAliasOperationInvalid(TestcaseBase):
# collection_w.create_alias(alias_name)
alias_not_exist_name = cf.gen_unique_str(prefix)
error = {ct.err_code: 65535,
ct.err_msg: f"failed to alter alias, alias does not exist: {alias_not_exist_name}"}
error = {ct.err_code: 1600,
ct.err_msg: "Alter alias failed: alias does not exist"}
self.utility_wrap.alter_alias(collection_w.name, alias_not_exist_name,
check_task=CheckTasks.err_res,
check_items=error)

View File

@ -348,7 +348,7 @@ class TestCollectionParams(TestcaseBase):
field, _ = self.field_schema_wrap.init_field_schema(name=name, dtype=5, is_primary=True)
vec_field = cf.gen_float_vec_field()
schema = cf.gen_collection_schema(fields=[field, vec_field])
error = {ct.err_code: 1, ct.err_msg: f"bad argument type for built-in"}
error = {ct.err_code: 1701, ct.err_msg: f"bad argument type for built-in"}
self.collection_wrap.init_collection(c_name, schema=schema, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@ -378,7 +378,7 @@ class TestCollectionParams(TestcaseBase):
c_name = cf.gen_unique_str(prefix)
field, _ = self.field_schema_wrap.init_field_schema(name=None, dtype=DataType.INT64, is_primary=True)
schema = cf.gen_collection_schema(fields=[field, cf.gen_float_vec_field()])
error = {ct.err_code: 65535, ct.err_msg: "the partition key field must not be primary field"}
error = {ct.err_code: 1701, ct.err_msg: "field name should not be empty"}
self.collection_wrap.init_collection(c_name, schema=schema, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@ -1092,7 +1092,6 @@ class TestCollectionOperation(TestcaseBase):
assert self.utility_wrap.has_collection(c_name)[0]
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("already has same senario")
def test_collection_all_datatype_fields(self):
"""
target: test create collection with all dataType fields
@ -1102,7 +1101,9 @@ class TestCollectionOperation(TestcaseBase):
self._connect()
fields = []
for k, v in DataType.__members__.items():
if v and v != DataType.UNKNOWN and v != DataType.STRING and v != DataType.VARCHAR and v != DataType.FLOAT_VECTOR and v != DataType.BINARY_VECTOR:
if v and v != DataType.UNKNOWN and v != DataType.STRING\
and v != DataType.VARCHAR and v != DataType.FLOAT_VECTOR\
and v != DataType.BINARY_VECTOR and v != DataType.ARRAY:
field, _ = self.field_schema_wrap.init_field_schema(name=k.lower(), dtype=v)
fields.append(field)
fields.append(cf.gen_float_vec_field())

View File

@ -118,8 +118,8 @@ class TestInsertParams(TestcaseBase):
columns = [ct.default_int64_field_name,
ct.default_float_vec_field_name]
df = pd.DataFrame(columns=columns)
error = {ct.err_code: 0,
ct.err_msg: "Cannot infer schema from empty dataframe"}
error = {ct.err_code: 1,
ct.err_msg: "The data don't match with schema fields, expect 5 list, got 0"}
collection_w.insert(
data=df, check_task=CheckTasks.err_res, check_items=error)
@ -289,6 +289,7 @@ class TestInsertParams(TestcaseBase):
data=df, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="Currently not check in pymilvus")
def test_insert_field_value_not_match(self):
"""
target: test insert data value not match
@ -414,7 +415,7 @@ class TestInsertParams(TestcaseBase):
ct.default_float_vec_field_name: float_vec_values,
ct.default_int64_field_name: int_values
})
error = {ct.err_code: 5, ct.err_msg: 'Missing param in entities'}
error = {ct.err_code: 1, ct.err_msg: "The fields don't match with schema fields"}
collection_w.insert(
data=df, check_task=CheckTasks.err_res, check_items=error)
@ -1136,7 +1137,7 @@ class TestInsertAsync(TestcaseBase):
ct.default_float_vec_field_name]
df = pd.DataFrame(columns=columns)
error = {ct.err_code: 0,
ct.err_msg: "Cannot infer schema from empty dataframe"}
ct.err_msg: "The fields don't match with schema fields"}
collection_w.insert(data=df, _async=True,
check_task=CheckTasks.err_res, check_items=error)
@ -1313,7 +1314,7 @@ class TestInsertInvalid(TestcaseBase):
prefix, is_all_data_type=True)[0]
data = cf.gen_dataframe_all_data_type(nb=1)
data[ct.default_int8_field_name] = [invalid_int8]
error = {ct.err_code: 1, 'err_msg': "The data type of field int8 doesn't match, "
error = {ct.err_code: 1100, 'err_msg': "The data type of field int8 doesn't match, "
"expected: INT8, got INT64"}
collection_w.insert(
data, check_task=CheckTasks.err_res, check_items=error)
@ -1330,7 +1331,7 @@ class TestInsertInvalid(TestcaseBase):
prefix, is_all_data_type=True)[0]
data = cf.gen_dataframe_all_data_type(nb=1)
data[ct.default_int16_field_name] = [invalid_int16]
error = {ct.err_code: 1, 'err_msg': "The data type of field int16 doesn't match, "
error = {ct.err_code: 1100, 'err_msg': "The data type of field int16 doesn't match, "
"expected: INT16, got INT64"}
collection_w.insert(
data, check_task=CheckTasks.err_res, check_items=error)