From 87ecaac70384c69f57ec59e0d42e259149f66496 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Wed, 30 Aug 2023 10:52:26 +0800 Subject: [PATCH] Add dynamic schema check in upsert (#26644) Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/proxy/task_insert.go | 41 +-------- internal/proxy/task_insert_test.go | 133 ----------------------------- internal/proxy/task_upsert.go | 7 ++ internal/proxy/util.go | 38 +++++++++ internal/proxy/util_test.go | 119 ++++++++++++++++++++++++++ 5 files changed, 165 insertions(+), 173 deletions(-) diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 3e519b3fc1..455518d382 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "encoding/json" "fmt" "strconv" @@ -13,7 +12,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -149,7 +147,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error { it.result.SuccIndex = sliceIndex if it.schema.EnableDynamicField { - err = it.checkDynamicFieldData() + err = checkDynamicFieldData(it.schema, it.insertMsg) if err != nil { return err } @@ -287,40 +285,3 @@ func (it *insertTask) Execute(ctx context.Context) error { func (it *insertTask) PostExecute(ctx context.Context) error { return nil } - -func (it *insertTask) verifyDynamicFieldData() error { - for _, field := range it.insertMsg.FieldsData { - if field.GetFieldName() == common.MetaFieldName { - if !it.schema.EnableDynamicField { - return fmt.Errorf("without dynamic schema enabled, the field name cannot be set to %s", common.MetaFieldName) - } - for _, rowData := range field.GetScalars().GetJsonData().GetData() { - jsonData := make(map[string]interface{}) - if err := json.Unmarshal(rowData, &jsonData); err != nil { - return err - } - if _, ok := jsonData[common.MetaFieldName]; ok { - return fmt.Errorf("cannot set json key to: %s", common.MetaFieldName) - } - } - } - } - - return nil -} - -func (it *insertTask) checkDynamicFieldData() error { - for _, data := range it.insertMsg.FieldsData { - if data.IsDynamic { - data.FieldName = common.MetaFieldName - return it.verifyDynamicFieldData() - } - } - defaultData := make([][]byte, it.insertMsg.NRows()) - for i := range defaultData { - defaultData[i] = []byte("{}") - } - dynamicData := autoGenDynamicFieldData(defaultData) - it.insertMsg.FieldsData = append(it.insertMsg.FieldsData, dynamicData) - return nil -} diff --git a/internal/proxy/task_insert_test.go b/internal/proxy/task_insert_test.go index f39a07c5fc..a5e95a71b4 100644 --- a/internal/proxy/task_insert_test.go +++ b/internal/proxy/task_insert_test.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "encoding/json" "fmt" "testing" @@ -264,135 +263,3 @@ func TestInsertTask(t *testing.T) { assert.ElementsMatch(t, channels, resChannels) }) } - -func TestInsertTask_checkDynamicFieldData(t *testing.T) { - t.Run("normal case", func(t *testing.T) { - jsonData := make([][]byte, 0) - data := map[string]interface{}{ - "bool": true, - "int": 100, - "float": 1.2, - "string": "abc", - "json": map[string]interface{}{ - "int": 20, - "array": []int{1, 2, 3}, - }, - } - jsonBytes, err := json.MarshalIndent(data, "", " ") - assert.NoError(t, err) - jsonData = append(jsonData, jsonBytes) - jsonFieldData := autoGenDynamicFieldData(jsonData) - it := insertTask{ - ctx: context.Background(), - insertMsg: &msgstream.InsertMsg{ - InsertRequest: msgpb.InsertRequest{ - CollectionName: "collectionName", - FieldsData: []*schemapb.FieldData{jsonFieldData}, - NumRows: 1, - Version: msgpb.InsertDataVersion_ColumnBased, - }, - }, - schema: newTestSchema(), - } - err = it.checkDynamicFieldData() - assert.NoError(t, err) - }) - t.Run("key has $meta", func(t *testing.T) { - jsonData := make([][]byte, 0) - data := map[string]interface{}{ - "bool": true, - "int": 100, - "float": 1.2, - "string": "abc", - "json": map[string]interface{}{ - "int": 20, - "array": []int{1, 2, 3}, - }, - "$meta": "error key", - } - jsonBytes, err := json.MarshalIndent(data, "", " ") - assert.NoError(t, err) - jsonData = append(jsonData, jsonBytes) - jsonFieldData := autoGenDynamicFieldData(jsonData) - it := insertTask{ - ctx: context.Background(), - insertMsg: &msgstream.InsertMsg{ - InsertRequest: msgpb.InsertRequest{ - CollectionName: "collectionName", - FieldsData: []*schemapb.FieldData{jsonFieldData}, - NumRows: 1, - Version: msgpb.InsertDataVersion_ColumnBased, - }, - }, - schema: newTestSchema(), - } - err = it.checkDynamicFieldData() - assert.Error(t, err) - }) - t.Run("disable dynamic schema", func(t *testing.T) { - jsonData := make([][]byte, 0) - data := map[string]interface{}{ - "bool": true, - "int": 100, - "float": 1.2, - "string": "abc", - "json": map[string]interface{}{ - "int": 20, - "array": []int{1, 2, 3}, - }, - } - jsonBytes, err := json.MarshalIndent(data, "", " ") - assert.NoError(t, err) - jsonData = append(jsonData, jsonBytes) - jsonFieldData := autoGenDynamicFieldData(jsonData) - it := insertTask{ - ctx: context.Background(), - insertMsg: &msgstream.InsertMsg{ - InsertRequest: msgpb.InsertRequest{ - CollectionName: "collectionName", - FieldsData: []*schemapb.FieldData{jsonFieldData}, - NumRows: 1, - Version: msgpb.InsertDataVersion_ColumnBased, - }, - }, - schema: newTestSchema(), - } - it.schema.EnableDynamicField = false - err = it.checkDynamicFieldData() - assert.Error(t, err) - }) - t.Run("json data is string", func(t *testing.T) { - data := "abcdefg" - jsonFieldData := autoGenDynamicFieldData([][]byte{[]byte(data)}) - it := insertTask{ - ctx: context.Background(), - insertMsg: &msgstream.InsertMsg{ - InsertRequest: msgpb.InsertRequest{ - CollectionName: "collectionName", - FieldsData: []*schemapb.FieldData{jsonFieldData}, - NumRows: 1, - Version: msgpb.InsertDataVersion_ColumnBased, - }, - }, - schema: newTestSchema(), - } - err := it.checkDynamicFieldData() - assert.Error(t, err) - }) - t.Run("no json data", func(t *testing.T) { - it := insertTask{ - ctx: context.Background(), - insertMsg: &msgstream.InsertMsg{ - InsertRequest: msgpb.InsertRequest{ - CollectionName: "collectionName", - FieldsData: []*schemapb.FieldData{}, - NumRows: 1, - Version: msgpb.InsertDataVersion_ColumnBased, - }, - }, - schema: newTestSchema(), - } - err := it.checkDynamicFieldData() - assert.NoError(t, err) - }) -} diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 29d6ef6b4a..26ac27afe0 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -170,6 +170,13 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { } it.result.SuccIndex = sliceIndex + if it.schema.EnableDynamicField { + err := checkDynamicFieldData(it.schema, it.upsertMsg.InsertMsg) + if err != nil { + return err + } + } + // check primaryFieldData whether autoID is true or not // only allow support autoID == false var err error diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 723db2f990..ab4c436e50 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -18,6 +18,7 @@ package proxy import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -1391,3 +1392,40 @@ func memsetLoop[T any](v T, numRows int) []T { return ret } + +func verifyDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error { + for _, field := range insertMsg.FieldsData { + if field.GetFieldName() == common.MetaFieldName { + if !schema.EnableDynamicField { + return fmt.Errorf("without dynamic schema enabled, the field name cannot be set to %s", common.MetaFieldName) + } + for _, rowData := range field.GetScalars().GetJsonData().GetData() { + jsonData := make(map[string]interface{}) + if err := json.Unmarshal(rowData, &jsonData); err != nil { + return err + } + if _, ok := jsonData[common.MetaFieldName]; ok { + return fmt.Errorf("cannot set json key to: %s", common.MetaFieldName) + } + } + } + } + + return nil +} + +func checkDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error { + for _, data := range insertMsg.FieldsData { + if data.IsDynamic { + data.FieldName = common.MetaFieldName + return verifyDynamicFieldData(schema, insertMsg) + } + } + defaultData := make([][]byte, insertMsg.NRows()) + for i := range defaultData { + defaultData[i] = []byte("{}") + } + dynamicData := autoGenDynamicFieldData(defaultData) + insertMsg.FieldsData = append(insertMsg.FieldsData, dynamicData) + return nil +} diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index cdeeea247d..d1432667b8 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -18,6 +18,7 @@ package proxy import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -38,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/merr" @@ -1811,3 +1813,120 @@ func Test_GetPartitionProgressFailed(t *testing.T) { _, _, err := getPartitionProgress(context.TODO(), qc, &commonpb.MsgBase{}, []string{}, "", 1, "") assert.Error(t, err) } + +func Test_CheckDynamicFieldData(t *testing.T) { + t.Run("normal case", func(t *testing.T) { + jsonData := make([][]byte, 0) + data := map[string]interface{}{ + "bool": true, + "int": 100, + "float": 1.2, + "string": "abc", + "json": map[string]interface{}{ + "int": 20, + "array": []int{1, 2, 3}, + }, + } + jsonBytes, err := json.MarshalIndent(data, "", " ") + assert.NoError(t, err) + jsonData = append(jsonData, jsonBytes) + jsonFieldData := autoGenDynamicFieldData(jsonData) + schema := newTestSchema() + insertMsg := &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + CollectionName: "collectionName", + FieldsData: []*schemapb.FieldData{jsonFieldData}, + NumRows: 1, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + } + err = checkDynamicFieldData(schema, insertMsg) + assert.NoError(t, err) + }) + t.Run("key has $meta", func(t *testing.T) { + jsonData := make([][]byte, 0) + data := map[string]interface{}{ + "bool": true, + "int": 100, + "float": 1.2, + "string": "abc", + "json": map[string]interface{}{ + "int": 20, + "array": []int{1, 2, 3}, + }, + "$meta": "error key", + } + jsonBytes, err := json.MarshalIndent(data, "", " ") + assert.NoError(t, err) + jsonData = append(jsonData, jsonBytes) + jsonFieldData := autoGenDynamicFieldData(jsonData) + schema := newTestSchema() + insertMsg := &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + CollectionName: "collectionName", + FieldsData: []*schemapb.FieldData{jsonFieldData}, + NumRows: 1, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + } + err = checkDynamicFieldData(schema, insertMsg) + assert.Error(t, err) + }) + t.Run("disable dynamic schema", func(t *testing.T) { + jsonData := make([][]byte, 0) + data := map[string]interface{}{ + "bool": true, + "int": 100, + "float": 1.2, + "string": "abc", + "json": map[string]interface{}{ + "int": 20, + "array": []int{1, 2, 3}, + }, + } + jsonBytes, err := json.MarshalIndent(data, "", " ") + assert.NoError(t, err) + jsonData = append(jsonData, jsonBytes) + jsonFieldData := autoGenDynamicFieldData(jsonData) + schema := newTestSchema() + insertMsg := &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + CollectionName: "collectionName", + FieldsData: []*schemapb.FieldData{jsonFieldData}, + NumRows: 1, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + } + schema.EnableDynamicField = false + err = checkDynamicFieldData(schema, insertMsg) + assert.Error(t, err) + }) + t.Run("json data is string", func(t *testing.T) { + data := "abcdefg" + jsonFieldData := autoGenDynamicFieldData([][]byte{[]byte(data)}) + schema := newTestSchema() + insertMsg := &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + CollectionName: "collectionName", + FieldsData: []*schemapb.FieldData{jsonFieldData}, + NumRows: 1, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + } + err := checkDynamicFieldData(schema, insertMsg) + assert.Error(t, err) + }) + t.Run("no json data", func(t *testing.T) { + schema := newTestSchema() + insertMsg := &msgstream.InsertMsg{ + InsertRequest: msgpb.InsertRequest{ + CollectionName: "collectionName", + FieldsData: []*schemapb.FieldData{}, + NumRows: 1, + Version: msgpb.InsertDataVersion_ColumnBased, + }, + } + err := checkDynamicFieldData(schema, insertMsg) + assert.NoError(t, err) + }) +}