diff --git a/internal/util/importutilv2/numpy/reader.go b/internal/util/importutilv2/numpy/reader.go index b1af6ef338..96a0398950 100644 --- a/internal/util/importutilv2/numpy/reader.go +++ b/internal/util/importutilv2/numpy/reader.go @@ -45,6 +45,11 @@ type reader struct { } func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, paths []string, bufferSize int) (*reader, error) { + for _, fieldSchema := range schema.Fields { + if fieldSchema.GetNullable() { + return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support bulk insert numpy files in field(%s) which set nullable == true", fieldSchema.GetName())) + } + } fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index b90c05bb67..80fae1722e 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -319,6 +319,58 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { suite.Error(err) } +func (suite *ReaderSuite) failRunNullable(dt schemapb.DataType, nullable bool) { + const dim = 8 + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + DataType: suite.pkDataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "max_length", + Value: "256", + }, + }, + }, + { + FieldID: 101, + Name: "vec", + DataType: suite.vecDataType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: fmt.Sprintf("%d", dim), + }, + }, + }, + { + FieldID: 102, + Name: dt.String(), + DataType: dt, + ElementType: schemapb.DataType_Int32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "max_length", + Value: "256", + }, + }, + Nullable: nullable, + }, + }, + } + files := make(map[int64]string) + for _, field := range schema.GetFields() { + files[field.GetFieldID()] = fmt.Sprintf("%s.npy", field.GetName()) + } + + cm := mocks.NewChunkManager(suite.T()) + _, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt) + suite.Error(err) +} + func (suite *ReaderSuite) TestReadScalarFields() { suite.run(schemapb.DataType_Bool) suite.run(schemapb.DataType_Int8) @@ -330,6 +382,15 @@ func (suite *ReaderSuite) TestReadScalarFields() { suite.run(schemapb.DataType_VarChar) suite.run(schemapb.DataType_JSON) suite.failRun(schemapb.DataType_JSON, true) + suite.failRunNullable(schemapb.DataType_Bool, true) + suite.failRunNullable(schemapb.DataType_Int8, true) + suite.failRunNullable(schemapb.DataType_Int16, true) + suite.failRunNullable(schemapb.DataType_Int32, true) + suite.failRunNullable(schemapb.DataType_Int64, true) + suite.failRunNullable(schemapb.DataType_Float, true) + suite.failRunNullable(schemapb.DataType_Double, true) + suite.failRunNullable(schemapb.DataType_VarChar, true) + suite.failRunNullable(schemapb.DataType_JSON, true) } func (suite *ReaderSuite) TestStringPK() {