diff --git a/internal/metrics/proxy_metrics.go b/internal/metrics/proxy_metrics.go index 82bc0460ae..08325f4a1d 100644 --- a/internal/metrics/proxy_metrics.go +++ b/internal/metrics/proxy_metrics.go @@ -223,6 +223,24 @@ var ( Help: "latency of each DQL request excluding search and query", Buckets: buckets, // unit: ms }, []string{nodeIDLabelName, functionLabelName}) + + // ProxyMutationReceiveBytes record the received bytes of Insert/Delete in Proxy + ProxyMutationReceiveBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "receive_bytes_count", + Help: "count of bytes received from sdk", + }, []string{nodeIDLabelName}) + + // ProxyReadReqSendBytes record the bytes sent back to client by Proxy + ProxyReadReqSendBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "send_bytes_count", + Help: "count of bytes sent back to sdk", + }, []string{nodeIDLabelName}) ) //RegisterProxy registers Proxy metrics @@ -254,5 +272,6 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyDDLReqLatency) registry.MustRegister(ProxyDMLReqLatency) registry.MustRegister(ProxyDQLReqLatency) - + registry.MustRegister(ProxyMutationReceiveBytes) + registry.MustRegister(ProxyReadReqSendBytes) } diff --git a/internal/metrics/querynode_metrics.go b/internal/metrics/querynode_metrics.go index 1b16c50730..f2d3f8305f 100644 --- a/internal/metrics/querynode_metrics.go +++ b/internal/metrics/querynode_metrics.go @@ -17,9 +17,8 @@ package metrics import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/prometheus/client_golang/prometheus" ) var ( @@ -167,6 +166,111 @@ var ( nodeIDLabelName, }) + QueryNodeReadTaskUnsolveLen = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "read_task_unsolved_len", + Help: "number of unsolved read tasks in unsolvedQueue", + }, []string{ + nodeIDLabelName, + }) + + QueryNodeReadTaskReadyLen = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "read_task_ready_len", + Help: "number of ready read tasks in readyQueue", + }, []string{ + nodeIDLabelName, + }) + + QueryNodeReadTaskConcurrency = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "read_task_concurrency", + Help: "number of concurrent executing read tasks in QueryNode", + }, []string{ + nodeIDLabelName, + }) + + QueryNodeEstimateCPUUsage = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "estimate_cpu_usage", + Help: "estimated cpu usage by the scheduler in QueryNode", + }, []string{ + nodeIDLabelName, + }) + + QueryNodeSearchGroupNQ = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "search_group_nq", + Help: "the number of queries of each grouped search task", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + }) + + QueryNodeSearchNQ = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "search_nq", + Help: "the number of queries of each search task", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + }) + + QueryNodeSearchGroupTopK = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "search_group_topk", + Help: "the topK of each grouped search task", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + }) + + QueryNodeSearchTopK = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "search_topk", + Help: "the top of each search task", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + }) + + QueryNodeSearchGroupSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "search_group_size", + Help: "the number of tasks of each grouped search task", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + }) + + QueryNodeEvictedReadReqCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "read_evicted_count", + Help: "count of evicted search / query request", + }, []string{ + nodeIDLabelName, + }) + QueryNodeNumFlowGraphs = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, @@ -176,6 +280,16 @@ var ( }, []string{ nodeIDLabelName, }) + + QueryNodeNumEntities = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "entities_num", + Help: "number of entities which can be searched/queried", + }, []string{ + nodeIDLabelName, + }) ) //RegisterQueryNode registers QueryNode metrics @@ -193,5 +307,16 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeSQSegmentLatencyInCore) registry.MustRegister(QueryNodeReduceLatency) registry.MustRegister(QueryNodeLoadSegmentLatency) + registry.MustRegister(QueryNodeReadTaskUnsolveLen) + registry.MustRegister(QueryNodeReadTaskReadyLen) + registry.MustRegister(QueryNodeReadTaskConcurrency) + registry.MustRegister(QueryNodeEstimateCPUUsage) + registry.MustRegister(QueryNodeSearchGroupNQ) + registry.MustRegister(QueryNodeSearchNQ) + registry.MustRegister(QueryNodeSearchGroupSize) + registry.MustRegister(QueryNodeEvictedReadReqCount) + registry.MustRegister(QueryNodeSearchGroupTopK) + registry.MustRegister(QueryNodeSearchTopK) registry.MustRegister(QueryNodeNumFlowGraphs) + registry.MustRegister(QueryNodeNumEntities) } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 9665191dc1..75acbb2bcf 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -2179,6 +2180,8 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) } method := "Insert" tr := timerecord.NewTimeRecorder(method) + receiveSize := proto.Size(request) + metrics.ProxyMutationReceiveBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(receiveSize)) defer func() { metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, @@ -2286,7 +2289,8 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(it.result.InsertCnt)) + successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex)) + metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(successCnt)) metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) return it.result, nil } @@ -2299,6 +2303,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID)) defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID)) + receiveSize := proto.Size(request) + metrics.ProxyMutationReceiveBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(receiveSize)) + if !node.checkHealthy() { return &milvuspb.MutationResult{ Status: unhealthyStatus(), @@ -2529,6 +2536,11 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) searchDur := tr.ElapseSpan().Milliseconds() metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(searchDur)) + + if qt.result != nil { + sentSize := proto.Size(qt.result) + metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize)) + } return qt.result, nil } @@ -2743,10 +2755,14 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - return &milvuspb.QueryResults{ + + ret := &milvuspb.QueryResults{ Status: qt.result.Status, FieldsData: qt.result.FieldsData, - }, nil + } + sentSize := proto.Size(qt.result) + metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize)) + return ret, nil } // CreateAlias create alias for collection, then you can search the collection with alias. diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 23cdeec569..bc57c4288a 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -674,7 +674,6 @@ func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) m func (m *MetaCache) ClearShards(collectionName string) { log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName)) m.mu.Lock() - //var ret map[string][]nodeInfo info, ok := m.collInfo[collectionName] if ok { m.collInfo[collectionName].shardLeaders = nil diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 63985f74fe..473b219e7d 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -579,6 +579,8 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) ( latency := tr.ElapseSpan() metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds())) metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc() + metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetNq())) + metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetTopk())) return ret, nil } diff --git a/internal/querynode/meta_replica.go b/internal/querynode/meta_replica.go index 2068e84d3b..92b0d2543b 100644 --- a/internal/querynode/meta_replica.go +++ b/internal/querynode/meta_replica.go @@ -553,6 +553,10 @@ func (replica *metaReplica) addSegmentPrivate(segmentID UniqueID, partitionID Un } metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc() + rowCount := segment.getRowCount() + if rowCount > 0 { + metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Add(float64(rowCount)) + } return nil } @@ -582,13 +586,14 @@ func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentTyp // removeSegmentPrivate is private function in collectionReplica, to remove a segment from collectionReplica func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType segmentType) { + var rowCount int64 switch segType { case segmentTypeGrowing: if segment, ok := replica.growingSegments[segmentID]; ok { if partition, ok := replica.partitions[segment.partitionID]; ok { partition.removeSegmentID(segmentID, segType) } - + rowCount = segment.getRowCount() delete(replica.growingSegments, segmentID) deleteSegment(segment) } @@ -598,6 +603,7 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg partition.removeSegmentID(segmentID, segType) } + rowCount = segment.getRowCount() delete(replica.sealedSegments, segmentID) deleteSegment(segment) } @@ -606,6 +612,9 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg } metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Dec() + if rowCount > 0 { + metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(rowCount)) + } } // getSegmentByID returns the segment which id is segmentID diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 16b18f4f73..43cd51d131 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -603,7 +603,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [ if err := HandleCStatus(&status, "Insert failed"); err != nil { return err } - + metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Add(float64(numOfRow)) s.setRecentlyModified(true) return nil } diff --git a/internal/querynode/task_scheduler.go b/internal/querynode/task_scheduler.go index e2ec81517c..a3cbf7ac31 100644 --- a/internal/querynode/task_scheduler.go +++ b/internal/querynode/task_scheduler.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" ) const ( @@ -52,7 +53,8 @@ type taskScheduler struct { schedule scheduleReadTaskPolicy // for search and query end - cpuUsage int32 // 1200 means 1200% 12 cores + cpuUsage int32 // 1200 means 1200% 12 cores + readConcurrency int32 // 1200 means 1200% 12 cores // for other tasks queue taskQueue @@ -153,6 +155,7 @@ func (s *taskScheduler) tryEvictUnsolvedReadTask(headCount int) { if diff <= 0 { return } + metrics.QueryNodeEvictedReadReqCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Add(float64(diff)) busyErr := fmt.Errorf("server is busy") for e := s.unsolvedReadTasks.Front(); e != nil && diff > 0; e = e.Next() { diff-- @@ -224,6 +227,7 @@ func (s *taskScheduler) popAndAddToExecute() { if curUsage < 0 { curUsage = 0 } + metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(curUsage)) targetUsage := s.maxCPUUsage - curUsage if targetUsage <= 0 { return @@ -246,11 +250,14 @@ func (s *taskScheduler) executeReadTasks() { return case t, ok := <-s.executeReadTaskChan: if ok { + pendingTaskLen := len(s.executeReadTaskChan) taskWg.Add(1) + atomic.AddInt32(&s.readConcurrency, int32(pendingTaskLen+1)) go func(t readTask) { defer taskWg.Done() s.processReadTask(t) cpu := t.CPUUsage() + atomic.AddInt32(&s.readConcurrency, -1) atomic.AddInt32(&s.cpuUsage, -cpu) select { case s.notifyChan <- struct{}{}: @@ -258,7 +265,6 @@ func (s *taskScheduler) executeReadTasks() { } }(t) - pendingTaskLen := len(s.executeReadTaskChan) for i := 0; i < pendingTaskLen; i++ { taskWg.Add(1) t := <-s.executeReadTaskChan @@ -266,6 +272,7 @@ func (s *taskScheduler) executeReadTasks() { defer taskWg.Done() s.processReadTask(t) cpu := t.CPUUsage() + atomic.AddInt32(&s.readConcurrency, -1) atomic.AddInt32(&s.cpuUsage, -cpu) select { case s.notifyChan <- struct{}{}: @@ -344,4 +351,8 @@ func (s *taskScheduler) tryMergeReadTasks() { s.unsolvedReadTasks.Remove(e) } } + metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.unsolvedReadTasks.Len())) + metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.readyReadTasks.Len())) + readConcurrency := atomic.LoadInt32(&s.readConcurrency) + metrics.QueryNodeEstimateCPUUsage.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency)) } diff --git a/internal/querynode/task_search.go b/internal/querynode/task_search.go index e06671b612..918cf5e82e 100644 --- a/internal/querynode/task_search.go +++ b/internal/querynode/task_search.go @@ -30,6 +30,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -161,6 +162,11 @@ func (s *searchTask) Execute(ctx context.Context) error { } func (s *searchTask) Notify(err error) { + if len(s.otherTasks) > 0 { + metrics.QueryNodeSearchGroupSize.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(len(s.otherTasks) + 1)) + metrics.QueryNodeSearchGroupNQ.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(s.NQ)) + metrics.QueryNodeSearchGroupTopK.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(s.TopK)) + } s.done <- err for i := 0; i < len(s.otherTasks); i++ { s.otherTasks[i].Notify(err) diff --git a/scripts/core_build.sh b/scripts/core_build.sh index 8567113428..a4e06028b2 100755 --- a/scripts/core_build.sh +++ b/scripts/core_build.sh @@ -260,7 +260,6 @@ fi if command -v ccache &> /dev/null then ccache -s - exit fi popd