Add performance test case for import (#16296)

Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
groot 2022-04-01 10:07:28 +08:00 committed by GitHub
parent b03da87df5
commit bd241cd28b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 29 deletions

View File

@ -82,8 +82,9 @@ func (p *ImportWrapper) printFieldsDataInfo(fieldsData map[string]storage.FieldD
for k, v := range fieldsData {
stats = append(stats, zap.Int(k, v.RowNum()))
}
for i := 0; i < len(files); i++ {
stats = append(stats, zap.String("file", files[i]))
if len(files) > 0 {
stats = append(stats, zap.Any("files", files))
}
log.Debug(msg, stats...)
}
@ -150,24 +151,31 @@ func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate b
return nil
}
p.printFieldsDataInfo(fields, "imprort wrapper: combine field data", nil)
fieldNames := make([]string, 0)
for k, v := range fields {
// ignore 0 row field
if v.RowNum() == 0 {
continue
}
// each column should be only combined once
data, ok := fieldsData[k]
if ok && data.RowNum() > 0 {
return errors.New("imprort error: the field " + k + " is duplicated")
return errors.New("the field " + k + " is duplicated")
}
// check the row count. only count non-zero row fields
if rowCount > 0 && rowCount != v.RowNum() {
return errors.New("the field " + k + " row count " + strconv.Itoa(v.RowNum()) + " doesn't equal " + strconv.Itoa(rowCount))
}
rowCount = v.RowNum()
// assign column data to fieldsData
fieldsData[k] = v
fieldNames = append(fieldNames, k)
if rowCount == 0 {
rowCount = v.RowNum()
} else if rowCount != v.RowNum() {
return errors.New("imprort error: the field " + k + " row count " + strconv.Itoa(v.RowNum()) + " doesn't equal " + strconv.Itoa(rowCount))
}
}
log.Debug("imprort wrapper: ", zap.Any("fieldNames", fieldNames), zap.Int("rowCount", rowCount))
return nil
}

View File

@ -2,12 +2,16 @@ package importutil
import (
"context"
"encoding/json"
"os"
"strconv"
"testing"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/stretchr/testify/assert"
)
@ -203,3 +207,212 @@ func Test_ImportColumnBased(t *testing.T) {
err = wrapper.Import(files, false, false)
assert.NotNil(t, err)
}
func perfSchema(dim int) *schemapb.CollectionSchema {
schema := &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
Name: "ID",
IsPrimaryKey: true,
AutoID: false,
Description: "int64",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 102,
Name: "Vector",
IsPrimaryKey: false,
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: strconv.Itoa(dim)},
},
},
},
}
return schema
}
func Test_ImportRowBased_perf(t *testing.T) {
ctx := context.Background()
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
idAllocator := newIDAllocator(ctx, t)
tr := timerecord.NewTimeRecorder("row-based parse performance")
type Entity struct {
ID int64
Vector []float32
}
type Entities struct {
Rows []*Entity
}
// change these parameters to test different cases
dim := 128
rowCount := 10000
shardNum := 2
segmentSize := 512 // unit: MB
// generate rows data
entities := &Entities{
Rows: make([]*Entity, 0),
}
for i := 0; i < rowCount; i++ {
entity := &Entity{
ID: int64(i),
Vector: make([]float32, 0, dim),
}
for k := 0; k < dim; k++ {
entity.Vector = append(entity.Vector, float32(i)+3.1415926)
}
entities.Rows = append(entities.Rows, entity)
}
tr.Record("generate " + strconv.Itoa(rowCount) + " rows")
// generate a json file
filePath := TempFilesPath + "row_perf.json"
func() {
fp, err := os.Create(filePath)
assert.Nil(t, err)
defer fp.Close()
encoder := json.NewEncoder(fp)
err = encoder.Encode(entities)
assert.Nil(t, err)
}()
tr.Record("generate large json file " + filePath)
// parse the json file
parseCount := 0
flushFunc := func(fields map[string]storage.FieldData) error {
count := 0
for _, data := range fields {
assert.Less(t, 0, data.RowNum())
if count == 0 {
count = data.RowNum()
} else {
assert.Equal(t, count, data.RowNum())
}
}
parseCount += count
return nil
}
schema := perfSchema(dim)
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int32(segmentSize), idAllocator, flushFunc)
files := make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, true, false)
assert.Nil(t, err)
assert.Equal(t, rowCount, parseCount)
tr.Record("parse large json file " + filePath)
}
func Test_ImportColumnBased_perf(t *testing.T) {
ctx := context.Background()
err := os.MkdirAll(TempFilesPath, os.ModePerm)
assert.Nil(t, err)
defer os.RemoveAll(TempFilesPath)
idAllocator := newIDAllocator(ctx, t)
tr := timerecord.NewTimeRecorder("column-based parse performance")
type IDCol struct {
ID []int64
}
type VectorCol struct {
Vector [][]float32
}
// change these parameters to test different cases
dim := 128
rowCount := 10000
shardNum := 2
segmentSize := 512 // unit: MB
// generate rows data
ids := &IDCol{
ID: make([]int64, 0, rowCount),
}
vectors := &VectorCol{
Vector: make([][]float32, 0, rowCount),
}
for i := 0; i < rowCount; i++ {
ids.ID = append(ids.ID, int64(i))
vector := make([]float32, 0, dim)
for k := 0; k < dim; k++ {
vector = append(vector, float32(i)+3.1415926)
}
vectors.Vector = append(vectors.Vector, vector)
}
tr.Record("generate " + strconv.Itoa(rowCount) + " rows")
// generate json files
saveFileFunc := func(filePath string, data interface{}) error {
fp, err := os.Create(filePath)
if err != nil {
return err
}
defer fp.Close()
encoder := json.NewEncoder(fp)
err = encoder.Encode(data)
return err
}
filePath1 := TempFilesPath + "ids.json"
err = saveFileFunc(filePath1, ids)
assert.Nil(t, err)
tr.Record("generate large json file " + filePath1)
filePath2 := TempFilesPath + "vectors.json"
err = saveFileFunc(filePath2, vectors)
assert.Nil(t, err)
tr.Record("generate large json file " + filePath2)
// parse the json file
parseCount := 0
flushFunc := func(fields map[string]storage.FieldData) error {
count := 0
for _, data := range fields {
assert.Less(t, 0, data.RowNum())
if count == 0 {
count = data.RowNum()
} else {
assert.Equal(t, count, data.RowNum())
}
}
parseCount += count
return nil
}
schema := perfSchema(dim)
wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int32(segmentSize), idAllocator, flushFunc)
files := make([]string, 0)
files = append(files, filePath1)
files = append(files, filePath2)
err = wrapper.Import(files, false, false)
assert.Nil(t, err)
assert.Equal(t, rowCount, parseCount)
tr.Record("parse large json files: " + filePath1 + "," + filePath2)
}

