enhance: add nullable in Field, check valid_data and fill data (#32086)

1. add nullable in model.Field
   help to read nullable accurately.
2. check valid_data
a. if user pass default_value or the field is nullable, the length of
valid_data must be num_rows.
b. if passed valid_data, the length of passed field data must equal to
the number of 'true' in valid_data.
c. after fill default_value, only nullable field will still has
valid_data.
3. fill data in two situation
    a. has no default_value, if nullable,
will fill nullValue when passed num_rows not equal to expected num_rows.
    b. has default_value,
will fill default_value when passed num_rows not equal to expected
num_rows.
c. after fill data, the length of all field will equal to passed
num_rows.
#31728

---------

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
smellthemoon 2024-05-16 11:57:35 +08:00 committed by GitHub
parent 875ad88d84
commit b45798107a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 2838 additions and 486 deletions

View File

@ -21,6 +21,7 @@ type Field struct {
IsClusteringKey bool
DefaultValue *schemapb.ValueField
ElementType schemapb.DataType
Nullable bool
}
func (f *Field) Available() bool {
@ -43,6 +44,7 @@ func (f *Field) Clone() *Field {
IsClusteringKey: f.IsClusteringKey,
DefaultValue: f.DefaultValue,
ElementType: f.ElementType,
Nullable: f.Nullable,
}
}
@ -72,7 +74,8 @@ func (f *Field) Equal(other Field) bool {
f.IsDynamic == other.IsDynamic &&
f.IsClusteringKey == other.IsClusteringKey &&
f.DefaultValue == other.DefaultValue &&
f.ElementType == other.ElementType
f.ElementType == other.ElementType &&
f.Nullable == other.Nullable
}
func CheckFieldsEqual(fieldsA, fieldsB []*Field) bool {
@ -107,6 +110,7 @@ func MarshalFieldModel(field *Field) *schemapb.FieldSchema {
IsClusteringKey: field.IsClusteringKey,
DefaultValue: field.DefaultValue,
ElementType: field.ElementType,
Nullable: field.Nullable,
}
}
@ -141,6 +145,7 @@ func UnmarshalFieldModel(fieldSchema *schemapb.FieldSchema) *Field {
IsClusteringKey: fieldSchema.IsClusteringKey,
DefaultValue: fieldSchema.DefaultValue,
ElementType: fieldSchema.ElementType,
Nullable: fieldSchema.Nullable,
}
}

View File

@ -661,6 +661,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
IsClusteringKey: field.IsClusteringKey,
DefaultValue: field.DefaultValue,
ElementType: field.ElementType,
Nullable: field.Nullable,
})
}
}

View File

