Add prometheus metrics for QueryNode (#15649)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2022-03-02 14:49:55 +08:00 committed by GitHub
parent 827389c704
commit 12eb8c3da5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 409 additions and 14 deletions

View File

@ -586,11 +586,6 @@ func RegisterProxy() {
prometheus.MustRegister(ProxyDmlChannelTimeTick)
}
//RegisterQueryNode registers QueryNode metrics
func RegisterQueryNode() {
}
var (
//DataCoordDataNodeList records the num of regsitered data nodes
DataCoordDataNodeList = prometheus.NewGaugeVec(

View File

@ -28,9 +28,6 @@ const (
QueryCoordMetricLabelSuccess = "success"
QueryCoordMetricLabelFail = "fail"
QueryCoordMetricLabelTotal = "total"
// TODO: move to metrics.go
collectionIDLabel = "collection_id"
)
// queryCoordLoadBuckets involves durations in milliseconds,

View File

@ -0,0 +1,266 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
// TODO: use the common status label
queryNodeStatusLabel = "status"
QueryNodeMetricLabelSuccess = "success"
QueryNodeMetricLabelFail = "fail"
QueryNodeMetricLabelTotal = "total"
// TODO: use the common status label
nodeIDLabel = "node_id"
collectionIDLabel = "collection_id"
)
const (
// query type
queryTypeLabel = "query_type"
QueryNodeQueryTypeSearch = "search"
QueryNodeQueryTypeQuery = "query"
// segment type
segmentTypeLabel = "segment_type"
QueryNodeSegTypeSealed = "sealed"
QueryNodeSegTypeGrowing = "growing"
)
// queryNodeDurationBuckets involves durations in milliseconds,
// [10 20 40 80 160 320 640 1280 2560 5120 10240 20480 40960 81920 163840 327680 655360 1.31072e+06]
var queryNodeDurationBuckets = prometheus.ExponentialBuckets(10, 2, 18)
var (
QueryNodeNumCollections = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_collections",
Help: "Number of collections in QueryNode.",
}, []string{
nodeIDLabel,
})
QueryNodeNumPartitions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_partitions",
Help: "Number of partitions per collection in QueryNode.",
}, []string{
collectionIDLabel,
nodeIDLabel,
})
QueryNodeNumSegments = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_segments",
Help: "Number of segments per collection in QueryNode.",
}, []string{
collectionIDLabel,
nodeIDLabel,
})
QueryNodeNumDmlChannels = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_dml_channels",
Help: "Number of dmlChannels per collection in QueryNode.",
}, []string{
collectionIDLabel,
nodeIDLabel,
})
QueryNodeNumDeltaChannels = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_delta_channels",
Help: "Number of deltaChannels per collection in QueryNode.",
}, []string{
collectionIDLabel,
nodeIDLabel,
})
QueryNodeNumConsumers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_consumers",
Help: "Number of consumers per collection in QueryNode.",
}, []string{
collectionIDLabel,
nodeIDLabel,
})
QueryNodeNumReaders = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_readers",
Help: "Number of readers per collection in QueryNode.",
}, []string{
collectionIDLabel,
nodeIDLabel,
})
QueryNodeSQCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_count",
Help: "Search and query requests statistic in QueryNode.",
}, []string{
queryNodeStatusLabel,
queryTypeLabel,
nodeIDLabel,
})
QueryNodeSQReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency",
Help: "Search and query requests latency in QueryNode.",
Buckets: queryNodeDurationBuckets,
}, []string{
queryTypeLabel,
nodeIDLabel,
})
QueryNodeSQLatencyInQueue = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency_in_queue",
Help: "The search and query latency in queue(unsolved buffer) in QueryNode.",
Buckets: queryNodeDurationBuckets,
}, []string{
queryTypeLabel,
nodeIDLabel,
})
QueryNodeSQSegmentLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency_per_segment",
Help: "The search and query on segments(sealed/growing segments).",
Buckets: queryNodeDurationBuckets,
}, []string{
queryTypeLabel,
segmentTypeLabel,
nodeIDLabel,
})
QueryNodeSQSegmentLatencyInCore = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency_in_core",
Help: "The search and query latency in core.",
Buckets: queryNodeDurationBuckets,
}, []string{
queryTypeLabel,
nodeIDLabel,
})
QueryNodeTranslateHitsLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "translate_hits_latency",
Help: "The search and query latency in translate hits.",
Buckets: queryNodeDurationBuckets,
}, []string{
nodeIDLabel,
})
QueryNodeReduceLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "reduce_latency",
Help: "The search and query latency in reduce(local reduce) in QueryNode.",
Buckets: queryNodeDurationBuckets,
}, []string{
segmentTypeLabel,
nodeIDLabel,
})
QueryNodeLoadSegmentLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "load_latency_per_segment",
Help: "The load latency per segment in QueryNode.",
Buckets: queryNodeDurationBuckets,
}, []string{
nodeIDLabel,
})
QueryNodeServiceTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "service_time",
Help: "ServiceTimes of collections in QueryNode.",
}, []string{
collectionIDLabel,
nodeIDLabel,
})
QueryNodeNumFlowGraphs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_flow_graphs",
Help: "Number of flow graphs in QueryNode.",
}, []string{
nodeIDLabel,
})
)
//RegisterQueryNode registers QueryNode metrics
func RegisterQueryNode() {
prometheus.MustRegister(QueryNodeNumCollections)
prometheus.MustRegister(QueryNodeNumPartitions)
prometheus.MustRegister(QueryNodeNumSegments)
prometheus.MustRegister(QueryNodeNumDmlChannels)
prometheus.MustRegister(QueryNodeNumDeltaChannels)
prometheus.MustRegister(QueryNodeNumConsumers)
prometheus.MustRegister(QueryNodeNumReaders)
prometheus.MustRegister(QueryNodeSQCount)
prometheus.MustRegister(QueryNodeSQReqLatency)
prometheus.MustRegister(QueryNodeSQLatencyInQueue)
prometheus.MustRegister(QueryNodeSQSegmentLatency)
prometheus.MustRegister(QueryNodeSQSegmentLatencyInCore)
prometheus.MustRegister(QueryNodeTranslateHitsLatency)
prometheus.MustRegister(QueryNodeReduceLatency)
prometheus.MustRegister(QueryNodeLoadSegmentLatency)
prometheus.MustRegister(QueryNodeServiceTime)
prometheus.MustRegister(QueryNodeNumFlowGraphs)
}

