mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
enhance: add more metrics (#31271)
/kind improvement fix: #31272 This pr add more metrics, which are: - Slow query count, which the duration considered as slow can be configurable; - Number of deleted entities; - Number of entities imported; - Number of entities per collection; - Number of loaded entities per collection; - Number of indexed entities; - Number of indexed entities, per collection, per index and whether it's a vetor index; - Quota states (LongTimeTickDelay, MemoryExhuasted, DiskQuotaExhuasted) per database; --------- Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
parent
b35ecebcc3
commit
dc2cdbe387
@ -251,6 +251,8 @@ proxy:
|
||||
serverMaxRecvSize: 67108864
|
||||
clientMaxSendSize: 268435456
|
||||
clientMaxRecvSize: 67108864
|
||||
# query whose executed time exceeds the `slowQuerySpanInSeconds` can be considered slow, in seconds.
|
||||
slowQuerySpanInSeconds: 5
|
||||
|
||||
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
|
||||
queryCoord:
|
||||
|
@ -18,6 +18,7 @@ package datacoord
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -28,6 +29,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -266,12 +268,16 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
||||
if info.GetImportedRows() <= segment.GetNumOfRows() {
|
||||
continue // rows not changed, no need to update
|
||||
}
|
||||
diff := info.GetImportedRows() - segment.GetNumOfRows()
|
||||
op := UpdateImportedRows(info.GetSegmentID(), info.GetImportedRows())
|
||||
err = s.meta.UpdateSegmentsInfo(op)
|
||||
if err != nil {
|
||||
log.Warn("update import segment rows failed", WrapTaskLog(task, zap.Error(err))...)
|
||||
return
|
||||
}
|
||||
metrics.DataCoordBulkVectors.WithLabelValues(
|
||||
strconv.FormatInt(task.GetCollectionID(), 10),
|
||||
).Add(float64(diff))
|
||||
}
|
||||
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
|
||||
for _, info := range resp.GetImportSegmentsInfo() {
|
||||
|
@ -237,10 +237,7 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm
|
||||
return result
|
||||
}
|
||||
|
||||
// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
|
||||
func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
func (m *meta) getNumRowsOfCollectionUnsafe(collectionID UniqueID) int64 {
|
||||
var ret int64
|
||||
segments := m.segments.GetSegments()
|
||||
for _, segment := range segments {
|
||||
@ -251,6 +248,13 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
|
||||
return ret
|
||||
}
|
||||
|
||||
// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
|
||||
func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
return m.getNumRowsOfCollectionUnsafe(collectionID)
|
||||
}
|
||||
|
||||
// GetCollectionBinlogSize returns the total binlog size and binlog size of collections.
|
||||
func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) {
|
||||
m.RLock()
|
||||
@ -280,6 +284,16 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) {
|
||||
return total, collectionBinlogSize
|
||||
}
|
||||
|
||||
func (m *meta) GetAllCollectionNumRows() map[int64]int64 {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
ret := make(map[int64]int64, len(m.collections))
|
||||
for collectionID := range m.collections {
|
||||
ret[collectionID] = m.getNumRowsOfCollectionUnsafe(collectionID)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// AddSegment records segment info, persisting info into kv store
|
||||
func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
|
||||
log := log.Ctx(ctx)
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
@ -43,6 +44,42 @@ func (s *Server) getQuotaMetrics() *metricsinfo.DataCoordQuotaMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoordCollectionMetrics {
|
||||
totalNumRows := s.meta.GetAllCollectionNumRows()
|
||||
ret := &metricsinfo.DataCoordCollectionMetrics{
|
||||
Collections: make(map[int64]*metricsinfo.DataCoordCollectionInfo, len(totalNumRows)),
|
||||
}
|
||||
for collectionID, total := range totalNumRows {
|
||||
if _, ok := ret.Collections[collectionID]; !ok {
|
||||
ret.Collections[collectionID] = &metricsinfo.DataCoordCollectionInfo{
|
||||
NumEntitiesTotal: 0,
|
||||
IndexInfo: make([]*metricsinfo.DataCoordIndexInfo, 0),
|
||||
}
|
||||
}
|
||||
ret.Collections[collectionID].NumEntitiesTotal = total
|
||||
indexInfo, err := s.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
||||
CollectionID: collectionID,
|
||||
IndexName: "",
|
||||
Timestamp: 0,
|
||||
})
|
||||
if err := merr.CheckRPCCall(indexInfo, err); err != nil {
|
||||
log.Ctx(ctx).Warn("failed to describe index, ignore to report index metrics",
|
||||
zap.Int64("collection", collectionID),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
for _, info := range indexInfo.GetIndexInfos() {
|
||||
ret.Collections[collectionID].IndexInfo = append(ret.Collections[collectionID].IndexInfo, &metricsinfo.DataCoordIndexInfo{
|
||||
NumEntitiesIndexed: info.GetIndexedRows(),
|
||||
IndexName: info.GetIndexName(),
|
||||
FieldID: info.GetIndexID(),
|
||||
})
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// getSystemInfoMetrics composes data cluster metrics
|
||||
func (s *Server) getSystemInfoMetrics(
|
||||
ctx context.Context,
|
||||
@ -53,7 +90,7 @@ func (s *Server) getSystemInfoMetrics(
|
||||
// get datacoord info
|
||||
nodes := s.cluster.GetSessions()
|
||||
clusterTopology := metricsinfo.DataClusterTopology{
|
||||
Self: s.getDataCoordMetrics(),
|
||||
Self: s.getDataCoordMetrics(ctx),
|
||||
ConnectedDataNodes: make([]metricsinfo.DataNodeInfos, 0, len(nodes)),
|
||||
ConnectedIndexNodes: make([]metricsinfo.IndexNodeInfos, 0),
|
||||
}
|
||||
@ -103,7 +140,7 @@ func (s *Server) getSystemInfoMetrics(
|
||||
}
|
||||
|
||||
// getDataCoordMetrics composes datacoord infos
|
||||
func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos {
|
||||
func (s *Server) getDataCoordMetrics(ctx context.Context) metricsinfo.DataCoordInfos {
|
||||
ret := metricsinfo.DataCoordInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
|
||||
@ -125,7 +162,8 @@ func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos {
|
||||
SystemConfigurations: metricsinfo.DataCoordConfiguration{
|
||||
SegmentMaxSize: Params.DataCoordCfg.SegmentMaxSize.GetAsFloat(),
|
||||
},
|
||||
QuotaMetrics: s.getQuotaMetrics(),
|
||||
QuotaMetrics: s.getQuotaMetrics(),
|
||||
CollectionMetrics: s.getCollectionMetrics(ctx),
|
||||
}
|
||||
|
||||
metricsinfo.FillDeployMetricsWithEnv(&ret.BaseComponentInfos.SystemInfo)
|
||||
|
@ -643,6 +643,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
|
||||
|
||||
metrics.CleanupDataCoordNumStoredRows(collectionID)
|
||||
metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel)
|
||||
metrics.CleanupDataCoordBulkInsertVectors(collectionID)
|
||||
|
||||
// no compaction triggered in Drop procedure
|
||||
return resp, nil
|
||||
|
@ -63,8 +63,6 @@ import (
|
||||
|
||||
const moduleName = "Proxy"
|
||||
|
||||
const SlowReadSpan = time.Second * 5
|
||||
|
||||
// GetComponentStates gets the state of Proxy.
|
||||
func (node *Proxy) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
|
||||
stats := &milvuspb.ComponentStates{
|
||||
@ -2469,6 +2467,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
||||
receiveSize := proto.Size(dr.req)
|
||||
rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))
|
||||
|
||||
successCnt := dr.result.GetDeleteCnt()
|
||||
metrics.ProxyDeleteVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.SuccessLabel).Inc()
|
||||
metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
@ -2675,8 +2676,12 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
|
||||
|
||||
defer func() {
|
||||
span := tr.ElapseSpan()
|
||||
if span >= SlowReadSpan {
|
||||
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
|
||||
log.Info(rpcSlow(method), zap.Int64("nq", qt.SearchRequest.GetNq()), zap.Duration("duration", span))
|
||||
metrics.ProxySlowQueryCount.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
@ -2815,8 +2820,12 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea
|
||||
|
||||
defer func() {
|
||||
span := tr.ElapseSpan()
|
||||
if span >= SlowReadSpan {
|
||||
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
|
||||
log.Info(rpcSlow(method), zap.Duration("duration", span))
|
||||
metrics.ProxySlowQueryCount.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.HybridSearchLabel,
|
||||
).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
@ -3070,7 +3079,7 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
|
||||
|
||||
defer func() {
|
||||
span := tr.ElapseSpan()
|
||||
if span >= SlowReadSpan {
|
||||
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
|
||||
log.Info(
|
||||
rpcSlow(method),
|
||||
zap.String("expr", request.Expr),
|
||||
@ -3078,6 +3087,10 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
|
||||
zap.Uint64("travel_timestamp", request.TravelTimestamp),
|
||||
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
|
||||
zap.Duration("duration", span))
|
||||
metrics.ProxySlowQueryCount.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -152,6 +152,18 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetrics, error) {
|
||||
allSegments := node.manager.Segment.GetBy()
|
||||
ret := &metricsinfo.QueryNodeCollectionMetrics{
|
||||
CollectionRows: make(map[int64]int64),
|
||||
}
|
||||
for _, segment := range allSegments {
|
||||
collectionID := segment.Collection()
|
||||
ret.CollectionRows[collectionID] += segment.RowNum()
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// getSystemInfoMetrics returns metrics info of QueryNode
|
||||
func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (*milvuspb.GetMetricsResponse, error) {
|
||||
usedMem := hardware.GetUsedMemoryCount()
|
||||
@ -161,7 +173,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
|
||||
if err != nil {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: merr.Status(err),
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, node.GetNodeID()),
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, node.GetNodeID()),
|
||||
}, nil
|
||||
}
|
||||
hardwareInfos := metricsinfo.HardwareMetrics{
|
||||
@ -175,6 +187,14 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
|
||||
}
|
||||
quotaMetrics.Hms = hardwareInfos
|
||||
|
||||
collectionMetrics, err := getCollectionMetrics(node)
|
||||
if err != nil {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: merr.Status(err),
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, node.GetNodeID()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
nodeInfos := metricsinfo.QueryNodeInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, node.GetNodeID()),
|
||||
@ -188,7 +208,8 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
|
||||
SystemConfigurations: metricsinfo.QueryNodeConfiguration{
|
||||
SimdType: paramtable.Get().CommonCfg.SimdType.GetValue(),
|
||||
},
|
||||
QuotaMetrics: quotaMetrics,
|
||||
QuotaMetrics: quotaMetrics,
|
||||
CollectionMetrics: collectionMetrics,
|
||||
}
|
||||
metricsinfo.FillDeployMetricsWithEnv(&nodeInfos.SystemInfo)
|
||||
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/tso"
|
||||
@ -177,6 +178,59 @@ func (q *QuotaCenter) clearMetrics() {
|
||||
q.proxyMetrics = make(map[UniqueID]*metricsinfo.ProxyQuotaMetrics, 0)
|
||||
}
|
||||
|
||||
func updateNumEntitiesLoaded(current map[int64]int64, qn *metricsinfo.QueryNodeCollectionMetrics) map[int64]int64 {
|
||||
for collectionID, rowNum := range qn.CollectionRows {
|
||||
current[collectionID] += rowNum
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
||||
func (q *QuotaCenter) reportNumEntitiesLoaded(numEntitiesLoaded map[int64]int64) {
|
||||
for collectionID, num := range numEntitiesLoaded {
|
||||
info, err := q.meta.GetCollectionByID(context.Background(), "", collectionID, typeutil.MaxTimestamp, false)
|
||||
if err != nil {
|
||||
log.Warn("failed to get collection info by its id, ignore to report loaded num entities",
|
||||
zap.Int64("collection", collectionID),
|
||||
zap.Int64("num_entities_loaded", num),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
metrics.RootCoordNumEntities.WithLabelValues(info.Name, metrics.LoadedLabel).Set(float64(num))
|
||||
}
|
||||
}
|
||||
|
||||
func (q *QuotaCenter) reportDataCoordCollectionMetrics(dc *metricsinfo.DataCoordCollectionMetrics) {
|
||||
for collectionID, collection := range dc.Collections {
|
||||
info, err := q.meta.GetCollectionByID(context.Background(), "", collectionID, typeutil.MaxTimestamp, false)
|
||||
if err != nil {
|
||||
log.Warn("failed to get collection info by its id, ignore to report total_num_entities/indexed_entities",
|
||||
zap.Int64("collection", collectionID),
|
||||
zap.Int64("num_entities_total", collection.NumEntitiesTotal),
|
||||
zap.Int("lenOfIndexedInfo", len(collection.IndexInfo)),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
metrics.RootCoordNumEntities.WithLabelValues(info.Name, metrics.TotalLabel).Set(float64(collection.NumEntitiesTotal))
|
||||
fields := lo.KeyBy(info.Fields, func(v *model.Field) int64 { return v.FieldID })
|
||||
for _, indexInfo := range collection.IndexInfo {
|
||||
if _, ok := fields[indexInfo.FieldID]; !ok {
|
||||
log.Warn("field id not found, ignore to report indexed num entities",
|
||||
zap.Int64("collection", collectionID),
|
||||
zap.Int64("field", indexInfo.FieldID),
|
||||
)
|
||||
continue
|
||||
}
|
||||
field := fields[indexInfo.FieldID]
|
||||
metrics.RootCoordIndexedNumEntities.WithLabelValues(
|
||||
info.Name,
|
||||
indexInfo.IndexName,
|
||||
strconv.FormatBool(typeutil.IsVectorType(field.DataType))).Set(float64(indexInfo.NumEntitiesIndexed))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes.
|
||||
func (q *QuotaCenter) syncMetrics() error {
|
||||
oldDataNodes := typeutil.NewSet(lo.Keys(q.dataNodeMetrics)...)
|
||||
@ -191,6 +245,8 @@ func (q *QuotaCenter) syncMetrics() error {
|
||||
return err
|
||||
}
|
||||
|
||||
numEntitiesLoaded := make(map[int64]int64)
|
||||
|
||||
// get Query cluster metrics
|
||||
group.Go(func() error {
|
||||
rsp, err := q.queryCoord.GetMetrics(ctx, req)
|
||||
@ -210,8 +266,12 @@ func (q *QuotaCenter) syncMetrics() error {
|
||||
q.queryNodeMetrics[queryNodeMetric.ID] = queryNodeMetric.QuotaMetrics
|
||||
collections.Insert(queryNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
|
||||
}
|
||||
if queryNodeMetric.CollectionMetrics != nil {
|
||||
numEntitiesLoaded = updateNumEntitiesLoaded(numEntitiesLoaded, queryNodeMetric.CollectionMetrics)
|
||||
}
|
||||
}
|
||||
q.readableCollections = collections.Collect()
|
||||
q.reportNumEntitiesLoaded(numEntitiesLoaded)
|
||||
return nil
|
||||
})
|
||||
// get Data cluster metrics
|
||||
@ -226,6 +286,10 @@ func (q *QuotaCenter) syncMetrics() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if dataCoordTopology.Cluster.Self.CollectionMetrics != nil {
|
||||
q.reportDataCoordCollectionMetrics(dataCoordTopology.Cluster.Self.CollectionMetrics)
|
||||
}
|
||||
|
||||
collections := typeutil.NewUniqueSet()
|
||||
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
|
||||
if dataNodeMetric.QuotaMetrics != nil {
|
||||
@ -848,14 +912,31 @@ func (q *QuotaCenter) setRates() error {
|
||||
func (q *QuotaCenter) recordMetrics() {
|
||||
record := func(errorCode commonpb.ErrorCode) {
|
||||
var hasException float64 = 0
|
||||
for _, states := range q.quotaStates {
|
||||
for collectionID, states := range q.quotaStates {
|
||||
info, err := q.meta.GetCollectionByID(context.Background(), "", collectionID, typeutil.MaxTimestamp, false)
|
||||
if err != nil {
|
||||
log.Warn("failed to get collection info by its id, ignore to report quota states",
|
||||
zap.Int64("collection", collectionID),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
dbm, err := q.meta.GetDatabaseByID(context.Background(), info.DBID, typeutil.MaxTimestamp)
|
||||
if err != nil {
|
||||
log.Warn("failed to get database name info by its id, ignore to report quota states",
|
||||
zap.Int64("collection", collectionID),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, state := range states {
|
||||
if state == errorCode {
|
||||
hasException = 1
|
||||
}
|
||||
}
|
||||
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String(), dbm.Name).Set(hasException)
|
||||
}
|
||||
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(hasException)
|
||||
}
|
||||
record(commonpb.ErrorCode_MemoryQuotaExhausted)
|
||||
record(commonpb.ErrorCode_DiskQuotaExhausted)
|
||||
|
@ -93,6 +93,16 @@ var (
|
||||
segmentStateLabelName,
|
||||
})
|
||||
|
||||
DataCoordBulkVectors = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataCoordRole,
|
||||
Name: "bulk_insert_vectors_count",
|
||||
Help: "counter of vectors successfully bulk inserted",
|
||||
}, []string{
|
||||
collectionIDLabelName,
|
||||
})
|
||||
|
||||
DataCoordNumStoredRowsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
@ -284,6 +294,7 @@ func RegisterDataCoord(registry *prometheus.Registry) {
|
||||
registry.MustRegister(DataCoordNumSegments)
|
||||
registry.MustRegister(DataCoordNumCollections)
|
||||
registry.MustRegister(DataCoordNumStoredRows)
|
||||
registry.MustRegister(DataCoordBulkVectors)
|
||||
registry.MustRegister(DataCoordNumStoredRowsCounter)
|
||||
registry.MustRegister(DataCoordConsumeDataNodeTimeTickLag)
|
||||
registry.MustRegister(DataCoordCheckpointUnixSeconds)
|
||||
@ -322,3 +333,9 @@ func CleanupDataCoordNumStoredRows(collectionID int64) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func CleanupDataCoordBulkInsertVectors(collectionID int64) {
|
||||
DataCoordBulkVectors.Delete(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
}
|
||||
|
@ -81,6 +81,8 @@ const (
|
||||
functionLabelName = "function_name"
|
||||
queryTypeLabelName = "query_type"
|
||||
collectionName = "collection_name"
|
||||
indexName = "index_name"
|
||||
isVectorIndex = "is_vector_index"
|
||||
segmentStateLabelName = "segment_state"
|
||||
segmentIDLabelName = "segment_id"
|
||||
segmentLevelLabelName = "segment_level"
|
||||
@ -97,6 +99,10 @@ const (
|
||||
lockType = "lock_type"
|
||||
lockOp = "lock_op"
|
||||
loadTypeName = "load_type"
|
||||
|
||||
// entities label
|
||||
LoadedLabel = "loaded"
|
||||
NumEntitiesAllLabel = "all"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -60,6 +60,14 @@ var (
|
||||
Help: "counter of vectors successfully upserted",
|
||||
}, []string{nodeIDLabelName})
|
||||
|
||||
ProxyDeleteVectors = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "delete_vectors_count",
|
||||
Help: "counter of vectors successfully deleted",
|
||||
}, []string{nodeIDLabelName})
|
||||
|
||||
// ProxySQLatency record the latency of search successfully.
|
||||
ProxySQLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
@ -306,6 +314,14 @@ var (
|
||||
Name: "rate_limit_req_count",
|
||||
Help: "count of operation executed",
|
||||
}, []string{nodeIDLabelName, msgTypeLabelName, statusLabelName})
|
||||
|
||||
ProxySlowQueryCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "slow_query_count",
|
||||
Help: "count of slow query executed",
|
||||
}, []string{nodeIDLabelName, msgTypeLabelName})
|
||||
)
|
||||
|
||||
// RegisterProxy registers Proxy metrics
|
||||
@ -314,6 +330,7 @@ func RegisterProxy(registry *prometheus.Registry) {
|
||||
registry.MustRegister(ProxySearchVectors)
|
||||
registry.MustRegister(ProxyInsertVectors)
|
||||
registry.MustRegister(ProxyUpsertVectors)
|
||||
registry.MustRegister(ProxyDeleteVectors)
|
||||
|
||||
registry.MustRegister(ProxySQLatency)
|
||||
registry.MustRegister(ProxyCollectionSQLatency)
|
||||
@ -351,6 +368,8 @@ func RegisterProxy(registry *prometheus.Registry) {
|
||||
registry.MustRegister(ProxyWorkLoadScore)
|
||||
registry.MustRegister(ProxyExecutingTotalNq)
|
||||
registry.MustRegister(ProxyRateLimitReqCount)
|
||||
|
||||
registry.MustRegister(ProxySlowQueryCount)
|
||||
}
|
||||
|
||||
func CleanupCollectionMetrics(nodeID int64, collection string) {
|
||||
|
@ -167,6 +167,7 @@ var (
|
||||
Help: "The quota states of cluster",
|
||||
}, []string{
|
||||
"quota_states",
|
||||
"db_name",
|
||||
})
|
||||
|
||||
// RootCoordRateLimitRatio reflects the ratio of rate limit.
|
||||
@ -185,6 +186,29 @@ var (
|
||||
Name: "ddl_req_latency_in_queue",
|
||||
Help: "latency of each DDL operations in queue",
|
||||
}, []string{functionLabelName})
|
||||
|
||||
RootCoordNumEntities = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.RootCoordRole,
|
||||
Name: "entity_num",
|
||||
Help: "number of entities, clustered by collection and their status(loaded/total)",
|
||||
}, []string{
|
||||
collectionName,
|
||||
statusLabelName,
|
||||
})
|
||||
|
||||
RootCoordIndexedNumEntities = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.RootCoordRole,
|
||||
Name: "indexed_entity_num",
|
||||
Help: "indexed number of entities, clustered by collection, index name and whether it's a vector index",
|
||||
}, []string{
|
||||
collectionName,
|
||||
indexName,
|
||||
isVectorIndex,
|
||||
})
|
||||
)
|
||||
|
||||
// RegisterRootCoord registers RootCoord metrics
|
||||
@ -219,4 +243,7 @@ func RegisterRootCoord(registry *prometheus.Registry) {
|
||||
registry.MustRegister(RootCoordQuotaStates)
|
||||
registry.MustRegister(RootCoordRateLimitRatio)
|
||||
registry.MustRegister(RootCoordDDLReqLatencyInQueue)
|
||||
|
||||
registry.MustRegister(RootCoordNumEntities)
|
||||
registry.MustRegister(RootCoordIndexedNumEntities)
|
||||
}
|
||||
|
@ -94,11 +94,16 @@ type QueryNodeConfiguration struct {
|
||||
SimdType string `json:"simd_type"`
|
||||
}
|
||||
|
||||
type QueryNodeCollectionMetrics struct {
|
||||
CollectionRows map[int64]int64
|
||||
}
|
||||
|
||||
// QueryNodeInfos implements ComponentInfos
|
||||
type QueryNodeInfos struct {
|
||||
BaseComponentInfos
|
||||
SystemConfigurations QueryNodeConfiguration `json:"system_configurations"`
|
||||
QuotaMetrics *QueryNodeQuotaMetrics `json:"quota_metrics"`
|
||||
SystemConfigurations QueryNodeConfiguration `json:"system_configurations"`
|
||||
QuotaMetrics *QueryNodeQuotaMetrics `json:"quota_metrics"`
|
||||
CollectionMetrics *QueryNodeCollectionMetrics `json:"collection_metrics"`
|
||||
}
|
||||
|
||||
// QueryCoordConfiguration records the configuration of QueryCoord.
|
||||
@ -167,11 +172,27 @@ type DataCoordConfiguration struct {
|
||||
SegmentMaxSize float64 `json:"segment_max_size"`
|
||||
}
|
||||
|
||||
type DataCoordIndexInfo struct {
|
||||
NumEntitiesIndexed int64
|
||||
IndexName string
|
||||
FieldID int64
|
||||
}
|
||||
|
||||
type DataCoordCollectionInfo struct {
|
||||
NumEntitiesTotal int64
|
||||
IndexInfo []*DataCoordIndexInfo
|
||||
}
|
||||
|
||||
type DataCoordCollectionMetrics struct {
|
||||
Collections map[int64]*DataCoordCollectionInfo
|
||||
}
|
||||
|
||||
// DataCoordInfos implements ComponentInfos
|
||||
type DataCoordInfos struct {
|
||||
BaseComponentInfos
|
||||
SystemConfigurations DataCoordConfiguration `json:"system_configurations"`
|
||||
QuotaMetrics *DataCoordQuotaMetrics `json:"quota_metrics"`
|
||||
SystemConfigurations DataCoordConfiguration `json:"system_configurations"`
|
||||
QuotaMetrics *DataCoordQuotaMetrics `json:"quota_metrics"`
|
||||
CollectionMetrics *DataCoordCollectionMetrics `json:"collection_metrics"`
|
||||
}
|
||||
|
||||
// RootCoordConfiguration records the configuration of RootCoord.
|
||||
|
@ -1056,6 +1056,8 @@ type proxyConfig struct {
|
||||
MaxConnectionNum ParamItem `refreshable:"true"`
|
||||
|
||||
GracefulStopTimeout ParamItem `refreshable:"true"`
|
||||
|
||||
SlowQuerySpanInSeconds ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *proxyConfig) init(base *BaseTable) {
|
||||
@ -1398,6 +1400,15 @@ please adjust in embedded Milvus: false`,
|
||||
Export: true,
|
||||
}
|
||||
p.MaxConnectionNum.Init(base.mgr)
|
||||
|
||||
p.SlowQuerySpanInSeconds = ParamItem{
|
||||
Key: "proxy.slowQuerySpanInSeconds",
|
||||
Version: "2.3.11",
|
||||
Doc: "query whose executed time exceeds the `slowQuerySpanInSeconds` can be considered slow, in seconds.",
|
||||
DefaultValue: "5",
|
||||
Export: true,
|
||||
}
|
||||
p.SlowQuerySpanInSeconds.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
Loading…
Reference in New Issue
Block a user