milvus/internal/storage/insert_data.go
XuanYang-cn 2f16339aac
Enhance InsertData and FieldData (#27436)
1. Add NewInsertData
2. Add GetRowNum(), GetMemorySize(), and, Append() for InsertData
3. Add AppendRow() for FieldData for compaction

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2023-10-17 17:36:11 +08:00

464 lines
13 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"encoding/binary"
"fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// TODO: fill it
// info for each blob
type BlobInfo struct {
Length int
}
// InsertData example row_schema: {float_field, int_field, float_vector_field, string_field}
// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
//
// system filed id:
// 0: unique row id
// 1: timestamp
// 100: first user field id
// 101: second user field id
// 102: ...
type InsertData struct {
// TODO, data should be zero copy by passing data directly to event reader or change Data to map[FieldID]FieldDataArray
Data map[FieldID]FieldData // field id to field data
Infos []BlobInfo
}
func NewInsertData(schema *schemapb.CollectionSchema) (*InsertData, error) {
if schema == nil {
return nil, fmt.Errorf("Nil input schema")
}
idata := &InsertData{
Data: make(map[FieldID]FieldData),
}
for _, fSchema := range schema.Fields {
fieldData, err := NewFieldData(fSchema.DataType, fSchema)
if err != nil {
return nil, err
}
idata.Data[fSchema.FieldID] = fieldData
}
return idata, nil
}
func (iData *InsertData) IsEmpty() bool {
if iData == nil {
return true
}
timeFieldData, ok := iData.Data[common.TimeStampField]
return (!ok) || (timeFieldData.RowNum() <= 0)
}
func (i *InsertData) GetRowNum() int {
if i.Data == nil || len(i.Data) == 0 {
return 0
}
data, ok := i.Data[common.RowIDField]
if !ok {
return 0
}
return data.RowNum()
}
func (i *InsertData) GetMemorySize() int {
var size int
if i.Data == nil || len(i.Data) == 0 {
return size
}
for _, data := range i.Data {
size += data.GetMemorySize()
}
return size
}
func (i *InsertData) Append(row map[FieldID]interface{}) error {
for fID, v := range row {
field, ok := i.Data[fID]
if !ok {
return fmt.Errorf("Missing field when appending row, got %d", fID)
}
if err := field.AppendRow(v); err != nil {
return err
}
}
return nil
}
// FieldData defines field data interface
type FieldData interface {
GetMemorySize() int
RowNum() int
GetRow(i int) any
AppendRow(row interface{}) error
}
func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) (FieldData, error) {
typeParams := fieldSchema.GetTypeParams()
switch dataType {
case schemapb.DataType_Float16Vector:
dim, err := GetDimFromParams(typeParams)
if err != nil {
return nil, err
}
return &Float16VectorFieldData{
Data: make([]byte, 0),
Dim: dim,
}, nil
case schemapb.DataType_FloatVector:
dim, err := GetDimFromParams(typeParams)
if err != nil {
return nil, err
}
return &FloatVectorFieldData{
Data: make([]float32, 0),
Dim: dim,
}, nil
case schemapb.DataType_BinaryVector:
dim, err := GetDimFromParams(typeParams)
if err != nil {
return nil, err
}
return &BinaryVectorFieldData{
Data: make([]byte, 0),
Dim: dim,
}, nil
case schemapb.DataType_Bool:
return &BoolFieldData{
Data: make([]bool, 0),
}, nil
case schemapb.DataType_Int8:
return &Int8FieldData{
Data: make([]int8, 0),
}, nil
case schemapb.DataType_Int16:
return &Int16FieldData{
Data: make([]int16, 0),
}, nil
case schemapb.DataType_Int32:
return &Int32FieldData{
Data: make([]int32, 0),
}, nil
case schemapb.DataType_Int64:
return &Int64FieldData{
Data: make([]int64, 0),
}, nil
case schemapb.DataType_Float:
return &FloatFieldData{
Data: make([]float32, 0),
}, nil
case schemapb.DataType_Double:
return &DoubleFieldData{
Data: make([]float64, 0),
}, nil
case schemapb.DataType_JSON:
return &JSONFieldData{
Data: make([][]byte, 0),
}, nil
case schemapb.DataType_Array:
return &ArrayFieldData{
Data: make([]*schemapb.ScalarField, 0),
ElementType: fieldSchema.GetElementType(),
}, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
return &StringFieldData{
Data: make([]string, 0),
}, nil
default:
return nil, fmt.Errorf("Unexpected schema data type: %d", dataType)
}
}
type BoolFieldData struct {
Data []bool
}
type Int8FieldData struct {
Data []int8
}
type Int16FieldData struct {
Data []int16
}
type Int32FieldData struct {
Data []int32
}
type Int64FieldData struct {
Data []int64
}
type FloatFieldData struct {
Data []float32
}
type DoubleFieldData struct {
Data []float64
}
type StringFieldData struct {
Data []string
}
type ArrayFieldData struct {
ElementType schemapb.DataType
Data []*schemapb.ScalarField
}
type JSONFieldData struct {
Data [][]byte
}
type BinaryVectorFieldData struct {
Data []byte
Dim int
}
type FloatVectorFieldData struct {
Data []float32
Dim int
}
type Float16VectorFieldData struct {
Data []byte
Dim int
}
// RowNum implements FieldData.RowNum
func (data *BoolFieldData) RowNum() int { return len(data.Data) }
func (data *Int8FieldData) RowNum() int { return len(data.Data) }
func (data *Int16FieldData) RowNum() int { return len(data.Data) }
func (data *Int32FieldData) RowNum() int { return len(data.Data) }
func (data *Int64FieldData) RowNum() int { return len(data.Data) }
func (data *FloatFieldData) RowNum() int { return len(data.Data) }
func (data *DoubleFieldData) RowNum() int { return len(data.Data) }
func (data *StringFieldData) RowNum() int { return len(data.Data) }
func (data *ArrayFieldData) RowNum() int { return len(data.Data) }
func (data *JSONFieldData) RowNum() int { return len(data.Data) }
func (data *BinaryVectorFieldData) RowNum() int { return len(data.Data) * 8 / data.Dim }
func (data *FloatVectorFieldData) RowNum() int { return len(data.Data) / data.Dim }
func (data *Float16VectorFieldData) RowNum() int { return len(data.Data) / 2 / data.Dim }
// GetRow implements FieldData.GetRow
func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] }
func (data *Int8FieldData) GetRow(i int) any { return data.Data[i] }
func (data *Int16FieldData) GetRow(i int) any { return data.Data[i] }
func (data *Int32FieldData) GetRow(i int) any { return data.Data[i] }
func (data *Int64FieldData) GetRow(i int) any { return data.Data[i] }
func (data *FloatFieldData) GetRow(i int) any { return data.Data[i] }
func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] }
func (data *StringFieldData) GetRow(i int) any { return data.Data[i] }
func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] }
func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] }
func (data *BinaryVectorFieldData) GetRow(i int) interface{} {
return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8]
}
func (data *FloatVectorFieldData) GetRow(i int) interface{} {
return data.Data[i*data.Dim : (i+1)*data.Dim]
}
func (data *Float16VectorFieldData) GetRow(i int) interface{} {
return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2]
}
// AppendRow implements FieldData.AppendRow
func (data *BoolFieldData) AppendRow(row interface{}) error {
v, ok := row.(bool)
if !ok {
return merr.WrapErrParameterInvalid("bool", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *Int8FieldData) AppendRow(row interface{}) error {
v, ok := row.(int8)
if !ok {
return merr.WrapErrParameterInvalid("int8", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *Int16FieldData) AppendRow(row interface{}) error {
v, ok := row.(int16)
if !ok {
return merr.WrapErrParameterInvalid("int16", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *Int32FieldData) AppendRow(row interface{}) error {
v, ok := row.(int32)
if !ok {
return merr.WrapErrParameterInvalid("int32", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *Int64FieldData) AppendRow(row interface{}) error {
v, ok := row.(int64)
if !ok {
return merr.WrapErrParameterInvalid("int64", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *FloatFieldData) AppendRow(row interface{}) error {
v, ok := row.(float32)
if !ok {
return merr.WrapErrParameterInvalid("float32", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *DoubleFieldData) AppendRow(row interface{}) error {
v, ok := row.(float64)
if !ok {
return merr.WrapErrParameterInvalid("float64", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *StringFieldData) AppendRow(row interface{}) error {
v, ok := row.(string)
if !ok {
return merr.WrapErrParameterInvalid("string", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *ArrayFieldData) AppendRow(row interface{}) error {
v, ok := row.(*schemapb.ScalarField)
if !ok {
return merr.WrapErrParameterInvalid("*schemapb.ScalarField", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *JSONFieldData) AppendRow(row interface{}) error {
v, ok := row.([]byte)
if !ok {
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
}
data.Data = append(data.Data, v)
return nil
}
func (data *BinaryVectorFieldData) AppendRow(row interface{}) error {
v, ok := row.([]byte)
if !ok || len(v) != data.Dim/8 {
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
}
data.Data = append(data.Data, v...)
return nil
}
func (data *FloatVectorFieldData) AppendRow(row interface{}) error {
v, ok := row.([]float32)
if !ok || len(v) != data.Dim {
return merr.WrapErrParameterInvalid("[]float32", row, "Wrong row type")
}
data.Data = append(data.Data, v...)
return nil
}
func (data *Float16VectorFieldData) AppendRow(row interface{}) error {
v, ok := row.([]byte)
if !ok || len(v) != data.Dim*2 {
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
}
data.Data = append(data.Data, v...)
return nil
}
// GetMemorySize implements FieldData.GetMemorySize
func (data *BoolFieldData) GetMemorySize() int { return binary.Size(data.Data) }
func (data *Int8FieldData) GetMemorySize() int { return binary.Size(data.Data) }
func (data *Int16FieldData) GetMemorySize() int { return binary.Size(data.Data) }
func (data *Int32FieldData) GetMemorySize() int { return binary.Size(data.Data) }
func (data *Int64FieldData) GetMemorySize() int { return binary.Size(data.Data) }
func (data *FloatFieldData) GetMemorySize() int { return binary.Size(data.Data) }
func (data *DoubleFieldData) GetMemorySize() int { return binary.Size(data.Data) }
func (data *BinaryVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 }
func (data *FloatVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 }
func (data *Float16VectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 }
// why not binary.Size(data) directly? binary.Size(data) return -1
// binary.Size returns how many bytes Write would generate to encode the value v, which
// must be a fixed-size value or a slice of fixed-size values, or a pointer to such data.
// If v is neither of these, binary.Size returns -1.
func (data *StringFieldData) GetMemorySize() int {
var size int
for _, val := range data.Data {
size += len(val) + 16
}
return size
}
func (data *ArrayFieldData) GetMemorySize() int {
var size int
for _, val := range data.Data {
switch data.ElementType {
case schemapb.DataType_Bool:
size += binary.Size(val.GetBoolData().GetData())
case schemapb.DataType_Int8:
size += binary.Size(val.GetIntData().GetData()) / 4
case schemapb.DataType_Int16:
size += binary.Size(val.GetIntData().GetData()) / 2
case schemapb.DataType_Int32:
size += binary.Size(val.GetIntData().GetData())
case schemapb.DataType_Int64:
size += binary.Size(val.GetLongData().GetData())
case schemapb.DataType_Float:
size += binary.Size(val.GetFloatData().GetData())
case schemapb.DataType_Double:
size += binary.Size(val.GetDoubleData().GetData())
case schemapb.DataType_String, schemapb.DataType_VarChar:
size += (&StringFieldData{Data: val.GetStringData().GetData()}).GetMemorySize()
}
}
return size
}
func (data *JSONFieldData) GetMemorySize() int {
var size int
for _, val := range data.Data {
size += len(val) + 16
}
return size
}