From 1b5554c8cbc4efa43d849ed0227c8787a13885b6 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 10 Apr 2024 17:27:17 +0800 Subject: [PATCH] enhance: Support $meta key for json import (#32013) During JSON import: 1. Allow the specification of the $meta key 2. Prohibit duplicated keys within the $meta field, for instance, `{"id": 1, "vector": [], "x": 6, "$meta": {"x": 8}}` issue: https://github.com/milvus-io/milvus/issues/31835 --------- Signed-off-by: bigsheeper --- internal/util/importutilv2/json/row_parser.go | 62 +++++-- .../util/importutilv2/json/row_parser_test.go | 154 ++++++++++++++++++ 2 files changed, 199 insertions(+), 17 deletions(-) create mode 100644 internal/util/importutilv2/json/row_parser_test.go diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 2c1822dd10..38f5651c82 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -116,10 +116,6 @@ func (r *rowParser) Parse(raw any) (Row, error) { } row[fieldID] = data } else if r.dynamicField != nil { - if key == r.dynamicField.GetName() { - return nil, merr.WrapErrImportFailed( - fmt.Sprintf("dynamic field is enabled, explicit specification of '%s' is not allowed", key)) - } // has dynamic field, put redundant pair to dynamicValues dynamicValues[key] = value } else { @@ -144,23 +140,55 @@ func (r *rowParser) Parse(raw any) (Row, error) { func (r *rowParser) combineDynamicRow(dynamicValues map[string]any, row Row) error { // Combine the dynamic field value - // invalid inputs: - // case 1: {"id": 1, "vector": [], "$meta": {"x": 8}} ==>> "$meta" is not allowed // valid inputs: - // case 2: {"id": 1, "vector": [], "x": 8} ==>> {"id": 1, "vector": [], "$meta": "{\"x\": 8}"} - // case 3: {"id": 1, "vector": []} + // case 1: {"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"} ==>> {"id": 1, "vector": [], "$meta": "{\"y\": 8, \"x\": 8}"} + // case 2: {"id": 1, "vector": [], "x": 8, "$meta": {}} ==>> {"id": 1, "vector": [], "$meta": {\"x\": 8}} + // case 3: {"id": 1, "vector": [], "$meta": "{\"x\": 8}"} + // case 4: {"id": 1, "vector": [], "$meta": {"x": 8}} + // case 5: {"id": 1, "vector": [], "$meta": {}} + // case 6: {"id": 1, "vector": [], "x": 8} ==>> {"id": 1, "vector": [], "$meta": "{\"x\": 8}"} + // case 7: {"id": 1, "vector": []} + // invalid inputs: + // case 8: {"id": 1, "vector": [], "x": 6, "$meta": {"x": 8}} ==>> duplicated key is not allowed + // case 9: {"id": 1, "vector": [], "x": 6, "$meta": "{\"x\": 8}"} ==>> duplicated key is not allowed dynamicFieldID := r.dynamicField.GetFieldID() - if len(dynamicValues) > 0 { - // case 2 - data, err := r.parseEntity(dynamicFieldID, dynamicValues) - if err != nil { - return err - } - row[dynamicFieldID] = data - } else { - // case 3 + if len(dynamicValues) == 0 { + // case 7 row[dynamicFieldID] = []byte("{}") + return nil } + + if obj, ok := dynamicValues[r.dynamicField.GetName()]; ok { + var mp map[string]interface{} + switch value := obj.(type) { + case string: + // case 1, 3 + err := json.Unmarshal([]byte(value), &mp) + if err != nil { + return merr.WrapErrImportFailed("illegal value for dynamic field, not a JSON format string") + } + case map[string]interface{}: + // case 2, 4, 5 + mp = value + default: + // invalid input + return merr.WrapErrImportFailed("illegal value for dynamic field, not a JSON object") + } + delete(dynamicValues, r.dynamicField.GetName()) + for k, v := range mp { + if _, ok = dynamicValues[k]; ok { + // case 8, 9 + return merr.WrapErrImportFailed(fmt.Sprintf("duplicated key is not allowed, key=%s", k)) + } + dynamicValues[k] = v + } + } + data, err := r.parseEntity(dynamicFieldID, dynamicValues) + if err != nil { + return err + } + row[dynamicFieldID] = data + return nil } diff --git a/internal/util/importutilv2/json/row_parser_test.go b/internal/util/importutilv2/json/row_parser_test.go new file mode 100644 index 0000000000..153a7777b2 --- /dev/null +++ b/internal/util/importutilv2/json/row_parser_test.go @@ -0,0 +1,154 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package json + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" +) + +func TestRowParser_Parse_Valid(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + Name: "id", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 2, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "0"}}, + }, + { + FieldID: 3, + Name: "$meta", + IsDynamic: true, + DataType: schemapb.DataType_JSON, + }, + }, + } + r, err := NewRowParser(schema) + assert.NoError(t, err) + + type testCase struct { + name string // input json + dyFields []string // expect dynamic fields + } + + cases := []testCase{ + {name: `{"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"}`, dyFields: []string{"x", "y"}}, + {name: `{"id": 1, "vector": [], "x": 8, "$meta": {}}`, dyFields: []string{"x"}}, + {name: `{"id": 1, "vector": [], "$meta": "{\"x\": 8}"}`, dyFields: []string{"x"}}, + {name: `{"id": 1, "vector": [], "$meta": {"x": 8}}`, dyFields: []string{"x"}}, + {name: `{"id": 1, "vector": [], "$meta": {}}`, dyFields: nil}, + {name: `{"id": 1, "vector": [], "x": 8}`, dyFields: []string{"x"}}, + {name: `{"id": 1, "vector": []}`, dyFields: nil}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var mp map[string]interface{} + + desc := json.NewDecoder(strings.NewReader(c.name)) + desc.UseNumber() + err = desc.Decode(&mp) + assert.NoError(t, err) + + row, err := r.Parse(mp) + assert.NoError(t, err) + + // validate contains fields + for _, field := range schema.GetFields() { + _, ok := row[field.GetFieldID()] + assert.True(t, ok) + } + + // validate dynamic fields + var dynamicFields map[string]interface{} + err = json.Unmarshal(row[r.(*rowParser).dynamicField.GetFieldID()].([]byte), &dynamicFields) + assert.NoError(t, err) + for _, k := range c.dyFields { + _, ok := dynamicFields[k] + assert.True(t, ok) + } + }) + } +} + +func TestRowParser_Parse_Invalid(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + Name: "id", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 2, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "0"}}, + }, + { + FieldID: 3, + Name: "$meta", + IsDynamic: true, + DataType: schemapb.DataType_JSON, + }, + }, + } + r, err := NewRowParser(schema) + assert.NoError(t, err) + + type testCase struct { + name string // input json + expectErr string + } + + cases := []testCase{ + {name: `{"id": 1, "vector": [], "x": 6, "$meta": {"x": 8}}`, expectErr: "duplicated key is not allowed"}, + {name: `{"id": 1, "vector": [], "x": 6, "$meta": "{\"x\": 8}"}`, expectErr: "duplicated key is not allowed"}, + {name: `{"id": 1, "vector": [], "x": 6, "$meta": "{*&%%&$*(&"}`, expectErr: "not a JSON format string"}, + {name: `{"id": 1, "vector": [], "x": 6, "$meta": []}`, expectErr: "not a JSON object"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var mp map[string]interface{} + + desc := json.NewDecoder(strings.NewReader(c.name)) + desc.UseNumber() + err = desc.Decode(&mp) + assert.NoError(t, err) + + _, err = r.Parse(mp) + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), c.expectErr)) + }) + } +}