mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
bee66631e3
Signed-off-by: yhmo <yihua.mo@zilliz.com> Signed-off-by: yhmo <yihua.mo@zilliz.com>
513 lines
17 KiB
Go
513 lines
17 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package importutil
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path"
|
|
"runtime/debug"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
|
"github.com/milvus-io/milvus/internal/common"
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
)
|
|
|
|
func isCanceled(ctx context.Context) bool {
|
|
// canceled?
|
|
select {
|
|
case <-ctx.Done():
|
|
return true
|
|
default:
|
|
break
|
|
}
|
|
return false
|
|
}
|
|
|
|
func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.FieldID]storage.FieldData {
|
|
segmentData := make(map[storage.FieldID]storage.FieldData)
|
|
// rowID field is a hidden field with fieldID=0, it is always auto-generated by IDAllocator
|
|
// if primary key is int64 and autoID=true, primary key field is equal to rowID field
|
|
segmentData[common.RowIDField] = &storage.Int64FieldData{
|
|
Data: make([]int64, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
|
|
for i := 0; i < len(collectionSchema.Fields); i++ {
|
|
schema := collectionSchema.Fields[i]
|
|
switch schema.DataType {
|
|
case schemapb.DataType_Bool:
|
|
segmentData[schema.GetFieldID()] = &storage.BoolFieldData{
|
|
Data: make([]bool, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
case schemapb.DataType_Float:
|
|
segmentData[schema.GetFieldID()] = &storage.FloatFieldData{
|
|
Data: make([]float32, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
case schemapb.DataType_Double:
|
|
segmentData[schema.GetFieldID()] = &storage.DoubleFieldData{
|
|
Data: make([]float64, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
case schemapb.DataType_Int8:
|
|
segmentData[schema.GetFieldID()] = &storage.Int8FieldData{
|
|
Data: make([]int8, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
case schemapb.DataType_Int16:
|
|
segmentData[schema.GetFieldID()] = &storage.Int16FieldData{
|
|
Data: make([]int16, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
case schemapb.DataType_Int32:
|
|
segmentData[schema.GetFieldID()] = &storage.Int32FieldData{
|
|
Data: make([]int32, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
case schemapb.DataType_Int64:
|
|
segmentData[schema.GetFieldID()] = &storage.Int64FieldData{
|
|
Data: make([]int64, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
case schemapb.DataType_BinaryVector:
|
|
dim, _ := getFieldDimension(schema)
|
|
segmentData[schema.GetFieldID()] = &storage.BinaryVectorFieldData{
|
|
Data: make([]byte, 0),
|
|
NumRows: []int64{0},
|
|
Dim: dim,
|
|
}
|
|
case schemapb.DataType_FloatVector:
|
|
dim, _ := getFieldDimension(schema)
|
|
segmentData[schema.GetFieldID()] = &storage.FloatVectorFieldData{
|
|
Data: make([]float32, 0),
|
|
NumRows: []int64{0},
|
|
Dim: dim,
|
|
}
|
|
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
|
segmentData[schema.GetFieldID()] = &storage.StringFieldData{
|
|
Data: make([]string, 0),
|
|
NumRows: []int64{0},
|
|
}
|
|
default:
|
|
log.Error("Import util: unsupported data type", zap.Int("DataType", int(schema.DataType)))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return segmentData
|
|
}
|
|
|
|
// initValidators constructs valiator methods and data conversion methods
|
|
func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[storage.FieldID]*Validator) error {
|
|
if collectionSchema == nil {
|
|
return errors.New("collection schema is nil")
|
|
}
|
|
|
|
// json decoder parse all the numeric value into float64
|
|
numericValidator := func(obj interface{}) error {
|
|
switch obj.(type) {
|
|
case float64:
|
|
return nil
|
|
default:
|
|
s := fmt.Sprintf("%v", obj)
|
|
msg := "illegal numeric value " + s
|
|
return errors.New(msg)
|
|
}
|
|
|
|
}
|
|
|
|
for i := 0; i < len(collectionSchema.Fields); i++ {
|
|
schema := collectionSchema.Fields[i]
|
|
|
|
validators[schema.GetFieldID()] = &Validator{}
|
|
validators[schema.GetFieldID()].primaryKey = schema.GetIsPrimaryKey()
|
|
validators[schema.GetFieldID()].autoID = schema.GetAutoID()
|
|
validators[schema.GetFieldID()].fieldName = schema.GetName()
|
|
validators[schema.GetFieldID()].isString = false
|
|
|
|
switch schema.DataType {
|
|
case schemapb.DataType_Bool:
|
|
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
|
|
switch obj.(type) {
|
|
case bool:
|
|
return nil
|
|
default:
|
|
s := fmt.Sprintf("%v", obj)
|
|
msg := "illegal value " + s + " for bool type field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
|
|
}
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := obj.(bool)
|
|
field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value)
|
|
field.(*storage.BoolFieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_Float:
|
|
validators[schema.GetFieldID()].validateFunc = numericValidator
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := float32(obj.(float64))
|
|
field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, value)
|
|
field.(*storage.FloatFieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_Double:
|
|
validators[schema.GetFieldID()].validateFunc = numericValidator
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := obj.(float64)
|
|
field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value)
|
|
field.(*storage.DoubleFieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_Int8:
|
|
validators[schema.GetFieldID()].validateFunc = numericValidator
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := int8(obj.(float64))
|
|
field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, value)
|
|
field.(*storage.Int8FieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_Int16:
|
|
validators[schema.GetFieldID()].validateFunc = numericValidator
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := int16(obj.(float64))
|
|
field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, value)
|
|
field.(*storage.Int16FieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_Int32:
|
|
validators[schema.GetFieldID()].validateFunc = numericValidator
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := int32(obj.(float64))
|
|
field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, value)
|
|
field.(*storage.Int32FieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_Int64:
|
|
validators[schema.GetFieldID()].validateFunc = numericValidator
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := int64(obj.(float64))
|
|
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value)
|
|
field.(*storage.Int64FieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_BinaryVector:
|
|
dim, err := getFieldDimension(schema)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
validators[schema.GetFieldID()].dimension = dim
|
|
|
|
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
|
|
switch vt := obj.(type) {
|
|
case []interface{}:
|
|
if len(vt)*8 != dim {
|
|
msg := "bit size " + strconv.Itoa(len(vt)*8) + " doesn't equal to vector dimension " + strconv.Itoa(dim) + " of field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
for i := 0; i < len(vt); i++ {
|
|
if e := numericValidator(vt[i]); e != nil {
|
|
msg := e.Error() + " for binary vector field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
|
|
t := int(vt[i].(float64))
|
|
if t > 255 || t < 0 {
|
|
msg := "illegal value " + strconv.Itoa(t) + " for binary vector field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
}
|
|
return nil
|
|
default:
|
|
s := fmt.Sprintf("%v", obj)
|
|
msg := s + " is not an array for binary vector field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
}
|
|
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
arr := obj.([]interface{})
|
|
for i := 0; i < len(arr); i++ {
|
|
value := byte(arr[i].(float64))
|
|
field.(*storage.BinaryVectorFieldData).Data = append(field.(*storage.BinaryVectorFieldData).Data, value)
|
|
}
|
|
|
|
field.(*storage.BinaryVectorFieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_FloatVector:
|
|
dim, err := getFieldDimension(schema)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
validators[schema.GetFieldID()].dimension = dim
|
|
|
|
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
|
|
switch vt := obj.(type) {
|
|
case []interface{}:
|
|
if len(vt) != dim {
|
|
msg := "array size " + strconv.Itoa(len(vt)) + " doesn't equal to vector dimension " + strconv.Itoa(dim) + " of field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
for i := 0; i < len(vt); i++ {
|
|
if e := numericValidator(vt[i]); e != nil {
|
|
msg := e.Error() + " for float vector field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
}
|
|
return nil
|
|
default:
|
|
s := fmt.Sprintf("%v", obj)
|
|
msg := s + " is not an array for float vector field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
}
|
|
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
arr := obj.([]interface{})
|
|
for i := 0; i < len(arr); i++ {
|
|
value := float32(arr[i].(float64))
|
|
field.(*storage.FloatVectorFieldData).Data = append(field.(*storage.FloatVectorFieldData).Data, value)
|
|
}
|
|
field.(*storage.FloatVectorFieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
|
validators[schema.GetFieldID()].isString = true
|
|
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
|
|
switch obj.(type) {
|
|
case string:
|
|
return nil
|
|
default:
|
|
s := fmt.Sprintf("%v", obj)
|
|
msg := s + " is not a string for string type field " + schema.GetName()
|
|
return errors.New(msg)
|
|
}
|
|
}
|
|
|
|
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
|
value := obj.(string)
|
|
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value)
|
|
field.(*storage.StringFieldData).NumRows[0]++
|
|
return nil
|
|
}
|
|
default:
|
|
return errors.New("unsupport data type: " + strconv.Itoa(int(collectionSchema.Fields[i].DataType)))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func printFieldsDataInfo(fieldsData map[storage.FieldID]storage.FieldData, msg string, files []string) {
|
|
stats := make([]zapcore.Field, 0)
|
|
for k, v := range fieldsData {
|
|
stats = append(stats, zap.Int(strconv.FormatInt(k, 10), v.RowNum()))
|
|
}
|
|
|
|
if len(files) > 0 {
|
|
stats = append(stats, zap.Any("files", files))
|
|
}
|
|
log.Info(msg, stats...)
|
|
}
|
|
|
|
// GetFileNameAndExt extracts file name and extension
|
|
// for example: "/a/b/c.ttt" returns "c" and ".ttt"
|
|
func GetFileNameAndExt(filePath string) (string, string) {
|
|
fileName := path.Base(filePath)
|
|
fileType := path.Ext(fileName)
|
|
fileNameWithoutExt := strings.TrimSuffix(fileName, fileType)
|
|
return fileNameWithoutExt, fileType
|
|
}
|
|
|
|
// getFieldDimension gets dimension of vecotor field
|
|
func getFieldDimension(schema *schemapb.FieldSchema) (int, error) {
|
|
for _, kvPair := range schema.GetTypeParams() {
|
|
key, value := kvPair.GetKey(), kvPair.GetValue()
|
|
if key == "dim" {
|
|
dim, err := strconv.Atoi(value)
|
|
if err != nil {
|
|
return 0, errors.New("vector dimension is invalid")
|
|
}
|
|
return dim, nil
|
|
}
|
|
}
|
|
|
|
return 0, errors.New("vector dimension is not defined")
|
|
}
|
|
|
|
// triggerGC triggers golang gc to return all free memory back to the underlying system at once,
|
|
// Note: this operation is expensive, and can lead to latency spikes as it holds the heap lock through the whole process
|
|
func triggerGC() {
|
|
debug.FreeOSMemory()
|
|
}
|
|
|
|
// tryFlushBlocks does the two things:
|
|
// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
|
|
// 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block
|
|
func tryFlushBlocks(ctx context.Context,
|
|
blocksData []map[storage.FieldID]storage.FieldData,
|
|
collectionSchema *schemapb.CollectionSchema,
|
|
callFlushFunc ImportFlushFunc,
|
|
blockSize int64,
|
|
maxTotalSize int64,
|
|
force bool) error {
|
|
|
|
totalSize := 0
|
|
biggestSize := 0
|
|
biggestItem := -1
|
|
|
|
// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
|
|
for i := 0; i < len(blocksData); i++ {
|
|
// outside context might be canceled(service stop, or future enhancement for canceling import task)
|
|
if isCanceled(ctx) {
|
|
log.Error("Import util: import task was canceled")
|
|
return errors.New("import task was canceled")
|
|
}
|
|
|
|
blockData := blocksData[i]
|
|
// Note: even rowCount is 0, the size is still non-zero
|
|
size := 0
|
|
rowCount := 0
|
|
for _, fieldData := range blockData {
|
|
size += fieldData.GetMemorySize()
|
|
rowCount = fieldData.RowNum()
|
|
}
|
|
|
|
// force to flush, called at the end of Read()
|
|
if force && rowCount > 0 {
|
|
printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil)
|
|
err := callFlushFunc(blockData, i)
|
|
if err != nil {
|
|
log.Error("Import util: failed to force flush block data", zap.Int("shardID", i))
|
|
return err
|
|
}
|
|
log.Info("Import util: force flush", zap.Int("rowCount", rowCount), zap.Int("size", size), zap.Int("shardID", i))
|
|
|
|
blocksData[i] = initSegmentData(collectionSchema)
|
|
if blocksData[i] == nil {
|
|
log.Error("Import util: failed to initialize FieldData list")
|
|
return errors.New("failed to initialize FieldData list")
|
|
}
|
|
continue
|
|
}
|
|
|
|
// if segment size is larger than predefined blockSize, flush to create a new binlog file
|
|
// initialize a new FieldData list for next round batch read
|
|
if size > int(blockSize) && rowCount > 0 {
|
|
printFieldsDataInfo(blockData, "import util: prepare to flush block larger than maxBlockSize", nil)
|
|
err := callFlushFunc(blockData, i)
|
|
if err != nil {
|
|
log.Error("Import util: failed to flush block data", zap.Int("shardID", i))
|
|
return err
|
|
}
|
|
log.Info("Import util: block size exceed limit and flush", zap.Int("rowCount", rowCount), zap.Int("size", size), zap.Int("shardID", i))
|
|
|
|
blocksData[i] = initSegmentData(collectionSchema)
|
|
if blocksData[i] == nil {
|
|
log.Error("Import util: failed to initialize FieldData list")
|
|
return errors.New("failed to initialize FieldData list")
|
|
}
|
|
continue
|
|
}
|
|
|
|
// calculate the total size(ignore the flushed blocks)
|
|
// find out the biggest block for the step 2
|
|
totalSize += size
|
|
if size > biggestSize {
|
|
biggestSize = size
|
|
biggestItem = i
|
|
}
|
|
}
|
|
|
|
// 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block
|
|
if totalSize > int(maxTotalSize) && biggestItem >= 0 {
|
|
// outside context might be canceled(service stop, or future enhancement for canceling import task)
|
|
if isCanceled(ctx) {
|
|
log.Error("Import util: import task was canceled")
|
|
return errors.New("import task was canceled")
|
|
}
|
|
|
|
blockData := blocksData[biggestItem]
|
|
// Note: even rowCount is 0, the size is still non-zero
|
|
size := 0
|
|
rowCount := 0
|
|
for _, fieldData := range blockData {
|
|
size += fieldData.GetMemorySize()
|
|
rowCount = fieldData.RowNum()
|
|
}
|
|
|
|
if rowCount > 0 {
|
|
printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil)
|
|
err := callFlushFunc(blockData, biggestItem)
|
|
if err != nil {
|
|
log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem))
|
|
return err
|
|
}
|
|
log.Info("Import util: total size exceed limit and flush", zap.Int("rowCount", rowCount),
|
|
zap.Int("size", size), zap.Int("totalSize", totalSize), zap.Int("shardID", biggestItem))
|
|
|
|
blocksData[biggestItem] = initSegmentData(collectionSchema)
|
|
if blocksData[biggestItem] == nil {
|
|
log.Error("Import util: failed to initialize FieldData list")
|
|
return errors.New("failed to initialize FieldData list")
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getTypeName(dt schemapb.DataType) string {
|
|
switch dt {
|
|
case schemapb.DataType_Bool:
|
|
return "Bool"
|
|
case schemapb.DataType_Int8:
|
|
return "Int8"
|
|
case schemapb.DataType_Int16:
|
|
return "Int16"
|
|
case schemapb.DataType_Int32:
|
|
return "Int32"
|
|
case schemapb.DataType_Int64:
|
|
return "Int64"
|
|
case schemapb.DataType_Float:
|
|
return "Float"
|
|
case schemapb.DataType_Double:
|
|
return "Double"
|
|
case schemapb.DataType_VarChar:
|
|
return "Varchar"
|
|
case schemapb.DataType_String:
|
|
return "String"
|
|
case schemapb.DataType_BinaryVector:
|
|
return "BinaryVector"
|
|
case schemapb.DataType_FloatVector:
|
|
return "FloatVector"
|
|
default:
|
|
return "InvalidType"
|
|
}
|
|
}
|