milvus/internal/datanode/compaction/segment_writer.go
wayblink 380d3f4469
fix: Fix memory buffer error & some renaming (#33850)
#30633

---------

Signed-off-by: wayblink <anyang.wang@zilliz.com>
2024-06-21 17:30:01 +08:00

250 lines
6.2 KiB
Go

// SegmentInsertBuffer can be reused to buffer all insert data of one segment
// buffer.Serialize will serialize the InsertBuffer and clear it
// pkstats keeps tracking pkstats of the segment until Finish
package compaction
import (
"fmt"
"math"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/writebuffer"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter {
return &SegmentDeltaWriter{
deleteData: &storage.DeleteData{},
segmentID: segmentID,
partitionID: partitionID,
collectionID: collectionID,
tsFrom: math.MaxUint64,
tsTo: 0,
}
}
type SegmentDeltaWriter struct {
deleteData *storage.DeleteData
segmentID int64
partitionID int64
collectionID int64
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
}
func (w *SegmentDeltaWriter) GetCollectionID() int64 {
return w.collectionID
}
func (w *SegmentDeltaWriter) GetPartitionID() int64 {
return w.partitionID
}
func (w *SegmentDeltaWriter) GetSegmentID() int64 {
return w.segmentID
}
func (w *SegmentDeltaWriter) GetRowNum() int64 {
return w.deleteData.RowCount
}
func (w *SegmentDeltaWriter) GetTimeRange() *writebuffer.TimeRange {
return writebuffer.NewTimeRange(w.tsFrom, w.tsTo)
}
func (w *SegmentDeltaWriter) updateRange(ts typeutil.Timestamp) {
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}
}
func (w *SegmentDeltaWriter) Write(pk storage.PrimaryKey, ts typeutil.Timestamp) {
w.deleteData.Append(pk, ts)
w.updateRange(ts)
}
func (w *SegmentDeltaWriter) WriteBatch(pks []storage.PrimaryKey, tss []typeutil.Timestamp) {
w.deleteData.AppendBatch(pks, tss)
for _, ts := range tss {
w.updateRange(ts)
}
}
func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, error) {
blob, err := storage.NewDeleteCodec().Serialize(w.collectionID, w.partitionID, w.segmentID, w.deleteData)
if err != nil {
return nil, nil, err
}
return blob, w.GetTimeRange(), nil
}
type SegmentWriter struct {
writer *storage.SerializeWriter[*storage.Value]
closers []func() (*storage.Blob, error)
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
pkstats *storage.PrimaryKeyStats
segmentID int64
partitionID int64
collectionID int64
sch *schemapb.CollectionSchema
rowCount *atomic.Int64
}
func (w *SegmentWriter) GetRowNum() int64 {
return w.rowCount.Load()
}
func (w *SegmentWriter) GetCollectionID() int64 {
return w.collectionID
}
func (w *SegmentWriter) GetPartitionID() int64 {
return w.partitionID
}
func (w *SegmentWriter) GetSegmentID() int64 {
return w.segmentID
}
func (w *SegmentWriter) GetPkID() int64 {
return w.pkstats.FieldID
}
func (w *SegmentWriter) WrittenMemorySize() uint64 {
return w.writer.WrittenMemorySize()
}
func (w *SegmentWriter) Write(v *storage.Value) error {
ts := typeutil.Timestamp(v.Timestamp)
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}
w.pkstats.Update(v.PK)
w.rowCount.Inc()
return w.writer.Write(v)
}
func (w *SegmentWriter) Finish(actualRowCount int64) (*storage.Blob, error) {
w.writer.Flush()
codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: w.collectionID, Schema: w.sch})
return codec.SerializePkStats(w.pkstats, actualRowCount)
}
func (w *SegmentWriter) IsFull() bool {
return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
}
func (w *SegmentWriter) FlushAndIsFull() bool {
w.writer.Flush()
return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
}
func (w *SegmentWriter) IsEmpty() bool {
return w.writer.WrittenMemorySize() == 0
}
func (w *SegmentWriter) FlushAndIsEmpty() bool {
w.writer.Flush()
return w.writer.WrittenMemorySize() == 0
}
func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange {
return writebuffer.NewTimeRange(w.tsFrom, w.tsTo)
}
func (w *SegmentWriter) SerializeYield() ([]*storage.Blob, *writebuffer.TimeRange, error) {
w.writer.Flush()
w.writer.Close()
fieldData := make([]*storage.Blob, len(w.closers))
for i, f := range w.closers {
blob, err := f()
if err != nil {
return nil, nil, err
}
fieldData[i] = blob
}
tr := w.GetTimeRange()
w.clear()
return fieldData, tr, nil
}
func (w *SegmentWriter) clear() {
writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch)
w.writer = writer
w.closers = closers
w.tsFrom = math.MaxUint64
w.tsTo = 0
}
func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, partID, collID int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch)
if err != nil {
return nil, err
}
var pkField *schemapb.FieldSchema
for _, fs := range sch.GetFields() {
if fs.GetIsPrimaryKey() && fs.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(fs.GetDataType()) {
pkField = fs
}
}
if pkField == nil {
log.Warn("failed to get pk field from schema")
return nil, fmt.Errorf("no pk field in schema")
}
stats, err := storage.NewPrimaryKeyStats(pkField.GetFieldID(), int64(pkField.GetDataType()), maxCount)
if err != nil {
return nil, err
}
segWriter := SegmentWriter{
writer: writer,
closers: closers,
tsFrom: math.MaxUint64,
tsTo: 0,
pkstats: stats,
sch: sch,
segmentID: segID,
partitionID: partID,
collectionID: collID,
rowCount: atomic.NewInt64(0),
}
return &segWriter, nil
}
func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*storage.Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields)
closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}
writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, 1024)
return
}