mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Cherry-pick from master pr: #36107 Related to #36102 This PR use newly added `grpcSizeStatsHandler` to reduce calling `proto.Size` since the request & response size info is recorded by grpc framework. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
780cd801eb
commit
1cd8d1bd80
@ -323,6 +323,17 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
|
||||
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
|
||||
unaryServerOption,
|
||||
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
|
||||
grpc.StatsHandler(metrics.NewGRPCSizeStatsHandler().
|
||||
// both inbound and outbound
|
||||
WithTargetMethods(
|
||||
milvuspb.MilvusService_Search_FullMethodName,
|
||||
milvuspb.MilvusService_HybridSearch_FullMethodName,
|
||||
milvuspb.MilvusService_Query_FullMethodName,
|
||||
).
|
||||
// inbound only
|
||||
WithInboundRecord(milvuspb.MilvusService_Insert_FullMethodName,
|
||||
milvuspb.MilvusService_Delete_FullMethodName,
|
||||
milvuspb.MilvusService_Upsert_FullMethodName)),
|
||||
}
|
||||
|
||||
if Params.TLSMode.GetAsInt() == 1 {
|
||||
|
@ -2504,9 +2504,10 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
||||
)
|
||||
method := "Insert"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.InsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request)))
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(method).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()
|
||||
|
||||
it := &insertTask{
|
||||
@ -2630,10 +2631,12 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
||||
)
|
||||
log.Debug("Start processing delete request in Proxy")
|
||||
defer log.Debug("Finish processing delete request in Proxy")
|
||||
method := "Delete"
|
||||
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.DeleteLabel, request.GetCollectionName()).Add(float64(proto.Size(request)))
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(method).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
return &milvuspb.MutationResult{
|
||||
@ -2641,7 +2644,6 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
||||
}, nil
|
||||
}
|
||||
|
||||
method := "Delete"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
@ -2740,9 +2742,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
|
||||
method := "Upsert"
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.UpsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request)))
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(method).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()
|
||||
|
||||
request.Base = commonpbutil.NewMsgBase(
|
||||
@ -2895,12 +2899,10 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
|
||||
}
|
||||
|
||||
func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
|
||||
receiveSize := proto.Size(request)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(receiveSize))
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(metrics.SearchLabel).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
|
||||
metrics.ProxyReceivedNQ.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
@ -3082,8 +3084,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
|
||||
if merr.Ok(qt.result.GetStatus()) {
|
||||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v))
|
||||
}
|
||||
|
||||
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
|
||||
}
|
||||
return qt.result, nil
|
||||
}
|
||||
@ -3107,12 +3107,10 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
||||
}
|
||||
|
||||
func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
|
||||
receiveSize := proto.Size(request)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.HybridSearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(receiveSize))
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(metrics.HybridSearchLabel).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
return &milvuspb.SearchResults{
|
||||
@ -3271,8 +3269,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
||||
if merr.Ok(qt.result.GetStatus()) {
|
||||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v))
|
||||
}
|
||||
|
||||
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
|
||||
}
|
||||
return qt.result, nil
|
||||
}
|
||||
@ -3532,12 +3528,10 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
|
||||
}
|
||||
|
||||
subLabel := GetCollectionRateSubLabel(request)
|
||||
receiveSize := proto.Size(request)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(receiveSize))
|
||||
metrics.GetStats(ctx).
|
||||
SetNodeID(paramtable.GetNodeID()).
|
||||
SetInboundLabel(metrics.QueryLabel).
|
||||
SetCollectionName(request.GetCollectionName())
|
||||
metrics.ProxyReceivedNQ.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
@ -3580,9 +3574,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
|
||||
sentSize := proto.Size(qt.result)
|
||||
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
|
||||
|
||||
username := GetCurUserFromContextOrDefault(ctx)
|
||||
nodeID := paramtable.GetStringNodeID()
|
||||
v := hookutil.GetExtension().Report(map[string]any{
|
||||
|
153
pkg/metrics/grpc_stats_handler.go
Normal file
153
pkg/metrics/grpc_stats_handler.go
Normal file
@ -0,0 +1,153 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"google.golang.org/grpc/stats"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
// milvusGrpcKey is context key type.
|
||||
type milvusGrpcKey struct{}
|
||||
|
||||
// GrpcStats stores the meta and payload size info
|
||||
// it should be attached to context so that request sizing could be avoided
|
||||
type GrpcStats struct {
|
||||
fullMethodName string
|
||||
collectionName string
|
||||
inboundPayloadSize int
|
||||
inboundLabel string
|
||||
nodeID int64
|
||||
}
|
||||
|
||||
func (s *GrpcStats) SetCollectionName(collName string) *GrpcStats {
|
||||
if s == nil {
|
||||
return s
|
||||
}
|
||||
s.collectionName = collName
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *GrpcStats) SetInboundLabel(label string) *GrpcStats {
|
||||
if s == nil {
|
||||
return s
|
||||
}
|
||||
s.inboundLabel = label
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *GrpcStats) SetNodeID(nodeID int64) *GrpcStats {
|
||||
if s == nil {
|
||||
return s
|
||||
}
|
||||
s.nodeID = nodeID
|
||||
return s
|
||||
}
|
||||
|
||||
func attachStats(ctx context.Context, stats *GrpcStats) context.Context {
|
||||
return context.WithValue(ctx, milvusGrpcKey{}, stats)
|
||||
}
|
||||
|
||||
func GetStats(ctx context.Context) *GrpcStats {
|
||||
stats, ok := ctx.Value(milvusGrpcKey{}).(*GrpcStats)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// grpcSizeStatsHandler implementing stats.Handler
|
||||
// this handler process grpc request & response related metrics logic
|
||||
type grpcSizeStatsHandler struct {
|
||||
outboundMethods typeutil.Set[string]
|
||||
targetMethods typeutil.Set[string]
|
||||
}
|
||||
|
||||
func NewGRPCSizeStatsHandler() *grpcSizeStatsHandler {
|
||||
return &grpcSizeStatsHandler{
|
||||
targetMethods: make(typeutil.Set[string]),
|
||||
outboundMethods: make(typeutil.Set[string]),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *grpcSizeStatsHandler) isTarget(method string) bool {
|
||||
return h.targetMethods.Contain(method)
|
||||
}
|
||||
|
||||
func (h *grpcSizeStatsHandler) shouldRecordOutbound(method string) bool {
|
||||
return h.outboundMethods.Contain(method)
|
||||
}
|
||||
|
||||
func (h *grpcSizeStatsHandler) WithTargetMethods(methods ...string) *grpcSizeStatsHandler {
|
||||
h.targetMethods.Insert(methods...)
|
||||
h.outboundMethods.Insert(methods...)
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *grpcSizeStatsHandler) WithInboundRecord(methods ...string) *grpcSizeStatsHandler {
|
||||
h.targetMethods.Insert(methods...)
|
||||
return h
|
||||
}
|
||||
|
||||
// TagConn exists to satisfy gRPC stats.Handler interface.
|
||||
func (h *grpcSizeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
// HandleConn exists to satisfy gRPC stats.Handler interface.
|
||||
func (h *grpcSizeStatsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {}
|
||||
|
||||
func (h *grpcSizeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
|
||||
// if method is not target, just return origin ctx
|
||||
if !h.isTarget(info.FullMethodName) {
|
||||
return ctx
|
||||
}
|
||||
// attach stats
|
||||
return attachStats(ctx, &GrpcStats{fullMethodName: info.FullMethodName})
|
||||
}
|
||||
|
||||
// HandleRPC implements per-RPC stats instrumentation.
|
||||
func (h *grpcSizeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
||||
mstats := GetStats(ctx)
|
||||
// if no stats found, do nothing
|
||||
if mstats == nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch rs := rs.(type) {
|
||||
case *stats.InPayload:
|
||||
// store inbound payload size in stats, collection name could be fetch in service after
|
||||
mstats.inboundPayloadSize = rs.Length
|
||||
case *stats.OutPayload:
|
||||
// all info set
|
||||
// set metrics with inbound size and related meta
|
||||
nodeIDValue := strconv.FormatInt(mstats.nodeID, 10)
|
||||
ProxyReceiveBytes.WithLabelValues(
|
||||
nodeIDValue,
|
||||
mstats.inboundLabel, mstats.collectionName).Add(float64(mstats.inboundPayloadSize))
|
||||
// set outbound payload size metrics for marked methods
|
||||
if h.shouldRecordOutbound(mstats.fullMethodName) {
|
||||
ProxyReadReqSendBytes.WithLabelValues(nodeIDValue).Add(float64(rs.Length))
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user