View File

@ -19,6 +19,7 @@ package msgstream
import (
"context"
"errors"
"time"
"github.com/golang/protobuf/proto"
@ -26,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/timerecord"
)
// MsgType is an alias of commonpb.MsgType
@ -242,6 +244,7 @@ func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) {
type SearchMsg struct {
BaseMsg
internalpb.SearchRequest
tr *timerecord.TimeRecorder
}
// interface implementation validation
@ -279,6 +282,21 @@ func (st *SearchMsg) TimeoutTs() Timestamp {
return st.GetTimeoutTimestamp()
}
// SetTimeRecorder sets the timeRecorder for RetrieveMsg
func (st *SearchMsg) SetTimeRecorder() {
st.tr = timerecord.NewTimeRecorder("searchMsg")
}
// ElapseSpan returns the duration from the beginning
func (st *SearchMsg) ElapseSpan() time.Duration {
return st.tr.ElapseSpan()
}
// RecordSpan returns the duration from last record
func (st *SearchMsg) RecordSpan() time.Duration {
return st.tr.RecordSpan()
}
// Marshal is used to serializing a message pack to byte array
func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) {
searchTask := input.(*SearchMsg)
@ -369,6 +387,7 @@ func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) {
type RetrieveMsg struct {
BaseMsg
internalpb.RetrieveRequest
tr *timerecord.TimeRecorder
}
// interface implementation validation
@ -406,6 +425,21 @@ func (rm *RetrieveMsg) TimeoutTs() Timestamp {
return rm.GetTimeoutTimestamp()
}
// SetTimeRecorder sets the timeRecorder for RetrieveMsg
func (rm *RetrieveMsg) SetTimeRecorder() {
rm.tr = timerecord.NewTimeRecorder("retrieveMsg")
}
// ElapseSpan returns the duration from the beginning
func (rm *RetrieveMsg) ElapseSpan() time.Duration {
return rm.tr.ElapseSpan()
}
// RecordSpan returns the duration from last record
func (rm *RetrieveMsg) RecordSpan() time.Duration {
return rm.tr.RecordSpan()
}
// Marshal is used to serializing a message pack to byte array
func (rm *RetrieveMsg) Marshal(input TsMsg) (MarshalType, error) {
retrieveTask := input.(*RetrieveMsg)

View File

@ -31,6 +31,7 @@ import "C"
import (
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/metrics"
"math"
"sync"
"unsafe"
@ -115,6 +116,8 @@ OUTER:
)
c.vChannels = append(c.vChannels, dstChan)
}
metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vChannels)))
}
// getVChannels get virtual channels of collection
@ -141,6 +144,8 @@ func (c *Collection) removeVChannel(channel Channel) {
zap.Any("collectionID", c.ID()),
zap.Any("channel", channel),
)
metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vChannels)))
}
// addPChannels add physical channels to physical channels of collection
@ -236,6 +241,8 @@ OUTER:
)
c.vDeltaChannels = append(c.vDeltaChannels, dstChan)
}
metrics.QueryNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vDeltaChannels)))
}
func (c *Collection) removeVDeltaChannel(channel Channel) {
@ -252,6 +259,8 @@ func (c *Collection) removeVDeltaChannel(channel Channel) {
zap.Any("collectionID", c.ID()),
zap.Any("channel", channel),
)
metrics.QueryNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(c.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(c.vDeltaChannels)))
}
// setReleaseTime records when collection is released

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -210,6 +211,8 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema
var newCollection = newCollection(collectionID, schema)
colReplica.collections[collectionID] = newCollection
log.Debug("Successfully add collection ", zap.Int64("collectionID", collectionID))
metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.collections)))
return newCollection
}
@ -238,6 +241,15 @@ func (colReplica *collectionReplica) removeCollectionPrivate(collectionID Unique
deleteCollection(collection)
delete(colReplica.collections, collectionID)
metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.collections)))
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0)
metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0)
metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0)
metrics.QueryNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0)
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0)
metrics.QueryNodeNumReaders.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(0)
return nil
}
@ -396,6 +408,8 @@ func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID,
var newPartition = newPartition(collectionID, partitionID)
colReplica.partitions[partitionID] = newPartition
}
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.partitions)))
return nil
}
@ -429,6 +443,7 @@ func (colReplica *collectionReplica) removePartitionPrivate(partitionID UniqueID
collection.removePartitionID(partitionID)
delete(colReplica.partitions, partitionID)
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(collection.ID()), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(len(colReplica.partitions)))
return nil
}
@ -536,6 +551,7 @@ func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, parti
partition.addSegmentID(segmentID)
colReplica.segments[segmentID] = segment
metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(segment.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return nil
}
@ -572,6 +588,8 @@ func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) er
partition.removeSegmentID(segmentID)
delete(colReplica.segments, segmentID)
deleteSegment(segment)
metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(segment.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Dec()
return nil
}

