Add dynamic schema check in upsert (#26644)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
smellthemoon 2023-08-30 10:52:26 +08:00 committed by GitHub
parent 213db490bd
commit 87ecaac703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 165 additions and 173 deletions

View File

@ -2,7 +2,6 @@ package proxy
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strconv" "strconv"
@ -13,7 +12,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -149,7 +147,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
it.result.SuccIndex = sliceIndex it.result.SuccIndex = sliceIndex
if it.schema.EnableDynamicField { if it.schema.EnableDynamicField {
err = it.checkDynamicFieldData() err = checkDynamicFieldData(it.schema, it.insertMsg)
if err != nil { if err != nil {
return err return err
} }
@ -287,40 +285,3 @@ func (it *insertTask) Execute(ctx context.Context) error {
func (it *insertTask) PostExecute(ctx context.Context) error { func (it *insertTask) PostExecute(ctx context.Context) error {
return nil return nil
} }
func (it *insertTask) verifyDynamicFieldData() error {
for _, field := range it.insertMsg.FieldsData {
if field.GetFieldName() == common.MetaFieldName {
if !it.schema.EnableDynamicField {
return fmt.Errorf("without dynamic schema enabled, the field name cannot be set to %s", common.MetaFieldName)
}
for _, rowData := range field.GetScalars().GetJsonData().GetData() {
jsonData := make(map[string]interface{})
if err := json.Unmarshal(rowData, &jsonData); err != nil {
return err
}
if _, ok := jsonData[common.MetaFieldName]; ok {
return fmt.Errorf("cannot set json key to: %s", common.MetaFieldName)
}
}
}
}
return nil
}
func (it *insertTask) checkDynamicFieldData() error {
for _, data := range it.insertMsg.FieldsData {
if data.IsDynamic {
data.FieldName = common.MetaFieldName
return it.verifyDynamicFieldData()
}
}
defaultData := make([][]byte, it.insertMsg.NRows())
for i := range defaultData {
defaultData[i] = []byte("{}")
}
dynamicData := autoGenDynamicFieldData(defaultData)
it.insertMsg.FieldsData = append(it.insertMsg.FieldsData, dynamicData)
return nil
}

View File

@ -2,7 +2,6 @@ package proxy
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"testing" "testing"
@ -264,135 +263,3 @@ func TestInsertTask(t *testing.T) {
assert.ElementsMatch(t, channels, resChannels) assert.ElementsMatch(t, channels, resChannels)
}) })
} }
func TestInsertTask_checkDynamicFieldData(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
jsonData := make([][]byte, 0)
data := map[string]interface{}{
"bool": true,
"int": 100,
"float": 1.2,
"string": "abc",
"json": map[string]interface{}{
"int": 20,
"array": []int{1, 2, 3},
},
}
jsonBytes, err := json.MarshalIndent(data, "", " ")
assert.NoError(t, err)
jsonData = append(jsonData, jsonBytes)
jsonFieldData := autoGenDynamicFieldData(jsonData)
it := insertTask{
ctx: context.Background(),
insertMsg: &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
schema: newTestSchema(),
}
err = it.checkDynamicFieldData()
assert.NoError(t, err)
})
t.Run("key has $meta", func(t *testing.T) {
jsonData := make([][]byte, 0)
data := map[string]interface{}{
"bool": true,
"int": 100,
"float": 1.2,
"string": "abc",
"json": map[string]interface{}{
"int": 20,
"array": []int{1, 2, 3},
},
"$meta": "error key",
}
jsonBytes, err := json.MarshalIndent(data, "", " ")
assert.NoError(t, err)
jsonData = append(jsonData, jsonBytes)
jsonFieldData := autoGenDynamicFieldData(jsonData)
it := insertTask{
ctx: context.Background(),
insertMsg: &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
schema: newTestSchema(),
}
err = it.checkDynamicFieldData()
assert.Error(t, err)
})
t.Run("disable dynamic schema", func(t *testing.T) {
jsonData := make([][]byte, 0)
data := map[string]interface{}{
"bool": true,
"int": 100,
"float": 1.2,
"string": "abc",
"json": map[string]interface{}{
"int": 20,
"array": []int{1, 2, 3},
},
}
jsonBytes, err := json.MarshalIndent(data, "", " ")
assert.NoError(t, err)
jsonData = append(jsonData, jsonBytes)
jsonFieldData := autoGenDynamicFieldData(jsonData)
it := insertTask{
ctx: context.Background(),
insertMsg: &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
schema: newTestSchema(),
}
it.schema.EnableDynamicField = false
err = it.checkDynamicFieldData()
assert.Error(t, err)
})
t.Run("json data is string", func(t *testing.T) {
data := "abcdefg"
jsonFieldData := autoGenDynamicFieldData([][]byte{[]byte(data)})
it := insertTask{
ctx: context.Background(),
insertMsg: &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
schema: newTestSchema(),
}
err := it.checkDynamicFieldData()
assert.Error(t, err)
})
t.Run("no json data", func(t *testing.T) {
it := insertTask{
ctx: context.Background(),
insertMsg: &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
schema: newTestSchema(),
}
err := it.checkDynamicFieldData()
assert.NoError(t, err)
})
}

