mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Support JSON for bulkinsert (#24130)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
parent
e6eb477074
commit
b89f06dd19
@ -726,6 +726,16 @@ func (p *BinlogAdapter) readInsertlog(fieldID storage.FieldID, logPath string,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case schemapb.DataType_JSON:
|
||||
data, err := binlogFile.ReadJSON()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.dispatchBytesToShards(data, memoryData, shardList, fieldID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case schemapb.DataType_BinaryVector:
|
||||
data, dim, err := binlogFile.ReadBinaryVector()
|
||||
if err != nil {
|
||||
@ -938,6 +948,29 @@ func (p *BinlogAdapter) dispatchVarcharToShards(data []string, memoryData []map[
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *BinlogAdapter) dispatchBytesToShards(data [][]byte, memoryData []map[storage.FieldID]storage.FieldData,
|
||||
shardList []int32, fieldID storage.FieldID) error {
|
||||
// verify row count
|
||||
if len(data) != len(shardList) {
|
||||
log.Error("Binlog adapter: JSON field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
|
||||
return fmt.Errorf("varchar JSON row count %d is not equal to shard list row count %d", len(data), len(shardList))
|
||||
}
|
||||
|
||||
// dispatch entities acoording to shard list
|
||||
for i, val := range data {
|
||||
shardID := shardList[i]
|
||||
if shardID < 0 {
|
||||
continue // this entity has been deleted or excluded by timestamp
|
||||
}
|
||||
|
||||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, val)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *BinlogAdapter) dispatchBinaryVecToShards(data []byte, dim int, memoryData []map[storage.FieldID]storage.FieldData,
|
||||
shardList []int32, fieldID storage.FieldID) error {
|
||||
// verify row count
|
||||
|
@ -18,6 +18,7 @@ package importutil
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
@ -83,6 +84,7 @@ func createFieldsData(rowCount int) map[storage.FieldID]interface{} {
|
||||
varcharData := make([]string, 0)
|
||||
binVecData := make([][]byte, 0)
|
||||
floatVecData := make([][]float32, 0)
|
||||
jsonData := make([][]byte, 0)
|
||||
|
||||
boolFunc := func(i int) bool {
|
||||
return i%3 != 0
|
||||
@ -101,6 +103,7 @@ func createFieldsData(rowCount int) map[storage.FieldID]interface{} {
|
||||
varcharData = append(varcharData, "no."+strconv.Itoa(i))
|
||||
binVecData = append(binVecData, []byte{byte(i % 256), byte(i % 256)}) // dim = 16
|
||||
floatVecData = append(floatVecData, []float32{float32(i / 2), float32(i / 4), float32(i / 5), float32(i / 8)}) // dim = 4
|
||||
jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i)))
|
||||
}
|
||||
|
||||
fieldsData[0] = rowIDData
|
||||
@ -115,6 +118,7 @@ func createFieldsData(rowCount int) map[storage.FieldID]interface{} {
|
||||
fieldsData[109] = varcharData
|
||||
fieldsData[110] = binVecData
|
||||
fieldsData[111] = floatVecData
|
||||
fieldsData[112] = jsonData
|
||||
|
||||
return fieldsData
|
||||
}
|
||||
@ -151,6 +155,9 @@ func createSegmentsData(fieldsData map[storage.FieldID]interface{}, shardNum int
|
||||
for _, vec := range floatVectors {
|
||||
segData[fieldID].(*storage.FloatVectorFieldData).Data = append(segData[fieldID].(*storage.FloatVectorFieldData).Data, vec...)
|
||||
}
|
||||
fieldID = int64(112)
|
||||
segData[fieldID].(*storage.JSONFieldData).Data = append(segData[fieldID].(*storage.JSONFieldData).Data, fieldsData[fieldID].([][]byte)...)
|
||||
|
||||
segmentsData = append(segmentsData, segData)
|
||||
}
|
||||
return segmentsData
|
||||
@ -228,7 +235,7 @@ func Test_BinlogAdapterVerify(t *testing.T) {
|
||||
|
||||
// row id field missed
|
||||
holder.fieldFiles = make(map[int64][]string)
|
||||
for i := int64(102); i <= 111; i++ {
|
||||
for i := int64(102); i <= 112; i++ {
|
||||
holder.fieldFiles[i] = make([]string, 0)
|
||||
}
|
||||
err = adapter.verify(holder)
|
||||
@ -250,7 +257,7 @@ func Test_BinlogAdapterVerify(t *testing.T) {
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// succeed
|
||||
for i := int64(102); i <= 111; i++ {
|
||||
for i := int64(102); i <= 112; i++ {
|
||||
holder.fieldFiles[i] = []string{
|
||||
"a",
|
||||
}
|
||||
@ -730,6 +737,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
|
||||
int64(109): {"109_insertlog"},
|
||||
int64(110): {"110_insertlog"},
|
||||
int64(111): {"111_insertlog"},
|
||||
int64(112): {"112_insertlog"},
|
||||
}
|
||||
holder.deltaFiles = []string{"deltalog"}
|
||||
err = adapter.Read(holder)
|
||||
@ -751,6 +759,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
|
||||
"109_insertlog": createBinlogBuf(t, schemapb.DataType_VarChar, fieldsData[109].([]string)),
|
||||
"110_insertlog": createBinlogBuf(t, schemapb.DataType_BinaryVector, fieldsData[110].([][]byte)),
|
||||
"111_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)),
|
||||
"112_insertlog": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)),
|
||||
"deltalog": createDeltalogBuf(t, deletedItems, false),
|
||||
}
|
||||
|
||||
@ -983,6 +992,20 @@ func Test_BinlogAdapterDispatch(t *testing.T) {
|
||||
assert.Equal(t, 1, segmentsData[1][fieldID].RowNum())
|
||||
assert.Equal(t, 0, segmentsData[2][fieldID].RowNum())
|
||||
|
||||
// dispatch JSON data
|
||||
fieldID = int64(112)
|
||||
data := [][]byte{[]byte("{\"x\": 3, \"y\": 10.5}"), []byte("{\"y\": true}"), []byte("{\"z\": \"hello\"}"), []byte("{}")}
|
||||
err = adapter.dispatchBytesToShards(data, segmentsData, shardList, fieldID) // row count mismatch
|
||||
assert.NotNil(t, err)
|
||||
for _, segment := range segmentsData {
|
||||
assert.Equal(t, 0, segment[fieldID].RowNum())
|
||||
}
|
||||
err = adapter.dispatchBytesToShards([][]byte{[]byte("{}"), []byte("{}"), []byte("{}")}, segmentsData, shardList, fieldID) // succeed
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 1, segmentsData[0][fieldID].RowNum())
|
||||
assert.Equal(t, 1, segmentsData[1][fieldID].RowNum())
|
||||
assert.Equal(t, 0, segmentsData[2][fieldID].RowNum())
|
||||
|
||||
// dispatch binary vector data
|
||||
fieldID = int64(110)
|
||||
err = adapter.dispatchBinaryVecToShards([]byte{1, 2, 3, 4}, 16, segmentsData, shardList, fieldID) // row count mismatch
|
||||
@ -1078,6 +1101,11 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {
|
||||
err = adapter.readInsertlog(1, "varchar", segmentsData, []int32{1})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// failed to dispatch JSON data
|
||||
chunkManager.readBuf["JSON"] = createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte))
|
||||
err = adapter.readInsertlog(1, "JSON", segmentsData, []int32{1})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// failed to dispatch binvector data
|
||||
chunkManager.readBuf["binvector"] = createBinlogBuf(t, schemapb.DataType_BinaryVector, fieldsData[110].([][]byte))
|
||||
err = adapter.readInsertlog(1, "binvector", segmentsData, []int32{1})
|
||||
|
@ -435,6 +435,49 @@ func (p *BinlogFile) ReadVarchar() ([]string, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ReadJSON method reads all the blocks of a binlog by a data type.
|
||||
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
|
||||
func (p *BinlogFile) ReadJSON() ([][]byte, error) {
|
||||
if p.reader == nil {
|
||||
log.Error("Binlog file: binlog reader not yet initialized")
|
||||
return nil, errors.New("binlog reader not yet initialized")
|
||||
}
|
||||
|
||||
result := make([][]byte, 0)
|
||||
for {
|
||||
event, err := p.reader.NextEventReader()
|
||||
if err != nil {
|
||||
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
|
||||
}
|
||||
|
||||
// end of the file
|
||||
if event == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if event.TypeCode != storage.InsertEventType {
|
||||
log.Error("Binlog file: binlog file is not insert log")
|
||||
return nil, errors.New("binlog file is not insert log")
|
||||
}
|
||||
|
||||
if p.DataType() != schemapb.DataType_JSON {
|
||||
log.Error("Binlog file: binlog data type is not JSON")
|
||||
return nil, errors.New("binlog data type is not JSON")
|
||||
}
|
||||
|
||||
data, err := event.PayloadReaderInterface.GetJSONFromPayload()
|
||||
if err != nil {
|
||||
log.Error("Binlog file: failed to read JSON data", zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to read JSON data, error: %w", err)
|
||||
}
|
||||
|
||||
result = append(result, data...)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ReadBinaryVector method reads all the blocks of a binlog by a data type.
|
||||
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
|
||||
// return vectors data and the dimension
|
||||
|
@ -113,6 +113,17 @@ func createBinlogBuf(t *testing.T, dataType schemapb.DataType, data interface{})
|
||||
// without the two lines, the case will crash at here.
|
||||
// the "original_size" is come from storage.originalSizeKey
|
||||
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
|
||||
case schemapb.DataType_JSON:
|
||||
rows := data.([][]byte)
|
||||
sizeTotal := 0
|
||||
for i := 0; i < len(rows); i++ {
|
||||
err = evt.AddOneJSONToPayload(rows[i])
|
||||
assert.Nil(t, err)
|
||||
sizeTotal += binary.Size(rows[i])
|
||||
}
|
||||
// without the two lines, the case will crash at here.
|
||||
// the "original_size" is come from storage.originalSizeKey
|
||||
w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
|
||||
case schemapb.DataType_BinaryVector:
|
||||
vectors := data.([][]byte)
|
||||
for i := 0; i < len(vectors); i++ {
|
||||
@ -232,6 +243,10 @@ func Test_BinlogFileOpen(t *testing.T) {
|
||||
assert.Nil(t, dataVarchar)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
dataJSON, err := binlogFile.ReadJSON()
|
||||
assert.Nil(t, dataJSON)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
dataBinaryVector, dim, err := binlogFile.ReadBinaryVector()
|
||||
assert.Nil(t, dataBinaryVector)
|
||||
assert.Equal(t, 0, dim)
|
||||
@ -632,12 +647,63 @@ func Test_BinlogFileVarchar(t *testing.T) {
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.Nil(t, err)
|
||||
|
||||
d, err := binlogFile.ReadJSON()
|
||||
assert.Zero(t, len(d))
|
||||
assert.NotNil(t, err)
|
||||
|
||||
binlogFile.Close()
|
||||
}
|
||||
|
||||
func Test_BinlogFileJSON(t *testing.T) {
|
||||
source := [][]byte{[]byte("{\"x\": 3, \"y\": 10.5}"), []byte("{\"y\": true}"), []byte("{\"z\": \"hello\"}"), []byte("{}")}
|
||||
chunkManager := &MockChunkManager{
|
||||
readBuf: map[string][]byte{
|
||||
"dummy": createBinlogBuf(t, schemapb.DataType_JSON, source),
|
||||
},
|
||||
}
|
||||
|
||||
binlogFile, err := NewBinlogFile(chunkManager)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, binlogFile)
|
||||
|
||||
// correct reading
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, schemapb.DataType_JSON, binlogFile.DataType())
|
||||
|
||||
data, err := binlogFile.ReadJSON()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, data)
|
||||
assert.Equal(t, len(source), len(data))
|
||||
for i := 0; i < len(source); i++ {
|
||||
assert.Equal(t, string(source[i]), string(data[i]))
|
||||
}
|
||||
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong data type reading
|
||||
binlogFile, err = NewBinlogFile(chunkManager)
|
||||
assert.Nil(t, err)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.Nil(t, err)
|
||||
|
||||
d, dim, err := binlogFile.ReadBinaryVector()
|
||||
assert.Zero(t, len(d))
|
||||
assert.Zero(t, dim)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
binlogFile.Close()
|
||||
|
||||
// wrong log type
|
||||
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
|
||||
err = binlogFile.Open("dummy")
|
||||
assert.Nil(t, err)
|
||||
|
||||
data, err = binlogFile.ReadJSON()
|
||||
assert.Zero(t, len(data))
|
||||
assert.NotNil(t, err)
|
||||
|
||||
binlogFile.Close()
|
||||
}
|
||||
|
||||
func Test_BinlogFileBinaryVector(t *testing.T) {
|
||||
|
@ -103,6 +103,10 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi
|
||||
segmentData[schema.GetFieldID()] = &storage.StringFieldData{
|
||||
Data: make([]string, 0),
|
||||
}
|
||||
case schemapb.DataType_JSON:
|
||||
segmentData[schema.GetFieldID()] = &storage.JSONFieldData{
|
||||
Data: make([][]byte, 0),
|
||||
}
|
||||
default:
|
||||
log.Error("Import util: unsupported data type", zap.String("DataType", getTypeName(schema.DataType)))
|
||||
return nil
|
||||
@ -303,6 +307,20 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
||||
}
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_JSON:
|
||||
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
||||
if value, ok := obj.(string); ok {
|
||||
var dummy interface{}
|
||||
err := json.Unmarshal([]byte(value), &dummy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse value '%v' for JSON field '%s', error: %w", value, schema.GetName(), err)
|
||||
}
|
||||
field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, []byte(value))
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for JSON type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unsupport data type: %s", getTypeName(collectionSchema.Fields[i].DataType))
|
||||
}
|
||||
@ -495,6 +513,8 @@ func getTypeName(dt schemapb.DataType) string {
|
||||
return "BinaryVector"
|
||||
case schemapb.DataType_FloatVector:
|
||||
return "FloatVector"
|
||||
case schemapb.DataType_JSON:
|
||||
return "JSON"
|
||||
default:
|
||||
return "InvalidType"
|
||||
}
|
||||
|
@ -118,6 +118,13 @@ func sampleSchema() *schemapb.CollectionSchema {
|
||||
{Key: common.DimKey, Value: "4"},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 112,
|
||||
Name: "FieldJSON",
|
||||
IsPrimaryKey: false,
|
||||
Description: "json",
|
||||
DataType: schemapb.DataType_JSON,
|
||||
},
|
||||
},
|
||||
}
|
||||
return schema
|
||||
@ -133,6 +140,7 @@ type sampleRow struct {
|
||||
FieldFloat float32
|
||||
FieldDouble float64
|
||||
FieldString string
|
||||
FieldJSON string
|
||||
FieldBinaryVector []int
|
||||
FieldFloatVector []float32
|
||||
}
|
||||
@ -456,6 +464,44 @@ func Test_InitValidators(t *testing.T) {
|
||||
err = initValidators(schema, validators)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("json field", func(t *testing.T) {
|
||||
schema = &schemapb.CollectionSchema{
|
||||
Name: "schema",
|
||||
Description: "schema",
|
||||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 102,
|
||||
Name: "FieldJSON",
|
||||
DataType: schemapb.DataType_JSON,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
validators = make(map[storage.FieldID]*Validator)
|
||||
err = initValidators(schema, validators)
|
||||
assert.Nil(t, err)
|
||||
|
||||
v, ok := validators[102]
|
||||
assert.True(t, ok)
|
||||
|
||||
fields := initSegmentData(schema)
|
||||
assert.NotNil(t, fields)
|
||||
fieldData := fields[102]
|
||||
|
||||
err = v.convertFunc("{\"x\": 1, \"y\": 5}", fieldData)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 1, fieldData.RowNum())
|
||||
|
||||
err = v.convertFunc("{}", fieldData)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 2, fieldData.RowNum())
|
||||
|
||||
err = v.convertFunc("", fieldData)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 2, fieldData.RowNum())
|
||||
})
|
||||
}
|
||||
|
||||
func Test_GetFileNameAndExt(t *testing.T) {
|
||||
@ -619,6 +665,8 @@ func Test_GetTypeName(t *testing.T) {
|
||||
assert.NotEmpty(t, str)
|
||||
str = getTypeName(schemapb.DataType_FloatVector)
|
||||
assert.NotEmpty(t, str)
|
||||
str = getTypeName(schemapb.DataType_JSON)
|
||||
assert.NotEmpty(t, str)
|
||||
str = getTypeName(schemapb.DataType_None)
|
||||
assert.Equal(t, "InvalidType", str)
|
||||
}
|
||||
|
@ -240,11 +240,11 @@ func Test_ImportWrapperRowBased(t *testing.T) {
|
||||
|
||||
content := []byte(`{
|
||||
"rows":[
|
||||
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
|
||||
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
|
||||
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
|
||||
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
|
||||
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
|
||||
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
|
||||
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldJSON": "{\"k\": 2.5}", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
|
||||
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": "{\"y\": \"hello\"}", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
|
||||
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
|
||||
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldJSON": "{\"x\": true}", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
|
||||
]
|
||||
}`)
|
||||
|
||||
@ -287,7 +287,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
|
||||
// parse error
|
||||
content = []byte(`{
|
||||
"rows":[
|
||||
{"FieldBool": true, "FieldInt8": false, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
|
||||
{"FieldBool": true, "FieldInt8": false, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
|
||||
]
|
||||
}`)
|
||||
|
||||
@ -371,6 +371,13 @@ func createSampleNumpyFiles(t *testing.T, cm storage.ChunkManager) []string {
|
||||
assert.NoError(t, err)
|
||||
files = append(files, filePath)
|
||||
|
||||
filePath = path.Join(cm.RootPath(), "FieldJSON.npy")
|
||||
content, err = CreateNumpyData([]string{"{\"x\": 10, \"y\": 5}", "{\"z\": 5}", "{}", "{}", "{\"x\": 3}"})
|
||||
assert.Nil(t, err)
|
||||
err = cm.Write(ctx, filePath, content)
|
||||
assert.NoError(t, err)
|
||||
files = append(files, filePath)
|
||||
|
||||
filePath = path.Join(cm.RootPath(), "FieldBinaryVector.npy")
|
||||
content, err = CreateNumpyData([][2]uint8{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}})
|
||||
assert.Nil(t, err)
|
||||
@ -700,11 +707,11 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
|
||||
|
||||
content := []byte(`{
|
||||
"rows":[
|
||||
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
|
||||
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
|
||||
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
|
||||
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
|
||||
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
|
||||
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": \"aaa\"}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
|
||||
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
|
||||
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2, \"y\": 5}", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
|
||||
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldJSON": "{\"x\": true}", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
|
||||
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
|
||||
]
|
||||
}`)
|
||||
|
||||
|
@ -19,6 +19,7 @@ package importutil
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -103,6 +104,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
|
||||
FieldFloat: 3 + float32(i)/11,
|
||||
FieldDouble: 1 + float64(i)/7,
|
||||
FieldString: "No." + strconv.FormatInt(int64(i), 10),
|
||||
FieldJSON: fmt.Sprintf("{\"x\": %d}", i),
|
||||
FieldBinaryVector: []int{(200 + i) % math.MaxUint8, 0},
|
||||
FieldFloatVector: []float32{float32(i) + 0.1, float32(i) + 0.2, float32(i) + 0.3, float32(i) + 0.4},
|
||||
}
|
||||
|
@ -136,6 +136,7 @@ func convertNumpyType(typeStr string) (schemapb.DataType, error) {
|
||||
return schemapb.DataType_Double, nil
|
||||
default:
|
||||
if isStringType(typeStr) {
|
||||
// Note: JSON field and VARCHAR field are using string type numpy
|
||||
return schemapb.DataType_VarChar, nil
|
||||
}
|
||||
log.Error("Numpy adapter: the numpy file data type is not supported", zap.String("dtype", typeStr))
|
||||
|
@ -18,6 +18,7 @@ package importutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -314,11 +315,15 @@ func (p *NumpyParser) validateHeader(columnReader *NumpyColumnReader) error {
|
||||
shape[1]*8, columnReader.fieldName, columnReader.dimension)
|
||||
}
|
||||
} else {
|
||||
if elementType != columnReader.dataType {
|
||||
log.Error("Numpy parser: illegal data type of numpy file for scalar field", zap.Any("numpyDataType", elementType),
|
||||
zap.String("fieldName", columnReader.fieldName), zap.Any("fieldDataType", columnReader.dataType))
|
||||
return fmt.Errorf("illegal data type %s of numpy file for scalar field '%s' with type %s",
|
||||
getTypeName(elementType), columnReader.fieldName, getTypeName(columnReader.dataType))
|
||||
// JSON field and VARCHAR field are using string type numpy
|
||||
// legal input if columnReader.dataType is JSON and elementType is VARCHAR
|
||||
if elementType != schemapb.DataType_VarChar && columnReader.dataType != schemapb.DataType_JSON {
|
||||
if elementType != columnReader.dataType {
|
||||
log.Error("Numpy parser: illegal data type of numpy file for scalar field", zap.Any("numpyDataType", elementType),
|
||||
zap.String("fieldName", columnReader.fieldName), zap.Any("fieldDataType", columnReader.dataType))
|
||||
return fmt.Errorf("illegal data type %s of numpy file for scalar field '%s' with type %s",
|
||||
getTypeName(elementType), columnReader.fieldName, getTypeName(columnReader.dataType))
|
||||
}
|
||||
}
|
||||
|
||||
// scalar field, the shape should be 1
|
||||
@ -529,6 +534,30 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
||||
return &storage.StringFieldData{
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_JSON:
|
||||
// JSON field read data from string array numpy
|
||||
data, err := columnReader.reader.ReadString(rowCount)
|
||||
if err != nil {
|
||||
log.Error("Numpy parser: failed to read json string array", zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to read json string array: %s", err.Error())
|
||||
}
|
||||
|
||||
byteArr := make([][]byte, 0)
|
||||
for _, str := range data {
|
||||
var dummy interface{}
|
||||
err := json.Unmarshal([]byte(str), &dummy)
|
||||
if err != nil {
|
||||
log.Error("Numpy parser: illegal string value for JSON field",
|
||||
zap.String("value", str), zap.String("FieldName", columnReader.fieldName), zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to parse value '%v' for JSON field '%s', error: %w",
|
||||
str, columnReader.fieldName, err)
|
||||
}
|
||||
byteArr = append(byteArr, []byte(str))
|
||||
}
|
||||
|
||||
return &storage.JSONFieldData{
|
||||
Data: byteArr,
|
||||
}, nil
|
||||
case schemapb.DataType_BinaryVector:
|
||||
data, err := columnReader.reader.ReadUint8(rowCount * (columnReader.dimension / 8))
|
||||
if err != nil {
|
||||
@ -655,6 +684,12 @@ func (p *NumpyParser) appendFunc(schema *schemapb.FieldSchema) func(src storage.
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(string))
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_JSON:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.JSONFieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).([]byte))
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
@ -119,6 +119,7 @@ func Test_NumpyParserValidateFileNames(t *testing.T) {
|
||||
"FieldFloat.npy",
|
||||
"FieldDouble.npy",
|
||||
"FieldString.npy",
|
||||
"FieldJSON.npy",
|
||||
"FieldBinaryVector.npy",
|
||||
}
|
||||
err = parser.validateFileNames(fileNames)
|
||||
@ -497,6 +498,10 @@ func Test_NumpyParserReadData(t *testing.T) {
|
||||
specialReadEmptyFunc("FieldString", []string{"aaa"})
|
||||
})
|
||||
|
||||
t.Run("read JSON", func(t *testing.T) {
|
||||
specialReadEmptyFunc("FieldJSON", []string{"{\"x\": 1}"})
|
||||
})
|
||||
|
||||
t.Run("read binary vector", func(t *testing.T) {
|
||||
specialReadEmptyFunc("FieldBinaryVector", [][2]uint8{{1, 2}, {3, 4}})
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user