View File

@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
)
@ -67,6 +68,7 @@ func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID Uniqu
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
results[channel] = newFlowGraph
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
}
return results
@ -98,6 +100,7 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
results[channel] = newFlowGraph
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
}
return results
@ -170,6 +173,7 @@ func (dsService *dataSyncService) removeFlowGraphsByDMLChannels(channels []Chann
if _, ok := dsService.dmlChannel2FlowGraph[channel]; ok {
// close flow graph
dsService.dmlChannel2FlowGraph[channel].close()
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Dec()
}
delete(dsService.dmlChannel2FlowGraph, channel)
}
@ -184,6 +188,7 @@ func (dsService *dataSyncService) removeFlowGraphsByDeltaChannels(channels []Cha
if _, ok := dsService.deltaChannel2FlowGraph[channel]; ok {
// close flow graph
dsService.deltaChannel2FlowGraph[channel].close()
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Dec()
}
delete(dsService.deltaChannel2FlowGraph, channel)
}

View File

@ -19,10 +19,12 @@ package querynode
import (
"context"
"errors"
"fmt"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
@ -200,6 +202,8 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS
zap.Any("channel", channel),
zap.Any("subName", subName),
)
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return nil
}
@ -214,6 +218,8 @@ func (q *queryNodeFlowGraph) consumerFlowGraphLatest(channel Channel, subName Co
zap.Any("channel", channel),
zap.Any("subName", subName),
)
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return nil
}
@ -225,6 +231,8 @@ func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosi
zap.Any("collectionID", q.collectionID),
zap.Any("channel", position.ChannelName),
)
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return err
}

