milvus/internal/querynodev2/segments/utils.go
yah01 081572d31c
Refactor QueryNode (#21625)
Signed-off-by: yah01 <yang.cen@zilliz.com>
Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: aoiasd <zhicheng.yue@zilliz.com>
2023-03-27 00:42:00 +08:00

284 lines
9.3 KiB
Go

package segments
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
func GetPkField(schema *schemapb.CollectionSchema) *schemapb.FieldSchema {
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() {
return field
}
}
return nil
}
// TODO: remove this function to proper file
// GetPrimaryKeys would get primary keys by insert messages
func GetPrimaryKeys(msg *msgstream.InsertMsg, schema *schemapb.CollectionSchema) ([]storage.PrimaryKey, error) {
if msg.IsRowBased() {
return getPKsFromRowBasedInsertMsg(msg, schema)
}
return getPKsFromColumnBasedInsertMsg(msg, schema)
}
func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.CollectionSchema) ([]storage.PrimaryKey, error) {
offset := 0
for _, field := range schema.Fields {
if field.IsPrimaryKey {
break
}
switch field.DataType {
case schemapb.DataType_Bool:
offset++
case schemapb.DataType_Int8:
offset++
case schemapb.DataType_Int16:
offset += 2
case schemapb.DataType_Int32:
offset += 4
case schemapb.DataType_Int64:
offset += 8
case schemapb.DataType_Float:
offset += 4
case schemapb.DataType_Double:
offset += 8
case schemapb.DataType_FloatVector:
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err := strconv.Atoi(t.Value)
if err != nil {
return nil, fmt.Errorf("strconv wrong on get dim, err = %s", err)
}
offset += dim * 4
break
}
}
case schemapb.DataType_BinaryVector:
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err := strconv.Atoi(t.Value)
if err != nil {
return nil, fmt.Errorf("strconv wrong on get dim, err = %s", err)
}
offset += dim / 8
break
}
}
}
}
log.Info(strconv.FormatInt(int64(offset), 10))
blobReaders := make([]io.Reader, len(msg.RowData))
for i, blob := range msg.RowData {
blobReaders[i] = bytes.NewReader(blob.GetValue()[offset : offset+8])
}
pks := make([]storage.PrimaryKey, len(blobReaders))
for i, reader := range blobReaders {
var int64PkValue int64
err := binary.Read(reader, common.Endian, &int64PkValue)
if err != nil {
log.Warn("binary read blob value failed", zap.Error(err))
return nil, err
}
pks[i] = storage.NewInt64PrimaryKey(int64PkValue)
}
return pks, nil
}
func getPKsFromColumnBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.CollectionSchema) ([]storage.PrimaryKey, error) {
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
return nil, err
}
primaryFieldData, err := typeutil.GetPrimaryFieldData(msg.GetFieldsData(), primaryFieldSchema)
if err != nil {
return nil, err
}
pks, err := storage.ParseFieldData2PrimaryKeys(primaryFieldData)
if err != nil {
return nil, err
}
return pks, nil
}
func fillBinVecFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
dim := fieldData.GetVectors().GetDim()
rowBytes := dim / 8
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
x := fieldData.GetVectors().GetData().(*schemapb.VectorField_BinaryVector)
resultLen := dim / 8
copy(x.BinaryVector[i*int(resultLen):(i+1)*int(resultLen)], content)
return nil
}
func fillFloatVecFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
dim := fieldData.GetVectors().GetDim()
rowBytes := dim * 4
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
x := fieldData.GetVectors().GetData().(*schemapb.VectorField_FloatVector)
floatResult := make([]float32, dim)
buf := bytes.NewReader(content)
if err = binary.Read(buf, endian, &floatResult); err != nil {
return err
}
resultLen := dim
copy(x.FloatVector.Data[i*int(resultLen):(i+1)*int(resultLen)], floatResult)
return nil
}
func fillBoolFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read whole file.
// TODO: optimize here.
content, err := vcm.Read(ctx, dataPath)
if err != nil {
return err
}
var arr schemapb.BoolArray
err = proto.Unmarshal(content, &arr)
if err != nil {
return err
}
fieldData.GetScalars().GetBoolData().GetData()[i] = arr.Data[offset]
return nil
}
func fillStringFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read whole file.
// TODO: optimize here.
content, err := vcm.Read(ctx, dataPath)
if err != nil {
return err
}
var arr schemapb.StringArray
err = proto.Unmarshal(content, &arr)
if err != nil {
return err
}
fieldData.GetScalars().GetStringData().GetData()[i] = arr.Data[offset]
return nil
}
func fillInt8FieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read by offset.
rowBytes := int64(1)
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
var i8 int8
if err := funcutil.ReadBinary(endian, content, &i8); err != nil {
return err
}
fieldData.GetScalars().GetIntData().GetData()[i] = int32(i8)
return nil
}
func fillInt16FieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read by offset.
rowBytes := int64(2)
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
var i16 int16
if err := funcutil.ReadBinary(endian, content, &i16); err != nil {
return err
}
fieldData.GetScalars().GetIntData().GetData()[i] = int32(i16)
return nil
}
func fillInt32FieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read by offset.
rowBytes := int64(4)
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
return funcutil.ReadBinary(endian, content, &(fieldData.GetScalars().GetIntData().GetData()[i]))
}
func fillInt64FieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read by offset.
rowBytes := int64(8)
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
return funcutil.ReadBinary(endian, content, &(fieldData.GetScalars().GetLongData().GetData()[i]))
}
func fillFloatFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read by offset.
rowBytes := int64(4)
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
return funcutil.ReadBinary(endian, content, &(fieldData.GetScalars().GetFloatData().GetData()[i]))
}
func fillDoubleFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
// read by offset.
rowBytes := int64(8)
content, err := vcm.ReadAt(ctx, dataPath, offset*rowBytes, rowBytes)
if err != nil {
return err
}
return funcutil.ReadBinary(endian, content, &(fieldData.GetScalars().GetDoubleData().GetData()[i]))
}
func fillFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error {
switch fieldData.Type {
case schemapb.DataType_BinaryVector:
return fillBinVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_FloatVector:
return fillFloatVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_Bool:
return fillBoolFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_String, schemapb.DataType_VarChar:
return fillStringFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_Int8:
return fillInt8FieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_Int16:
return fillInt16FieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_Int32:
return fillInt32FieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_Int64:
return fillInt64FieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_Float:
return fillFloatFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
case schemapb.DataType_Double:
return fillDoubleFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian)
default:
return fmt.Errorf("invalid data type: %s", fieldData.Type.String())
}
}