View File

@ -170,6 +170,13 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
} }
it.result.SuccIndex = sliceIndex it.result.SuccIndex = sliceIndex
if it.schema.EnableDynamicField {
err := checkDynamicFieldData(it.schema, it.upsertMsg.InsertMsg)
if err != nil {
return err
}
}
// check primaryFieldData whether autoID is true or not // check primaryFieldData whether autoID is true or not
// only allow support autoID == false // only allow support autoID == false
var err error var err error

View File

@ -18,6 +18,7 @@ package proxy
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@ -1391,3 +1392,40 @@ func memsetLoop[T any](v T, numRows int) []T {
return ret return ret
} }
func verifyDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
for _, field := range insertMsg.FieldsData {
if field.GetFieldName() == common.MetaFieldName {
if !schema.EnableDynamicField {
return fmt.Errorf("without dynamic schema enabled, the field name cannot be set to %s", common.MetaFieldName)
}
for _, rowData := range field.GetScalars().GetJsonData().GetData() {
jsonData := make(map[string]interface{})
if err := json.Unmarshal(rowData, &jsonData); err != nil {
return err
}
if _, ok := jsonData[common.MetaFieldName]; ok {
return fmt.Errorf("cannot set json key to: %s", common.MetaFieldName)
}
}
}
}
return nil
}
func checkDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
for _, data := range insertMsg.FieldsData {
if data.IsDynamic {
data.FieldName = common.MetaFieldName
return verifyDynamicFieldData(schema, insertMsg)
}
}
defaultData := make([][]byte, insertMsg.NRows())
for i := range defaultData {
defaultData[i] = []byte("{}")
}
dynamicData := autoGenDynamicFieldData(defaultData)
insertMsg.FieldsData = append(insertMsg.FieldsData, dynamicData)
return nil
}

View File

@ -18,6 +18,7 @@ package proxy
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@ -38,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
@ -1811,3 +1813,120 @@ func Test_GetPartitionProgressFailed(t *testing.T) {
_, _, err := getPartitionProgress(context.TODO(), qc, &commonpb.MsgBase{}, []string{}, "", 1, "") _, _, err := getPartitionProgress(context.TODO(), qc, &commonpb.MsgBase{}, []string{}, "", 1, "")
assert.Error(t, err) assert.Error(t, err)
} }
func Test_CheckDynamicFieldData(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
jsonData := make([][]byte, 0)
data := map[string]interface{}{
"bool": true,
"int": 100,
"float": 1.2,
"string": "abc",
"json": map[string]interface{}{
"int": 20,
"array": []int{1, 2, 3},
},
}
jsonBytes, err := json.MarshalIndent(data, "", " ")
assert.NoError(t, err)
jsonData = append(jsonData, jsonBytes)
jsonFieldData := autoGenDynamicFieldData(jsonData)
schema := newTestSchema()
insertMsg := &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
}
err = checkDynamicFieldData(schema, insertMsg)
assert.NoError(t, err)
})
t.Run("key has $meta", func(t *testing.T) {
jsonData := make([][]byte, 0)
data := map[string]interface{}{
"bool": true,
"int": 100,
"float": 1.2,
"string": "abc",
"json": map[string]interface{}{
"int": 20,
"array": []int{1, 2, 3},
},
"$meta": "error key",
}
jsonBytes, err := json.MarshalIndent(data, "", " ")
assert.NoError(t, err)
jsonData = append(jsonData, jsonBytes)
jsonFieldData := autoGenDynamicFieldData(jsonData)
schema := newTestSchema()
insertMsg := &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
}
err = checkDynamicFieldData(schema, insertMsg)
assert.Error(t, err)
})
t.Run("disable dynamic schema", func(t *testing.T) {
jsonData := make([][]byte, 0)
data := map[string]interface{}{
"bool": true,
"int": 100,
"float": 1.2,
"string": "abc",
"json": map[string]interface{}{
"int": 20,
"array": []int{1, 2, 3},
},
}
jsonBytes, err := json.MarshalIndent(data, "", " ")
assert.NoError(t, err)
jsonData = append(jsonData, jsonBytes)
jsonFieldData := autoGenDynamicFieldData(jsonData)
schema := newTestSchema()
insertMsg := &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
}
schema.EnableDynamicField = false
err = checkDynamicFieldData(schema, insertMsg)
assert.Error(t, err)
})
t.Run("json data is string", func(t *testing.T) {
data := "abcdefg"
jsonFieldData := autoGenDynamicFieldData([][]byte{[]byte(data)})
schema := newTestSchema()
insertMsg := &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{jsonFieldData},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
}
err := checkDynamicFieldData(schema, insertMsg)
assert.Error(t, err)
})
t.Run("no json data", func(t *testing.T) {
schema := newTestSchema()
insertMsg := &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
CollectionName: "collectionName",
FieldsData: []*schemapb.FieldData{},
NumRows: 1,
Version: msgpb.InsertDataVersion_ColumnBased,
},
}
err := checkDynamicFieldData(schema, insertMsg)
assert.NoError(t, err)
})
}