milvus/internal/flushcommon/writebuffer/write_buffer.go
yihao.dai 8aab6cbfac
enhance: Organize the common modules of streamingNode and dataNode (#34773)
1. Move the common modules of streamingNode and dataNode to flushcommon
2. Add new GetVChannels interface for rootcoord

issue: https://github.com/milvus-io/milvus/issues/33285

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2024-07-22 11:33:51 +08:00

692 lines
21 KiB
Go

package writebuffer
import (
"context"
"fmt"
"sync"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
nonFlushTS uint64 = 0
)
// WriteBuffer is the interface for channel write buffer.
// It provides abstraction for channel write buffer and pk bloom filter & L0 delta logic.
type WriteBuffer interface {
// HasSegment checks whether certain segment exists in this buffer.
HasSegment(segmentID int64) bool
// BufferData is the method to buffer dml data msgs.
BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
// FlushTimestamp set flush timestamp for write buffer
SetFlushTimestamp(flushTs uint64)
// GetFlushTimestamp get current flush timestamp
GetFlushTimestamp() uint64
// SealSegments is the method to perform `Sync` operation with provided options.
SealSegments(ctx context.Context, segmentIDs []int64) error
// DropPartitions mark segments as Dropped of the partition
DropPartitions(partitionIDs []int64)
// GetCheckpoint returns current channel checkpoint.
// If there are any non-empty segment buffer, returns the earliest buffer start position.
// Otherwise, returns latest buffered checkpoint.
GetCheckpoint() *msgpb.MsgPosition
// MemorySize returns the size in bytes currently used by this write buffer.
MemorySize() int64
// EvictBuffer evicts buffer to sync manager which match provided sync policies.
EvictBuffer(policies ...SyncPolicy)
// Close is the method to close and sink current buffer data.
Close(ctx context.Context, drop bool)
}
type checkpointCandidate struct {
segmentID int64
position *msgpb.MsgPosition
source string
}
type checkpointCandidates struct {
candidates map[string]*checkpointCandidate
mu sync.RWMutex
}
func newCheckpointCandiates() *checkpointCandidates {
return &checkpointCandidates{
candidates: make(map[string]*checkpointCandidate),
}
}
func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp))
}
func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) {
c.mu.Lock()
defer c.mu.Unlock()
c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source}
}
func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate {
c.mu.RLock()
defer c.mu.RUnlock()
var result *checkpointCandidate = def
for _, candidate := range c.candidates {
if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() {
result = candidate
}
}
return result
}
func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
option := defaultWBOption(metacache)
for _, opt := range opts {
opt(option)
}
switch option.deletePolicy {
case DeletePolicyBFPkOracle:
return NewBFWriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
case DeletePolicyL0Delta:
return NewL0WriteBuffer(channel, metacache, storageV2Cache, syncMgr, option)
default:
return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy)
}
}
// writeBufferBase is the common component for buffering data
type writeBufferBase struct {
mut sync.RWMutex
collectionID int64
channelName string
metaWriter syncmgr.MetaWriter
collSchema *schemapb.CollectionSchema
helper *typeutil.SchemaHelper
pkField *schemapb.FieldSchema
estSizePerRecord int
metaCache metacache.MetaCache
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer
syncPolicies []SyncPolicy
syncCheckpoint *checkpointCandidates
syncMgr syncmgr.SyncManager
serializer syncmgr.Serializer
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64
storagev2Cache *metacache.StorageV2Cache
// pre build logger
logger *log.MLogger
cpRatedLogger *log.MLogger
}
func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (*writeBufferBase, error) {
flushTs := atomic.NewUint64(nonFlushTS)
flushTsPolicy := GetFlushTsPolicy(flushTs, metacache)
option.syncPolicies = append(option.syncPolicies, flushTsPolicy)
var serializer syncmgr.Serializer
var err error
if params.Params.CommonCfg.EnableStorageV2.GetAsBool() {
serializer, err = syncmgr.NewStorageV2Serializer(
storageV2Cache,
option.idAllocator,
metacache,
option.metaWriter,
)
} else {
serializer, err = syncmgr.NewStorageSerializer(
option.idAllocator,
metacache,
option.metaWriter,
)
}
if err != nil {
return nil, err
}
schema := metacache.Schema()
estSize, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
return nil, err
}
helper, err := typeutil.CreateSchemaHelper(schema)
if err != nil {
return nil, err
}
pkField, err := helper.GetPrimaryKeyField()
if err != nil {
return nil, err
}
wb := &writeBufferBase{
channelName: channel,
collectionID: metacache.Collection(),
collSchema: schema,
helper: helper,
pkField: pkField,
estSizePerRecord: estSize,
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncCheckpoint: newCheckpointCandiates(),
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
storagev2Cache: storageV2Cache,
}
wb.logger = log.With(zap.Int64("collectionID", wb.collectionID),
zap.String("channel", wb.channelName))
wb.cpRatedLogger = wb.logger.WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60)
return wb, nil
}
func (wb *writeBufferBase) HasSegment(segmentID int64) bool {
wb.mut.RLock()
defer wb.mut.RUnlock()
_, ok := wb.buffers[segmentID]
return ok
}
func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64) error {
wb.mut.RLock()
defer wb.mut.RUnlock()
return wb.sealSegments(ctx, segmentIDs)
}
func (wb *writeBufferBase) DropPartitions(partitionIDs []int64) {
wb.mut.RLock()
defer wb.mut.RUnlock()
wb.dropPartitions(partitionIDs)
}
func (wb *writeBufferBase) SetFlushTimestamp(flushTs uint64) {
wb.flushTimestamp.Store(flushTs)
}
func (wb *writeBufferBase) GetFlushTimestamp() uint64 {
return wb.flushTimestamp.Load()
}
func (wb *writeBufferBase) MemorySize() int64 {
wb.mut.RLock()
defer wb.mut.RUnlock()
var size int64
for _, segBuf := range wb.buffers {
size += segBuf.MemorySize()
}
return size
}
func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) {
log := wb.logger
wb.mut.Lock()
defer wb.mut.Unlock()
// need valid checkpoint before triggering syncing
if wb.checkpoint == nil {
log.Warn("evict buffer before buffering data")
return
}
ts := wb.checkpoint.GetTimestamp()
segmentIDs := wb.getSegmentsToSync(ts, policies...)
if len(segmentIDs) > 0 {
log.Info("evict buffer find segments to sync", zap.Int64s("segmentIDs", segmentIDs))
conc.AwaitAll(wb.syncSegments(context.Background(), segmentIDs)...)
}
}
func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
log := wb.cpRatedLogger
wb.mut.RLock()
defer wb.mut.RUnlock()
candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate {
return &checkpointCandidate{buf.segmentID, buf.EarliestPosition(), "segment buffer"}
})
candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool {
return candidate.position != nil
})
checkpoint := wb.syncCheckpoint.GetEarliestWithDefault(lo.MinBy(candidates, func(a, b *checkpointCandidate) bool {
return a.position.GetTimestamp() < b.position.GetTimestamp()
}))
if checkpoint == nil {
// all buffer are empty
log.RatedDebug(60, "checkpoint from latest consumed msg", zap.Uint64("cpTimestamp", wb.checkpoint.GetTimestamp()))
return wb.checkpoint
}
log.RatedDebug(20, "checkpoint evaluated",
zap.String("cpSource", checkpoint.source),
zap.Int64("segmentID", checkpoint.segmentID),
zap.Uint64("cpTimestamp", checkpoint.position.GetTimestamp()))
return checkpoint.position
}
func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp(), wb.syncPolicies...)
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
// ignore future here, use callback to handle error
wb.syncSegments(context.Background(), segmentsToSync)
}
return segmentsToSync
}
func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) error {
for _, segmentID := range segmentIDs {
_, ok := wb.metaCache.GetSegmentByID(segmentID)
if !ok {
log.Warn("cannot find segment when sealSegments", zap.Int64("segmentID", segmentID), zap.String("channel", wb.channelName))
return merr.WrapErrSegmentNotFound(segmentID)
}
}
// mark segment flushing if segment was growing
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Sealed),
metacache.WithSegmentIDs(segmentIDs...),
metacache.WithSegmentState(commonpb.SegmentState_Growing))
return nil
}
func (wb *writeBufferBase) dropPartitions(partitionIDs []int64) {
// mark segment dropped if partition was dropped
segIDs := wb.metaCache.GetSegmentIDsBy(metacache.WithPartitionIDs(partitionIDs))
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Dropped),
metacache.WithSegmentIDs(segIDs...),
)
}
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] {
log := log.Ctx(ctx)
result := make([]*conc.Future[struct{}], 0, len(segmentIDs))
for _, segmentID := range segmentIDs {
syncTask, err := wb.getSyncTask(ctx, segmentID)
if err != nil {
if errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("segment not found in meta", zap.Int64("segmentID", segmentID))
continue
} else {
log.Fatal("failed to get sync task", zap.Int64("segmentID", segmentID), zap.Error(err))
}
}
result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if err != nil {
return err
}
if syncTask.StartPosition() != nil {
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}
return nil
}))
}
return result
}
// getSegmentsToSync applies all policies to get segments list to sync.
// **NOTE** shall be invoked within mutex protection
func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp, policies ...SyncPolicy) []int64 {
buffers := lo.Values(wb.buffers)
segments := typeutil.NewSet[int64]()
for _, policy := range policies {
result := policy.SelectSegments(buffers, ts)
if len(result) > 0 {
log.Info("SyncPolicy selects segments", zap.Int64s("segmentIDs", result), zap.String("reason", policy.Reason()))
segments.Insert(result...)
}
}
return segments.Collect()
}
func (wb *writeBufferBase) getOrCreateBuffer(segmentID int64) *segmentBuffer {
buffer, ok := wb.buffers[segmentID]
if !ok {
var err error
buffer, err = newSegmentBuffer(segmentID, wb.collSchema)
if err != nil {
// TODO avoid panic here
panic(err)
}
wb.buffers[segmentID] = buffer
}
return buffer
}
func (wb *writeBufferBase) yieldBuffer(segmentID int64) ([]*storage.InsertData, *storage.DeleteData, *TimeRange, *msgpb.MsgPosition) {
buffer, ok := wb.buffers[segmentID]
if !ok {
return nil, nil, nil, nil
}
// remove buffer and move it to sync manager
delete(wb.buffers, segmentID)
start := buffer.EarliestPosition()
timeRange := buffer.GetTimeRange()
insert, delta := buffer.Yield()
return insert, delta, timeRange, start
}
type inData struct {
segmentID int64
partitionID int64
data []*storage.InsertData
pkField []storage.FieldData
tsField []*storage.Int64FieldData
rowNum int64
intPKTs map[int64]int64
strPKTs map[string]int64
}
func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool {
var ok bool
var minTs int64
switch pk.Type() {
case schemapb.DataType_Int64:
minTs, ok = id.intPKTs[pk.GetValue().(int64)]
case schemapb.DataType_VarChar:
minTs, ok = id.strPKTs[pk.GetValue().(string)]
}
return ok && ts > uint64(minTs)
}
func (id *inData) batchPkExists(pks []storage.PrimaryKey, tss []uint64, hits []bool) []bool {
if len(pks) == 0 {
return nil
}
pkType := pks[0].Type()
switch pkType {
case schemapb.DataType_Int64:
for i := range pks {
if !hits[i] {
minTs, ok := id.intPKTs[pks[i].GetValue().(int64)]
hits[i] = ok && tss[i] > uint64(minTs)
}
}
case schemapb.DataType_VarChar:
for i := range pks {
if !hits[i] {
minTs, ok := id.strPKTs[pks[i].GetValue().(string)]
hits[i] = ok && tss[i] > uint64(minTs)
}
}
}
return hits
}
// prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID
// also returns primary key field data
func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*inData, error) {
groups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.SegmentID })
segmentPartition := lo.SliceToMap(insertMsgs, func(msg *msgstream.InsertMsg) (int64, int64) { return msg.GetSegmentID(), msg.GetPartitionID() })
result := make([]*inData, 0, len(groups))
for segment, msgs := range groups {
inData := &inData{
segmentID: segment,
partitionID: segmentPartition[segment],
data: make([]*storage.InsertData, 0, len(msgs)),
pkField: make([]storage.FieldData, 0, len(msgs)),
}
switch wb.pkField.GetDataType() {
case schemapb.DataType_Int64:
inData.intPKTs = make(map[int64]int64)
case schemapb.DataType_VarChar:
inData.strPKTs = make(map[string]int64)
}
for _, msg := range msgs {
data, err := storage.InsertMsgToInsertData(msg, wb.collSchema)
if err != nil {
log.Warn("failed to transfer insert msg to insert data", zap.Error(err))
return nil, err
}
pkFieldData, err := storage.GetPkFromInsertData(wb.collSchema, data)
if err != nil {
return nil, err
}
if pkFieldData.RowNum() != data.GetRowNum() {
return nil, merr.WrapErrServiceInternal("pk column row num not match")
}
tsFieldData, err := storage.GetTimestampFromInsertData(data)
if err != nil {
return nil, err
}
if tsFieldData.RowNum() != data.GetRowNum() {
return nil, merr.WrapErrServiceInternal("timestamp column row num not match")
}
timestamps := tsFieldData.GetRows().([]int64)
switch wb.pkField.GetDataType() {
case schemapb.DataType_Int64:
pks := pkFieldData.GetRows().([]int64)
for idx, pk := range pks {
ts, ok := inData.intPKTs[pk]
if !ok || timestamps[idx] < ts {
inData.intPKTs[pk] = timestamps[idx]
}
}
case schemapb.DataType_VarChar:
pks := pkFieldData.GetRows().([]string)
for idx, pk := range pks {
ts, ok := inData.strPKTs[pk]
if !ok || timestamps[idx] < ts {
inData.strPKTs[pk] = timestamps[idx]
}
}
}
inData.data = append(inData.data, data)
inData.pkField = append(inData.pkField, pkFieldData)
inData.tsField = append(inData.tsField, tsFieldData)
inData.rowNum += int64(data.GetRowNum())
}
result = append(result, inData)
}
return result, nil
}
// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(inData *inData, startPos, endPos *msgpb.MsgPosition) error {
_, ok := wb.metaCache.GetSegmentByID(inData.segmentID)
// new segment
if !ok {
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: inData.segmentID,
PartitionID: inData.partitionID,
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
}, metacache.SetStartPosRecorded(false))
log.Info("add growing segment", zap.Int64("segmentID", inData.segmentID), zap.String("channel", wb.channelName))
}
segBuf := wb.getOrCreateBuffer(inData.segmentID)
totalMemSize := segBuf.insertBuffer.Buffer(inData, startPos, endPos)
wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.WithSegmentIDs(inData.segmentID))
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize))
return nil
}
// bufferDelete buffers DeleteMsg into DeleteData.
func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) {
segBuf := wb.getOrCreateBuffer(segmentID)
bufSize := segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos)
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(bufSize))
}
func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segmentID),
)
segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID) // wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
if !ok {
log.Warn("segment info not found in meta cache", zap.Int64("segmentID", segmentID))
return nil, merr.WrapErrSegmentNotFound(segmentID)
}
var batchSize int64
var totalMemSize float64 = 0
var tsFrom, tsTo uint64
insert, delta, timeRange, startPos := wb.yieldBuffer(segmentID)
if timeRange != nil {
tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax
}
if startPos != nil {
wb.syncCheckpoint.Add(segmentID, startPos, "syncing task")
}
actions := []metacache.SegmentAction{}
for _, chunk := range insert {
batchSize += int64(chunk.GetRowNum())
totalMemSize += float64(chunk.GetMemorySize())
}
if delta != nil {
totalMemSize += float64(delta.Size())
}
actions = append(actions, metacache.StartSyncing(batchSize))
wb.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(segmentID))
pack := &syncmgr.SyncPack{}
pack.WithInsertData(insert).
WithDeleteData(delta).
WithCollectionID(wb.collectionID).
WithPartitionID(segmentInfo.PartitionID()).
WithChannelName(wb.channelName).
WithSegmentID(segmentID).
WithStartPosition(startPos).
WithTimeRange(tsFrom, tsTo).
WithLevel(segmentInfo.Level()).
WithCheckpoint(wb.checkpoint).
WithBatchSize(batchSize)
if segmentInfo.State() == commonpb.SegmentState_Flushing ||
segmentInfo.Level() == datapb.SegmentLevel_L0 { // Level zero segment will always be sync as flushed
pack.WithFlush()
}
if segmentInfo.State() == commonpb.SegmentState_Dropped {
pack.WithDrop()
}
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize)
return wb.serializer.EncodeBuffer(ctx, pack)
}
// getEstBatchSize returns the batch size based on estimated size per record and FlushBufferSize configuration value.
func (wb *writeBufferBase) getEstBatchSize() uint {
sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
return uint(sizeLimit / int64(wb.estSizePerRecord))
}
func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
log := wb.logger
// sink all data and call Drop for meta writer
wb.mut.Lock()
defer wb.mut.Unlock()
if !drop {
return
}
var futures []*conc.Future[struct{}]
for id := range wb.buffers {
syncTask, err := wb.getSyncTask(ctx, id)
if err != nil {
// TODO
continue
}
switch t := syncTask.(type) {
case *syncmgr.SyncTask:
t.WithDrop()
case *syncmgr.SyncTaskV2:
t.WithDrop()
}
f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if err != nil {
return err
}
if syncTask.StartPosition() != nil {
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}
return nil
})
futures = append(futures, f)
}
err := conc.AwaitAll(futures...)
if err != nil {
log.Error("failed to sink write buffer data", zap.Error(err))
// TODO change to remove channel in the future
panic(err)
}
err = wb.metaWriter.DropChannel(ctx, wb.channelName)
if err != nil {
log.Error("failed to drop channel", zap.Error(err))
// TODO change to remove channel in the future
panic(err)
}
}