@ -33,7 +33,7 @@ func TestInsertTask_CheckAligned(t *testing.T) {
err = case1.insertMsg.CheckAligned()
assert.NoError(t, err)
// fillFieldsDataBySchema was already checked by TestInsertTask_fillFieldsDataBySchema
// checkFieldsDataBySchema was already checked by TestInsertTask_checkFieldsDataBySchema
boolFieldSchema := &schemapb.FieldSchema{DataType: schemapb.DataType_Bool}
int8FieldSchema := &schemapb.FieldSchema{DataType: schemapb.DataType_Int8}

View File

@ -59,7 +59,7 @@ func TestUpsertTask_CheckAligned(t *testing.T) {
err = case1.upsertMsg.InsertMsg.CheckAligned()
assert.NoError(t, err)
// fillFieldsDataBySchema was already checked by TestUpsertTask_fillFieldsDataBySchema
// checkFieldsDataBySchema was already checked by TestUpsertTask_checkFieldsDataBySchema
boolFieldSchema := &schemapb.FieldSchema{DataType: schemapb.DataType_Bool}
int8FieldSchema := &schemapb.FieldSchema{DataType: schemapb.DataType_Int8}

View File

@ -1123,9 +1123,10 @@ func isPartitionLoaded(ctx context.Context, qc types.QueryCoordClient, collID in
return false, nil
}
func fillFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
requiredFieldsNum := 0
func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
log := log.With(zap.String("collection", schema.GetName()))
primaryKeyNum := 0
autoGenFieldNum := 0
dataNameSet := typeutil.NewSet[string]()
for _, data := range insertMsg.FieldsData {
@ -1141,24 +1142,24 @@ func fillFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstr
log.Warn("not primary key field, but set autoID true", zap.String("field", fieldSchema.GetName()))
return merr.WrapErrParameterInvalidMsg("only primary key could be with AutoID enabled")
}
if fieldSchema.IsPrimaryKey {
primaryKeyNum++
}
if fieldSchema.GetDefaultValue() != nil && fieldSchema.IsPrimaryKey {
return merr.WrapErrParameterInvalidMsg("primary key can't be with default value")
}
if !fieldSchema.AutoID {
requiredFieldsNum++
}
if fieldSchema.AutoID && Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() {
requiredFieldsNum++
}
// if has no field pass in, consider use default value
// so complete it with field schema
if _, ok := dataNameSet[fieldSchema.GetName()]; !ok {
// primary key can not use default value
if fieldSchema.IsPrimaryKey {
primaryKeyNum++
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() {
// no need to pass when pk is autoid and SkipAutoIDCheck is false
autoGenFieldNum++
continue
}
if fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
log.Warn("no corresponding fieldData pass in", zap.String("fieldSchema", fieldSchema.GetName()))
return merr.WrapErrParameterInvalidMsg("fieldSchema(%s) has no corresponding fieldData pass in", fieldSchema.GetName())
}
// when use default_value or has set Nullable
// it's ok that no corresponding fieldData found
dataToAppend := &schemapb.FieldData{
Type: fieldSchema.GetDataType(),
FieldName: fieldSchema.GetName(),
@ -1169,17 +1170,15 @@ func fillFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstr
if primaryKeyNum > 1 {
log.Warn("more than 1 primary keys not supported",
zap.Int64("primaryKeyNum", int64(primaryKeyNum)),
zap.String("collection", schema.GetName()))
zap.Int64("primaryKeyNum", int64(primaryKeyNum)))
return merr.WrapErrParameterInvalidMsg("more than 1 primary keys not supported, got %d", primaryKeyNum)
}
if len(insertMsg.FieldsData) != requiredFieldsNum {
log.Warn("the number of fields is not the same as needed",
zap.Int("fieldNum", len(insertMsg.FieldsData)),
zap.Int("requiredFieldNum", requiredFieldsNum),
zap.String("collection", schema.GetName()))
return merr.WrapErrParameterInvalid(requiredFieldsNum, len(insertMsg.FieldsData), "the number of fields is less than needed")
expectedNum := len(schema.Fields)
actualNum := autoGenFieldNum + len(insertMsg.FieldsData)
if expectedNum != actualNum {
log.Warn("the number of fields is not the same as needed", zap.Int("expected", expectedNum), zap.Int("actual", actualNum))
return merr.WrapErrParameterInvalid(expectedNum, actualNum, "more fieldData has pass in")
}
return nil
@ -1192,7 +1191,7 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M
return nil, merr.WrapErrParameterInvalid("invalid num_rows", fmt.Sprint(rowNums), "num_rows should be greater than 0")
}
if err := fillFieldsDataBySchema(schema, insertMsg); err != nil {
if err := checkFieldsDataBySchema(schema, insertMsg); err != nil {
return nil, err
}
@ -1201,6 +1200,9 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M
log.Error("get primary field schema failed", zap.String("collectionName", insertMsg.CollectionName), zap.Any("schema", schema), zap.Error(err))
return nil, err
}
if primaryFieldSchema.GetNullable() {
return nil, merr.WrapErrParameterInvalidMsg("primary field not support null")
}
// get primaryFieldData whether autoID is true or not
var primaryFieldData *schemapb.FieldData
if inInsert {
@ -1472,15 +1474,6 @@ func assignPartitionKeys(ctx context.Context, dbName string, collName string, ke
return hashedPartitionNames, err
}
func memsetLoop[T any](v T, numRows int) []T {
ret := make([]T, 0, numRows)
for i := 0; i < numRows; i++ {
ret = append(ret, v)
}
return ret
}
func ErrWithLog(logger *log.MLogger, msg string, err error) error {
wrapErr := errors.Wrap(err, msg)
if logger != nil {

View File

@ -1122,373 +1122,426 @@ func Test_isPartitionIsLoaded(t *testing.T) {
})
}
func Test_InsertTaskfillFieldsDataBySchema(t *testing.T) {
func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
paramtable.Init()
log.Info("Test_InsertTaskfillFieldsDataBySchema", zap.Bool("enable", Params.ProxyCfg.SkipAutoIDCheck.GetAsBool()))
log.Info("InsertTaskcheckFieldsDataBySchema", zap.Bool("enable", Params.ProxyCfg.SkipAutoIDCheck.GetAsBool()))
var err error
// schema is empty, though won't happen in system
case1 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
DbName: "TestInsertTask_fillFieldsDataBySchema",
CollectionName: "TestInsertTask_fillFieldsDataBySchema",
PartitionName: "TestInsertTask_fillFieldsDataBySchema",
t.Run("schema is empty, though won't happen in system", func(t *testing.T) {
// won't happen in system
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{},
},
},
}
err = fillFieldsDataBySchema(case1.schema, case1.insertMsg)
assert.Equal(t, nil, err)
// schema has two fields, msg has no field. fields will be filled in
case2 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
DbName: "TestInsertTask_checkFieldsDataBySchema",
CollectionName: "TestInsertTask_checkFieldsDataBySchema",
PartitionName: "TestInsertTask_checkFieldsDataBySchema",
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
}
err = fillFieldsDataBySchema(case2.schema, case2.insertMsg)
assert.Equal(t, nil, err)
assert.Equal(t, len(case2.insertMsg.FieldsData), 2)
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.Equal(t, nil, err)
assert.Equal(t, len(task.insertMsg.FieldsData), 0)
})
// schema has a pk can't fill in, and another can.
case3 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = fillFieldsDataBySchema(case3.schema, case3.insertMsg)
assert.Equal(t, nil, err)
assert.Equal(t, len(case3.insertMsg.FieldsData), 1)
// schema has a pk can't fill in, and another can, but pk autoid == false
// means that data pass less
case4 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = fillFieldsDataBySchema(case4.schema, case4.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
assert.Equal(t, len(case4.insertMsg.FieldsData), 1)
// pass more data field
case5 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
FieldsData: []*schemapb.FieldData{
t.Run("miss field", func(t *testing.T) {
// schema has field, msg has no field.
// schema is not Nullable or has set default_value
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldName: "c",
Type: schemapb.DataType_Int64,
Name: "a",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
},
}
err = fillFieldsDataBySchema(case5.schema, case5.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
assert.Equal(t, len(case5.insertMsg.FieldsData), 3)
// duplicate field datas
case5.insertMsg.FieldsData = []*schemapb.FieldData{
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
}
err = fillFieldsDataBySchema(case5.schema, case5.insertMsg)
assert.Error(t, err)
// not pk, but autoid == true
case6 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: true,
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
}
err = fillFieldsDataBySchema(case6.schema, case6.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
assert.Equal(t, len(case6.insertMsg.FieldsData), 0)
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
})
// more than one pk
case7 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
t.Run("miss field is nullable or set default_value", func(t *testing.T) {
// schema has fields, msg has no field.
// schema is Nullable or set default_value
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
err = fillFieldsDataBySchema(case7.schema, case7.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
assert.Equal(t, len(case7.insertMsg.FieldsData), 0)
// pk can not set default value
case8 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
DefaultValue: &schemapb.ValueField{
Data: &schemapb.ValueField_LongData{
LongData: 1,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
DataType: schemapb.DataType_Int64,
Nullable: true,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
DefaultValue: &schemapb.ValueField{
Data: &schemapb.ValueField_LongData{
LongData: 1,
},
},
},
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = fillFieldsDataBySchema(case8.schema, case8.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
assert.Equal(t, len(case8.insertMsg.FieldsData), 0)
// skip the auto id
case9 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
FieldsData: []*schemapb.FieldData{
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
{
FieldName: "b",
Type: schemapb.DataType_Int64,
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
},
}
}
err = fillFieldsDataBySchema(case9.schema, case9.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
assert.Equal(t, len(case9.insertMsg.FieldsData), 2)
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.Equal(t, nil, err)
assert.Equal(t, len(task.insertMsg.FieldsData), 2)
})
paramtable.Get().Save(Params.ProxyCfg.SkipAutoIDCheck.Key, "true")
case10 := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
FieldsData: []*schemapb.FieldData{
t.Run("schema has autoid pk", func(t *testing.T) {
// schema has autoid pk
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
{
FieldName: "b",
Type: schemapb.DataType_Int64,
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
},
}
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = fillFieldsDataBySchema(case10.schema, case10.insertMsg)
assert.NoError(t, err)
assert.Equal(t, len(case10.insertMsg.FieldsData), 2)
paramtable.Get().Reset(Params.ProxyCfg.SkipAutoIDCheck.Key)
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.Equal(t, nil, err)
assert.Equal(t, len(task.insertMsg.FieldsData), 0)
})
t.Run("schema pk is not autoid, but not pass pk", func(t *testing.T) {
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
})
t.Run("pass more data field", func(t *testing.T) {
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
FieldsData: []*schemapb.FieldData{
{
FieldName: "c",
Type: schemapb.DataType_Int64,
},
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
})
t.Run("duplicate field datas", func(t *testing.T) {
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
FieldsData: []*schemapb.FieldData{
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
})
t.Run("not pk field, but autoid == true", func(t *testing.T) {
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: true,
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
})
t.Run("has more than one pk", func(t *testing.T) {
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
})
t.Run("pk can not set default value", func(t *testing.T) {
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_checkFieldsDataBySchema",
Description: "TestInsertTask_checkFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: false,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
DefaultValue: &schemapb.ValueField{
Data: &schemapb.ValueField_LongData{
LongData: 1,
},
},
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
})
t.Run("skip the auto id", func(t *testing.T) {
task := insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
FieldsData: []*schemapb.FieldData{
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
{
FieldName: "b",
Type: schemapb.DataType_Int64,
},
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
assert.Equal(t, len(task.insertMsg.FieldsData), 2)
paramtable.Get().Save(Params.ProxyCfg.SkipAutoIDCheck.Key, "true")
task = insertTask{
schema: &schemapb.CollectionSchema{
Name: "TestInsertTask_fillFieldsDataBySchema",
Description: "TestInsertTask_fillFieldsDataBySchema",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
Name: "a",
AutoID: true,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
Name: "b",
AutoID: false,
DataType: schemapb.DataType_Int64,
},
},
},
insertMsg: &BaseInsertTask{
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
},
FieldsData: []*schemapb.FieldData{
{
FieldName: "a",
Type: schemapb.DataType_Int64,
},
{
FieldName: "b",
Type: schemapb.DataType_Int64,
},
},
},
},
}
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
assert.NoError(t, err)
assert.Equal(t, len(task.insertMsg.FieldsData), 2)
paramtable.Get().Reset(Params.ProxyCfg.SkipAutoIDCheck.Key)
})
}
func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {

View File

@ -122,7 +122,7 @@ func (v *validateUtil) Validate(data []*schemapb.FieldData, schema *schemapb.Col
}
}
err = v.fillWithDefaultValue(data, helper, numRows)
err = v.fillWithValue(data, helper, int(numRows))
if err != nil {
return err
}
@ -255,6 +255,9 @@ func (v *validateUtil) checkAligned(data []*schemapb.FieldData, schema *typeutil
}
if n != numRows {
log.Warn("the num_rows of field is not equal to passed num_rows", zap.String("fieldName", field.GetFieldName()),
zap.Int64("fieldNumRows", int64(n)), zap.Int64("passedNumRows", int64(numRows)),
zap.Bools("ValidData", field.GetValidData()))
return errNumRowsMismatch(field.GetFieldName(), n)
}
}
@ -263,82 +266,313 @@ func (v *validateUtil) checkAligned(data []*schemapb.FieldData, schema *typeutil
return nil
}
func (v *validateUtil) fillWithDefaultValue(data []*schemapb.FieldData, schema *typeutil.SchemaHelper, numRows uint64) error {
// fill data in two situation
// 1. has no default_value, if nullable,
// will fill nullValue when passed num_rows not equal to expected num_rows
// 2. has default_value,
// will fill default_value when passed num_rows not equal to expected num_rows,
//
// after fillWithValue, only nullable field will has valid_data, the length of all data will be passed num_rows
func (v *validateUtil) fillWithValue(data []*schemapb.FieldData, schema *typeutil.SchemaHelper, numRows int) error {
for _, field := range data {
fieldSchema, err := schema.GetFieldFromName(field.GetFieldName())
if err != nil {
return err
}
// if default value is not set, continue
// compatible with 2.2.x
if fieldSchema.GetDefaultValue() == nil {
continue
}
switch field.Field.(type) {
case *schemapb.FieldData_Scalars:
switch sd := field.GetScalars().GetData().(type) {
case *schemapb.ScalarField_BoolData:
if len(sd.BoolData.Data) == 0 {
defaultValue := fieldSchema.GetDefaultValue().GetBoolData()
sd.BoolData.Data = memsetLoop(defaultValue, int(numRows))
}
case *schemapb.ScalarField_IntData:
if len(sd.IntData.Data) == 0 {
defaultValue := fieldSchema.GetDefaultValue().GetIntData()
sd.IntData.Data = memsetLoop(defaultValue, int(numRows))
}
case *schemapb.ScalarField_LongData:
if len(sd.LongData.Data) == 0 {
defaultValue := fieldSchema.GetDefaultValue().GetLongData()
sd.LongData.Data = memsetLoop(defaultValue, int(numRows))
}
case *schemapb.ScalarField_FloatData:
if len(sd.FloatData.Data) == 0 {
defaultValue := fieldSchema.GetDefaultValue().GetFloatData()
sd.FloatData.Data = memsetLoop(defaultValue, int(numRows))
}
case *schemapb.ScalarField_DoubleData:
if len(sd.DoubleData.Data) == 0 {
defaultValue := fieldSchema.GetDefaultValue().GetDoubleData()
sd.DoubleData.Data = memsetLoop(defaultValue, int(numRows))
}
case *schemapb.ScalarField_StringData:
if len(sd.StringData.Data) == 0 {
defaultValue := fieldSchema.GetDefaultValue().GetStringData()
sd.StringData.Data = memsetLoop(defaultValue, int(numRows))
}
case *schemapb.ScalarField_ArrayData:
log.Error("array type not support default value", zap.String("fieldSchemaName", field.GetFieldName()))
return merr.WrapErrParameterInvalid("not set default value", "", "array type not support default value")
case *schemapb.ScalarField_JsonData:
log.Error("json type not support default value", zap.String("fieldSchemaName", field.GetFieldName()))
return merr.WrapErrParameterInvalid("not set default value", "", "json type not support default value")
default:
panic("undefined data type " + field.Type.String())
err = v.fillWithNullValue(field, fieldSchema, numRows)
if err != nil {
return err
}
} else {
err = v.fillWithDefaultValue(field, fieldSchema, numRows)
if err != nil {
return err
}
case *schemapb.FieldData_Vectors:
log.Error("vector not support default value", zap.String("fieldSchemaName", field.GetFieldName()))
return merr.WrapErrParameterInvalid("not set default value", "", "vector type not support default value")
default:
panic("undefined data type " + field.Type.String())
}
}
return nil
}
func (v *validateUtil) fillWithNullValue(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema, numRows int) error {
err := checkValidData(field, fieldSchema, numRows)
if err != nil {
return err
}
switch field.Field.(type) {
case *schemapb.FieldData_Scalars:
switch sd := field.GetScalars().GetData().(type) {
case *schemapb.ScalarField_BoolData:
if fieldSchema.GetNullable() {
sd.BoolData.Data, err = fillWithNullValueImpl(sd.BoolData.Data, field.GetValidData())
if err != nil {
return err
}
}
case *schemapb.ScalarField_IntData:
if fieldSchema.GetNullable() {
sd.IntData.Data, err = fillWithNullValueImpl(sd.IntData.Data, field.GetValidData())
if err != nil {
return err
}
}
case *schemapb.ScalarField_LongData:
if fieldSchema.GetNullable() {
sd.LongData.Data, err = fillWithNullValueImpl(sd.LongData.Data, field.GetValidData())
if err != nil {
return err
}
}
case *schemapb.ScalarField_FloatData:
if fieldSchema.GetNullable() {
sd.FloatData.Data, err = fillWithNullValueImpl(sd.FloatData.Data, field.GetValidData())
if err != nil {
return err
}
}
case *schemapb.ScalarField_DoubleData:
if fieldSchema.GetNullable() {
sd.DoubleData.Data, err = fillWithNullValueImpl(sd.DoubleData.Data, field.GetValidData())
if err != nil {
return err
}
}
case *schemapb.ScalarField_StringData:
if fieldSchema.GetNullable() {
sd.StringData.Data, err = fillWithNullValueImpl(sd.StringData.Data, field.GetValidData())
if err != nil {
return err
}
}
case *schemapb.ScalarField_ArrayData:
// Todo: support it
case *schemapb.ScalarField_JsonData:
if fieldSchema.GetNullable() {
sd.JsonData.Data, err = fillWithNullValueImpl(sd.JsonData.Data, field.GetValidData())
if err != nil {
return err
}
}
default:
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("undefined data type:%s", field.Type.String()))
}
case *schemapb.FieldData_Vectors:
default:
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("undefined data type:%s", field.Type.String()))
}
return nil
}
func (v *validateUtil) fillWithDefaultValue(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema, numRows int) error {
var err error
switch field.Field.(type) {
case *schemapb.FieldData_Scalars:
switch sd := field.GetScalars().GetData().(type) {
case *schemapb.ScalarField_BoolData:
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
defaultValue := fieldSchema.GetDefaultValue().GetBoolData()
sd.BoolData.Data, err = fillWithDefaultValueImpl(sd.BoolData.Data, defaultValue, field.GetValidData())
if err != nil {
return err
}
if !fieldSchema.GetNullable() {
field.ValidData = []bool{}
}
case *schemapb.ScalarField_IntData:
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
defaultValue := fieldSchema.GetDefaultValue().GetIntData()
sd.IntData.Data, err = fillWithDefaultValueImpl(sd.IntData.Data, defaultValue, field.GetValidData())
if err != nil {
return err
}
if !fieldSchema.GetNullable() {
field.ValidData = []bool{}
}
case *schemapb.ScalarField_LongData:
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
defaultValue := fieldSchema.GetDefaultValue().GetLongData()
sd.LongData.Data, err = fillWithDefaultValueImpl(sd.LongData.Data, defaultValue, field.GetValidData())
if err != nil {
return err
}
if !fieldSchema.GetNullable() {
field.ValidData = []bool{}
}
case *schemapb.ScalarField_FloatData:
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
defaultValue := fieldSchema.GetDefaultValue().GetFloatData()
sd.FloatData.Data, err = fillWithDefaultValueImpl(sd.FloatData.Data, defaultValue, field.GetValidData())
if err != nil {
return err
}
if !fieldSchema.GetNullable() {
field.ValidData = []bool{}
}
case *schemapb.ScalarField_DoubleData:
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
defaultValue := fieldSchema.GetDefaultValue().GetDoubleData()
sd.DoubleData.Data, err = fillWithDefaultValueImpl(sd.DoubleData.Data, defaultValue, field.GetValidData())
if err != nil {
return err
}
if !fieldSchema.GetNullable() {
field.ValidData = []bool{}
}
case *schemapb.ScalarField_StringData:
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
defaultValue := fieldSchema.GetDefaultValue().GetStringData()
sd.StringData.Data, err = fillWithDefaultValueImpl(sd.StringData.Data, defaultValue, field.GetValidData())
if err != nil {
return err
}
if !fieldSchema.GetNullable() {
field.ValidData = []bool{}
}
case *schemapb.ScalarField_ArrayData:
// Todo: support it
log.Error("array type not support default value", zap.String("fieldSchemaName", field.GetFieldName()))
return merr.WrapErrParameterInvalid("not set default value", "", "array type not support default value")
case *schemapb.ScalarField_JsonData:
if len(field.GetValidData()) != numRows {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", field.GetFieldName())
return merr.WrapErrParameterInvalid(numRows, len(field.GetValidData()), msg)
}
defaultValue := fieldSchema.GetDefaultValue().GetBytesData()
sd.JsonData.Data, err = fillWithDefaultValueImpl(sd.JsonData.Data, defaultValue, field.GetValidData())
if err != nil {
return err
}
if !fieldSchema.GetNullable() {
field.ValidData = []bool{}
}
default:
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("undefined data type:%s", field.Type.String()))
}
case *schemapb.FieldData_Vectors:
log.Error("vector not support default value", zap.String("fieldSchemaName", field.GetFieldName()))
return merr.WrapErrParameterInvalidMsg("vector type not support default value")
default:
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("undefined data type:%s", field.Type.String()))
}
err = checkValidData(field, fieldSchema, numRows)
if err != nil {
return err
}
return nil
}
func checkValidData(data *schemapb.FieldData, schema *schemapb.FieldSchema, numRows int) error {
expectedNum := 0
// if nullable, the length of ValidData is numRows
if schema.GetNullable() {
expectedNum = numRows
}
if len(data.GetValidData()) != expectedNum {
msg := fmt.Sprintf("the length of valid_data of field(%s) is wrong", data.GetFieldName())
return merr.WrapErrParameterInvalid(expectedNum, len(data.GetValidData()), msg)
}
return nil
}
func fillWithNullValueImpl[T any](array []T, validData []bool) ([]T, error) {
n := getValidNumber(validData)
if len(array) != n {
return nil, merr.WrapErrParameterInvalid(n, len(array), "the length of field is wrong")
}
if n == len(validData) {
return array, nil
}
res := make([]T, len(validData))
srcIdx := 0
for i, v := range validData {
if v {
res[i] = array[srcIdx]
srcIdx++
}
}
return res, nil
}
func fillWithDefaultValueImpl[T any](array []T, value T, validData []bool) ([]T, error) {
n := getValidNumber(validData)
if len(array) != n {
return nil, merr.WrapErrParameterInvalid(n, len(array), "the length of field is wrong")
}
if n == len(validData) {
return array, nil
}
res := make([]T, len(validData))
srcIdx := 0
for i, v := range validData {
if v {
res[i] = array[srcIdx]
srcIdx++
} else {
res[i] = value
}
}
return res, nil
}
func getValidNumber(validData []bool) int {
res := 0
for _, v := range validData {
if v {
res++
}
}
return res
}
func (v *validateUtil) checkFloatVectorFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
floatArray := field.GetVectors().GetFloatVector().GetData()
if floatArray == nil {
@ -401,7 +635,7 @@ func (v *validateUtil) checkSparseFloatFieldData(field *schemapb.FieldData, fiel
func (v *validateUtil) checkVarCharFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
strArr := field.GetScalars().GetStringData().GetData()
if strArr == nil && fieldSchema.GetDefaultValue() == nil {
if strArr == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
msg := fmt.Sprintf("varchar field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need string array", "got nil", msg)
}
@ -466,7 +700,7 @@ func (v *validateUtil) checkJSONFieldData(field *schemapb.FieldData, fieldSchema
func (v *validateUtil) checkIntegerFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
data := field.GetScalars().GetIntData().GetData()
if data == nil && fieldSchema.GetDefaultValue() == nil {
if data == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
msg := fmt.Sprintf("field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need int array", "got nil", msg)
}
@ -485,7 +719,7 @@ func (v *validateUtil) checkIntegerFieldData(field *schemapb.FieldData, fieldSch
func (v *validateUtil) checkLongFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
data := field.GetScalars().GetLongData().GetData()
if data == nil && fieldSchema.GetDefaultValue() == nil {
if data == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
msg := fmt.Sprintf("field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need long int array", "got nil", msg)
}
@ -495,7 +729,7 @@ func (v *validateUtil) checkLongFieldData(field *schemapb.FieldData, fieldSchema
func (v *validateUtil) checkFloatFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
data := field.GetScalars().GetFloatData().GetData()
if data == nil && fieldSchema.GetDefaultValue() == nil {
if data == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
msg := fmt.Sprintf("field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need float32 array", "got nil", msg)
}
@ -509,7 +743,7 @@ func (v *validateUtil) checkFloatFieldData(field *schemapb.FieldData, fieldSchem
func (v *validateUtil) checkDoubleFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
data := field.GetScalars().GetDoubleData().GetData()
if data == nil && fieldSchema.GetDefaultValue() == nil {
if data == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
msg := fmt.Sprintf("field '%v' is illegal, array type mismatch", field.GetFieldName())
return merr.WrapErrParameterInvalid("need float64(double) array", "got nil", msg)
}

File diff suppressed because it is too large Load Diff