milvus/pkg/metrics/streaming_service_metrics.go
Zhen Ye 2ec6e602d6
enhance: add streaming client metrics (#36523)
issue: #33285

---------

Signed-off-by: chyezh <chyezh@outlook.com>
2024-10-08 21:25:19 +08:00

414 lines
15 KiB
Go

package metrics
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
subsystemStreamingServiceClient = "streaming"
subsystemWAL = "wal"
StreamingServiceClientStatusAvailable = "available"
StreamingServiceClientStatusUnavailable = "unavailable"
StreamingServiceClientStatusOK = "ok"
StreamingServiceClientStatusCancel = "cancel"
StreamignServiceClientStatusError = "error"
TimeTickSyncTypeLabelName = "type"
TimeTickAckTypeLabelName = "type"
WALTxnStateLabelName = "state"
WALChannelLabelName = channelNameLabelName
WALSegmentSealPolicyNameLabelName = "policy"
WALSegmentAllocStateLabelName = "state"
WALMessageTypeLabelName = "message_type"
WALChannelTermLabelName = "term"
WALNameLabelName = "wal_name"
WALTxnTypeLabelName = "txn_type"
StatusLabelName = statusLabelName
StreamingNodeLabelName = "streaming_node"
NodeIDLabelName = nodeIDLabelName
)
var (
StreamingServiceClientRegisterOnce sync.Once
// from 64 bytes to 8MB
messageBytesBuckets = prometheus.ExponentialBucketsRange(64, 8388608, 10)
// from 1ms to 5s
secondsBuckets = prometheus.ExponentialBucketsRange(0.001, 5, 10)
// Streaming Service Client Producer Metrics.
StreamingServiceClientProducerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "producer_total",
Help: "Total of producers",
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientProduceInflightTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "produce_inflight_total",
Help: "Total of inflight produce request",
}, WALChannelLabelName)
StreamingServiceClientProduceBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "produce_bytes",
Help: "Bytes of produced message",
Buckets: messageBytesBuckets,
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientProduceDurationSeconds = newStreamingServiceClientHistogramVec(
prometheus.HistogramOpts{
Name: "produce_duration_seconds",
Help: "Duration of client produce",
Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName)
// Streaming Service Client Consumer Metrics.
StreamingServiceClientConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total",
Help: "Total of consumers",
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientConsumeBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes",
Help: "Bytes of consumed message",
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
StreamingServiceClientConsumeInflightTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consume_inflight_total",
Help: "Total of inflight consume body",
}, WALChannelLabelName)
// StreamingCoord metrics
StreamingCoordPChannelInfo = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "pchannel_info",
Help: "Term of pchannels",
}, WALChannelLabelName, WALChannelTermLabelName, StreamingNodeLabelName)
StreamingCoordAssignmentVersion = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "assignment_info",
Help: "Info of assignment",
})
StreamingCoordAssignmentListenerTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "assignment_listener_total",
Help: "Total of assignment listener",
})
// StreamingNode Producer Server Metrics.
StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "producer_total",
Help: "Total of producers on current streaming node",
}, WALChannelLabelName)
StreamingNodeProduceInflightTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "produce_inflight_total",
Help: "Total of inflight produce request",
}, WALChannelLabelName)
// StreamingNode Consumer Server Metrics.
StreamingNodeConsumerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total",
Help: "Total of consumers on current streaming node",
}, WALChannelLabelName)
StreamingNodeConsumeInflightTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "consume_inflight_total",
Help: "Total of inflight consume body",
}, WALChannelLabelName)
StreamingNodeConsumeBytes = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes",
Help: "Bytes of consumed message",
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
// WAL WAL metrics
WALInfo = newWALGaugeVec(prometheus.GaugeOpts{
Name: "info",
Help: "current info of wal on current streaming node",
}, WALChannelLabelName, WALChannelTermLabelName, WALNameLabelName)
// TimeTick related metrics
WALLastAllocatedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "last_allocated_time_tick",
Help: "Current max allocated time tick of wal",
}, WALChannelLabelName)
WALAllocateTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "allocate_time_tick_total",
Help: "Total of allocated time tick on wal",
}, WALChannelLabelName)
WALLastConfirmedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "last_confirmed_time_tick",
Help: "Current max confirmed time tick of wal",
}, WALChannelLabelName)
WALAcknowledgeTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "acknowledge_time_tick_total",
Help: "Total of acknowledge time tick on wal",
}, WALChannelLabelName, TimeTickAckTypeLabelName)
WALSyncTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "sync_time_tick_total",
Help: "Total of sync time tick on wal",
}, WALChannelLabelName, TimeTickAckTypeLabelName)
WALTimeTickSyncTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "sync_total",
Help: "Total of time tick sync sent",
}, WALChannelLabelName, TimeTickSyncTypeLabelName)
WALTimeTickSyncTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "sync_time_tick",
Help: "Max time tick of time tick sync sent",
}, WALChannelLabelName, TimeTickSyncTypeLabelName)
// Txn Related Metrics
WALInflightTxn = newWALGaugeVec(prometheus.GaugeOpts{
Name: "inflight_txn",
Help: "Total of inflight txn on wal",
}, WALChannelLabelName)
WALFinishTxn = newWALCounterVec(prometheus.CounterOpts{
Name: "finish_txn",
Help: "Total of finish txn on wal",
}, WALChannelLabelName, WALTxnStateLabelName)
// Segment related metrics
WALSegmentAllocTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "segment_assign_segment_alloc_total",
Help: "Total of segment alloc on wal",
}, WALChannelLabelName, WALSegmentAllocStateLabelName)
WALSegmentFlushedTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "segment_assign_flushed_segment_total",
Help: "Total of segment sealed on wal",
}, WALChannelLabelName, WALSegmentSealPolicyNameLabelName)
WALSegmentBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "segment_assign_segment_bytes",
Help: "Bytes of segment alloc on wal",
Buckets: prometheus.ExponentialBucketsRange(5242880, 1073741824, 10), // 5MB -> 1024MB
}, WALChannelLabelName)
WALPartitionTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "segment_assign_partition_total",
Help: "Total of partition on wal",
}, WALChannelLabelName)
WALCollectionTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "segment_assign_collection_total",
Help: "Total of collection on wal",
}, WALChannelLabelName)
// Append Related Metrics
WALAppendMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "append_message_bytes",
Help: "Bytes of append message to wal",
Buckets: messageBytesBuckets,
}, WALChannelLabelName, StatusLabelName)
WALAppendMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "append_message_total",
Help: "Total of append message to wal",
}, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName)
WALAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "append_message_duration_seconds",
Help: "Duration of wal append message",
Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName)
WALImplsAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "impls_append_message_duration_seconds",
Help: "Duration of wal impls append message",
Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName)
// Scanner Related Metrics
WALScannerTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_total",
Help: "Total of wal scanner on current streaming node",
}, WALChannelLabelName)
WALScanMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "scan_message_bytes",
Help: "Bytes of scanned message from wal",
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
WALScanMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_message_total",
Help: "Total of scanned message from wal",
}, WALChannelLabelName, WALMessageTypeLabelName)
WALScanPassMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "scan_pass_message_bytes",
Help: "Bytes of pass (not filtered) scanned message from wal",
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
WALScanPassMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_pass_message_total",
Help: "Total of pass (not filtered) scanned message from wal",
}, WALChannelLabelName, WALMessageTypeLabelName)
WALScanTimeTickViolationMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_time_tick_violation_message_total",
Help: "Total of time tick violation message (dropped) from wal",
}, WALChannelLabelName, WALMessageTypeLabelName)
WALScanTxnTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_txn_total",
Help: "Total of scanned txn from wal",
}, WALChannelLabelName, WALTxnStateLabelName)
WALScannerPendingQueueBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_pending_queue_bytes",
Help: "Size of pending queue in wal scanner",
}, WALChannelLabelName)
WALScannerTimeTickBufBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_time_tick_buf_bytes",
Help: "Size of time tick buffer in wal scanner",
}, WALChannelLabelName)
WALScannerTxnBufBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_txn_buf_bytes",
Help: "Size of txn buffer in wal scanner",
}, WALChannelLabelName)
)
// RegisterStreamingServiceClient registers streaming service client metrics
func RegisterStreamingServiceClient(registry *prometheus.Registry) {
StreamingServiceClientRegisterOnce.Do(func() {
registry.MustRegister(StreamingServiceClientProducerTotal)
registry.MustRegister(StreamingServiceClientProduceInflightTotal)
registry.MustRegister(StreamingServiceClientProduceBytes)
registry.MustRegister(StreamingServiceClientProduceDurationSeconds)
registry.MustRegister(StreamingServiceClientConsumerTotal)
registry.MustRegister(StreamingServiceClientConsumeBytes)
registry.MustRegister(StreamingServiceClientConsumeInflightTotal)
})
}
// registerStreamingCoord registers streaming coord metrics
func registerStreamingCoord(registry *prometheus.Registry) {
registry.MustRegister(StreamingCoordPChannelInfo)
registry.MustRegister(StreamingCoordAssignmentVersion)
registry.MustRegister(StreamingCoordAssignmentListenerTotal)
}
// RegisterStreamingNode registers streaming node metrics
func RegisterStreamingNode(registry *prometheus.Registry) {
registry.MustRegister(StreamingNodeProducerTotal)
registry.MustRegister(StreamingNodeProduceInflightTotal)
registry.MustRegister(StreamingNodeConsumerTotal)
registry.MustRegister(StreamingNodeConsumeInflightTotal)
registry.MustRegister(StreamingNodeConsumeBytes)
registerWAL(registry)
}
// registerWAL registers wal metrics
func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALInfo)
registry.MustRegister(WALLastAllocatedTimeTick)
registry.MustRegister(WALAllocateTimeTickTotal)
registry.MustRegister(WALLastConfirmedTimeTick)
registry.MustRegister(WALAcknowledgeTimeTickTotal)
registry.MustRegister(WALSyncTimeTickTotal)
registry.MustRegister(WALTimeTickSyncTotal)
registry.MustRegister(WALTimeTickSyncTimeTick)
registry.MustRegister(WALInflightTxn)
registry.MustRegister(WALFinishTxn)
registry.MustRegister(WALSegmentAllocTotal)
registry.MustRegister(WALSegmentFlushedTotal)
registry.MustRegister(WALSegmentBytes)
registry.MustRegister(WALPartitionTotal)
registry.MustRegister(WALCollectionTotal)
registry.MustRegister(WALAppendMessageBytes)
registry.MustRegister(WALAppendMessageTotal)
registry.MustRegister(WALAppendMessageDurationSeconds)
registry.MustRegister(WALImplsAppendMessageDurationSeconds)
registry.MustRegister(WALScannerTotal)
registry.MustRegister(WALScanMessageBytes)
registry.MustRegister(WALScanMessageTotal)
registry.MustRegister(WALScanPassMessageBytes)
registry.MustRegister(WALScanPassMessageTotal)
registry.MustRegister(WALScanTimeTickViolationMessageTotal)
registry.MustRegister(WALScanTxnTotal)
registry.MustRegister(WALScannerPendingQueueBytes)
registry.MustRegister(WALScannerTimeTickBufBytes)
registry.MustRegister(WALScannerTxnBufBytes)
}
func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = typeutil.StreamingCoordRole
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newStreamingServiceClientGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemStreamingServiceClient
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newStreamingServiceClientHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemStreamingServiceClient
labels := mergeLabel(extra...)
return prometheus.NewHistogramVec(opts, labels)
}
func newStreamingNodeGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = typeutil.StreamingNodeRole
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newStreamingNodeHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace
opts.Subsystem = typeutil.StreamingNodeRole
labels := mergeLabel(extra...)
return prometheus.NewHistogramVec(opts, labels)
}
func newWALGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemWAL
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newWALCounterVec(opts prometheus.CounterOpts, extra ...string) *prometheus.CounterVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemWAL
labels := mergeLabel(extra...)
return prometheus.NewCounterVec(opts, labels)
}
func newWALHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemWAL
labels := mergeLabel(extra...)
return prometheus.NewHistogramVec(opts, labels)
}
func mergeLabel(extra ...string) []string {
labels := make([]string, 0, 1+len(extra))
labels = append(labels, NodeIDLabelName)
labels = append(labels, extra...)
return labels
}