From bc1746f96cb3530f7b433cbc9b047d9643236070 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 4 Jul 2024 09:52:09 +0800 Subject: [PATCH] enhance: [cherry-pick] Optimize clustering compaction (#34313) (#34398) issue: #30633 master pr: #34313 Signed-off-by: Cai Zhang --- internal/datacoord/compaction.go | 11 +- .../compaction/clustering_compactor.go | 297 ++++++++++++------ internal/datanode/io/binlog_io.go | 7 + pkg/util/paramtable/component_param.go | 8 +- 4 files changed, 224 insertions(+), 99 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 4044e90bd2..fe730caf52 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -571,11 +571,12 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com } case datapb.CompactionType_ClusteringCompaction: task = &clusteringCompactionTask{ - CompactionTask: t, - meta: c.meta, - sessions: c.sessions, - handler: c.handler, - analyzeScheduler: c.analyzeScheduler, + CompactionTask: t, + meta: c.meta, + sessions: c.sessions, + handler: c.handler, + analyzeScheduler: c.analyzeScheduler, + lastUpdateStateTime: time.Now().UnixMilli(), } default: return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type") diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 70fdaa5ee3..2a1d7ae7fa 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -74,10 +74,12 @@ type clusteringCompactionTask struct { // flush flushMutex sync.Mutex flushCount *atomic.Int64 - flushChan chan SpillSignal + flushChan chan FlushSignal + doneChan chan struct{} - // metrics + // metrics, don't use writtenRowNum *atomic.Int64 + hasSignal *atomic.Bool // inner field collectionID int64 @@ -100,8 +102,10 @@ type clusteringCompactionTask struct { type ClusterBuffer struct { id int - writer *SegmentWriter - bufferRowNum atomic.Int64 + writer *SegmentWriter + flushLock lock.RWMutex + + bufferMemorySize atomic.Int64 flushedRowNum atomic.Int64 flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog @@ -112,8 +116,11 @@ type ClusterBuffer struct { clusteringKeyFieldStats *storage.FieldStats } -type SpillSignal struct { - buffer *ClusterBuffer +type FlushSignal struct { + writer *SegmentWriter + pack bool + id int + done bool } func NewClusteringCompactionTask( @@ -131,11 +138,13 @@ func NewClusteringCompactionTask( plan: plan, tr: timerecord.NewTimeRecorder("clustering_compaction"), done: make(chan struct{}, 1), - flushChan: make(chan SpillSignal, 100), + flushChan: make(chan FlushSignal, 100), + doneChan: make(chan struct{}), clusterBuffers: make([]*ClusterBuffer, 0), clusterBufferLocks: lock.NewKeyLock[int](), flushCount: atomic.NewInt64(0), writtenRowNum: atomic.NewInt64(0), + hasSignal: atomic.NewBool(false), } } @@ -262,6 +271,8 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro } func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) error { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getScalarAnalyzeResult-%d", t.GetPlanID())) + defer span.End() analyzeDict, err := t.scalarAnalyze(ctx) if err != nil { return err @@ -297,6 +308,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e } func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID())) + defer span.End() analyzeResultPath := t.plan.AnalyzeResultPath centroidFilePath := path.Join(analyzeResultPath, metautil.JoinIDPath(t.collectionID, t.partitionID, t.clusteringKeyField.FieldID), common.Centroids) offsetMappingFiles := make(map[int64]string, 0) @@ -345,6 +358,8 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e func (t *clusteringCompactionTask) mapping(ctx context.Context, deltaPk2Ts map[interface{}]typeutil.Timestamp, ) ([]*datapb.CompactionSegment, *storage.PartitionStatsSnapshot, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mapping-%d", t.GetPlanID())) + defer span.End() inputSegments := t.plan.GetSegmentBinlogs() mapStart := time.Now() @@ -368,6 +383,13 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return nil, nil, err } + t.flushChan <- FlushSignal{ + done: true, + } + + // block util all writer flushed. + <-t.doneChan + // force flush all buffers err := t.flushAll(ctx) if err != nil { @@ -408,7 +430,15 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return resultSegments, resultPartitionStats, nil } -func (t *clusteringCompactionTask) getUsedMemoryBufferSize() int64 { +func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 { + var totalBufferSize int64 = 0 + for _, buffer := range t.clusterBuffers { + totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + buffer.bufferMemorySize.Load() + } + return totalBufferSize +} + +func (t *clusteringCompactionTask) getCurrentBufferWrittenMemorySize() int64 { var totalBufferSize int64 = 0 for _, buffer := range t.clusterBuffers { totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) @@ -422,7 +452,7 @@ func (t *clusteringCompactionTask) mappingSegment( segment *datapb.CompactionSegmentBinlogs, delta map[interface{}]typeutil.Timestamp, ) error { - ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("Compact-Map-%d", t.GetPlanID())) + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mappingSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) defer span.End() log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollection()), @@ -543,20 +573,39 @@ func (t *clusteringCompactionTask) mappingSegment( remained++ if (remained+1)%100 == 0 { - currentBufferSize := t.getUsedMemoryBufferSize() + currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() + currentBufferWrittenMemorySize := t.getCurrentBufferWrittenMemorySize() + log.Debug("current buffer size", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize), + zap.Int64("currentBufferWrittenMemorySize", currentBufferWrittenMemorySize)) + // trigger flushBinlog - if clusterBuffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { + currentBufferNum := clusterBuffer.writer.GetRowNum() + if clusterBuffer.flushedRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() || + clusterBuffer.writer.IsFull() { // reach segment/binlog max size - t.flushChan <- SpillSignal{ - buffer: clusterBuffer, + t.clusterBufferLocks.Lock(clusterBuffer.id) + writer := clusterBuffer.writer + pack, _ := t.refreshBufferWriter(clusterBuffer) + log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), + zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum)) + t.clusterBufferLocks.Unlock(clusterBuffer.id) + + t.flushChan <- FlushSignal{ + writer: writer, + pack: pack, + id: clusterBuffer.id, } - } else if currentBufferSize >= t.getMemoryBufferHighWatermark() { + } else if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() && !t.hasSignal.Load() { // reach flushBinlog trigger threshold - t.flushChan <- SpillSignal{} + log.Debug("largest buffer need to flush", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) + t.flushChan <- FlushSignal{} + t.hasSignal.Store(true) } // if the total buffer size is too large, block here, wait for memory release by flushBinlog - if currentBufferSize > t.getMemoryBufferBlockSpillThreshold() { + if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() { + log.Debug("memory is already above the block watermark, pause writing", + zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) loop: for { select { @@ -567,8 +616,11 @@ func (t *clusteringCompactionTask) mappingSegment( log.Warn("stop waiting for memory buffer release as task chan done") return nil default: - currentSize := t.getUsedMemoryBufferSize() - if currentSize < t.getMemoryBufferLowWatermark() { + //currentSize := t.getCurrentBufferWrittenMemorySize() + currentSize := t.getBufferTotalUsedMemorySize() + if currentSize < t.getMemoryBufferBlockFlushThreshold() { + log.Debug("memory is already below the block watermark, continue writing", + zap.Int64("currentSize", currentSize)) break loop } time.Sleep(time.Millisecond * 200) @@ -593,17 +645,14 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf defer t.clusterBufferLocks.Unlock(clusterBuffer.id) // prepare if clusterBuffer.writer == nil { - err := t.refreshBufferWriter(clusterBuffer) - if err != nil { - return err - } + log.Warn("unexpected behavior, please check", zap.Int("buffer id", clusterBuffer.id)) + return fmt.Errorf("unexpected behavior, please check buffer id: %d", clusterBuffer.id) } err := clusterBuffer.writer.Write(value) if err != nil { return err } t.writtenRowNum.Inc() - clusterBuffer.bufferRowNum.Add(1) return nil } @@ -624,7 +673,7 @@ func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { return int64(float64(t.memoryBufferSize) * 0.9) } -func (t *clusteringCompactionTask) getMemoryBufferBlockSpillThreshold() int64 { +func (t *clusteringCompactionTask) getMemoryBufferBlockFlushThreshold() int64 { return t.memoryBufferSize } @@ -639,14 +688,20 @@ func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) { return case signal := <-t.flushChan: var err error - if signal.buffer == nil { + if signal.done { + t.doneChan <- struct{}{} + } else if signal.writer == nil { err = t.flushLargestBuffers(ctx) + t.hasSignal.Store(false) } else { - err = func() error { - t.clusterBufferLocks.Lock(signal.buffer.id) - defer t.clusterBufferLocks.Unlock(signal.buffer.id) - return t.flushBinlog(ctx, signal.buffer) - }() + future := t.flushPool.Submit(func() (any, error) { + err := t.flushBinlog(ctx, t.clusterBuffers[signal.id], signal.writer, signal.pack) + if err != nil { + return nil, err + } + return struct{}{}, nil + }) + err = conc.AwaitAll(future) } if err != nil { log.Warn("fail to flushBinlog data", zap.Error(err)) @@ -663,28 +718,56 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro return nil } defer t.flushMutex.Unlock() + currentMemorySize := t.getBufferTotalUsedMemorySize() + if currentMemorySize <= t.getMemoryBufferLowWatermark() { + log.Info("memory low water mark", zap.Int64("memoryBufferSize", t.getBufferTotalUsedMemorySize())) + return nil + } + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers") + defer span.End() bufferIDs := make([]int, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) } sort.Slice(bufferIDs, func(i, j int) bool { - return t.clusterBuffers[i].bufferRowNum.Load() > t.clusterBuffers[j].bufferRowNum.Load() + return t.clusterBuffers[i].writer.GetRowNum() > + t.clusterBuffers[j].writer.GetRowNum() }) - log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs)) + log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize)) + + futures := make([]*conc.Future[any], 0) for _, bufferId := range bufferIDs { - err := func() error { - t.clusterBufferLocks.Lock(bufferId) - defer t.clusterBufferLocks.Unlock(bufferId) - return t.flushBinlog(ctx, t.clusterBuffers[bufferId]) - }() - if err != nil { - return err - } - if t.getUsedMemoryBufferSize() <= t.getMemoryBufferLowWatermark() { - log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize())) + t.clusterBufferLocks.Lock(bufferId) + buffer := t.clusterBuffers[bufferId] + writer := buffer.writer + currentMemorySize -= int64(writer.WrittenMemorySize()) + pack, _ := t.refreshBufferWriter(buffer) + t.clusterBufferLocks.Unlock(bufferId) + + log.Info("currentMemorySize after flush buffer binlog", + zap.Int64("currentMemorySize", currentMemorySize), + zap.Int("bufferID", bufferId), + zap.Uint64("WrittenMemorySize()", writer.WrittenMemorySize()), + zap.Int64("RowNum", writer.GetRowNum())) + future := t.flushPool.Submit(func() (any, error) { + err := t.flushBinlog(ctx, buffer, writer, pack) + if err != nil { + return nil, err + } + return struct{}{}, nil + }) + futures = append(futures, future) + + if currentMemorySize <= t.getMemoryBufferLowWatermark() { + log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getBufferTotalUsedMemorySize())) break } } + if err := conc.AwaitAll(futures...); err != nil { + return err + } + + log.Info("flushLargestBuffers end", zap.Int64("currentMemorySize", currentMemorySize)) return nil } @@ -692,26 +775,26 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { // only one flushLargestBuffers or flushAll should do at the same time t.flushMutex.Lock() defer t.flushMutex.Unlock() + futures := make([]*conc.Future[any], 0) for _, buffer := range t.clusterBuffers { - err := func() error { - t.clusterBufferLocks.Lock(buffer.id) - defer t.clusterBufferLocks.Unlock(buffer.id) - err := t.flushBinlog(ctx, buffer) + buffer := buffer + future := t.flushPool.Submit(func() (any, error) { + err := t.flushBinlog(ctx, buffer, buffer.writer, true) if err != nil { - log.Error("flushBinlog fail") - return err + return nil, err } - err = t.packBufferToSegment(ctx, buffer) - return err - }() - if err != nil { - return err - } + return struct{}{}, nil + }) + futures = append(futures, future) } + if err := conc.AwaitAll(futures...); err != nil { + return err + } + return nil } -func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer) error { +func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error { if len(buffer.flushedBinlogs) == 0 { return nil } @@ -719,7 +802,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff for _, fieldBinlog := range buffer.flushedBinlogs { insertLogs = append(insertLogs, fieldBinlog) } - statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum.Load()) + statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer, buffer.flushedRowNum.Load()) if err != nil { return err } @@ -727,7 +810,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff // pack current flushBinlog data into a segment seg := &datapb.CompactionSegment{ PlanID: t.plan.GetPlanID(), - SegmentID: buffer.writer.GetSegmentID(), + SegmentID: writer.GetSegmentID(), NumOfRows: buffer.flushedRowNum.Load(), InsertLogs: insertLogs, Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, @@ -738,37 +821,55 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, NumRows: int(buffer.flushedRowNum.Load()), } - buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats - // refresh - t.refreshBufferWriter(buffer) - buffer.flushedRowNum.Store(0) + buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats + buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0) for _, binlog := range seg.InsertLogs { - log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("binlog", binlog.String())) + log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String())) } - log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.Any("segStats", segmentStats)) + log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), + zap.Int64("segID", seg.GetSegmentID()), + zap.Int64("row num", seg.GetNumOfRows())) + + // set old writer nil + writer = nil return nil } -func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer) error { - log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()), zap.Int64("segmentID", buffer.writer.GetSegmentID())) - if buffer.writer.IsEmpty() { +func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter, pack bool) error { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", writer.GetSegmentID())) + defer span.End() + if writer == nil { + log.Warn("buffer writer is nil, please check", zap.Int("buffer id", buffer.id)) + return fmt.Errorf("buffer: %d writer is nil, please check", buffer.id) + } + buffer.flushLock.Lock() + defer buffer.flushLock.Unlock() + writtenMemorySize := int64(writer.WrittenMemorySize()) + writtenRowNum := writer.GetRowNum() + log := log.With(zap.Int("bufferID", buffer.id), + zap.Int64("segmentID", writer.GetSegmentID()), + zap.Bool("pack", pack), + zap.Int64("writerRowNum", writtenRowNum), + zap.Int64("writtenMemorySize", writtenMemorySize), + zap.Int64("bufferMemorySize", buffer.bufferMemorySize.Load()), + ) + + log.Info("start flush binlog") + if writtenRowNum <= 0 { + log.Debug("writerRowNum is zero, skip flush") + if pack { + return t.packBufferToSegment(ctx, buffer, writer) + } return nil } - future := t.flushPool.Submit(func() (any, error) { - kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer) - if err != nil { - log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) - return typeutil.NewPair(kvs, partialBinlogs), err - } - return typeutil.NewPair(kvs, partialBinlogs), nil - }) - if err := conc.AwaitAll(future); err != nil { + start := time.Now() + kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, writer) + if err != nil { + log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) return err } - kvs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).A - partialBinlogs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).B if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) @@ -784,18 +885,19 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus } buffer.flushedBinlogs[fID] = tmpBinlog } - buffer.flushedRowNum.Add(buffer.bufferRowNum.Load()) + buffer.flushedRowNum.Add(writtenRowNum) - // clean buffer - buffer.bufferRowNum.Store(0) + // clean buffer with writer + buffer.bufferMemorySize.Sub(writtenMemorySize) t.flushCount.Inc() - log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load())) - if buffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() { - if err := t.packBufferToSegment(ctx, buffer); err != nil { + if pack { + if err := t.packBufferToSegment(ctx, buffer, writer); err != nil { return err } } + log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()), + zap.Int64("cost", time.Since(start).Milliseconds())) return nil } @@ -825,6 +927,8 @@ func (t *clusteringCompactionTask) cleanUp(ctx context.Context) { } func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[interface{}]int64, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyze-%d", t.GetPlanID())) + defer span.End() inputSegments := t.plan.GetSegmentBinlogs() futures := make([]*conc.Future[any], 0, len(inputSegments)) analyzeStart := time.Now() @@ -871,6 +975,8 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( ctx context.Context, segment *datapb.CompactionSegmentBinlogs, ) (map[interface{}]int64, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyzeSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) + defer span.End() log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("segmentID", segment.GetSegmentID())) // vars @@ -1015,16 +1121,27 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in return buckets } -func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error { - segmentID, err := t.allocator.AllocOne() - if err != nil { - return err +func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (bool, error) { + var segmentID int64 + var err error + var pack bool + if buffer.writer != nil { + segmentID = buffer.writer.GetSegmentID() + buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize())) } + if buffer.writer == nil || buffer.flushedRowNum.Load()+buffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() { + pack = true + segmentID, err = t.allocator.AllocOne() + if err != nil { + return pack, err + } + } + writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID) if err != nil { - return err + return pack, err } + buffer.writer = writer - buffer.bufferRowNum.Store(0) - return nil + return pack, nil } diff --git a/internal/datanode/io/binlog_io.go b/internal/datanode/io/binlog_io.go index 61a2ccf976..55274e8327 100644 --- a/internal/datanode/io/binlog_io.go +++ b/internal/datanode/io/binlog_io.go @@ -18,6 +18,7 @@ package io import ( "context" + "time" "github.com/samber/lo" "go.opentelemetry.io/otel" @@ -55,6 +56,7 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, var val []byte var err error + start := time.Now() log.Debug("BinlogIO download", zap.String("path", path)) err = retry.Do(ctx, func() error { val, err = b.Read(ctx, path) @@ -64,6 +66,9 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, return err }) + log.Debug("BinlogIO download success", zap.String("path", path), zap.Int64("cost", time.Since(start).Milliseconds()), + zap.Error(err)) + return val, err }) futures = append(futures, future) @@ -88,6 +93,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error innerK, innerV := k, v future := b.pool.Submit(func() (any, error) { var err error + start := time.Now() log.Debug("BinlogIO upload", zap.String("paths", innerK)) err = retry.Do(ctx, func() error { err = b.Write(ctx, innerK, innerV) @@ -96,6 +102,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error } return err }) + log.Debug("BinlogIO upload success", zap.String("paths", innerK), zap.Int64("cost", time.Since(start).Milliseconds()), zap.Error(err)) return struct{}{}, err }) futures = append(futures, future) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2188564d2d..8af5fb9357 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4063,8 +4063,8 @@ if this parameter <= 0, will set it as 10`, p.SlotCap.Init(base.mgr) p.ClusteringCompactionMemoryBufferRatio = ParamItem{ - Key: "datanode.clusteringCompaction.memoryBufferRatio", - Version: "2.4.2", + Key: "dataNode.clusteringCompaction.memoryBufferRatio", + Version: "2.4.6", Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.", DefaultValue: "0.1", PanicIfEmpty: false, @@ -4073,8 +4073,8 @@ if this parameter <= 0, will set it as 10`, p.ClusteringCompactionMemoryBufferRatio.Init(base.mgr) p.ClusteringCompactionWorkerPoolSize = ParamItem{ - Key: "datanode.clusteringCompaction.cpu", - Version: "2.4.2", + Key: "dataNode.clusteringCompaction.workPoolSize", + Version: "2.4.6", Doc: "worker pool size for one clustering compaction job.", DefaultValue: "1", PanicIfEmpty: false,