View File

@ -25,8 +25,10 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/timerecord"
)
// historical is in charge of historical data in query node
@ -177,11 +179,15 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID
if !seg.getOnService() {
return
}
tr := timerecord.NewTimeRecorder("searchOnSealed")
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
if err != nil {
err2 = err
return
}
metrics.QueryNodeSQSegmentLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch,
metrics.QueryNodeSegTypeSealed,
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
segmentLock.Lock()
searchResults = append(searchResults, searchResult)

View File

@ -1161,10 +1161,12 @@ func genSimpleSearchMsg() (*msgstream.SearchMsg, error) {
if err != nil {
return nil, err
}
return &msgstream.SearchMsg{
msg := &msgstream.SearchMsg{
BaseMsg: genMsgStreamBaseMsg(),
SearchRequest: *req,
}, nil
}
msg.SetTimeRecorder()
return msg, nil
}
func genSimpleRetrieveMsg() (*msgstream.RetrieveMsg, error) {
@ -1172,10 +1174,12 @@ func genSimpleRetrieveMsg() (*msgstream.RetrieveMsg, error) {
if err != nil {
return nil, err
}
return &msgstream.RetrieveMsg{
msg := &msgstream.RetrieveMsg{
BaseMsg: genMsgStreamBaseMsg(),
RetrieveRequest: *req,
}, nil
}
msg.SetTimeRecorder()
return msg, nil
}
func genQueryChannel() Channel {

View File

@ -22,6 +22,7 @@ import (
"fmt"
"math"
"sync"
"time"
"unsafe"
"github.com/golang/protobuf/proto"
@ -30,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -50,6 +52,9 @@ type queryMsg interface {
GuaranteeTs() Timestamp
TravelTs() Timestamp
TimeoutTs() Timestamp
SetTimeRecorder()
ElapseSpan() time.Duration
RecordSpan() time.Duration
}
// queryCollection manages and executes the retrieve and search tasks, it can be created
@ -334,6 +339,8 @@ func (q *queryCollection) setServiceableTime(t Timestamp) {
return
}
q.serviceableTime = t
ps, _ := tsoutil.ParseHybridTs(t)
metrics.QueryNodeServiceTime.WithLabelValues(fmt.Sprint(q.collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Set(float64(ps))
}
func (q *queryCollection) checkTimeout(msg queryMsg) bool {
@ -446,6 +453,8 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
var collectionID UniqueID
var msgTypeStr string
msg.SetTimeRecorder()
switch msgType {
case commonpb.MsgType_Retrieve:
collectionID = msg.(*msgstream.RetrieveMsg).CollectionID
@ -532,6 +541,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
zap.Any("msgID", msg.ID()),
zap.String("msgType", msgTypeStr),
)
msg.RecordSpan()
q.addToUnsolvedMsg(msg)
sp.LogFields(
oplog.String("send to unsolved buffer", "send to unsolved buffer"),
@ -667,8 +677,12 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
)
switch msgType {
case commonpb.MsgType_Retrieve:
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(metrics.QueryNodeQueryTypeQuery,
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(m.RecordSpan().Milliseconds()))
err = q.retrieve(m)
case commonpb.MsgType_Search:
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(metrics.QueryNodeQueryTypeSearch,
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(m.RecordSpan().Milliseconds()))
err = q.search(m)
default:
err := fmt.Errorf("receive invalid msgType = %d", msgType)
@ -700,6 +714,7 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
}
func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][]byte) (*schemapb.SearchResultData, error) {
tr := timerecord.NewTimeRecorder("translateHitsDuration")
log.Debug("translateHits:", zap.Any("lenOfFieldIDs", len(fieldIDs)), zap.Any("lenOfRawHits", len(rawHits)))
if len(rawHits) == 0 {
return nil, fmt.Errorf("empty results")
@ -974,6 +989,7 @@ func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][]
}
}
metrics.QueryNodeTranslateHitsLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
return finalResult, nil
}
@ -1119,6 +1135,9 @@ func (q *queryCollection) search(msg queryMsg) error {
if err != nil {
return err
}
metrics.QueryNodeSQReqLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(msg.ElapseSpan().Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelSuccess, metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
tr.Record(fmt.Sprintf("publish empty search result done, msgID = %d", searchMsg.ID()))
tr.Elapse(fmt.Sprintf("all done, msgID = %d", searchMsg.ID()))
return nil
@ -1128,6 +1147,7 @@ func (q *queryCollection) search(msg queryMsg) error {
numSegment := int64(len(searchResults))
var marshaledHits *MarshaledHits
log.Debug("QueryNode reduce data", zap.Int64("msgID", searchMsg.ID()), zap.Int64("numSegment", numSegment))
tr.RecordSpan()
err = reduceSearchResultsAndFillData(plan, searchResults, numSegment)
log.Debug("QueryNode reduce data finished", zap.Int64("msgID", searchMsg.ID()))
sp.LogFields(oplog.String("statistical time", "reduceSearchResults end"))
@ -1147,7 +1167,7 @@ func (q *queryCollection) search(msg queryMsg) error {
if err != nil {
return err
}
tr.Record(fmt.Sprintf("reduce result done, msgID = %d", searchMsg.ID()))
metrics.QueryNodeReduceLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.RecordSpan().Milliseconds()))
var offset int64
for index := range searchRequests {
@ -1227,6 +1247,11 @@ func (q *queryCollection) search(msg queryMsg) error {
if err != nil {
return err
}
metrics.QueryNodeSQReqLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch,
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(msg.ElapseSpan().Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelSuccess,
metrics.QueryNodeQueryTypeSearch,
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
tr.Record(fmt.Sprintf("publish search result, msgID = %d", searchMsg.ID()))
}
sp.LogFields(oplog.String("statistical time", "stats done"))
@ -1309,7 +1334,8 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
if err != nil {
return err
}
tr.Record(fmt.Sprintf("merge result done, msgID = %d", retrieveMsg.ID()))
reduceDuration := tr.Record(fmt.Sprintf("merge result done, msgID = %d", retrieveMsg.ID()))
metrics.QueryNodeReduceLatency.WithLabelValues(metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(reduceDuration.Milliseconds()))
resultChannelInt := 0
retrieveResultMsg := &msgstream.RetrieveResultMsg{
@ -1334,6 +1360,9 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
if err != nil {
return err
}
metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelSuccess, metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
metrics.QueryNodeSQReqLatency.WithLabelValues(metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(msg.ElapseSpan().Milliseconds()))
log.Debug("QueryNode publish RetrieveResultMsg",
zap.Int64("msgID", retrieveMsg.ID()),
zap.Any("vChannels", collection.getVChannels()),
@ -1403,6 +1432,7 @@ func (q *queryCollection) publishSearchResultWithCtx(ctx context.Context, result
}
func (q *queryCollection) publishSearchResult(result *internalpb.SearchResults, nodeID UniqueID) error {
metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelTotal, metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return q.publishSearchResultWithCtx(q.releaseCtx, result, nodeID)
}
@ -1411,6 +1441,7 @@ func (q *queryCollection) publishRetrieveResultWithCtx(ctx context.Context, resu
}
func (q *queryCollection) publishRetrieveResult(result *internalpb.RetrieveResults, nodeID UniqueID) error {
metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelTotal, metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return q.publishRetrieveResultWithCtx(q.releaseCtx, result, nodeID)
}
@ -1430,6 +1461,7 @@ func (q *queryCollection) publishFailedQueryResultWithCtx(ctx context.Context, m
case commonpb.MsgType_Retrieve:
retrieveMsg := msg.(*msgstream.RetrieveMsg)
baseResult.MsgType = commonpb.MsgType_RetrieveResult
metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelFail, metrics.QueryNodeQueryTypeQuery, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return q.publishRetrieveResult(&internalpb.RetrieveResults{
Base: baseResult,
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg},
@ -1440,6 +1472,7 @@ func (q *queryCollection) publishFailedQueryResultWithCtx(ctx context.Context, m
case commonpb.MsgType_Search:
searchMsg := msg.(*msgstream.SearchMsg)
baseResult.MsgType = commonpb.MsgType_SearchResult
metrics.QueryNodeSQCount.WithLabelValues(metrics.QueryNodeMetricLabelFail, metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
return q.publishSearchResultWithCtx(ctx, &internalpb.SearchResults{
Base: baseResult,
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg},

View File

@ -32,6 +32,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util/timerecord"
"strconv"
"sync"
"unsafe"
@ -308,7 +310,9 @@ func (s *Segment) search(plan *SearchPlan,
cPlaceHolderGroup := cPlaceholderGroups[0]
log.Debug("do search on segment", zap.Int64("segmentID", s.segmentID), zap.Int32("segmentType", int32(s.segmentType)))
tr := timerecord.NewTimeRecorder("cgoSearch")
status := C.Search(s.segmentPtr, plan.cSearchPlan, cPlaceHolderGroup, ts, &searchResult.cSearchResult, C.int64_t(s.segmentID))
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(metrics.QueryNodeQueryTypeSearch, fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
if err := HandleCStatus(&status, "Search failed"); err != nil {
return nil, err
}
@ -336,7 +340,10 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro
var retrieveResult RetrieveResult
ts := C.uint64_t(plan.Timestamp)
tr := timerecord.NewTimeRecorder("cgoRetrieve")
status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult)
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(metrics.QueryNodeQueryTypeQuery,
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
if err := HandleCStatus(&status, "Retrieve failed"); err != nil {
return nil, err
}

View File

@ -32,6 +32,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -42,6 +43,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/timerecord"
)
const timeoutForEachRead = 10 * time.Second
@ -131,6 +133,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme
partitionID := loadInfo.PartitionID
segmentID := loadInfo.SegmentID
segment := newSegments[segmentID]
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
err = loader.loadSegmentInternal(segment, loadInfo)
if err != nil {
log.Error("load segment failed when load data into memory",
@ -142,6 +145,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme
segmentGC()
return err
}
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
}
// set segment to meta replica
@ -505,6 +509,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName)
position.ChannelName = pChannelName
stream.AsReader([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.QueryNodeID, collectionID))
metrics.QueryNodeNumReaders.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
err = stream.SeekReaders([]*internalpb.MsgPosition{position})
if err != nil {
return err

View File

@ -26,8 +26,10 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/util/timerecord"
)
// streaming is in charge of streaming data in query node
@ -206,11 +208,15 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs
// continue
//}
tr := timerecord.NewTimeRecorder("searchOnGrowing")
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
if err != nil {
err2 = err
return
}
metrics.QueryNodeSQSegmentLatency.WithLabelValues(metrics.QueryNodeQueryTypeSearch,
metrics.QueryNodeSegTypeGrowing,
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
segmentLock.Lock()
searchResults = append(searchResults, searchResult)
searchSegmentIDs = append(searchSegmentIDs, seg.segmentID)

View File

@ -27,6 +27,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -161,6 +162,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
if r.req.SeekPosition == nil || len(r.req.SeekPosition.MsgID) == 0 {
// as consumer
log.Debug("QueryNode AsConsumer", zap.Strings("channels", consumeChannels), zap.String("sub name", consumeSubName))