milvus/internal/storage/serde_events.go
shaoting-huang 88b373b024
enhance: binlog primary key turn off dict encoding (#34358)
issue: #34357 

Go Parquet uses dictionary encoding by default, and it will fall back to
plain encoding if the dictionary size exceeds the dictionary size page
limit. Users can specify custom fallback encoding by using
`parquet.WithEncoding(ENCODING_METHOD)` in writer properties. However,
Go Parquet [fallbacks to plain
encoding](e65c1e295d/go/parquet/file/column_writer_types.gen.go.tmpl (L238))
rather than custom encoding method users provide. Therefore, this patch
only turns off dictionary encoding for the primary key.

With a 5 million auto ID primary key benchmark, the parquet file size
improves from 13.93 MB to 8.36 MB when dictionary encoding is turned
off, reducing primary key storage space by 40%.

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
2024-07-17 17:47:44 +08:00

864 lines
22 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"sort"
"strconv"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ RecordReader = (*compositeBinlogRecordReader)(nil)
type compositeBinlogRecordReader struct {
blobs [][]*Blob
blobPos int
rrs []array.RecordReader
closers []func()
fields []FieldID
r compositeRecord
}
func (crr *compositeBinlogRecordReader) iterateNextBatch() error {
if crr.closers != nil {
for _, close := range crr.closers {
if close != nil {
close()
}
}
}
crr.blobPos++
if crr.blobPos >= len(crr.blobs[0]) {
return io.EOF
}
for i, b := range crr.blobs {
reader, err := NewBinlogReader(b[crr.blobPos].Value)
if err != nil {
return err
}
crr.fields[i] = reader.FieldID
// TODO: assert schema being the same in every blobs
crr.r.schema[reader.FieldID] = reader.PayloadDataType
er, err := reader.NextEventReader()
if err != nil {
return err
}
rr, err := er.GetArrowRecordReader()
if err != nil {
return err
}
crr.rrs[i] = rr
crr.closers[i] = func() {
rr.Release()
er.Close()
reader.Close()
}
}
return nil
}
func (crr *compositeBinlogRecordReader) Next() error {
if crr.rrs == nil {
if crr.blobs == nil || len(crr.blobs) == 0 {
return io.EOF
}
crr.rrs = make([]array.RecordReader, len(crr.blobs))
crr.closers = make([]func(), len(crr.blobs))
crr.blobPos = -1
crr.fields = make([]FieldID, len(crr.rrs))
crr.r = compositeRecord{
recs: make(map[FieldID]arrow.Record, len(crr.rrs)),
schema: make(map[FieldID]schemapb.DataType, len(crr.rrs)),
}
if err := crr.iterateNextBatch(); err != nil {
return err
}
}
composeRecord := func() bool {
for i, rr := range crr.rrs {
if ok := rr.Next(); !ok {
return false
}
// compose record
crr.r.recs[crr.fields[i]] = rr.Record()
}
return true
}
// Try compose records
if ok := composeRecord(); !ok {
// If failed the first time, try iterate next batch (blob), the error may be io.EOF
if err := crr.iterateNextBatch(); err != nil {
return err
}
// If iterate next batch success, try compose again
if ok := composeRecord(); !ok {
// If the next blob is empty, return io.EOF (it's rare).
return io.EOF
}
}
return nil
}
func (crr *compositeBinlogRecordReader) Record() Record {
return &crr.r
}
func (crr *compositeBinlogRecordReader) Close() {
for _, close := range crr.closers {
if close != nil {
close()
}
}
}
func parseBlobKey(blobKey string) (colId FieldID, logId UniqueID) {
if _, _, _, colId, logId, ok := metautil.ParseInsertLogPath(blobKey); ok {
return colId, logId
}
if colId, err := strconv.ParseInt(blobKey, 10, 64); err == nil {
// data_codec.go generate single field id as blob key.
return colId, 0
}
return InvalidUniqueID, InvalidUniqueID
}
func newCompositeBinlogRecordReader(blobs []*Blob) (*compositeBinlogRecordReader, error) {
blobMap := make(map[FieldID][]*Blob)
for _, blob := range blobs {
colId, _ := parseBlobKey(blob.Key)
if _, exists := blobMap[colId]; !exists {
blobMap[colId] = []*Blob{blob}
} else {
blobMap[colId] = append(blobMap[colId], blob)
}
}
sortedBlobs := make([][]*Blob, 0, len(blobMap))
for _, blobsForField := range blobMap {
sort.Slice(blobsForField, func(i, j int) bool {
_, iLog := parseBlobKey(blobsForField[i].Key)
_, jLog := parseBlobKey(blobsForField[j].Key)
return iLog < jLog
})
sortedBlobs = append(sortedBlobs, blobsForField)
}
return &compositeBinlogRecordReader{
blobs: sortedBlobs,
}, nil
}
func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error) {
reader, err := newCompositeBinlogRecordReader(blobs)
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*Value) error {
// Note: the return value `Value` is reused.
for i := 0; i < r.Len(); i++ {
value := v[i]
if value == nil {
value = &Value{}
value.Value = make(map[FieldID]interface{}, len(r.Schema()))
v[i] = value
}
m := value.Value.(map[FieldID]interface{})
for j, dt := range r.Schema() {
if r.Column(j).IsNull(i) {
m[j] = nil
} else {
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
if ok {
m[j] = d // TODO: avoid memory copy here.
} else {
return merr.WrapErrServiceInternal(fmt.Sprintf("unexpected type %s", dt))
}
}
}
rowID, ok := m[common.RowIDField].(int64)
if !ok {
return merr.WrapErrIoKeyNotFound("no row id column found")
}
value.ID = rowID
value.Timestamp = m[common.TimeStampField].(int64)
pk, err := GenPrimaryKeyByRawData(m[PKfieldID], r.Schema()[PKfieldID])
if err != nil {
return err
}
value.PK = pk
value.IsDeleted = false
value.Value = m
}
return nil
}), nil
}
func NewDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
reader, err := newCompositeBinlogRecordReader(blobs)
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
var fid FieldID // The only fid from delete file
for k := range r.Schema() {
fid = k
break
}
for i := 0; i < r.Len(); i++ {
if v[i] == nil {
v[i] = &DeleteLog{}
}
a := r.Column(fid).(*array.String)
strVal := a.Value(i)
if err := v[i].Parse(strVal); err != nil {
return err
}
}
return nil
}), nil
}
type BinlogStreamWriter struct {
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldSchema *schemapb.FieldSchema
memorySize int // To be updated on the fly
buf bytes.Buffer
rw *singleFieldRecordWriter
}
func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) {
if bsw.rw != nil {
return bsw.rw, nil
}
fid := bsw.fieldSchema.FieldID
dim, _ := typeutil.GetDim(bsw.fieldSchema)
rw, err := newSingleFieldRecordWriter(fid, arrow.Field{
Name: strconv.Itoa(int(fid)),
Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)),
Nullable: true, // No nullable check here.
}, &bsw.buf, WithRecordWriterProps(getFieldWriterProps(bsw.fieldSchema)))
if err != nil {
return nil, err
}
bsw.rw = rw
return rw, nil
}
func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) {
if bsw.rw == nil {
return nil, io.ErrUnexpectedEOF
}
bsw.rw.Close()
var b bytes.Buffer
if err := bsw.writeBinlogHeaders(&b); err != nil {
return nil, err
}
if _, err := b.Write(bsw.buf.Bytes()); err != nil {
return nil, err
}
return &Blob{
Key: strconv.Itoa(int(bsw.fieldSchema.FieldID)),
Value: b.Bytes(),
RowNum: int64(bsw.rw.numRows),
MemorySize: int64(bsw.memorySize),
}, nil
}
func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error {
// Write magic number
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
return err
}
// Write descriptor
de := NewBaseDescriptorEvent(bsw.collectionID, bsw.partitionID, bsw.segmentID)
de.PayloadDataType = bsw.fieldSchema.DataType
de.FieldID = bsw.fieldSchema.FieldID
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize))
if err := de.Write(w); err != nil {
return err
}
// Write event header
eh := newEventHeader(InsertEventType)
// Write event data
ev := newInsertEventData()
ev.StartTimestamp = 1
ev.EndTimestamp = 1
eh.EventLength = int32(bsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
// eh.NextPosition = eh.EventLength + w.Offset()
if err := eh.Write(w); err != nil {
return err
}
if err := ev.WriteEventData(w); err != nil {
return err
}
return nil
}
func NewBinlogStreamWriters(collectionID, partitionID, segmentID UniqueID,
schema []*schemapb.FieldSchema,
) map[FieldID]*BinlogStreamWriter {
bws := make(map[FieldID]*BinlogStreamWriter, len(schema))
for _, f := range schema {
bws[f.FieldID] = &BinlogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: f,
}
}
return bws
}
func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID,
eventWriters map[FieldID]*BinlogStreamWriter, batchSize int,
) (*SerializeWriter[*Value], error) {
rws := make(map[FieldID]RecordWriter, len(eventWriters))
for fid := range eventWriters {
w := eventWriters[fid]
rw, err := w.GetRecordWriter()
if err != nil {
return nil, err
}
rws[fid] = rw
}
compositeRecordWriter := newCompositeRecordWriter(rws)
return NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, uint64, error) {
builders := make(map[FieldID]array.Builder, len(schema.Fields))
types := make(map[FieldID]schemapb.DataType, len(schema.Fields))
for _, f := range schema.Fields {
dim, _ := typeutil.GetDim(f)
builders[f.FieldID] = array.NewBuilder(memory.DefaultAllocator, serdeMap[f.DataType].arrowType(int(dim)))
types[f.FieldID] = f.DataType
}
var memorySize uint64
for _, vv := range v {
m := vv.Value.(map[FieldID]any)
for fid, e := range m {
typeEntry, ok := serdeMap[types[fid]]
if !ok {
panic("unknown type")
}
ok = typeEntry.serialize(builders[fid], e)
if !ok {
return nil, 0, merr.WrapErrServiceInternal(fmt.Sprintf("serialize error on type %s", types[fid]))
}
eventWriters[fid].memorySize += int(typeEntry.sizeof(e))
memorySize += typeEntry.sizeof(e)
}
}
arrays := make([]arrow.Array, len(types))
fields := make([]arrow.Field, len(types))
field2Col := make(map[FieldID]int, len(types))
i := 0
for fid, builder := range builders {
arrays[i] = builder.NewArray()
builder.Release()
fields[i] = arrow.Field{
Name: strconv.Itoa(int(fid)),
Type: arrays[i].DataType(),
Nullable: true, // No nullable check here.
}
field2Col[fid] = i
i++
}
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), types, field2Col), memorySize, nil
}, batchSize), nil
}
type DeltalogStreamWriter struct {
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldSchema *schemapb.FieldSchema
memorySize int // To be updated on the fly
buf bytes.Buffer
rw *singleFieldRecordWriter
}
func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
if dsw.rw != nil {
return dsw.rw, nil
}
dim, _ := typeutil.GetDim(dsw.fieldSchema)
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{
Name: dsw.fieldSchema.Name,
Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)),
Nullable: false,
}, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema)))
if err != nil {
return nil, err
}
dsw.rw = rw
return rw, nil
}
func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {
if dsw.rw == nil {
return nil, io.ErrUnexpectedEOF
}
dsw.rw.Close()
var b bytes.Buffer
if err := dsw.writeDeltalogHeaders(&b); err != nil {
return nil, err
}
if _, err := b.Write(dsw.buf.Bytes()); err != nil {
return nil, err
}
return &Blob{
Value: b.Bytes(),
RowNum: int64(dsw.rw.numRows),
MemorySize: int64(dsw.memorySize),
}, nil
}
func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
// Write magic number
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
return err
}
// Write descriptor
de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID)
de.PayloadDataType = dsw.fieldSchema.DataType
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize))
if err := de.Write(w); err != nil {
return err
}
// Write event header
eh := newEventHeader(DeleteEventType)
// Write event data
ev := newDeleteEventData()
ev.StartTimestamp = 1
ev.EndTimestamp = 1
eh.EventLength = int32(dsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
// eh.NextPosition = eh.EventLength + w.Offset()
if err := eh.Write(w); err != nil {
return err
}
if err := ev.WriteEventData(w); err != nil {
return err
}
return nil
}
func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *DeltalogStreamWriter {
return &DeltalogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{
FieldID: common.RowIDField,
Name: "delta",
DataType: schemapb.DataType_String,
},
}
}
func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *DeltalogStreamWriter, batchSize int,
) (*SerializeWriter[*DeleteLog], error) {
rws := make(map[FieldID]RecordWriter, 1)
rw, err := eventWriter.GetRecordWriter()
if err != nil {
return nil, err
}
rws[0] = rw
compositeRecordWriter := newCompositeRecordWriter(rws)
return NewSerializeRecordWriter[*DeleteLog](compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) {
builder := array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String)
var memorySize uint64
for _, vv := range v {
strVal, err := json.Marshal(vv)
if err != nil {
return nil, memorySize, err
}
builder.AppendValueFromString(string(strVal))
memorySize += uint64(len(strVal))
}
arr := []arrow.Array{builder.NewArray()}
field := []arrow.Field{{
Name: "delta",
Type: arrow.BinaryTypes.String,
Nullable: false,
}}
field2Col := map[FieldID]int{
0: 0,
}
schema := map[FieldID]schemapb.DataType{
0: schemapb.DataType_String,
}
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), memorySize, nil
}, batchSize), nil
}
var _ RecordReader = (*simpleArrowRecordReader)(nil)
type simpleArrowRecordReader struct {
blobs []*Blob
blobPos int
rr array.RecordReader
closer func()
r simpleArrowRecord
}
func (crr *simpleArrowRecordReader) iterateNextBatch() error {
if crr.closer != nil {
crr.closer()
}
crr.blobPos++
if crr.blobPos >= len(crr.blobs) {
return io.EOF
}
reader, err := NewBinlogReader(crr.blobs[crr.blobPos].Value)
if err != nil {
return err
}
er, err := reader.NextEventReader()
if err != nil {
return err
}
rr, err := er.GetArrowRecordReader()
if err != nil {
return err
}
crr.rr = rr
crr.closer = func() {
crr.rr.Release()
er.Close()
reader.Close()
}
return nil
}
func (crr *simpleArrowRecordReader) Next() error {
if crr.rr == nil {
if crr.blobs == nil || len(crr.blobs) == 0 {
return io.EOF
}
crr.blobPos = -1
crr.r = simpleArrowRecord{
schema: make(map[FieldID]schemapb.DataType),
field2Col: make(map[FieldID]int),
}
if err := crr.iterateNextBatch(); err != nil {
return err
}
}
composeRecord := func() bool {
if ok := crr.rr.Next(); !ok {
return false
}
record := crr.rr.Record()
for i := range record.Schema().Fields() {
crr.r.field2Col[FieldID(i)] = i
}
crr.r.r = record
return true
}
if ok := composeRecord(); !ok {
if err := crr.iterateNextBatch(); err != nil {
return err
}
if ok := composeRecord(); !ok {
return io.EOF
}
}
return nil
}
func (crr *simpleArrowRecordReader) Record() Record {
return &crr.r
}
func (crr *simpleArrowRecordReader) Close() {
if crr.closer != nil {
crr.closer()
}
}
func newSimpleArrowRecordReader(blobs []*Blob) (*simpleArrowRecordReader, error) {
return &simpleArrowRecordReader{
blobs: blobs,
}, nil
}
func NewMultiFieldDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID, schema []*schemapb.FieldSchema) *MultiFieldDeltalogStreamWriter {
return &MultiFieldDeltalogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchemas: schema,
}
}
type MultiFieldDeltalogStreamWriter struct {
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldSchemas []*schemapb.FieldSchema
memorySize int // To be updated on the fly
buf bytes.Buffer
rw *multiFieldRecordWriter
}
func (dsw *MultiFieldDeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
if dsw.rw != nil {
return dsw.rw, nil
}
fieldIds := make([]FieldID, len(dsw.fieldSchemas))
fields := make([]arrow.Field, len(dsw.fieldSchemas))
for i, fieldSchema := range dsw.fieldSchemas {
fieldIds[i] = fieldSchema.FieldID
dim, _ := typeutil.GetDim(fieldSchema)
fields[i] = arrow.Field{
Name: fieldSchema.Name,
Type: serdeMap[fieldSchema.DataType].arrowType(int(dim)),
Nullable: false, // No nullable check here.
}
}
rw, err := newMultiFieldRecordWriter(fieldIds, fields, &dsw.buf)
if err != nil {
return nil, err
}
dsw.rw = rw
return rw, nil
}
func (dsw *MultiFieldDeltalogStreamWriter) Finalize() (*Blob, error) {
if dsw.rw == nil {
return nil, io.ErrUnexpectedEOF
}
dsw.rw.Close()
var b bytes.Buffer
if err := dsw.writeDeltalogHeaders(&b); err != nil {
return nil, err
}
if _, err := b.Write(dsw.buf.Bytes()); err != nil {
return nil, err
}
return &Blob{
Value: b.Bytes(),
RowNum: int64(dsw.rw.numRows),
MemorySize: int64(dsw.memorySize),
}, nil
}
func (dsw *MultiFieldDeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
// Write magic number
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
return err
}
// Write descriptor
de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID)
de.PayloadDataType = schemapb.DataType_Int64
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize))
de.descriptorEventData.AddExtra(version, MultiField)
if err := de.Write(w); err != nil {
return err
}
// Write event header
eh := newEventHeader(DeleteEventType)
// Write event data
ev := newDeleteEventData()
ev.StartTimestamp = 1
ev.EndTimestamp = 1
eh.EventLength = int32(dsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
// eh.NextPosition = eh.EventLength + w.Offset()
if err := eh.Write(w); err != nil {
return err
}
if err := ev.WriteEventData(w); err != nil {
return err
}
return nil
}
func NewDeltalogMultiFieldWriter(partitionID, segmentID UniqueID, eventWriter *MultiFieldDeltalogStreamWriter, batchSize int,
) (*SerializeWriter[*DeleteLog], error) {
rw, err := eventWriter.GetRecordWriter()
if err != nil {
return nil, err
}
return NewSerializeRecordWriter[*DeleteLog](rw, func(v []*DeleteLog) (Record, uint64, error) {
fields := []arrow.Field{
{
Name: "pk",
Type: serdeMap[schemapb.DataType(v[0].PkType)].arrowType(0),
Nullable: false,
},
{
Name: "ts",
Type: arrow.PrimitiveTypes.Int64,
Nullable: false,
},
}
arrowSchema := arrow.NewSchema(fields, nil)
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
defer builder.Release()
var memorySize uint64
pkType := schemapb.DataType(v[0].PkType)
switch pkType {
case schemapb.DataType_Int64:
pb := builder.Field(0).(*array.Int64Builder)
for _, vv := range v {
pk := vv.Pk.GetValue().(int64)
pb.Append(pk)
memorySize += uint64(pk)
}
case schemapb.DataType_VarChar:
pb := builder.Field(0).(*array.StringBuilder)
for _, vv := range v {
pk := vv.Pk.GetValue().(string)
pb.Append(pk)
memorySize += uint64(binary.Size(pk))
}
default:
return nil, 0, fmt.Errorf("unexpected pk type %v", v[0].PkType)
}
for _, vv := range v {
builder.Field(1).(*array.Int64Builder).Append(int64(vv.Ts))
memorySize += vv.Ts
}
arr := []arrow.Array{builder.Field(0).NewArray(), builder.Field(1).NewArray()}
field2Col := map[FieldID]int{
common.RowIDField: 0,
common.TimeStampField: 1,
}
schema := map[FieldID]schemapb.DataType{
common.RowIDField: pkType,
common.TimeStampField: schemapb.DataType_Int64,
}
return newSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), schema, field2Col), memorySize, nil
}, batchSize), nil
}
func NewDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
reader, err := newSimpleArrowRecordReader(blobs)
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
rec, ok := r.(*simpleArrowRecord)
if !ok {
return fmt.Errorf("can not cast to simple arrow record")
}
fields := rec.r.Schema().Fields()
switch fields[0].Type.ID() {
case arrow.INT64:
arr := r.Column(0).(*array.Int64)
for j := 0; j < r.Len(); j++ {
if v[j] == nil {
v[j] = &DeleteLog{}
}
v[j].Pk = NewInt64PrimaryKey(arr.Value(j))
}
case arrow.STRING:
arr := r.Column(0).(*array.String)
for j := 0; j < r.Len(); j++ {
if v[j] == nil {
v[j] = &DeleteLog{}
}
v[j].Pk = NewVarCharPrimaryKey(arr.Value(j))
}
default:
return fmt.Errorf("unexpected delta log pkType %v", fields[0].Type.Name())
}
arr := r.Column(1).(*array.Int64)
for j := 0; j < r.Len(); j++ {
v[j].Ts = uint64(arr.Value(j))
}
return nil
}), nil
}
// NewDeltalogDeserializeReader is the entry point for the delta log reader.
// It includes NewDeltalogOneFieldReader, which uses the existing log format with only one column in a log file,
// and NewDeltalogMultiFieldReader, which uses the new format and supports multiple fields in a log file.
func NewDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
if supportMultiFieldFormat(blobs) {
return NewDeltalogMultiFieldReader(blobs)
}
return NewDeltalogOneFieldReader(blobs)
}
// check delta log description data to see if it is the format with
// pk and ts column separately
func supportMultiFieldFormat(blobs []*Blob) bool {
if len(blobs) > 0 {
reader, err := NewBinlogReader(blobs[0].Value)
defer reader.Close()
if err != nil {
return false
}
version := reader.descriptorEventData.Extras[version]
return version != nil && version.(string) == MultiField
}
return false
}