enhance: Constraint dynamic field as key-value format (#31183)

issue: #31051

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-03-12 12:45:03 +08:00 committed by GitHub
parent 3298e64bd3
commit de2c95d00c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 323 additions and 17 deletions

View File

@ -36,9 +36,7 @@ func (v *ParserVisitor) translateIdentifier(identifier string) (*ExprWithType, e
if identifier != field.Name {
nestedPath = append(nestedPath, identifier)
}
//if typeutil.IsJSONType(field.DataType) && len(nestedPath) == 0 {
// return nil, fmt.Errorf("can not comparisons jsonField directly")
//}
return &ExprWithType{
expr: &planpb.Expr{
Expr: &planpb.Expr_ColumnExpr{

View File

@ -682,6 +682,16 @@ func Test_JSONExpr(t *testing.T) {
`A == [1,2,3]`,
`A + 1.2 == 3.3`,
`A + 1 == 2`,
`JSONField > 0`,
`JSONField == 0`,
`JSONField < 100`,
`0 < JSONField < 100`,
`20 > JSONField > 0`,
`JSONField + 5 > 0`,
`JSONField > 2 + 5`,
`JSONField * 2 > 5`,
`JSONField / 2 > 5`,
`JSONField % 10 > 5`,
}
for _, expr = range exprs {
_, err = CreateSearchPlan(schema, expr, "FloatVectorField", &planpb.QueryInfo{
@ -703,13 +713,6 @@ func Test_InvalidExprOnJSONField(t *testing.T) {
`exists $meta`,
`exists JSONField`,
`exists ArrayField`,
//`$meta > 0`,
//`JSONField == 0`,
//`$meta < 100`,
//`0 < $meta < 100`,
//`20 > $meta > 0`,
//`$meta + 5 > 0`,
//`$meta > 2 + 5`,
`exists $meta["A"] > 10 `,
`exists Int64Field`,
`A[[""B""]] > 10`,
@ -860,6 +863,8 @@ func Test_JSONContains(t *testing.T) {
`array_contains(A, [1,2,3])`,
`array_contains(ArrayField, [1,2,3])`,
`array_contains(ArrayField, 1)`,
`json_contains(JSONField, 5)`,
`json_contains($meta, 1)`,
}
for _, expr = range exprs {
_, err = CreateSearchPlan(schema, expr, "FloatVectorField", &planpb.QueryInfo{
@ -882,7 +887,6 @@ func Test_InvalidJSONContains(t *testing.T) {
`json_contains([1,2,3], 1)`,
`json_contains([1,2,3], [1,2,3])`,
`json_contains([1,2,3], [1,2])`,
//`json_contains($meta, 1)`,
`json_contains(A, B)`,
`not json_contains(A, B)`,
`json_contains(A, B > 5)`,
@ -890,9 +894,8 @@ func Test_InvalidJSONContains(t *testing.T) {
`json_contains(A, StringField > 5)`,
`json_contains(A)`,
`json_contains(A, 5, C)`,
//`json_contains(JSONField, 5)`,
//`json_Contains(JSONField, 5)`,
//`JSON_contains(JSONField, 5)`,
`json_Contains(JSONField, 5)`,
`JSON_contains(JSONField, 5)`,
}
for _, expr = range exprs {
_, err = CreateSearchPlan(schema, expr, "FloatVectorField", &planpb.QueryInfo{

View File

@ -1,6 +1,7 @@
package proxy
import (
"encoding/json"
"fmt"
"math"
"reflect"
@ -369,6 +370,19 @@ func (v *validateUtil) checkJSONFieldData(field *schemapb.FieldData, fieldSchema
}
}
if fieldSchema.GetIsDynamic() {
var jsonMap map[string]interface{}
for _, data := range jsonArray {
err := json.Unmarshal(data, &jsonMap)
if err != nil {
log.Warn("insert invalid JSON data, milvus only support json map without nesting",
zap.ByteString("data", data),
zap.Error(err),
)
return merr.WrapErrIoFailedReason(err.Error())
}
}
}
return nil
}

View File

@ -3422,4 +3422,28 @@ func Test_validateUtil_checkJSONData(t *testing.T) {
err := v.checkJSONFieldData(data, f)
assert.Error(t, err)
})
t.Run("invalid_JSON_data", func(t *testing.T) {
v := newValidateUtil(withOverflowCheck(), withMaxLenCheck())
jsonData := "hello"
f := &schemapb.FieldSchema{
DataType: schemapb.DataType_JSON,
IsDynamic: true,
}
data := &schemapb.FieldData{
FieldName: "json",
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_JsonData{
JsonData: &schemapb.JSONArray{
Data: [][]byte{[]byte(jsonData)},
},
},
},
},
}
err := v.checkJSONFieldData(data, f)
assert.Error(t, err)
})
}

View File

@ -353,6 +353,12 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
if err != nil {
return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for JSON field '%s', error: %v", value, schema.GetName(), err))
}
if schema.GetIsDynamic() {
var dummy2 map[string]interface{}
if err := json.Unmarshal([]byte(value), &dummy2); err != nil {
return merr.WrapErrImportFailed(fmt.Sprintf("failed to parse value '%v' for dynamic JSON field '%s', error: %v", value, schema.GetName(), err))
}
}
field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, []byte(value))
} else if mp, ok := obj.(map[string]interface{}); ok {
bs, err := json.Marshal(mp)

View File

@ -1280,3 +1280,37 @@ func Test_UpdateKVInfo(t *testing.T) {
assert.Equal(t, 2, len(infos))
assert.Equal(t, "5", infos[1].Value)
}
func Test_DynamicField(t *testing.T) {
schema := &schemapb.CollectionSchema{
Name: "dynamic_field",
Description: "",
Fields: []*schemapb.FieldSchema{
{
FieldID: 999,
Name: "$meta",
DataType: schemapb.DataType_JSON,
IsDynamic: true,
},
},
EnableDynamicField: true,
}
validators := make(map[storage.FieldID]*Validator)
// success case
err := initValidators(schema, validators)
assert.NoError(t, err)
v, ok := validators[999]
assert.True(t, ok)
fields := initBlockData(schema)
assert.NotNil(t, fields)
fieldData := fields[999]
err = v.convertFunc("{\"x\": 123}", fieldData)
assert.NoError(t, err)
err = v.convertFunc("123", fieldData)
assert.Error(t, err)
}

View File

@ -140,7 +140,19 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount
case schemapb.DataType_JSON:
jsonData := make([][]byte, 0)
for i := 0; i < rowCount; i++ {
jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i)))
if i%4 == 0 {
v, _ := json.Marshal("{\"a\": \"%s\", \"b\": %d}")
jsonData = append(jsonData, v)
} else if i%4 == 1 {
v, _ := json.Marshal(i)
jsonData = append(jsonData, v)
} else if i%4 == 2 {
v, _ := json.Marshal(float32(i) * 0.1)
jsonData = append(jsonData, v)
} else if i%4 == 3 {
v, _ := json.Marshal(strconv.Itoa(i))
jsonData = append(jsonData, v)
}
}
insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData}
case schemapb.DataType_Array:

View File

@ -178,6 +178,15 @@ func (c *FieldReader) Next(count int64) (any, error) {
return nil, merr.WrapErrImportFailed(
fmt.Sprintf("failed to parse value '%v' for JSON field '%s', error: %v", str, c.field.GetName(), err))
}
if c.field.GetIsDynamic() {
var dummy2 map[string]interface{}
err = json.Unmarshal([]byte(str), &dummy2)
if err != nil {
return nil, merr.WrapErrImportFailed(
fmt.Sprintf("failed to parse value '%v' for dynamic JSON field '%s', error: %v",
str, c.field.GetName(), err))
}
}
byteArr = append(byteArr, []byte(str))
}
data = byteArr

View File

@ -20,6 +20,7 @@ import (
"bytes"
"context"
rand2 "crypto/rand"
"encoding/json"
"fmt"
"io"
"math"
@ -145,7 +146,19 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount
case schemapb.DataType_JSON:
jsonData := make([][]byte, 0)
for i := 0; i < rowCount; i++ {
jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i)))
if i%4 == 0 {
v, _ := json.Marshal("{\"a\": \"%s\", \"b\": %d}")
jsonData = append(jsonData, v)
} else if i%4 == 1 {
v, _ := json.Marshal(i)
jsonData = append(jsonData, v)
} else if i%4 == 2 {
v, _ := json.Marshal(float32(i) * 0.1)
jsonData = append(jsonData, v)
} else if i%4 == 3 {
v, _ := json.Marshal(strconv.Itoa(i))
jsonData = append(jsonData, v)
}
}
insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData}
case schemapb.DataType_Array:
@ -302,6 +315,115 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
checkFn(res, 0, suite.numRows)
}
func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) {
const dim = 8
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
DataType: suite.pkDataType,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "256",
},
},
},
{
FieldID: 101,
Name: "vec",
DataType: suite.vecDataType,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: fmt.Sprintf("%d", dim),
},
},
},
{
FieldID: 102,
Name: dt.String(),
DataType: dt,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "256",
},
},
IsDynamic: isDynamic,
},
},
}
insertData := createInsertData(suite.T(), schema, suite.numRows)
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
files := make(map[int64]string)
for _, field := range schema.GetFields() {
files[field.GetFieldID()] = fmt.Sprintf("%s.npy", field.GetName())
}
cm := mocks.NewChunkManager(suite.T())
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if dataType == schemapb.DataType_JSON {
jsonStrs := make([]string, 0, fieldData.RowNum())
for i := 0; i < fieldData.RowNum(); i++ {
row := fieldData.GetRow(i)
jsonStrs = append(jsonStrs, string(row.([]byte)))
}
reader, err := CreateReader(jsonStrs)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_FloatVector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_BinaryVector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8)
chunkedRows := make([][dim / 8]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else {
reader, err := CreateReader(insertData.Data[fieldID].GetRows())
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
}
}
reader, err := NewReader(context.Background(), schema, lo.Values(files), cm, math.MaxInt)
suite.NoError(err)
_, err = reader.Read()
suite.Error(err)
}
func (suite *ReaderSuite) TestReadScalarFields() {
suite.run(schemapb.DataType_Bool)
suite.run(schemapb.DataType_Int8)
@ -312,6 +434,7 @@ func (suite *ReaderSuite) TestReadScalarFields() {
suite.run(schemapb.DataType_Double)
suite.run(schemapb.DataType_VarChar)
suite.run(schemapb.DataType_JSON)
suite.failRun(schemapb.DataType_JSON, true)
}
func (suite *ReaderSuite) TestStringPK() {

View File

@ -111,6 +111,13 @@ func (c *FieldReader) Next(count int64) (any, error) {
if err != nil {
return nil, err
}
if c.field.GetIsDynamic() {
var dummy2 map[string]interface{}
err = json.Unmarshal([]byte(str), &dummy2)
if err != nil {
return nil, err
}
}
byteArr = append(byteArr, []byte(str))
}
return byteArr, nil

View File

@ -18,6 +18,7 @@ package parquet
import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
@ -237,7 +238,19 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int, isBi
case schemapb.DataType_JSON:
builder := array.NewStringBuilder(mem)
for i := 0; i < rows; i++ {
builder.Append(fmt.Sprintf("{\"a\": \"%s\", \"b\": %d}", randomString(3), i))
if i%4 == 0 {
v, _ := json.Marshal(fmt.Sprintf("{\"a\": \"%s\", \"b\": %d}", randomString(3), i))
builder.Append(string(v))
} else if i%4 == 1 {
v, _ := json.Marshal(i)
builder.Append(string(v))
} else if i%4 == 2 {
v, _ := json.Marshal(float32(i) * 0.1)
builder.Append(string(v))
} else if i%4 == 3 {
v, _ := json.Marshal(randomString(10))
builder.Append(string(v))
}
}
return builder.NewStringArray()
case schemapb.DataType_Array:
@ -428,6 +441,68 @@ func (s *ReaderSuite) run(dt schemapb.DataType) {
checkFn(res, 0, s.numRows)
}
func (s *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
DataType: s.pkDataType,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "256",
},
},
},
{
FieldID: 101,
Name: "vec",
DataType: s.vecDataType,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "8",
},
},
},
{
FieldID: 102,
Name: dt.String(),
DataType: dt,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "256",
},
},
IsDynamic: isDynamic,
},
},
}
filePath := fmt.Sprintf("/tmp/test_%d_reader.parquet", rand.Int())
defer os.Remove(filePath)
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(s.T(), err)
err = writeParquet(wf, schema, s.numRows)
assert.NoError(s.T(), err)
ctx := context.Background()
f := storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus_test/test_parquet_reader/"))
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(s.T(), err)
cmReader, err := cm.Reader(ctx, filePath)
assert.NoError(s.T(), err)
reader, err := NewReader(ctx, schema, cmReader, 64*1024*1024)
s.NoError(err)
_, err = reader.Read()
s.Error(err)
}
func (s *ReaderSuite) TestReadScalarFields() {
s.run(schemapb.DataType_Bool)
s.run(schemapb.DataType_Int8)
@ -439,6 +514,7 @@ func (s *ReaderSuite) TestReadScalarFields() {
s.run(schemapb.DataType_VarChar)
s.run(schemapb.DataType_Array)
s.run(schemapb.DataType_JSON)
s.failRun(schemapb.DataType_JSON, true)
}
func (s *ReaderSuite) TestStringPK() {