View File

@ -326,10 +326,6 @@ func NewJSONColumnValidator(schema *schemapb.CollectionSchema, downstream JSONCo
}
initValidators(schema, v.validators)
for k := range v.validators {
v.rowCounter[k] = 0
}
return v
}
@ -344,20 +340,17 @@ func (v *JSONColumnValidator) Handle(columns map[string][]interface{}) error {
// parse completed
if columns == nil {
// all columns are parsed?
maxCount := int64(0)
for _, counter := range v.rowCounter {
if counter > maxCount {
maxCount = counter
}
}
for k := range v.validators {
counter, ok := v.rowCounter[k]
if !ok || counter != maxCount {
return errors.New("JSON column validator: the field " + k + " row count is not equal to other fields")
// compare the row count of columns, should be equal
rowCount := int64(-1)
for k, counter := range v.rowCounter {
if rowCount == -1 {
rowCount = counter
} else if rowCount != counter {
return errors.New("JSON column validator: the field " + k + " row count " + strconv.Itoa(int(counter)) + " is not equal to other fields " + strconv.Itoa(int(rowCount)))
}
}
// let the downstream know parse is completed
log.Debug("JSON column validation finished")
if v.downstream != nil {
return v.downstream.Handle(nil)
@ -368,7 +361,7 @@ func (v *JSONColumnValidator) Handle(columns map[string][]interface{}) error {
for name, values := range columns {
validator, ok := v.validators[name]
if !ok {
// not a valid field name
// not a valid field name, skip without parsing
break
}
@ -658,10 +651,17 @@ func (v *JSONColumnConsumer) flush() error {
// check row count, should be equal
rowCount := 0
for name, field := range v.fieldsData {
// skip the autoid field
if name == v.primaryKey && v.validators[v.primaryKey].autoID {
continue
}
cnt := field.RowNum()
// skip 0 row fields since a data file may only import one column(there are several data files imported)
if cnt == 0 {
continue
}
// only check non-zero row fields
if rowCount == 0 {
rowCount = cnt
} else if rowCount != cnt {
@ -690,6 +690,7 @@ func (v *JSONColumnConsumer) Handle(columns map[string][]interface{}) error {
return err
}
// consume columns data
for name, values := range columns {
validator, ok := v.validators[name]
if !ok {

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"io"
"strings"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/schemapb"
@ -68,10 +69,11 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
return p.logError("JSON parse: " + err.Error())
}
key := t.(string)
keyLower := strings.ToLower(key)
// the root key should be RowRootNode
if key != RowRootNode {
return p.logError("JSON parse: invalid row-based JSON format, the key " + RowRootNode + " is not found")
if keyLower != RowRootNode {
return p.logError("JSON parse: invalid row-based JSON format, the key " + key + " is not found")
}
// started by '['