mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
issue: #30633 master pr: #34313 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
3e0034bea2
commit
bc1746f96c
@ -571,11 +571,12 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
|
||||
}
|
||||
case datapb.CompactionType_ClusteringCompaction:
|
||||
task = &clusteringCompactionTask{
|
||||
CompactionTask: t,
|
||||
meta: c.meta,
|
||||
sessions: c.sessions,
|
||||
handler: c.handler,
|
||||
analyzeScheduler: c.analyzeScheduler,
|
||||
CompactionTask: t,
|
||||
meta: c.meta,
|
||||
sessions: c.sessions,
|
||||
handler: c.handler,
|
||||
analyzeScheduler: c.analyzeScheduler,
|
||||
lastUpdateStateTime: time.Now().UnixMilli(),
|
||||
}
|
||||
default:
|
||||
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
|
||||
|
@ -74,10 +74,12 @@ type clusteringCompactionTask struct {
|
||||
// flush
|
||||
flushMutex sync.Mutex
|
||||
flushCount *atomic.Int64
|
||||
flushChan chan SpillSignal
|
||||
flushChan chan FlushSignal
|
||||
doneChan chan struct{}
|
||||
|
||||
// metrics
|
||||
// metrics, don't use
|
||||
writtenRowNum *atomic.Int64
|
||||
hasSignal *atomic.Bool
|
||||
|
||||
// inner field
|
||||
collectionID int64
|
||||
@ -100,8 +102,10 @@ type clusteringCompactionTask struct {
|
||||
type ClusterBuffer struct {
|
||||
id int
|
||||
|
||||
writer *SegmentWriter
|
||||
bufferRowNum atomic.Int64
|
||||
writer *SegmentWriter
|
||||
flushLock lock.RWMutex
|
||||
|
||||
bufferMemorySize atomic.Int64
|
||||
|
||||
flushedRowNum atomic.Int64
|
||||
flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog
|
||||
@ -112,8 +116,11 @@ type ClusterBuffer struct {
|
||||
clusteringKeyFieldStats *storage.FieldStats
|
||||
}
|
||||
|
||||
type SpillSignal struct {
|
||||
buffer *ClusterBuffer
|
||||
type FlushSignal struct {
|
||||
writer *SegmentWriter
|
||||
pack bool
|
||||
id int
|
||||
done bool
|
||||
}
|
||||
|
||||
func NewClusteringCompactionTask(
|
||||
@ -131,11 +138,13 @@ func NewClusteringCompactionTask(
|
||||
plan: plan,
|
||||
tr: timerecord.NewTimeRecorder("clustering_compaction"),
|
||||
done: make(chan struct{}, 1),
|
||||
flushChan: make(chan SpillSignal, 100),
|
||||
flushChan: make(chan FlushSignal, 100),
|
||||
doneChan: make(chan struct{}),
|
||||
clusterBuffers: make([]*ClusterBuffer, 0),
|
||||
clusterBufferLocks: lock.NewKeyLock[int](),
|
||||
flushCount: atomic.NewInt64(0),
|
||||
writtenRowNum: atomic.NewInt64(0),
|
||||
hasSignal: atomic.NewBool(false),
|
||||
}
|
||||
}
|
||||
|
||||
@ -262,6 +271,8 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getScalarAnalyzeResult-%d", t.GetPlanID()))
|
||||
defer span.End()
|
||||
analyzeDict, err := t.scalarAnalyze(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -297,6 +308,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID()))
|
||||
defer span.End()
|
||||
analyzeResultPath := t.plan.AnalyzeResultPath
|
||||
centroidFilePath := path.Join(analyzeResultPath, metautil.JoinIDPath(t.collectionID, t.partitionID, t.clusteringKeyField.FieldID), common.Centroids)
|
||||
offsetMappingFiles := make(map[int64]string, 0)
|
||||
@ -345,6 +358,8 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
|
||||
func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
||||
deltaPk2Ts map[interface{}]typeutil.Timestamp,
|
||||
) ([]*datapb.CompactionSegment, *storage.PartitionStatsSnapshot, error) {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mapping-%d", t.GetPlanID()))
|
||||
defer span.End()
|
||||
inputSegments := t.plan.GetSegmentBinlogs()
|
||||
mapStart := time.Now()
|
||||
|
||||
@ -368,6 +383,13 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
t.flushChan <- FlushSignal{
|
||||
done: true,
|
||||
}
|
||||
|
||||
// block util all writer flushed.
|
||||
<-t.doneChan
|
||||
|
||||
// force flush all buffers
|
||||
err := t.flushAll(ctx)
|
||||
if err != nil {
|
||||
@ -408,7 +430,15 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
||||
return resultSegments, resultPartitionStats, nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getUsedMemoryBufferSize() int64 {
|
||||
func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 {
|
||||
var totalBufferSize int64 = 0
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + buffer.bufferMemorySize.Load()
|
||||
}
|
||||
return totalBufferSize
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getCurrentBufferWrittenMemorySize() int64 {
|
||||
var totalBufferSize int64 = 0
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize())
|
||||
@ -422,7 +452,7 @@ func (t *clusteringCompactionTask) mappingSegment(
|
||||
segment *datapb.CompactionSegmentBinlogs,
|
||||
delta map[interface{}]typeutil.Timestamp,
|
||||
) error {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("Compact-Map-%d", t.GetPlanID()))
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mappingSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID()))
|
||||
defer span.End()
|
||||
log := log.With(zap.Int64("planID", t.GetPlanID()),
|
||||
zap.Int64("collectionID", t.GetCollection()),
|
||||
@ -543,20 +573,39 @@ func (t *clusteringCompactionTask) mappingSegment(
|
||||
remained++
|
||||
|
||||
if (remained+1)%100 == 0 {
|
||||
currentBufferSize := t.getUsedMemoryBufferSize()
|
||||
currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()
|
||||
currentBufferWrittenMemorySize := t.getCurrentBufferWrittenMemorySize()
|
||||
log.Debug("current buffer size", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize),
|
||||
zap.Int64("currentBufferWrittenMemorySize", currentBufferWrittenMemorySize))
|
||||
|
||||
// trigger flushBinlog
|
||||
if clusterBuffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
|
||||
currentBufferNum := clusterBuffer.writer.GetRowNum()
|
||||
if clusterBuffer.flushedRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() ||
|
||||
clusterBuffer.writer.IsFull() {
|
||||
// reach segment/binlog max size
|
||||
t.flushChan <- SpillSignal{
|
||||
buffer: clusterBuffer,
|
||||
t.clusterBufferLocks.Lock(clusterBuffer.id)
|
||||
writer := clusterBuffer.writer
|
||||
pack, _ := t.refreshBufferWriter(clusterBuffer)
|
||||
log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id),
|
||||
zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum))
|
||||
t.clusterBufferLocks.Unlock(clusterBuffer.id)
|
||||
|
||||
t.flushChan <- FlushSignal{
|
||||
writer: writer,
|
||||
pack: pack,
|
||||
id: clusterBuffer.id,
|
||||
}
|
||||
} else if currentBufferSize >= t.getMemoryBufferHighWatermark() {
|
||||
} else if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() && !t.hasSignal.Load() {
|
||||
// reach flushBinlog trigger threshold
|
||||
t.flushChan <- SpillSignal{}
|
||||
log.Debug("largest buffer need to flush", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize))
|
||||
t.flushChan <- FlushSignal{}
|
||||
t.hasSignal.Store(true)
|
||||
}
|
||||
|
||||
// if the total buffer size is too large, block here, wait for memory release by flushBinlog
|
||||
if currentBufferSize > t.getMemoryBufferBlockSpillThreshold() {
|
||||
if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() {
|
||||
log.Debug("memory is already above the block watermark, pause writing",
|
||||
zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize))
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
@ -567,8 +616,11 @@ func (t *clusteringCompactionTask) mappingSegment(
|
||||
log.Warn("stop waiting for memory buffer release as task chan done")
|
||||
return nil
|
||||
default:
|
||||
currentSize := t.getUsedMemoryBufferSize()
|
||||
if currentSize < t.getMemoryBufferLowWatermark() {
|
||||
//currentSize := t.getCurrentBufferWrittenMemorySize()
|
||||
currentSize := t.getBufferTotalUsedMemorySize()
|
||||
if currentSize < t.getMemoryBufferBlockFlushThreshold() {
|
||||
log.Debug("memory is already below the block watermark, continue writing",
|
||||
zap.Int64("currentSize", currentSize))
|
||||
break loop
|
||||
}
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
@ -593,17 +645,14 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf
|
||||
defer t.clusterBufferLocks.Unlock(clusterBuffer.id)
|
||||
// prepare
|
||||
if clusterBuffer.writer == nil {
|
||||
err := t.refreshBufferWriter(clusterBuffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Warn("unexpected behavior, please check", zap.Int("buffer id", clusterBuffer.id))
|
||||
return fmt.Errorf("unexpected behavior, please check buffer id: %d", clusterBuffer.id)
|
||||
}
|
||||
err := clusterBuffer.writer.Write(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.writtenRowNum.Inc()
|
||||
clusterBuffer.bufferRowNum.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -624,7 +673,7 @@ func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
|
||||
return int64(float64(t.memoryBufferSize) * 0.9)
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) getMemoryBufferBlockSpillThreshold() int64 {
|
||||
func (t *clusteringCompactionTask) getMemoryBufferBlockFlushThreshold() int64 {
|
||||
return t.memoryBufferSize
|
||||
}
|
||||
|
||||
@ -639,14 +688,20 @@ func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) {
|
||||
return
|
||||
case signal := <-t.flushChan:
|
||||
var err error
|
||||
if signal.buffer == nil {
|
||||
if signal.done {
|
||||
t.doneChan <- struct{}{}
|
||||
} else if signal.writer == nil {
|
||||
err = t.flushLargestBuffers(ctx)
|
||||
t.hasSignal.Store(false)
|
||||
} else {
|
||||
err = func() error {
|
||||
t.clusterBufferLocks.Lock(signal.buffer.id)
|
||||
defer t.clusterBufferLocks.Unlock(signal.buffer.id)
|
||||
return t.flushBinlog(ctx, signal.buffer)
|
||||
}()
|
||||
future := t.flushPool.Submit(func() (any, error) {
|
||||
err := t.flushBinlog(ctx, t.clusterBuffers[signal.id], signal.writer, signal.pack)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return struct{}{}, nil
|
||||
})
|
||||
err = conc.AwaitAll(future)
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("fail to flushBinlog data", zap.Error(err))
|
||||
@ -663,28 +718,56 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro
|
||||
return nil
|
||||
}
|
||||
defer t.flushMutex.Unlock()
|
||||
currentMemorySize := t.getBufferTotalUsedMemorySize()
|
||||
if currentMemorySize <= t.getMemoryBufferLowWatermark() {
|
||||
log.Info("memory low water mark", zap.Int64("memoryBufferSize", t.getBufferTotalUsedMemorySize()))
|
||||
return nil
|
||||
}
|
||||
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers")
|
||||
defer span.End()
|
||||
bufferIDs := make([]int, 0)
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
bufferIDs = append(bufferIDs, buffer.id)
|
||||
}
|
||||
sort.Slice(bufferIDs, func(i, j int) bool {
|
||||
return t.clusterBuffers[i].bufferRowNum.Load() > t.clusterBuffers[j].bufferRowNum.Load()
|
||||
return t.clusterBuffers[i].writer.GetRowNum() >
|
||||
t.clusterBuffers[j].writer.GetRowNum()
|
||||
})
|
||||
log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs))
|
||||
log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize))
|
||||
|
||||
futures := make([]*conc.Future[any], 0)
|
||||
for _, bufferId := range bufferIDs {
|
||||
err := func() error {
|
||||
t.clusterBufferLocks.Lock(bufferId)
|
||||
defer t.clusterBufferLocks.Unlock(bufferId)
|
||||
return t.flushBinlog(ctx, t.clusterBuffers[bufferId])
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if t.getUsedMemoryBufferSize() <= t.getMemoryBufferLowWatermark() {
|
||||
log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize()))
|
||||
t.clusterBufferLocks.Lock(bufferId)
|
||||
buffer := t.clusterBuffers[bufferId]
|
||||
writer := buffer.writer
|
||||
currentMemorySize -= int64(writer.WrittenMemorySize())
|
||||
pack, _ := t.refreshBufferWriter(buffer)
|
||||
t.clusterBufferLocks.Unlock(bufferId)
|
||||
|
||||
log.Info("currentMemorySize after flush buffer binlog",
|
||||
zap.Int64("currentMemorySize", currentMemorySize),
|
||||
zap.Int("bufferID", bufferId),
|
||||
zap.Uint64("WrittenMemorySize()", writer.WrittenMemorySize()),
|
||||
zap.Int64("RowNum", writer.GetRowNum()))
|
||||
future := t.flushPool.Submit(func() (any, error) {
|
||||
err := t.flushBinlog(ctx, buffer, writer, pack)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return struct{}{}, nil
|
||||
})
|
||||
futures = append(futures, future)
|
||||
|
||||
if currentMemorySize <= t.getMemoryBufferLowWatermark() {
|
||||
log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getBufferTotalUsedMemorySize()))
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := conc.AwaitAll(futures...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("flushLargestBuffers end", zap.Int64("currentMemorySize", currentMemorySize))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -692,26 +775,26 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error {
|
||||
// only one flushLargestBuffers or flushAll should do at the same time
|
||||
t.flushMutex.Lock()
|
||||
defer t.flushMutex.Unlock()
|
||||
futures := make([]*conc.Future[any], 0)
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
err := func() error {
|
||||
t.clusterBufferLocks.Lock(buffer.id)
|
||||
defer t.clusterBufferLocks.Unlock(buffer.id)
|
||||
err := t.flushBinlog(ctx, buffer)
|
||||
buffer := buffer
|
||||
future := t.flushPool.Submit(func() (any, error) {
|
||||
err := t.flushBinlog(ctx, buffer, buffer.writer, true)
|
||||
if err != nil {
|
||||
log.Error("flushBinlog fail")
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
err = t.packBufferToSegment(ctx, buffer)
|
||||
return err
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return struct{}{}, nil
|
||||
})
|
||||
futures = append(futures, future)
|
||||
}
|
||||
if err := conc.AwaitAll(futures...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer) error {
|
||||
func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error {
|
||||
if len(buffer.flushedBinlogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -719,7 +802,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
|
||||
for _, fieldBinlog := range buffer.flushedBinlogs {
|
||||
insertLogs = append(insertLogs, fieldBinlog)
|
||||
}
|
||||
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum.Load())
|
||||
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer, buffer.flushedRowNum.Load())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -727,7 +810,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
|
||||
// pack current flushBinlog data into a segment
|
||||
seg := &datapb.CompactionSegment{
|
||||
PlanID: t.plan.GetPlanID(),
|
||||
SegmentID: buffer.writer.GetSegmentID(),
|
||||
SegmentID: writer.GetSegmentID(),
|
||||
NumOfRows: buffer.flushedRowNum.Load(),
|
||||
InsertLogs: insertLogs,
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
|
||||
@ -738,37 +821,55 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
|
||||
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
|
||||
NumRows: int(buffer.flushedRowNum.Load()),
|
||||
}
|
||||
buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats
|
||||
// refresh
|
||||
t.refreshBufferWriter(buffer)
|
||||
buffer.flushedRowNum.Store(0)
|
||||
buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats
|
||||
|
||||
buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0)
|
||||
for _, binlog := range seg.InsertLogs {
|
||||
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("binlog", binlog.String()))
|
||||
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String()))
|
||||
}
|
||||
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.Any("segStats", segmentStats))
|
||||
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID),
|
||||
zap.Int64("segID", seg.GetSegmentID()),
|
||||
zap.Int64("row num", seg.GetNumOfRows()))
|
||||
|
||||
// set old writer nil
|
||||
writer = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer) error {
|
||||
log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()), zap.Int64("segmentID", buffer.writer.GetSegmentID()))
|
||||
if buffer.writer.IsEmpty() {
|
||||
func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter, pack bool) error {
|
||||
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", writer.GetSegmentID()))
|
||||
defer span.End()
|
||||
if writer == nil {
|
||||
log.Warn("buffer writer is nil, please check", zap.Int("buffer id", buffer.id))
|
||||
return fmt.Errorf("buffer: %d writer is nil, please check", buffer.id)
|
||||
}
|
||||
buffer.flushLock.Lock()
|
||||
defer buffer.flushLock.Unlock()
|
||||
writtenMemorySize := int64(writer.WrittenMemorySize())
|
||||
writtenRowNum := writer.GetRowNum()
|
||||
log := log.With(zap.Int("bufferID", buffer.id),
|
||||
zap.Int64("segmentID", writer.GetSegmentID()),
|
||||
zap.Bool("pack", pack),
|
||||
zap.Int64("writerRowNum", writtenRowNum),
|
||||
zap.Int64("writtenMemorySize", writtenMemorySize),
|
||||
zap.Int64("bufferMemorySize", buffer.bufferMemorySize.Load()),
|
||||
)
|
||||
|
||||
log.Info("start flush binlog")
|
||||
if writtenRowNum <= 0 {
|
||||
log.Debug("writerRowNum is zero, skip flush")
|
||||
if pack {
|
||||
return t.packBufferToSegment(ctx, buffer, writer)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
future := t.flushPool.Submit(func() (any, error) {
|
||||
kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to serialize writer", zap.Error(err))
|
||||
return typeutil.NewPair(kvs, partialBinlogs), err
|
||||
}
|
||||
return typeutil.NewPair(kvs, partialBinlogs), nil
|
||||
})
|
||||
if err := conc.AwaitAll(future); err != nil {
|
||||
start := time.Now()
|
||||
kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, writer)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to serialize writer", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
kvs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).A
|
||||
partialBinlogs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).B
|
||||
|
||||
if err := t.binlogIO.Upload(ctx, kvs); err != nil {
|
||||
log.Warn("compact wrong, failed to upload kvs", zap.Error(err))
|
||||
@ -784,18 +885,19 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
|
||||
}
|
||||
buffer.flushedBinlogs[fID] = tmpBinlog
|
||||
}
|
||||
buffer.flushedRowNum.Add(buffer.bufferRowNum.Load())
|
||||
buffer.flushedRowNum.Add(writtenRowNum)
|
||||
|
||||
// clean buffer
|
||||
buffer.bufferRowNum.Store(0)
|
||||
// clean buffer with writer
|
||||
buffer.bufferMemorySize.Sub(writtenMemorySize)
|
||||
|
||||
t.flushCount.Inc()
|
||||
log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()))
|
||||
if buffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() {
|
||||
if err := t.packBufferToSegment(ctx, buffer); err != nil {
|
||||
if pack {
|
||||
if err := t.packBufferToSegment(ctx, buffer, writer); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()),
|
||||
zap.Int64("cost", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -825,6 +927,8 @@ func (t *clusteringCompactionTask) cleanUp(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[interface{}]int64, error) {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyze-%d", t.GetPlanID()))
|
||||
defer span.End()
|
||||
inputSegments := t.plan.GetSegmentBinlogs()
|
||||
futures := make([]*conc.Future[any], 0, len(inputSegments))
|
||||
analyzeStart := time.Now()
|
||||
@ -871,6 +975,8 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
|
||||
ctx context.Context,
|
||||
segment *datapb.CompactionSegmentBinlogs,
|
||||
) (map[interface{}]int64, error) {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyzeSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID()))
|
||||
defer span.End()
|
||||
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("segmentID", segment.GetSegmentID()))
|
||||
|
||||
// vars
|
||||
@ -1015,16 +1121,27 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in
|
||||
return buckets
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error {
|
||||
segmentID, err := t.allocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (bool, error) {
|
||||
var segmentID int64
|
||||
var err error
|
||||
var pack bool
|
||||
if buffer.writer != nil {
|
||||
segmentID = buffer.writer.GetSegmentID()
|
||||
buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize()))
|
||||
}
|
||||
if buffer.writer == nil || buffer.flushedRowNum.Load()+buffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() {
|
||||
pack = true
|
||||
segmentID, err = t.allocator.AllocOne()
|
||||
if err != nil {
|
||||
return pack, err
|
||||
}
|
||||
}
|
||||
|
||||
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
return pack, err
|
||||
}
|
||||
|
||||
buffer.writer = writer
|
||||
buffer.bufferRowNum.Store(0)
|
||||
return nil
|
||||
return pack, nil
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package io
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.opentelemetry.io/otel"
|
||||
@ -55,6 +56,7 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte,
|
||||
var val []byte
|
||||
var err error
|
||||
|
||||
start := time.Now()
|
||||
log.Debug("BinlogIO download", zap.String("path", path))
|
||||
err = retry.Do(ctx, func() error {
|
||||
val, err = b.Read(ctx, path)
|
||||
@ -64,6 +66,9 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte,
|
||||
return err
|
||||
})
|
||||
|
||||
log.Debug("BinlogIO download success", zap.String("path", path), zap.Int64("cost", time.Since(start).Milliseconds()),
|
||||
zap.Error(err))
|
||||
|
||||
return val, err
|
||||
})
|
||||
futures = append(futures, future)
|
||||
@ -88,6 +93,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error
|
||||
innerK, innerV := k, v
|
||||
future := b.pool.Submit(func() (any, error) {
|
||||
var err error
|
||||
start := time.Now()
|
||||
log.Debug("BinlogIO upload", zap.String("paths", innerK))
|
||||
err = retry.Do(ctx, func() error {
|
||||
err = b.Write(ctx, innerK, innerV)
|
||||
@ -96,6 +102,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error
|
||||
}
|
||||
return err
|
||||
})
|
||||
log.Debug("BinlogIO upload success", zap.String("paths", innerK), zap.Int64("cost", time.Since(start).Milliseconds()), zap.Error(err))
|
||||
return struct{}{}, err
|
||||
})
|
||||
futures = append(futures, future)
|
||||
|
@ -4063,8 +4063,8 @@ if this parameter <= 0, will set it as 10`,
|
||||
p.SlotCap.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionMemoryBufferRatio = ParamItem{
|
||||
Key: "datanode.clusteringCompaction.memoryBufferRatio",
|
||||
Version: "2.4.2",
|
||||
Key: "dataNode.clusteringCompaction.memoryBufferRatio",
|
||||
Version: "2.4.6",
|
||||
Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.",
|
||||
DefaultValue: "0.1",
|
||||
PanicIfEmpty: false,
|
||||
@ -4073,8 +4073,8 @@ if this parameter <= 0, will set it as 10`,
|
||||
p.ClusteringCompactionMemoryBufferRatio.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionWorkerPoolSize = ParamItem{
|
||||
Key: "datanode.clusteringCompaction.cpu",
|
||||
Version: "2.4.2",
|
||||
Key: "dataNode.clusteringCompaction.workPoolSize",
|
||||
Version: "2.4.6",
|
||||
Doc: "worker pool size for one clustering compaction job.",
|
||||
DefaultValue: "1",
|
||||
PanicIfEmpty: false,
|
||||
|
Loading…
Reference in New Issue
Block a user