milvus/internal/datanode/compaction/segment_writer.go
XuanYang-cn 22bddde5ff
enhance: Tidy compactor and remove dup codes (#32198)
See also: #32451

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2024-05-23 09:53:40 +08:00

166 lines
4.2 KiB
Go

// SegmentInsertBuffer can be reused to buffer all insert data of one segment
// buffer.Serialize will serialize the InsertBuffer and clear it
// pkstats keeps tracking pkstats of the segment until Finish
package compaction
import (
"fmt"
"math"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/writebuffer"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SegmentWriter struct {
writer *storage.SerializeWriter[*storage.Value]
closers []func() (*storage.Blob, error)
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
pkstats *storage.PrimaryKeyStats
segmentID int64
partitionID int64
collectionID int64
sch *schemapb.CollectionSchema
rowCount *atomic.Int64
}
func (w *SegmentWriter) GetRowNum() int64 {
return w.rowCount.Load()
}
func (w *SegmentWriter) GetCollectionID() int64 {
return w.collectionID
}
func (w *SegmentWriter) GetPartitionID() int64 {
return w.partitionID
}
func (w *SegmentWriter) GetSegmentID() int64 {
return w.segmentID
}
func (w *SegmentWriter) GetPkID() int64 {
return w.pkstats.FieldID
}
func (w *SegmentWriter) Write(v *storage.Value) error {
ts := typeutil.Timestamp(v.Timestamp)
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}
w.pkstats.Update(v.PK)
w.rowCount.Inc()
return w.writer.Write(v)
}
func (w *SegmentWriter) Finish(actualRowCount int64) (*storage.Blob, error) {
w.writer.Flush()
codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: w.collectionID, Schema: w.sch})
return codec.SerializePkStats(w.pkstats, actualRowCount)
}
func (w *SegmentWriter) IsFull() bool {
w.writer.Flush()
return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
}
func (w *SegmentWriter) IsEmpty() bool {
w.writer.Flush()
return w.writer.WrittenMemorySize() == 0
}
func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange {
return writebuffer.NewTimeRange(w.tsFrom, w.tsTo)
}
func (w *SegmentWriter) SerializeYield() ([]*storage.Blob, *writebuffer.TimeRange, error) {
w.writer.Flush()
w.writer.Close()
fieldData := make([]*storage.Blob, len(w.closers))
for i, f := range w.closers {
blob, err := f()
if err != nil {
return nil, nil, err
}
fieldData[i] = blob
}
tr := w.GetTimeRange()
w.clear()
return fieldData, tr, nil
}
func (w *SegmentWriter) clear() {
writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch)
w.writer = writer
w.closers = closers
w.tsFrom = math.MaxUint64
w.tsTo = 0
}
func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, partID, collID int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch)
if err != nil {
return nil, err
}
var pkField *schemapb.FieldSchema
for _, fs := range sch.GetFields() {
if fs.GetIsPrimaryKey() && fs.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(fs.GetDataType()) {
pkField = fs
}
}
if pkField == nil {
log.Warn("failed to get pk field from schema")
return nil, fmt.Errorf("no pk field in schema")
}
stats, err := storage.NewPrimaryKeyStats(pkField.GetFieldID(), int64(pkField.GetDataType()), maxCount)
if err != nil {
return nil, err
}
segWriter := SegmentWriter{
writer: writer,
closers: closers,
tsFrom: math.MaxUint64,
tsTo: 0,
pkstats: stats,
sch: sch,
segmentID: segID,
partitionID: partID,
collectionID: collID,
rowCount: atomic.NewInt64(0),
}
return &segWriter, nil
}
func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*storage.Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields)
closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}
writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, 1024)
return
}