Support dynamic field for bulkinsert (#24265) (#24304)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
groot 2023-05-23 14:17:25 +08:00 committed by GitHub
parent 411374abde
commit 97cd0409e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 427 additions and 15 deletions

View File

@ -309,6 +309,8 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
}
case schemapb.DataType_JSON:
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
// for JSON data, we accept two kinds input: string and map[string]interface
// user can write JSON content as {"FieldJSON": "{\"x\": 8}"} or {"FieldJSON": {"x": 8}}
if value, ok := obj.(string); ok {
var dummy interface{}
err := json.Unmarshal([]byte(value), &dummy)
@ -316,6 +318,12 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return fmt.Errorf("failed to parse value '%v' for JSON field '%s', error: %w", value, schema.GetName(), err)
}
field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, []byte(value))
} else if mp, ok := obj.(map[string]interface{}); ok {
bs, err := json.Marshal(mp)
if err != nil {
return fmt.Errorf("failed to parse value for JSON field '%s', error: %w", schema.GetName(), err)
}
field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, bs)
} else {
return fmt.Errorf("illegal value '%v' for JSON type field '%s'", obj, schema.GetName())
}
@ -372,6 +380,48 @@ func triggerGC() {
debug.FreeOSMemory()
}
// if user didn't provide dynamic data, fill the dynamic field by "{}"
func fillDynamicData(blockData map[storage.FieldID]storage.FieldData, collectionSchema *schemapb.CollectionSchema) error {
if !collectionSchema.GetEnableDynamicField() {
return nil
}
dynamicFieldID := int64(-1)
for i := 0; i < len(collectionSchema.Fields); i++ {
schema := collectionSchema.Fields[i]
if schema.GetIsDynamic() {
dynamicFieldID = schema.GetFieldID()
break
}
}
if dynamicFieldID < 0 {
return fmt.Errorf("the collection schema is dynamic but dynamic field is not found")
}
rowCount := 0
if len(blockData) > 0 {
for _, v := range blockData {
rowCount = v.RowNum()
}
}
_, ok := blockData[dynamicFieldID]
if !ok {
data := &storage.JSONFieldData{
Data: make([][]byte, 0),
}
bs := []byte("{}")
for i := 0; i < rowCount; i++ {
data.Data = append(data.Data, bs)
}
blockData[dynamicFieldID] = data
}
return nil
}
// tryFlushBlocks does the two things:
// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
// 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block
@ -407,7 +457,12 @@ func tryFlushBlocks(ctx context.Context,
// force to flush, called at the end of Read()
if force && rowCount > 0 {
printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil)
err := callFlushFunc(blockData, i)
err := fillDynamicData(blockData, collectionSchema)
if err != nil {
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
}
err = callFlushFunc(blockData, i)
if err != nil {
log.Error("Import util: failed to force flush block data", zap.Int("shardID", i), zap.Error(err))
return fmt.Errorf("failed to force flush block data for shard id %d, error: %w", i, err)
@ -426,7 +481,12 @@ func tryFlushBlocks(ctx context.Context,
// initialize a new FieldData list for next round batch read
if size > int(blockSize) && rowCount > 0 {
printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil)
err := callFlushFunc(blockData, i)
err := fillDynamicData(blockData, collectionSchema)
if err != nil {
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
}
err = callFlushFunc(blockData, i)
if err != nil {
log.Error("Import util: failed to flush block data", zap.Int("shardID", i), zap.Error(err))
return fmt.Errorf("failed to flush block data for shard id %d, error: %w", i, err)
@ -470,7 +530,12 @@ func tryFlushBlocks(ctx context.Context,
if rowCount > 0 {
printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil)
err := callFlushFunc(blockData, biggestItem)
err := fillDynamicData(blockData, collectionSchema)
if err != nil {
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
}
err = callFlushFunc(blockData, biggestItem)
if err != nil {
log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem))
return fmt.Errorf("failed to flush biggest block data for shard id %d, error: %w", biggestItem, err)

View File

@ -393,6 +393,10 @@ func Test_InitValidators(t *testing.T) {
validVal = "aa"
checkConvertFunc("FieldString", validVal, nil)
validVal = map[string]interface{}{"x": 5, "y": true, "z": "hello"}
checkConvertFunc("FieldJSON", validVal, nil)
checkConvertFunc("FieldJSON", "{\"x\": 8}", "{")
// the binary vector dimension is 16, shoud input two uint8 values, each value should between 0~255
validVal = []interface{}{jsonNumber("100"), jsonNumber("101")}
invalidVal = []interface{}{jsonNumber("100"), jsonNumber("1256")}
@ -540,6 +544,97 @@ func Test_GetFieldDimension(t *testing.T) {
assert.Equal(t, 0, dim)
}
func Test_FillDynamicData(t *testing.T) {
ctx := context.Background()
schema := &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
EnableDynamicField: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 106,
Name: "FieldID",
IsPrimaryKey: true,
AutoID: false,
Description: "int64",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 113,
Name: "FieldDynamic",
IsPrimaryKey: false,
IsDynamic: true,
Description: "dynamic field",
DataType: schemapb.DataType_JSON,
},
},
}
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
rowCount := 1000
idData := &storage.Int64FieldData{
Data: make([]int64, 0),
}
for i := 0; i < rowCount; i++ {
idData.Data = append(idData.Data, int64(i)) // this is primary key
}
t.Run("dynamic field is filled", func(t *testing.T) {
blockData := map[storage.FieldID]storage.FieldData{
106: idData,
}
segmentsData := []map[storage.FieldID]storage.FieldData{
blockData,
}
err := fillDynamicData(blockData, schema)
assert.NoError(t, err)
assert.Equal(t, 2, len(blockData))
assert.Contains(t, blockData, int64(113))
assert.Equal(t, rowCount, blockData[113].RowNum())
assert.Equal(t, []byte("{}"), blockData[113].GetRow(0).([]byte))
err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1, 1, false)
assert.NoError(t, err)
})
t.Run("collection is dynamic by no dynamic field", func(t *testing.T) {
blockData := map[storage.FieldID]storage.FieldData{
106: idData,
}
schema.Fields[1].IsDynamic = false
err := fillDynamicData(blockData, schema)
assert.Error(t, err)
segmentsData := []map[storage.FieldID]storage.FieldData{
blockData,
}
err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024*1024, 1, true)
assert.Error(t, err)
err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024, 1, false)
assert.Error(t, err)
err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024*1024, 1, false)
assert.Error(t, err)
})
t.Run("collection is not dynamic", func(t *testing.T) {
blockData := map[storage.FieldID]storage.FieldData{
106: idData,
}
schema.EnableDynamicField = false
err := fillDynamicData(blockData, schema)
assert.NoError(t, err)
})
}
func Test_TryFlushBlocks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -242,9 +242,9 @@ func Test_ImportWrapperRowBased(t *testing.T) {
content := []byte(`{
"rows":[
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": {"x": 2}, "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldJSON": "{\"k\": 2.5}", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": "{\"y\": \"hello\"}", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": {"y": "hello"}, "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldJSON": "{\"x\": true}", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
]

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
const (
@ -44,17 +45,17 @@ type IOReader struct {
}
type JSONParser struct {
ctx context.Context // for canceling parse process
bufRowCount int // max rows in a buffer
fields map[string]int64 // fields need to be parsed
ctx context.Context // for canceling parse process
bufRowCount int // max rows in a buffer
name2FieldID map[string]storage.FieldID
updateProgressFunc func(percent int64) // update working progress percent value
dynamicFieldID storage.FieldID // dynamic field id, set to -1 if no dynamic field
}
// NewJSONParser helper function to create a JSONParser
func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSchema, updateProgressFunc func(percent int64)) *JSONParser {
fields := make(map[string]int64)
name2FieldID := make(map[string]storage.FieldID)
dynamicFieldID := int64(-1)
for i := 0; i < len(collectionSchema.Fields); i++ {
schema := collectionSchema.Fields[i]
// RowIDField and TimeStampField is internal field, no need to parse
@ -66,16 +67,18 @@ func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSch
continue
}
fields[schema.GetName()] = 0
name2FieldID[schema.GetName()] = schema.GetFieldID()
if schema.GetIsDynamic() && collectionSchema.GetEnableDynamicField() {
dynamicFieldID = schema.GetFieldID()
}
}
parser := &JSONParser{
ctx: ctx,
bufRowCount: 1024,
fields: fields,
name2FieldID: name2FieldID,
updateProgressFunc: updateProgressFunc,
dynamicFieldID: dynamicFieldID,
}
adjustBufSize(parser, collectionSchema)
@ -108,6 +111,50 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche
parser.bufRowCount = bufRowCount
}
func (p *JSONParser) combineDynamicRow(dynamicValues map[string]interface{}, row map[storage.FieldID]interface{}) error {
if p.dynamicFieldID < 0 {
return nil
}
// combine the dynamic field value
// valid input:
// case 1: {"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"}
// case 2: {"id": 1, "vector": [], "x": 8, "$meta": {}}
// case 3: {"id": 1, "vector": [], "$meta": "{\"x\": 8}"}
// case 4: {"id": 1, "vector": [], "$meta": {"x": 8}}
// case 5: {"id": 1, "vector": [], "$meta": {}}
// case 6: {"id": 1, "vector": [], "x": 8}
// case 7: {"id": 1, "vector": []}
obj, ok := row[p.dynamicFieldID]
if ok {
if len(dynamicValues) > 0 {
if value, is := obj.(string); is {
// case 1
mp := make(map[string]interface{})
json.Unmarshal([]byte(value), &mp)
maps.Copy(dynamicValues, mp)
} else if mp, is := obj.(map[string]interface{}); is {
// case 2
maps.Copy(dynamicValues, mp)
} else {
// invalid input
return errors.New("illegal value for dynamic field")
}
row[p.dynamicFieldID] = dynamicValues
}
// else case 3/4/5
} else {
if len(dynamicValues) > 0 {
// case 6
row[p.dynamicFieldID] = dynamicValues
} else {
// case 7
row[p.dynamicFieldID] = "{}"
}
}
return nil
}
func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}, error) {
stringMap, ok := raw.(map[string]interface{})
if !ok {
@ -115,20 +162,29 @@ func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}
return nil, errors.New("invalid JSON format, each row should be a key-value map")
}
dynamicValues := make(map[string]interface{})
row := make(map[storage.FieldID]interface{})
for k, v := range stringMap {
// if user provided redundant field, return error
fieldID, ok := p.name2FieldID[k]
if !ok {
if ok {
row[fieldID] = v
} else if p.dynamicFieldID >= 0 {
// has dynamic field. put redundant pair to dynamicValues
dynamicValues[k] = v
} else {
// no dynamic field. if user provided redundant field, return error
log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k))
return nil, fmt.Errorf("the field '%s' is not defined in collection schema", k)
}
row[fieldID] = v
}
// some fields not provided?
if len(row) != len(p.name2FieldID) {
for k, v := range p.name2FieldID {
if v == p.dynamicFieldID {
// dyanmic field, allow user ignore this field
continue
}
_, ok := row[v]
if !ok {
log.Error("JSON parser: a field value is missed", zap.String("fieldName", k))
@ -137,7 +193,9 @@ func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}
}
}
return row, nil
// combine the redundant pairs into dunamic field(if has)
err := p.combineDynamicRow(dynamicValues, row)
return row, err
}
func (p *JSONParser) ParseRows(reader *IOReader, handler JSONRowHandler) error {

View File

@ -385,3 +385,189 @@ func Test_JSONParserParseRows_StrPK(t *testing.T) {
}
}
}
func Test_JSONParserCombineDynamicRow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schema := &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
EnableDynamicField: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 106,
Name: "FieldID",
IsPrimaryKey: true,
AutoID: false,
Description: "int64",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 113,
Name: "FieldDynamic",
IsPrimaryKey: false,
IsDynamic: true,
Description: "dynamic field",
DataType: schemapb.DataType_JSON,
},
},
}
parser := NewJSONParser(ctx, schema, nil)
assert.NotNil(t, parser)
// valid input:
// case 1: {"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"}
// case 2: {"id": 1, "vector": [], "x": 8, "$meta": {}}
// case 3: {"id": 1, "vector": [], "$meta": "{\"x\": 8}"}
// case 4: {"id": 1, "vector": [], "$meta": {"x": 8}}
// case 5: {"id": 1, "vector": [], "$meta": {}}
// case 6: {"id": 1, "vector": [], "x": 8}
// case 7: {"id": 1, "vector": []}
// case 1
dynamicValues := map[string]interface{}{
"x": 8,
}
row := map[storage.FieldID]interface{}{
106: 1,
113: "{\"y\": 8}",
}
err := parser.combineDynamicRow(dynamicValues, row)
assert.NoError(t, err)
assert.Contains(t, row, int64(113))
assert.Contains(t, row[113], "x")
assert.Contains(t, row[113], "y")
// case 2
dynamicValues = map[string]interface{}{
"x": 8,
}
row = map[storage.FieldID]interface{}{
106: 1,
113: map[string]interface{}{},
}
err = parser.combineDynamicRow(dynamicValues, row)
assert.NoError(t, err)
assert.Contains(t, row, int64(113))
assert.Contains(t, row[113], "x")
// case 3/4/5
dynamicValues = map[string]interface{}{}
row = map[storage.FieldID]interface{}{
106: 1,
113: "{\"x\": 8}",
}
err = parser.combineDynamicRow(dynamicValues, row)
assert.NoError(t, err)
assert.Contains(t, row, int64(113))
// case 6
dynamicValues = map[string]interface{}{
"x": 8,
}
row = map[storage.FieldID]interface{}{
106: 1,
}
err = parser.combineDynamicRow(dynamicValues, row)
assert.NoError(t, err)
assert.Contains(t, row, int64(113))
assert.Contains(t, row[113], "x")
// case 7
dynamicValues = map[string]interface{}{}
row = map[storage.FieldID]interface{}{
106: 1,
}
err = parser.combineDynamicRow(dynamicValues, row)
assert.NoError(t, err)
assert.Contains(t, row, int64(113))
assert.Equal(t, "{}", row[113])
// invalid input
dynamicValues = map[string]interface{}{
"x": 8,
}
row = map[storage.FieldID]interface{}{
106: 1,
113: 5,
}
err = parser.combineDynamicRow(dynamicValues, row)
assert.Error(t, err)
// no dynamic field
parser.dynamicFieldID = -1
dynamicValues = map[string]interface{}{
"x": 8,
}
row = map[storage.FieldID]interface{}{
106: 1,
}
err = parser.combineDynamicRow(dynamicValues, row)
assert.NoError(t, err)
assert.NotContains(t, row, int64(113))
}
func Test_JSONParserVerifyRow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schema := &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
EnableDynamicField: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 106,
Name: "FieldID",
IsPrimaryKey: true,
AutoID: false,
Description: "int64",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 113,
Name: "FieldDynamic",
IsPrimaryKey: false,
IsDynamic: true,
Description: "dynamic field",
DataType: schemapb.DataType_JSON,
},
},
}
parser := NewJSONParser(ctx, schema, nil)
assert.NotNil(t, parser)
assert.Equal(t, int64(113), parser.dynamicFieldID)
// dynamic field provided
raw := map[string]interface{}{
"FieldID": 100,
"FieldDynamic": "{\"x\": 8}",
"y": true,
}
row, err := parser.verifyRow(raw)
assert.NoError(t, err)
assert.Contains(t, row, int64(106))
assert.Contains(t, row, int64(113))
assert.Contains(t, row[113], "x")
assert.Contains(t, row[113], "y")
// dynamic field not provided
raw = map[string]interface{}{
"FieldID": 100,
}
row, err = parser.verifyRow(raw)
assert.NoError(t, err)
assert.Contains(t, row, int64(106))
assert.Contains(t, row, int64(113))
assert.Equal(t, "{}", row[113])
// invalid input dynamic field
raw = map[string]interface{}{
"FieldID": 100,
"FieldDynamic": true,
"y": true,
}
_, err = parser.verifyRow(raw)
assert.Error(t, err)
}

View File

@ -143,8 +143,12 @@ func (p *NumpyParser) Parse(filePaths []string) error {
// validateFileNames is to check redundant file and missed file
func (p *NumpyParser) validateFileNames(filePaths []string) error {
dynamicFieldName := ""
requiredFieldNames := make(map[string]interface{})
for _, schema := range p.collectionSchema.Fields {
if schema.GetIsDynamic() && p.collectionSchema.GetEnableDynamicField() {
dynamicFieldName = schema.GetName()
}
if schema.GetIsPrimaryKey() {
if !schema.GetAutoID() {
requiredFieldNames[schema.GetName()] = nil
@ -168,6 +172,10 @@ func (p *NumpyParser) validateFileNames(filePaths []string) error {
// check missed file
for name := range requiredFieldNames {
if name == dynamicFieldName {
// dynamic schema field file is not required
continue
}
_, ok := fileNames[name]
if !ok {
log.Error("Numpy parser: there is no file corresponding to field", zap.String("fieldName", name))