milvus/internal/storage/serde.go
2024-03-08 18:25:02 +08:00

415 lines
9.2 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"io"
"sort"
"strconv"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/metautil"
)
type Record interface {
Schema() map[FieldID]schemapb.DataType
Column(i FieldID) arrow.Array
Len() int
Release()
}
type RecordReader interface {
Next() error
Record() Record
Close()
}
// compositeRecord is a record being composed of multiple records, in which each only have 1 column
type compositeRecord struct {
recs map[FieldID]arrow.Record
schema map[FieldID]schemapb.DataType
}
func (r *compositeRecord) Column(i FieldID) arrow.Array {
return r.recs[i].Column(0)
}
func (r *compositeRecord) Len() int {
for _, rec := range r.recs {
return rec.Column(0).Len()
}
return 0
}
func (r *compositeRecord) Release() {
for _, rec := range r.recs {
rec.Release()
}
}
func (r *compositeRecord) Schema() map[FieldID]schemapb.DataType {
return r.schema
}
type compositeRecordReader struct {
RecordReader
blobs [][]*Blob
blobPos int
rrs []array.RecordReader
closers []func()
fields []FieldID
r compositeRecord
}
func (crr *compositeRecordReader) iterateNextBatch() error {
if crr.closers != nil {
for _, close := range crr.closers {
if close != nil {
close()
}
}
}
crr.blobPos++
if crr.blobPos >= len(crr.blobs[0]) {
return io.EOF
}
for i, b := range crr.blobs {
reader, err := NewBinlogReader(b[crr.blobPos].Value)
if err != nil {
return err
}
crr.fields[i] = reader.FieldID
// TODO: assert schema being the same in every blobs
crr.r.schema[reader.FieldID] = reader.PayloadDataType
er, err := reader.NextEventReader()
if err != nil {
return err
}
rr, err := er.GetArrowRecordReader()
if err != nil {
return err
}
crr.rrs[i] = rr
crr.closers[i] = func() {
rr.Release()
er.Close()
reader.Close()
}
}
return nil
}
func (crr *compositeRecordReader) Next() error {
if crr.rrs == nil {
if crr.blobs == nil || len(crr.blobs) == 0 {
return io.EOF
}
crr.rrs = make([]array.RecordReader, len(crr.blobs))
crr.closers = make([]func(), len(crr.blobs))
crr.blobPos = -1
crr.fields = make([]FieldID, len(crr.rrs))
crr.r = compositeRecord{
recs: make(map[FieldID]arrow.Record, len(crr.rrs)),
schema: make(map[FieldID]schemapb.DataType, len(crr.rrs)),
}
crr.iterateNextBatch()
}
composeRecord := func() bool {
for i, rr := range crr.rrs {
if ok := rr.Next(); !ok {
return false
}
// compose record
crr.r.recs[crr.fields[i]] = rr.Record()
}
return true
}
// Try compose records
if ok := composeRecord(); !ok {
// If failed the first time, try iterate next batch (blob), the error may be io.EOF
if err := crr.iterateNextBatch(); err != nil {
return err
}
// If iterate next batch success, try compose again
if ok := composeRecord(); !ok {
// If the next blob is empty, return io.EOF (it's rare).
return io.EOF
}
}
return nil
}
func (crr *compositeRecordReader) Record() Record {
return &crr.r
}
func (crr *compositeRecordReader) Close() {
for _, close := range crr.closers {
close()
}
}
func parseBlobKey(bolbKey string) (colId FieldID, logId UniqueID) {
if _, _, _, colId, logId, ok := metautil.ParseInsertLogPath(bolbKey); ok {
return colId, logId
}
if colId, err := strconv.ParseInt(bolbKey, 10, 64); err == nil {
// data_codec.go generate single field id as blob key.
return colId, 0
}
return -1, -1
}
func newCompositeRecordReader(blobs []*Blob) (*compositeRecordReader, error) {
sort.Slice(blobs, func(i, j int) bool {
iCol, iLog := parseBlobKey(blobs[i].Key)
jCol, jLog := parseBlobKey(blobs[j].Key)
if iCol == jCol {
return iLog < jLog
}
return iCol < jCol
})
blobm := make([][]*Blob, 0)
var fieldId FieldID = -1
var currentCol []*Blob
for _, blob := range blobs {
colId, _ := parseBlobKey(blob.Key)
if colId != fieldId {
if currentCol != nil {
blobm = append(blobm, currentCol)
}
currentCol = make([]*Blob, 0)
fieldId = colId
}
currentCol = append(currentCol, blob)
}
if currentCol != nil {
blobm = append(blobm, currentCol)
}
return &compositeRecordReader{
blobs: blobm,
}, nil
}
type DeserializeReader[T any] struct {
rr RecordReader
deserializer func(Record, []T) error
rec Record
values []T
pos int
}
// Iterate to next value, return error or EOF if no more value.
func (deser *DeserializeReader[T]) Next() error {
if deser.rec == nil || deser.pos >= deser.rec.Len()-1 {
if err := deser.rr.Next(); err != nil {
return err
}
deser.pos = 0
deser.rec = deser.rr.Record()
if deser.values == nil {
deser.values = make([]T, deser.rec.Len())
}
if err := deser.deserializer(deser.rec, deser.values); err != nil {
return err
}
} else {
deser.pos++
}
return nil
}
func (deser *DeserializeReader[T]) Value() T {
return deser.values[deser.pos]
}
func (deser *DeserializeReader[T]) Close() {
if deser.rec != nil {
deser.rec.Release()
}
if deser.rr != nil {
deser.rr.Close()
}
}
func NewDeserializeReader[T any](rr RecordReader, deserializer func(Record, []T) error) *DeserializeReader[T] {
return &DeserializeReader[T]{
rr: rr,
deserializer: deserializer,
}
}
func deserializeCell(col arrow.Array, dataType schemapb.DataType, i int) (interface{}, bool) {
switch dataType {
case schemapb.DataType_Bool:
arr, ok := col.(*array.Boolean)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_Int8:
arr, ok := col.(*array.Int8)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_Int16:
arr, ok := col.(*array.Int16)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_Int32:
arr, ok := col.(*array.Int32)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_Int64:
arr, ok := col.(*array.Int64)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_Float:
arr, ok := col.(*array.Float32)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_Double:
arr, ok := col.(*array.Float64)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_String, schemapb.DataType_VarChar:
arr, ok := col.(*array.String)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_Array:
arr, ok := col.(*array.Binary)
if !ok {
return nil, false
}
v := &schemapb.ScalarField{}
if err := proto.Unmarshal(arr.Value(i), v); err != nil {
return nil, false
}
return v, true
case schemapb.DataType_JSON:
arr, ok := col.(*array.Binary)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
arr, ok := col.(*array.FixedSizeBinary)
if !ok {
return nil, false
}
return arr.Value(i), true
case schemapb.DataType_FloatVector:
arr, ok := col.(*array.FixedSizeBinary)
if !ok {
return nil, false
}
return arrow.Float32Traits.CastFromBytes(arr.Value(i)), true
default:
panic(fmt.Sprintf("unsupported type %s", dataType))
}
}
func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error) {
reader, err := newCompositeRecordReader(blobs)
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*Value) error {
// Note: the return value `Value` is reused.
for i := 0; i < r.Len(); i++ {
value := v[i]
if value == nil {
value = &Value{}
m := make(map[FieldID]interface{}, len(r.Schema()))
value.Value = m
v[i] = value
}
m := value.Value.(map[FieldID]interface{})
for j, dt := range r.Schema() {
d, ok := deserializeCell(r.Column(j), dt, i)
if ok {
m[j] = d // TODO: avoid memory copy here.
} else {
return errors.New(fmt.Sprintf("unexpected type %s", dt))
}
}
if _, ok := m[common.RowIDField]; !ok {
panic("no row id column found")
}
value.ID = m[common.RowIDField].(int64)
value.Timestamp = m[common.TimeStampField].(int64)
pk, err := GenPrimaryKeyByRawData(m[PKfieldID], r.Schema()[PKfieldID])
if err != nil {
return err
}
value.PK = pk
value.IsDeleted = false
value.Value = m
}
return nil
}), nil
}