From 80f25d497f57f6a5d461216fd168afa137fa7e6c Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sat, 28 Sep 2024 17:31:15 +0800 Subject: [PATCH] enhance: Add metrics to monitor import throughput and imported rows (#36519) issue: https://github.com/milvus-io/milvus/issues/36518 Signed-off-by: bigsheeper --- internal/datanode/importv2/util.go | 8 ++++++++ internal/flushcommon/syncmgr/options.go | 5 +++++ internal/flushcommon/syncmgr/serializer.go | 6 ++++++ internal/flushcommon/syncmgr/storage_serializer.go | 1 + internal/flushcommon/syncmgr/task.go | 4 +++- internal/flushcommon/writebuffer/write_buffer.go | 1 + pkg/metrics/datanode_metrics.go | 14 +++++++++++++- pkg/metrics/metrics.go | 4 ++++ 8 files changed, 41 insertions(+), 2 deletions(-) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 7dd3c3eda1..c725bb1b1a 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -77,6 +78,11 @@ func NewSyncTask(ctx context.Context, return nil, err } + segmentLevel := datapb.SegmentLevel_L1 + if insertData == nil && deleteData != nil { + segmentLevel = datapb.SegmentLevel_L0 + } + syncPack := &syncmgr.SyncPack{} syncPack.WithInsertData([]*storage.InsertData{insertData}). WithDeleteData(deleteData). @@ -85,6 +91,8 @@ func NewSyncTask(ctx context.Context, WithChannelName(vchannel). WithSegmentID(segmentID). WithTimeRange(ts, ts). + WithLevel(segmentLevel). + WithDataSource(metrics.BulkinsertDataSourceLabel). WithBatchSize(int64(insertData.GetRowNum())) return serializer.EncodeBuffer(ctx, syncPack) diff --git a/internal/flushcommon/syncmgr/options.go b/internal/flushcommon/syncmgr/options.go index ec1eff91c9..3696d9cda2 100644 --- a/internal/flushcommon/syncmgr/options.go +++ b/internal/flushcommon/syncmgr/options.go @@ -117,3 +117,8 @@ func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask { t.level = level return t } + +func (t *SyncTask) WithDataSource(source string) *SyncTask { + t.dataSource = source + return t +} diff --git a/internal/flushcommon/syncmgr/serializer.go b/internal/flushcommon/syncmgr/serializer.go index 7d8d64ad5f..90b621e21b 100644 --- a/internal/flushcommon/syncmgr/serializer.go +++ b/internal/flushcommon/syncmgr/serializer.go @@ -49,6 +49,7 @@ type SyncPack struct { startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition batchSize int64 // batchSize is the row number of this sync task,not the total num of rows of segemnt + dataSource string isFlush bool isDrop bool // metadata @@ -137,3 +138,8 @@ func (p *SyncPack) WithErrorHandler(handler func(err error)) *SyncPack { p.errHandler = handler return p } + +func (p *SyncPack) WithDataSource(source string) *SyncPack { + p.dataSource = source + return p +} diff --git a/internal/flushcommon/syncmgr/storage_serializer.go b/internal/flushcommon/syncmgr/storage_serializer.go index e22cdc2064..7bdb9be8d5 100644 --- a/internal/flushcommon/syncmgr/storage_serializer.go +++ b/internal/flushcommon/syncmgr/storage_serializer.go @@ -174,6 +174,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) { WithStartPosition(pack.startPosition). WithCheckpoint(pack.checkpoint). WithLevel(pack.level). + WithDataSource(pack.dataSource). WithTimeRange(pack.tsFrom, pack.tsTo). WithMetaCache(s.metacache). WithMetaWriter(s.metaWriter). diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 9e910b9cbc..94c596827a 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -55,6 +55,7 @@ type SyncTask struct { pkField *schemapb.FieldSchema startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition + dataSource string // batchSize is the row number of this sync task, // not the total num of rows of segemnt batchSize int64 @@ -169,7 +170,8 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { totalSize += float64(len(t.deltaBlob.Value)) } - metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, t.level.String()).Add(totalSize) + metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(totalSize) + metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchSize)) metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds())) diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 82afb38932..5276cba880 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -603,6 +603,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy WithStartPosition(startPos). WithTimeRange(tsFrom, tsTo). WithLevel(segmentInfo.Level()). + WithDataSource(metrics.StreamingDataSourceLabel). WithCheckpoint(wb.checkpoint). WithBatchSize(batchSize). WithErrorHandler(wb.errHandler) diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index 10807708d8..ff982a4bbf 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -54,10 +54,21 @@ var ( Help: "byte size of data flushed to storage", }, []string{ nodeIDLabelName, - msgTypeLabelName, + dataSourceLabelName, segmentLevelLabelName, }) + DataNodeFlushedRows = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "flushed_data_rows", + Help: "num of rows flushed to storage", + }, []string{ + nodeIDLabelName, + dataSourceLabelName, + }) + DataNodeNumProducers = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, @@ -246,6 +257,7 @@ func RegisterDataNode(registry *prometheus.Registry) { registry.MustRegister(DataNodeFlushBufferCount) registry.MustRegister(DataNodeFlushReqCounter) registry.MustRegister(DataNodeFlushedSize) + registry.MustRegister(DataNodeFlushedRows) // compaction related registry.MustRegister(DataNodeCompactionLatency) registry.MustRegister(DataNodeCompactionLatencyInQueue) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index f274e55682..50d5c23778 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -57,6 +57,9 @@ const ( FlushingSegmentLabel = "Flushing" DroppedSegmentLabel = "Dropped" + StreamingDataSourceLabel = "streaming" + BulkinsertDataSourceLabel = "bulkinsert" + Leader = "OnLeader" FromLeader = "FromLeader" @@ -101,6 +104,7 @@ const ( cacheNameLabelName = "cache_name" cacheStateLabelName = "cache_state" indexCountLabelName = "indexed_field_count" + dataSourceLabelName = "data_source" requestScope = "scope" fullMethodLabelName = "full_method" reduceLevelName = "reduce_level"