From 12eb8c3da5578a12d474ff6039a79f9972b640c7 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 2 Mar 2022 14:49:55 +0800 Subject: [PATCH] Add prometheus metrics for QueryNode (#15649) Signed-off-by: bigsheeper --- internal/metrics/metrics.go | 5 - internal/metrics/querycoord.go | 3 - internal/metrics/querynode.go | 266 ++++++++++++++++++++ internal/msgstream/msg.go | 34 +++ internal/querynode/collection.go | 9 + internal/querynode/collection_replica.go | 18 ++ internal/querynode/data_sync_service.go | 5 + internal/querynode/flow_graph_query_node.go | 8 + internal/querynode/historical.go | 6 + internal/querynode/mock_test.go | 12 +- internal/querynode/query_collection.go | 37 ++- internal/querynode/segment.go | 7 + internal/querynode/segment_loader.go | 5 + internal/querynode/streaming.go | 6 + internal/querynode/task.go | 2 + 15 files changed, 409 insertions(+), 14 deletions(-) create mode 100644 internal/metrics/querynode.go diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 28c84996b3..eb1b079b0e 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -586,11 +586,6 @@ func RegisterProxy() { prometheus.MustRegister(ProxyDmlChannelTimeTick) } -//RegisterQueryNode registers QueryNode metrics -func RegisterQueryNode() { - -} - var ( //DataCoordDataNodeList records the num of regsitered data nodes DataCoordDataNodeList = prometheus.NewGaugeVec( diff --git a/internal/metrics/querycoord.go b/internal/metrics/querycoord.go index 7ac92e98ec..e60d3338b1 100644 --- a/internal/metrics/querycoord.go +++ b/internal/metrics/querycoord.go @@ -28,9 +28,6 @@ const ( QueryCoordMetricLabelSuccess = "success" QueryCoordMetricLabelFail = "fail" QueryCoordMetricLabelTotal = "total" - - // TODO: move to metrics.go - collectionIDLabel = "collection_id" ) // queryCoordLoadBuckets involves durations in milliseconds, diff --git a/internal/metrics/querynode.go b/internal/metrics/querynode.go new file mode 100644 index 0000000000..bbd7ac6afa --- /dev/null +++ b/internal/metrics/querynode.go @@ -0,0 +1,266 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +const ( + // TODO: use the common status label + queryNodeStatusLabel = "status" + QueryNodeMetricLabelSuccess = "success" + QueryNodeMetricLabelFail = "fail" + QueryNodeMetricLabelTotal = "total" + + // TODO: use the common status label + nodeIDLabel = "node_id" + collectionIDLabel = "collection_id" +) + +const ( + // query type + queryTypeLabel = "query_type" + QueryNodeQueryTypeSearch = "search" + QueryNodeQueryTypeQuery = "query" + + // segment type + segmentTypeLabel = "segment_type" + QueryNodeSegTypeSealed = "sealed" + QueryNodeSegTypeGrowing = "growing" +) + +// queryNodeDurationBuckets involves durations in milliseconds, +// [10 20 40 80 160 320 640 1280 2560 5120 10240 20480 40960 81920 163840 327680 655360 1.31072e+06] +var queryNodeDurationBuckets = prometheus.ExponentialBuckets(10, 2, 18) + +var ( + QueryNodeNumCollections = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_collections", + Help: "Number of collections in QueryNode.", + }, []string{ + nodeIDLabel, + }) + + QueryNodeNumPartitions = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_partitions", + Help: "Number of partitions per collection in QueryNode.", + }, []string{ + collectionIDLabel, + nodeIDLabel, + }) + + QueryNodeNumSegments = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_segments", + Help: "Number of segments per collection in QueryNode.", + }, []string{ + collectionIDLabel, + nodeIDLabel, + }) + + QueryNodeNumDmlChannels = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_dml_channels", + Help: "Number of dmlChannels per collection in QueryNode.", + }, []string{ + collectionIDLabel, + nodeIDLabel, + }) + + QueryNodeNumDeltaChannels = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_delta_channels", + Help: "Number of deltaChannels per collection in QueryNode.", + }, []string{ + collectionIDLabel, + nodeIDLabel, + }) + + QueryNodeNumConsumers = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_consumers", + Help: "Number of consumers per collection in QueryNode.", + }, []string{ + collectionIDLabel, + nodeIDLabel, + }) + + QueryNodeNumReaders = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_readers", + Help: "Number of readers per collection in QueryNode.", + }, []string{ + collectionIDLabel, + nodeIDLabel, + }) + + QueryNodeSQCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "sq_count", + Help: "Search and query requests statistic in QueryNode.", + }, []string{ + queryNodeStatusLabel, + queryTypeLabel, + nodeIDLabel, + }) + + QueryNodeSQReqLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "sq_latency", + Help: "Search and query requests latency in QueryNode.", + Buckets: queryNodeDurationBuckets, + }, []string{ + queryTypeLabel, + nodeIDLabel, + }) + + QueryNodeSQLatencyInQueue = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "sq_latency_in_queue", + Help: "The search and query latency in queue(unsolved buffer) in QueryNode.", + Buckets: queryNodeDurationBuckets, + }, []string{ + queryTypeLabel, + nodeIDLabel, + }) + + QueryNodeSQSegmentLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "sq_latency_per_segment", + Help: "The search and query on segments(sealed/growing segments).", + Buckets: queryNodeDurationBuckets, + }, []string{ + queryTypeLabel, + segmentTypeLabel, + nodeIDLabel, + }) + + QueryNodeSQSegmentLatencyInCore = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "sq_latency_in_core", + Help: "The search and query latency in core.", + Buckets: queryNodeDurationBuckets, + }, []string{ + queryTypeLabel, + nodeIDLabel, + }) + + QueryNodeTranslateHitsLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "translate_hits_latency", + Help: "The search and query latency in translate hits.", + Buckets: queryNodeDurationBuckets, + }, []string{ + nodeIDLabel, + }) + + QueryNodeReduceLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "reduce_latency", + Help: "The search and query latency in reduce(local reduce) in QueryNode.", + Buckets: queryNodeDurationBuckets, + }, []string{ + segmentTypeLabel, + nodeIDLabel, + }) + + QueryNodeLoadSegmentLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "load_latency_per_segment", + Help: "The load latency per segment in QueryNode.", + Buckets: queryNodeDurationBuckets, + }, []string{ + nodeIDLabel, + }) + + QueryNodeServiceTime = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "service_time", + Help: "ServiceTimes of collections in QueryNode.", + }, []string{ + collectionIDLabel, + nodeIDLabel, + }) + + QueryNodeNumFlowGraphs = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "num_flow_graphs", + Help: "Number of flow graphs in QueryNode.", + }, []string{ + nodeIDLabel, + }) +) + +//RegisterQueryNode registers QueryNode metrics +func RegisterQueryNode() { + prometheus.MustRegister(QueryNodeNumCollections) + prometheus.MustRegister(QueryNodeNumPartitions) + prometheus.MustRegister(QueryNodeNumSegments) + prometheus.MustRegister(QueryNodeNumDmlChannels) + prometheus.MustRegister(QueryNodeNumDeltaChannels) + prometheus.MustRegister(QueryNodeNumConsumers) + prometheus.MustRegister(QueryNodeNumReaders) + prometheus.MustRegister(QueryNodeSQCount) + prometheus.MustRegister(QueryNodeSQReqLatency) + prometheus.MustRegister(QueryNodeSQLatencyInQueue) + prometheus.MustRegister(QueryNodeSQSegmentLatency) + prometheus.MustRegister(QueryNodeSQSegmentLatencyInCore) + prometheus.MustRegister(QueryNodeTranslateHitsLatency) + prometheus.MustRegister(QueryNodeReduceLatency) + prometheus.MustRegister(QueryNodeLoadSegmentLatency) + prometheus.MustRegister(QueryNodeServiceTime) + prometheus.MustRegister(QueryNodeNumFlowGraphs) +} diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index ca6ac1b736..b86d29c869 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -19,6 +19,7 @@ package msgstream import ( "context" "errors" + "time" "github.com/golang/protobuf/proto" @@ -26,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/timerecord" ) // MsgType is an alias of commonpb.MsgType @@ -242,6 +244,7 @@ func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) { type SearchMsg struct { BaseMsg internalpb.SearchRequest + tr *timerecord.TimeRecorder } // interface implementation validation @@ -279,6 +282,21 @@ func (st *SearchMsg) TimeoutTs() Timestamp { return st.GetTimeoutTimestamp() } +// SetTimeRecorder sets the timeRecorder for RetrieveMsg +func (st *SearchMsg) SetTimeRecorder() { + st.tr = timerecord.NewTimeRecorder("searchMsg") +} + +// ElapseSpan returns the duration from the beginning +func (st *SearchMsg) ElapseSpan() time.Duration { + return st.tr.ElapseSpan() +} + +// RecordSpan returns the duration from last record +func (st *SearchMsg) RecordSpan() time.Duration { + return st.tr.RecordSpan() +} + // Marshal is used to serializing a message pack to byte array func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) { searchTask := input.(*SearchMsg) @@ -369,6 +387,7 @@ func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { type RetrieveMsg struct { BaseMsg internalpb.RetrieveRequest + tr *timerecord.TimeRecorder } // interface implementation validation @@ -406,6 +425,21 @@ func (rm *RetrieveMsg) TimeoutTs() Timestamp { return rm.GetTimeoutTimestamp() } +// SetTimeRecorder sets the timeRecorder for RetrieveMsg +func (rm *RetrieveMsg) SetTimeRecorder() { + rm.tr = timerecord.NewTimeRecorder("retrieveMsg") +} + +// ElapseSpan returns the duration from the beginning +func (rm *RetrieveMsg) ElapseSpan() time.Duration { + return rm.tr.ElapseSpan() +} + +// RecordSpan returns the duration from last record +func (rm *RetrieveMsg) RecordSpan() time.Duration { + return rm.tr.RecordSpan() +} + // Marshal is used to serializing a message pack to byte array func (rm *RetrieveMsg) Marshal(input TsMsg) (MarshalType, error) { retrieveTask := input.(*RetrieveMsg) diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index c0d743542b..4ace4287a8 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -31,6 +31,7 @@ import "C" import ( "errors" "fmt" + "github.com/milvus-io/milvus/internal/metrics" "math" "sync" "unsafe" @@ -115,6 +116,8 @@ OUTER: ) c.vChannels = append(c.vChannels, dstChan) } + + metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vChannels))) } // getVChannels get virtual channels of collection @@ -141,6 +144,8 @@ func (c *Collection) removeVChannel(channel Channel) { zap.Any("collectionID", c.ID()), zap.Any("channel", channel), ) + + metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vChannels))) } // addPChannels add physical channels to physical channels of collection @@ -236,6 +241,8 @@ OUTER: ) c.vDeltaChannels = append(c.vDeltaChannels, dstChan) } + + metrics.QueryNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vDeltaChannels))) } func (c *Collection) removeVDeltaChannel(channel Channel) { @@ -252,6 +259,8 @@ func (c *Collection) removeVDeltaChannel(channel Channel) { zap.Any("collectionID", c.ID()), zap.Any("channel", channel), ) + + metrics.QueryNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vDeltaChannels))) } // setReleaseTime records when collection is released diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index dd077c71b0..28f064578b 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/common" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -210,6 +211,8 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema var newCollection = newCollection(collectionID, schema) colReplica.collections[collectionID] = newCollection log.Debug("Successfully add collection ", zap.Int64("collectionID", collectionID)) + + metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.collections))) return newCollection } @@ -238,6 +241,15 @@ func (colReplica *collectionReplica) removeCollectionPrivate(collectionID Unique deleteCollection(collection) delete(colReplica.collections, collectionID) + metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.collections))) + metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0) + metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0) + + metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0) + metrics.QueryNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0) + + metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0) + metrics.QueryNodeNumReaders.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0) return nil } @@ -396,6 +408,8 @@ func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID, var newPartition = newPartition(collectionID, partitionID) colReplica.partitions[partitionID] = newPartition } + + metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.partitions))) return nil } @@ -429,6 +443,7 @@ func (colReplica *collectionReplica) removePartitionPrivate(partitionID UniqueID collection.removePartitionID(partitionID) delete(colReplica.partitions, partitionID) + metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(collection.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.partitions))) return nil } @@ -536,6 +551,7 @@ func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, parti partition.addSegmentID(segmentID) colReplica.segments[segmentID] = segment + metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(segment.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return nil } @@ -572,6 +588,8 @@ func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) er partition.removeSegmentID(segmentID) delete(colReplica.segments, segmentID) deleteSegment(segment) + + metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(segment.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Dec() return nil } diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 2096f19e19..0fb31942a2 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" ) @@ -67,6 +68,7 @@ func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID Uniqu zap.Any("collectionID", collectionID), zap.Any("channel", channel)) results[channel] = newFlowGraph + metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() } return results @@ -98,6 +100,7 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni zap.Any("collectionID", collectionID), zap.Any("channel", channel)) results[channel] = newFlowGraph + metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() } return results @@ -170,6 +173,7 @@ func (dsService *dataSyncService) removeFlowGraphsByDMLChannels(channels []Chann if _, ok := dsService.dmlChannel2FlowGraph[channel]; ok { // close flow graph dsService.dmlChannel2FlowGraph[channel].close() + metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Dec() } delete(dsService.dmlChannel2FlowGraph, channel) } @@ -184,6 +188,7 @@ func (dsService *dataSyncService) removeFlowGraphsByDeltaChannels(channels []Cha if _, ok := dsService.deltaChannel2FlowGraph[channel]; ok { // close flow graph dsService.deltaChannel2FlowGraph[channel].close() + metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Dec() } delete(dsService.deltaChannel2FlowGraph, channel) } diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 70d951bdfd..167ff80f1b 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -19,10 +19,12 @@ package querynode import ( "context" "errors" + "fmt" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" @@ -200,6 +202,8 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS zap.Any("channel", channel), zap.Any("subName", subName), ) + + metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return nil } @@ -214,6 +218,8 @@ func (q *queryNodeFlowGraph) consumerFlowGraphLatest(channel Channel, subName Co zap.Any("channel", channel), zap.Any("subName", subName), ) + + metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return nil } @@ -225,6 +231,8 @@ func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosi zap.Any("collectionID", q.collectionID), zap.Any("channel", position.ChannelName), ) + + metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return err } diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go index e416242ef3..c0ef38ab75 100644 --- a/internal/querynode/historical.go +++ b/internal/querynode/historical.go @@ -25,8 +25,10 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/timerecord" ) // historical is in charge of historical data in query node @@ -177,11 +179,15 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID if !seg.getOnService() { return } + tr := timerecord.NewTimeRecorder("searchOnSealed") searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs}) if err != nil { err2 = err return } + metrics.QueryNodeSQSegmentLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch, + metrics.QueryNodeSegTypeSealed, + fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds())) segmentLock.Lock() searchResults = append(searchResults, searchResult) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 041637adb7..7dbaa344a4 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -1161,10 +1161,12 @@ func genSimpleSearchMsg() (*msgstream.SearchMsg, error) { if err != nil { return nil, err } - return &msgstream.SearchMsg{ + msg := &msgstream.SearchMsg{ BaseMsg: genMsgStreamBaseMsg(), SearchRequest: *req, - }, nil + } + msg.SetTimeRecorder() + return msg, nil } func genSimpleRetrieveMsg() (*msgstream.RetrieveMsg, error) { @@ -1172,10 +1174,12 @@ func genSimpleRetrieveMsg() (*msgstream.RetrieveMsg, error) { if err != nil { return nil, err } - return &msgstream.RetrieveMsg{ + msg := &msgstream.RetrieveMsg{ BaseMsg: genMsgStreamBaseMsg(), RetrieveRequest: *req, - }, nil + } + msg.SetTimeRecorder() + return msg, nil } func genQueryChannel() Channel { diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 99e915a9ef..aa26143fc3 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "sync" + "time" "unsafe" "github.com/golang/protobuf/proto" @@ -30,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -50,6 +52,9 @@ type queryMsg interface { GuaranteeTs() Timestamp TravelTs() Timestamp TimeoutTs() Timestamp + SetTimeRecorder() + ElapseSpan() time.Duration + RecordSpan() time.Duration } // queryCollection manages and executes the retrieve and search tasks, it can be created @@ -334,6 +339,8 @@ func (q *queryCollection) setServiceableTime(t Timestamp) { return } q.serviceableTime = t + ps, _ := tsoutil.ParseHybridTs(t) + metrics.QueryNodeServiceTime.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(ps)) } func (q *queryCollection) checkTimeout(msg queryMsg) bool { @@ -446,6 +453,8 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { var collectionID UniqueID var msgTypeStr string + msg.SetTimeRecorder() + switch msgType { case commonpb.MsgType_Retrieve: collectionID = msg.(*msgstream.RetrieveMsg).CollectionID @@ -532,6 +541,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { zap.Any("msgID", msg.ID()), zap.String("msgType", msgTypeStr), ) + msg.RecordSpan() q.addToUnsolvedMsg(msg) sp.LogFields( oplog.String("send to unsolved buffer", "send to unsolved buffer"), @@ -667,8 +677,12 @@ func (q *queryCollection) doUnsolvedQueryMsg() { ) switch msgType { case commonpb.MsgType_Retrieve: + metrics.QueryNodeSQLatencyInQueue.WithLabelValues(metrics.QueryNodeQueryTypeQuery, + fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(m.RecordSpan().Milliseconds())) err = q.retrieve(m) case commonpb.MsgType_Search: + metrics.QueryNodeSQLatencyInQueue.WithLabelValues(metrics.QueryNodeQueryTypeSearch, + fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(m.RecordSpan().Milliseconds())) err = q.search(m) default: err := fmt.Errorf("receive invalid msgType = %d", msgType) @@ -700,6 +714,7 @@ func (q *queryCollection) doUnsolvedQueryMsg() { } func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][]byte) (*schemapb.SearchResultData, error) { + tr := timerecord.NewTimeRecorder("translateHitsDuration") log.Debug("translateHits:", zap.Any("lenOfFieldIDs", len(fieldIDs)), zap.Any("lenOfRawHits", len(rawHits))) if len(rawHits) == 0 { return nil, fmt.Errorf("empty results") @@ -974,6 +989,7 @@ func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][] } } + metrics.QueryNodeTranslateHitsLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds())) return finalResult, nil } @@ -1119,6 +1135,9 @@ func (q *queryCollection) search(msg queryMsg) error { if err != nil { return err } + metrics.QueryNodeSQReqLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(msg.ElapseSpan().Milliseconds())) + metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelSuccess, metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() + tr.Record(fmt.Sprintf("publish empty search result done, msgID = %d", searchMsg.ID())) tr.Elapse(fmt.Sprintf("all done, msgID = %d", searchMsg.ID())) return nil @@ -1128,6 +1147,7 @@ func (q *queryCollection) search(msg queryMsg) error { numSegment := int64(len(searchResults)) var marshaledHits *MarshaledHits log.Debug("QueryNode reduce data", zap.Int64("msgID", searchMsg.ID()), zap.Int64("numSegment", numSegment)) + tr.RecordSpan() err = reduceSearchResultsAndFillData(plan, searchResults, numSegment) log.Debug("QueryNode reduce data finished", zap.Int64("msgID", searchMsg.ID())) sp.LogFields(oplog.String("statistical time", "reduceSearchResults end")) @@ -1147,7 +1167,7 @@ func (q *queryCollection) search(msg queryMsg) error { if err != nil { return err } - tr.Record(fmt.Sprintf("reduce result done, msgID = %d", searchMsg.ID())) + metrics.QueryNodeReduceLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.RecordSpan().Milliseconds())) var offset int64 for index := range searchRequests { @@ -1227,6 +1247,11 @@ func (q *queryCollection) search(msg queryMsg) error { if err != nil { return err } + metrics.QueryNodeSQReqLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch, + fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(msg.ElapseSpan().Milliseconds())) + metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelSuccess, + metrics.QueryNodeQueryTypeSearch, + fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() tr.Record(fmt.Sprintf("publish search result, msgID = %d", searchMsg.ID())) } sp.LogFields(oplog.String("statistical time", "stats done")) @@ -1309,7 +1334,8 @@ func (q *queryCollection) retrieve(msg queryMsg) error { if err != nil { return err } - tr.Record(fmt.Sprintf("merge result done, msgID = %d", retrieveMsg.ID())) + reduceDuration := tr.Record(fmt.Sprintf("merge result done, msgID = %d", retrieveMsg.ID())) + metrics.QueryNodeReduceLatency.WithLabelValues(metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(reduceDuration.Milliseconds())) resultChannelInt := 0 retrieveResultMsg := &msgstream.RetrieveResultMsg{ @@ -1334,6 +1360,9 @@ func (q *queryCollection) retrieve(msg queryMsg) error { if err != nil { return err } + metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelSuccess, metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() + metrics.QueryNodeSQReqLatency.WithLabelValues(metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(msg.ElapseSpan().Milliseconds())) + log.Debug("QueryNode publish RetrieveResultMsg", zap.Int64("msgID", retrieveMsg.ID()), zap.Any("vChannels", collection.getVChannels()), @@ -1403,6 +1432,7 @@ func (q *queryCollection) publishSearchResultWithCtx(ctx context.Context, result } func (q *queryCollection) publishSearchResult(result *internalpb.SearchResults, nodeID UniqueID) error { + metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelTotal, metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return q.publishSearchResultWithCtx(q.releaseCtx, result, nodeID) } @@ -1411,6 +1441,7 @@ func (q *queryCollection) publishRetrieveResultWithCtx(ctx context.Context, resu } func (q *queryCollection) publishRetrieveResult(result *internalpb.RetrieveResults, nodeID UniqueID) error { + metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelTotal, metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return q.publishRetrieveResultWithCtx(q.releaseCtx, result, nodeID) } @@ -1430,6 +1461,7 @@ func (q *queryCollection) publishFailedQueryResultWithCtx(ctx context.Context, m case commonpb.MsgType_Retrieve: retrieveMsg := msg.(*msgstream.RetrieveMsg) baseResult.MsgType = commonpb.MsgType_RetrieveResult + metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelFail, metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return q.publishRetrieveResult(&internalpb.RetrieveResults{ Base: baseResult, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg}, @@ -1440,6 +1472,7 @@ func (q *queryCollection) publishFailedQueryResultWithCtx(ctx context.Context, m case commonpb.MsgType_Search: searchMsg := msg.(*msgstream.SearchMsg) baseResult.MsgType = commonpb.MsgType_SearchResult + metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelFail, metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() return q.publishSearchResultWithCtx(ctx, &internalpb.SearchResults{ Base: baseResult, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg}, diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index e60230d265..8cf62730ab 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -32,6 +32,8 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/milvus-io/milvus/internal/metrics" + "github.com/milvus-io/milvus/internal/util/timerecord" "strconv" "sync" "unsafe" @@ -308,7 +310,9 @@ func (s *Segment) search(plan *SearchPlan, cPlaceHolderGroup := cPlaceholderGroups[0] log.Debug("do search on segment", zap.Int64("segmentID", s.segmentID), zap.Int32("segmentType", int32(s.segmentType))) + tr := timerecord.NewTimeRecorder("cgoSearch") status := C.Search(s.segmentPtr, plan.cSearchPlan, cPlaceHolderGroup, ts, &searchResult.cSearchResult, C.int64_t(s.segmentID)) + metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds())) if err := HandleCStatus(&status, "Search failed"); err != nil { return nil, err } @@ -336,7 +340,10 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro var retrieveResult RetrieveResult ts := C.uint64_t(plan.Timestamp) + tr := timerecord.NewTimeRecorder("cgoRetrieve") status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult) + metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(metrics.QueryNodeQueryTypeQuery, + fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds())) if err := HandleCStatus(&status, "Retrieve failed"); err != nil { return nil, err } diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index a9c7d6f980..940ccad9c0 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -32,6 +32,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" minioKV "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -42,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/timerecord" ) const timeoutForEachRead = 10 * time.Second @@ -131,6 +133,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme partitionID := loadInfo.PartitionID segmentID := loadInfo.SegmentID segment := newSegments[segmentID] + tr := timerecord.NewTimeRecorder("loadDurationPerSegment") err = loader.loadSegmentInternal(segment, loadInfo) if err != nil { log.Error("load segment failed when load data into memory", @@ -142,6 +145,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme segmentGC() return err } + metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds())) } // set segment to meta replica @@ -505,6 +509,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName) position.ChannelName = pChannelName stream.AsReader([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.QueryNodeID, collectionID)) + metrics.QueryNodeNumReaders.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() err = stream.SeekReaders([]*internalpb.MsgPosition{position}) if err != nil { return err diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index 0ef30d5a3d..fecdfaeab9 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -26,8 +26,10 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/segcorepb" + "github.com/milvus-io/milvus/internal/util/timerecord" ) // streaming is in charge of streaming data in query node @@ -206,11 +208,15 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs // continue //} + tr := timerecord.NewTimeRecorder("searchOnGrowing") searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs}) if err != nil { err2 = err return } + metrics.QueryNodeSQSegmentLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch, + metrics.QueryNodeSegTypeGrowing, + fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds())) segmentLock.Lock() searchResults = append(searchResults, searchResult) searchSegmentIDs = append(searchSegmentIDs, seg.segmentID) diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 0cd20e29a2..5d6c0482f1 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -161,6 +162,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error { consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName) + metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() if r.req.SeekPosition == nil || len(r.req.SeekPosition.MsgID) == 0 { // as consumer log.Debug("QueryNode AsConsumer", zap.Strings("channels", consumeChannels), zap.String("sub name", consumeSubName))