Simplify monitoring metrics (#16687)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2022-04-27 23:03:47 +08:00 committed by GitHub
parent 3a6db2faeb
commit 0a953948af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 406 additions and 1148 deletions

View File

@ -518,7 +518,7 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
}
utcT, _ := tsoutil.ParseHybridTs(ts)
metrics.DataCoordSyncUTC.WithLabelValues().Set(float64(utcT))
metrics.DataCoordSyncEpoch.WithLabelValues(ch).Set(float64(utcT))
s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

View File

@ -543,7 +543,7 @@ func (node *DataNode) ReadyToFlush() error {
//
// One precondition: The segmentID in req is in ascending order.
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
metrics.DataNodeFlushSegmentsReqCounter.WithLabelValues(
metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(Params.DataNodeCfg.GetNodeID()),
MetricRequestsTotal).Inc()
@ -610,7 +610,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
}
status.ErrorCode = commonpb.ErrorCode_Success
metrics.DataNodeFlushSegmentsReqCounter.WithLabelValues(
metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(Params.DataNodeCfg.GetNodeID()),
MetricRequestsSuccess).Inc()

View File

@ -247,6 +247,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
segmentID UniqueID
flushed bool
dropped bool
auto bool
}
var (
@ -296,9 +297,9 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
segmentID: segToFlush,
flushed: false,
dropped: false,
auto: true,
})
metrics.DataNodeAutoFlushSegmentCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc()
}
}
@ -343,13 +344,22 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0])
if err != nil {
log.Warn("failed to invoke flushBufferData", zap.Error(err))
metrics.DataNodeFlushSegmentCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
if task.auto {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
}
} else {
segmentsToFlush = append(segmentsToFlush, task.segmentID)
ibNode.insertBuffer.Delete(task.segmentID)
metrics.DataNodeFlushSegmentCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc()
if task.auto {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
}
}
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
if task.auto {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
}
metrics.DataNodeFlushSegmentCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
}
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax, seg2Upload); err != nil {
@ -521,7 +531,6 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
continue
}
stats = append(stats, stat)
metrics.DataNodeSegmentRowsCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Add(float64(stat.NumRows))
}
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.DataNodeTtMsg{

View File

@ -412,7 +412,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
data: kvs,
}, field2Insert, field2Stats, flushed, dropped, pos)
metrics.DataNodeFlushSegmentLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
}

View File

@ -378,16 +378,14 @@ func (it *IndexBuildTask) loadFieldData(ctx context.Context) (storage.FieldID, s
loadVectorDuration := it.tr.RecordSpan()
log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID))
it.tr.Record("load field data done")
metrics.IndexNodeLoadFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(loadVectorDuration))
var insertCodec storage.InsertCodec
collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(blobs)
if err2 != nil {
return storage.InvalidUniqueID, nil, err2
}
// TODO: @xiaocai2333 metrics.IndexNodeLoadBinlogLatency should be added above, put here to get segmentID.
metrics.IndexNodeLoadBinlogLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(loadVectorDuration))
metrics.IndexNodeDecodeBinlogLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(it.tr.RecordSpan()))
metrics.IndexNodeDecodeFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(it.tr.RecordSpan()))
if len(insertData.Data) != 1 {
return storage.InvalidUniqueID, nil, errors.New("we expect only one field in deserialized insert data")

View File

@ -37,18 +37,18 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "num_datanodes",
Help: "Number of data nodes managed by DataCoord",
Name: "datanode_num",
Help: "number of data nodes",
}, []string{})
DataCoordNumSegments = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "num_segments",
Name: "segment_num",
Help: "number of segments",
}, []string{
segmentTypeLabelName,
segmentStateLabelName,
})
//DataCoordCollectionNum records the num of collections managed by DataCoord.
@ -56,25 +56,25 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "num_collections",
Help: "Number of collections",
Name: "collection_num",
Help: "number of collections",
}, []string{})
DataCoordNumStoredRows = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "num_stored_rows",
Name: "stored_rows_num",
Help: "number of stored rows",
}, []string{})
DataCoordSyncUTC = prometheus.NewGaugeVec(
DataCoordSyncEpoch = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "sync_utc_time",
Help: "sync timestamp",
}, []string{})
Name: "sync_epoch_time",
Help: "synchronized unix epoch per physical channel",
}, []string{channelNameLabelName})
/* hard to implement, commented now
DataCoordSegmentSizeRatio = prometheus.NewHistogramVec(
@ -129,5 +129,5 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(DataCoordNumSegments)
registry.MustRegister(DataCoordNumCollections)
registry.MustRegister(DataCoordNumStoredRows)
registry.MustRegister(DataCoordSyncUTC)
registry.MustRegister(DataCoordSyncEpoch)
}

View File

@ -26,8 +26,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_flow_graphs",
Help: "Number of flow graphs in DataNode.",
Name: "flowgraph_num",
Help: "number of flowgraphs",
}, []string{
nodeIDLabelName,
})
@ -36,8 +36,8 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "message_rows_count",
Help: "Messages rows size count consumed from msgStream in DataNode.",
Name: "msg_rows_count",
Help: "count of rows consumed from msgStream",
}, []string{
nodeIDLabelName,
msgTypeLabelName,
@ -47,41 +47,19 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flushed_size",
Help: "Data size flushed to storage in DataNode.",
Name: "flushed_data_size",
Help: "byte size of data flushed to storage",
}, []string{
nodeIDLabelName,
msgTypeLabelName,
})
//DataNodeNumDmlChannels = prometheus.NewGaugeVec(
// prometheus.GaugeOpts{
// Namespace: milvusNamespace,
// Subsystem: typeutil.DataNodeRole,
// Name: "num_dml_channels",
// Help: "Number of dmlChannels per collection in DataNode.",
// }, []string{
// collectionIDLabelName,
// nodeIDLabelName,
// })
//
//DataNodeNumDeltaChannels = prometheus.NewGaugeVec(
// prometheus.GaugeOpts{
// Namespace: milvusNamespace,
// Subsystem: typeutil.DataNodeRole,
// Name: "num_delta_channels",
// Help: "Number of deltaChannels per collection in DataNode.",
// }, []string{
// collectionIDLabelName,
// nodeIDLabelName,
// })
DataNodeNumConsumers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_consumers",
Help: "Number of consumers per collection in DataNode.",
Name: "consumer_num",
Help: "number of consumers",
}, []string{
nodeIDLabelName,
})
@ -90,8 +68,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_producers",
Help: "Number of producers per collection in DataNode.",
Name: "producer_num",
Help: "number of producers",
}, []string{
nodeIDLabelName,
})
@ -100,39 +78,29 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "time_sync",
Help: "Synchronized timestamps per channel in DataNode.",
Name: "sync_epoch_time",
Help: "synchronized unix epoch per physical channel",
}, []string{
nodeIDLabelName,
channelNameLabelName,
})
DataNodeSegmentRowsCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "seg_rows_count",
Help: "Rows count of segments which sent to DataCoord from DataNode.",
}, []string{
nodeIDLabelName,
})
DataNodeNumUnflushedSegments = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_unflushed_segments",
Help: "Number of unflushed segments in DataNode.",
Name: "unflushed_segment_num",
Help: "number of unflushed segments",
}, []string{
nodeIDLabelName,
})
DataNodeFlushSegmentLatency = prometheus.NewHistogramVec( // TODO: arguably
DataNodeEncodeBufferLatency = prometheus.NewHistogramVec( // TODO: arguably
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flush_segment_latency",
Help: "The flush segment latency in DataNode.",
Name: "encode_buffer_latency",
Help: "latency of encode buffer data",
Buckets: buckets,
}, []string{
nodeIDLabelName,
@ -143,32 +111,33 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "save_latency",
Help: "The latency saving flush data to storage in DataNode.",
Help: "latency of saving flush data to storage",
Buckets: []float64{0, 10, 100, 200, 400, 1000, 10000},
}, []string{
nodeIDLabelName,
msgTypeLabelName,
})
DataNodeFlushSegmentCount = prometheus.NewCounterVec( // TODO: arguably
DataNodeFlushBufferCount = prometheus.NewCounterVec( // TODO: arguably
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flush_segment_count",
Help: "Flush segment statistics in DataNode.",
Name: "flush_buffer_op_count",
Help: "count of flush buffer operations",
}, []string{
nodeIDLabelName,
statusLabelName,
})
DataNodeAutoFlushSegmentCount = prometheus.NewCounterVec( // TODO: arguably
DataNodeAutoFlushBufferCount = prometheus.NewCounterVec( // TODO: arguably
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "auto_flush_segment_count",
Help: "Auto flush segment statistics in DataNode.",
Name: "autoflush_buffer_op_count",
Help: "count of auto flush buffer operations",
}, []string{
nodeIDLabelName,
statusLabelName,
})
DataNodeCompactionLatency = prometheus.NewHistogramVec(
@ -176,19 +145,19 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "compaction_latency",
Help: "Compaction latency in DataNode.",
Help: "latency of compaction operation",
Buckets: buckets,
}, []string{
nodeIDLabelName,
})
// DataNodeFlushSegmentsReqCounter counts the num of calls of FlushSegments
DataNodeFlushSegmentsReqCounter = prometheus.NewCounterVec(
// DataNodeFlushReqCounter counts the num of calls of FlushSegments
DataNodeFlushReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flush_segments_total",
Help: "Counter of flush segments",
Name: "flush_req_count",
Help: "count of flush request",
}, []string{
nodeIDLabelName,
statusLabelName,
@ -203,12 +172,11 @@ func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeNumConsumers)
registry.MustRegister(DataNodeNumProducers)
registry.MustRegister(DataNodeTimeSync)
registry.MustRegister(DataNodeSegmentRowsCount)
registry.MustRegister(DataNodeNumUnflushedSegments)
registry.MustRegister(DataNodeFlushSegmentLatency)
registry.MustRegister(DataNodeEncodeBufferLatency)
registry.MustRegister(DataNodeSave2StorageLatency)
registry.MustRegister(DataNodeFlushSegmentCount)
registry.MustRegister(DataNodeAutoFlushSegmentCount)
registry.MustRegister(DataNodeFlushBufferCount)
registry.MustRegister(DataNodeAutoFlushBufferCount)
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeFlushSegmentsReqCounter)
registry.MustRegister(DataNodeFlushReqCounter)
}

View File

@ -27,8 +27,8 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexCoordRole,
Name: "index_req_counter",
Help: "The number of requests to build index",
Name: "indexreq_count",
Help: "number of building index requests ",
}, []string{statusLabelName})
// IndexCoordIndexTaskCounter records the number of index tasks of each type.
@ -36,8 +36,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexCoordRole,
Name: "index_task_counter",
Help: "The number of index tasks of each type",
Name: "indextask_count",
Help: "number of index tasks of each type",
}, []string{indexTaskStatusLabelName})
// IndexCoordIndexNodeNum records the number of IndexNodes managed by IndexCoord.
@ -45,8 +45,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexCoordRole,
Name: "index_node_num",
Help: "The number of IndexNodes managed by IndexCoord",
Name: "indexnode_num",
Help: "number of IndexNodes managed by IndexCoord",
}, []string{})
)

View File

@ -26,25 +26,25 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexNodeRole,
Name: "index_task_counter",
Help: "The number of tasks that index node received",
Name: "index_task_count",
Help: "number of tasks that index node received",
}, []string{nodeIDLabelName, statusLabelName})
IndexNodeLoadBinlogLatency = prometheus.NewHistogramVec(
IndexNodeLoadFieldLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexNodeRole,
Name: "load_segment_latency",
Help: "The latency of loading the segment",
Name: "load_field_latency",
Help: "latency of loading the field data",
Buckets: buckets,
}, []string{nodeIDLabelName})
IndexNodeDecodeBinlogLatency = prometheus.NewHistogramVec(
IndexNodeDecodeFieldLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexNodeRole,
Name: "decode_binlog_latency",
Help: "The latency of decode the binlog",
Name: "decode_field_latency",
Help: "latency of decode field data",
Buckets: buckets,
}, []string{nodeIDLabelName})
@ -52,8 +52,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexNodeRole,
Name: "knowhere_build_index_latency",
Help: "The latency of knowhere building the index",
Name: "build_index_latency",
Help: "latency of building the index by knowhere",
Buckets: buckets,
}, []string{nodeIDLabelName})
@ -61,8 +61,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexNodeRole,
Name: "encode_index_file_latency",
Help: "The latency of encoding the index file",
Name: "encode_index_latency",
Help: "latency of encoding the index file",
Buckets: buckets,
}, []string{nodeIDLabelName})
@ -70,8 +70,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.IndexNodeRole,
Name: "save_index_file_latency",
Help: "The latency of saving the index file",
Name: "save_index_latency",
Help: "latency of saving the index file",
Buckets: buckets,
}, []string{nodeIDLabelName})
)
@ -79,8 +79,8 @@ var (
//RegisterIndexNode registers IndexNode metrics
func RegisterIndexNode(registry *prometheus.Registry) {
registry.MustRegister(IndexNodeBuildIndexTaskCounter)
registry.MustRegister(IndexNodeLoadBinlogLatency)
registry.MustRegister(IndexNodeDecodeBinlogLatency)
registry.MustRegister(IndexNodeLoadFieldLatency)
registry.MustRegister(IndexNodeDecodeFieldLatency)
registry.MustRegister(IndexNodeKnowhereBuildIndexLatency)
registry.MustRegister(IndexNodeEncodeIndexFileLatency)
registry.MustRegister(IndexNodeSaveIndexFileLatency)

View File

@ -70,8 +70,10 @@ const (
channelNameLabelName = "channel_name"
functionLabelName = "function_name"
queryTypeLabelName = "query_type"
segmentTypeLabelName = "segment_type"
segmentStateLabelName = "segment_state"
usernameLabelName = "username"
cacheNameLabelName = "cache_name"
cacheStateLabelName = "cache_state"
)
var (

View File

@ -22,69 +22,41 @@ import (
)
var (
// ProxySearchCount record the number of times search succeeded or failed.
ProxySearchCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "search_counter",
Help: "The number of times search succeeded or failed",
}, []string{nodeIDLabelName, queryTypeLabelName, statusLabelName})
// ProxyInsertCount record the number of times insert succeeded or failed.
ProxyInsertCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "insert_counter",
Help: "The number of times insert succeeded or failed",
}, []string{nodeIDLabelName, statusLabelName})
// ProxySearchVectors record the number of vectors search successfully.
ProxySearchVectors = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
ProxySearchVectors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "search_vectors",
Help: "The number of vectors search successfully",
}, []string{nodeIDLabelName, queryTypeLabelName})
// ProxyInsertVectors record the number of vectors insert successfully.
ProxyInsertVectors = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "insert_vectors",
Help: "The number of vectors insert successfully",
Name: "search_vectors_count",
Help: "counter of vectors successfully searched",
}, []string{nodeIDLabelName})
// ProxyLinkedSDKs record The number of SDK linked proxy.
// TODO: how to know when sdk disconnect?
//ProxyLinkedSDKs = prometheus.NewGaugeVec(
// prometheus.GaugeOpts{
// Namespace: milvusNamespace,
// Subsystem: typeutil.ProxyRole,
// Name: "linked_sdk_numbers",
// Help: "The number of SDK linked proxy",
// }, []string{nodeIDLabelName})
// ProxyInsertVectors record the number of vectors insert successfully.
ProxyInsertVectors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "insert_vectors_count",
Help: "counter of vectors successfully inserted",
}, []string{nodeIDLabelName})
// ProxySearchLatency record the latency of search successfully.
ProxySearchLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "search_latency",
Help: "The latency of search successfully",
Name: "sq_lantency",
Help: "latency of search",
Buckets: buckets,
}, []string{nodeIDLabelName, queryTypeLabelName})
// ProxySendMessageLatency record the latency that the proxy sent the search request to the message stream.
ProxySendMessageLatency = prometheus.NewHistogramVec(
// ProxySendSQReqLatency record the latency that the proxy sent the search request to the message stream.
ProxySendSQReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "send_search_msg_time",
Help: "The latency that the proxy sent the search request to the message stream",
Name: "sq_send_latency",
Help: "latency that proxy sent the search request to the message stream",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, queryTypeLabelName})
@ -93,8 +65,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "wait_for_search_result_time",
Help: "The time that the proxy waits for the search result",
Name: "sq_wait_result_latency",
Help: "latency that proxy waits for the result",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, queryTypeLabelName})
@ -103,8 +75,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "reduce_search_result_time",
Help: "The time that the proxy reduces search result",
Name: "sq_reduce_result_latency",
Help: "latency that proxy reduces search result",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, queryTypeLabelName})
@ -113,8 +85,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "decode_search_result_time",
Help: "The time that the proxy decodes the search result",
Name: "sq_decode_result_latency",
Help: "latency that proxy decodes the search result",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, queryTypeLabelName})
@ -123,55 +95,46 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "msg_stream_obj_for_PChan",
Help: "The number of MsgStream objects per PChannel on each collection on Proxy",
Name: "msgstream_obj_num",
Help: "number of MsgStream objects per physical channel",
}, []string{nodeIDLabelName, channelNameLabelName})
// ProxyMsgStreamObjectsForSearch record the number of MsgStream objects for search per collection_id.
ProxyMsgStreamObjectsForSearch = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "msg_stream_obj_for_search",
Help: "The number of MsgStream objects for search per collection",
}, []string{nodeIDLabelName, queryTypeLabelName})
// ProxyInsertLatency record the latency that insert successfully.
ProxyInsertLatency = prometheus.NewHistogramVec(
// ProxyMutationLatency record the latency that insert successfully.
ProxyMutationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "insert_latency",
Help: "The latency that insert successfully.",
Name: "mutation_latency",
Help: "latency of insert or delete successfully",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName})
}, []string{nodeIDLabelName, msgTypeLabelName})
// ProxySendInsertReqLatency record the latency that Proxy send insert request to MsgStream.
ProxySendInsertReqLatency = prometheus.NewHistogramVec(
// ProxySendMutationReqLatency record the latency that Proxy send insert request to MsgStream.
ProxySendMutationReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "send_insert_req_latency",
Help: "The latency that Proxy send insert request to MsgStream",
Name: "mutation_send_latency",
Help: "latency that proxy send insert request to MsgStream",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName})
}, []string{nodeIDLabelName, msgTypeLabelName})
// ProxyCacheHitCounter record the number of Proxy cache hits or miss.
ProxyCacheHitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "cache_hits",
Help: "Proxy cache hits",
}, []string{nodeIDLabelName, "cache_type", "hit_type"})
Name: "cache_hit_count",
Help: "count of cache hits",
}, []string{nodeIDLabelName, cacheNameLabelName, cacheStateLabelName})
// ProxyUpdateCacheLatency record the time that proxy update cache when cache miss.
ProxyUpdateCacheLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "update_cache_latency",
Help: "The time that proxy update cache when cache miss",
Name: "cache_update_latency",
Help: "latency that proxy update cache when cache miss",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName})
@ -180,8 +143,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "sync_time_tick",
Help: "Proxy synchronization timestamp statistics, differentiated by Channel",
Name: "sync_epoch_time",
Help: "synchronized unix epoch per physical channel and default channel",
}, []string{nodeIDLabelName, channelNameLabelName})
// ProxyApplyPrimaryKeyLatency record the latency that apply primary key.
@ -190,7 +153,7 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "apply_pk_latency",
Help: "The latency that apply primary key",
Help: "latency that apply primary key",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName})
@ -200,7 +163,7 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "apply_timestamp_latency",
Help: "The latency that proxy apply timestamp",
Help: "latency that proxy apply timestamp",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName})
@ -209,8 +172,8 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "DDL_call_counter",
Help: "the number of times the function of the DDL operation was executed",
Name: "ddl_req_count",
Help: "count of DDL operation executed",
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
// ProxyDQLFunctionCall records the number of times the function of the DQL operation was executed, like `HasCollection`.
@ -218,8 +181,8 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "DQL_call_counter",
Help: "",
Name: "dql_req_count",
Help: "count of DQL operation executed",
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
// ProxyDMLFunctionCall records the number of times the function of the DML operation was executed, like `LoadCollection`.
@ -227,8 +190,8 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "DML_call_counter",
Help: "",
Name: "dml_req_count",
Help: "count of DML operation executed",
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
// ProxyDDLReqLatency records the latency that for DML request, like "CreateCollection".
@ -236,8 +199,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "DDL_call_latency",
Help: "The latency that for DDL request",
Name: "ddl_req_latency",
Help: "latency of each DDL request",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, functionLabelName})
@ -246,8 +209,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "DML_call_latency",
Help: "The latency that for DML request",
Name: "dml_req_latency",
Help: "latency of each DML request excluding insert and delete",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, functionLabelName})
@ -256,51 +219,27 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "DQL_call_latency",
Help: "The latency that for DQL request",
Name: "dql_req_latency",
Help: "latency of each DQL request excluding search and query",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, functionLabelName})
// ProxySearchLatencyPerNQ records the latency for searching.
ProxySearchLatencyPerNQ = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "proxy_search_latency_count",
Help: "The latency for searching",
Buckets: buckets,
}, []string{nodeIDLabelName})
// ProxyCredentialReqLatency record the latency that for credential request, like "CreateCredential".
ProxyCredentialReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "credential_call_latency",
Help: "The latency that for credential request",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, functionLabelName, usernameLabelName})
)
//RegisterProxy registers Proxy metrics
func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxySearchCount)
registry.MustRegister(ProxyInsertCount)
registry.MustRegister(ProxySearchVectors)
registry.MustRegister(ProxyInsertVectors)
registry.MustRegister(ProxySearchLatency)
registry.MustRegister(ProxySearchLatencyPerNQ)
registry.MustRegister(ProxySendMessageLatency)
registry.MustRegister(ProxySendSQReqLatency)
registry.MustRegister(ProxyWaitForSearchResultLatency)
registry.MustRegister(ProxyReduceSearchResultLatency)
registry.MustRegister(ProxyDecodeSearchResultLatency)
registry.MustRegister(ProxyMsgStreamObjectsForPChan)
registry.MustRegister(ProxyMsgStreamObjectsForSearch)
registry.MustRegister(ProxyInsertLatency)
registry.MustRegister(ProxySendInsertReqLatency)
registry.MustRegister(ProxyMutationLatency)
registry.MustRegister(ProxySendMutationReqLatency)
registry.MustRegister(ProxyCacheHitCounter)
registry.MustRegister(ProxyUpdateCacheLatency)
@ -316,6 +255,4 @@ func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxyDMLReqLatency)
registry.MustRegister(ProxyDQLReqLatency)
// for credential
registry.MustRegister(ProxyCredentialReqLatency)
}

View File

@ -27,24 +27,24 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_collections",
Help: "Number of collections in QueryCoord.",
Name: "collection_num",
Help: "number of collections",
}, []string{})
QueryCoordNumEntities = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_entities",
Help: "Number of entities in collection.",
Name: "entitiy_num",
Help: "number of entities",
}, []string{})
QueryCoordLoadCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "load_count",
Help: "Load request statistic in QueryCoord.",
Name: "load_req_count",
Help: "count of load request",
}, []string{
statusLabelName,
})
@ -53,8 +53,8 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "release_count",
Help: "Release request statistic in QueryCoord.",
Name: "release_req_count",
Help: "count of release request",
}, []string{
statusLabelName,
})
@ -64,7 +64,7 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "load_latency",
Help: "Load request latency in QueryCoord",
Help: "latency of load request",
Buckets: buckets,
}, []string{})
@ -73,7 +73,7 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "release_latency",
Help: "Release request latency in QueryCoord",
Help: "latency of release request",
Buckets: []float64{0, 5, 10, 20, 40, 100, 200, 400, 1000, 10000},
}, []string{})
@ -81,16 +81,16 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_child_tasks",
Help: "Number of child tasks in QueryCoord.",
Name: "child_task_num",
Help: "number of child tasks in QueryCoord's queue",
}, []string{})
QueryCoordNumParentTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_parent_tasks",
Help: "Number of parent tasks in QueryCoord.",
Name: "parent_task_num",
Help: "number of parent tasks in QueryCoord's queue",
}, []string{})
QueryCoordChildTaskLatency = prometheus.NewHistogramVec(
@ -98,7 +98,7 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "child_task_latency",
Help: "Child tasks latency in QueryCoord.",
Help: "latency of child tasks",
Buckets: buckets,
}, []string{})
@ -106,8 +106,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_querynodes",
Help: "Number of QueryNodes in QueryCoord.",
Name: "querynode_num",
Help: "number of QueryNodes managered by QueryCoord",
}, []string{})
)

View File

@ -27,8 +27,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_collections",
Help: "Number of collections in QueryNode.",
Name: "collection_num",
Help: "number of collections loaded",
}, []string{
nodeIDLabelName,
})
@ -37,8 +37,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_partitions",
Help: "Number of partitions per collection in QueryNode.",
Name: "partition_num",
Help: "number of partitions loaded",
}, []string{
nodeIDLabelName,
})
@ -47,8 +47,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_segments",
Help: "Number of segments per collection in QueryNode.",
Name: "segment_num",
Help: "number of segments loaded",
}, []string{
nodeIDLabelName,
})
@ -57,8 +57,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_dml_channels",
Help: "Number of dmlChannels per collection in QueryNode.",
Name: "dml_vchannel_num",
Help: "number of dmlChannels watched",
}, []string{
nodeIDLabelName,
})
@ -67,8 +67,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_delta_channels",
Help: "Number of deltaChannels per collection in QueryNode.",
Name: "delta_vchannel_num",
Help: "number of deltaChannels watched",
}, []string{
nodeIDLabelName,
})
@ -77,8 +77,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_consumers",
Help: "Number of consumers per collection in QueryNode.",
Name: "consumer_num",
Help: "number of consumers",
}, []string{
nodeIDLabelName,
})
@ -87,8 +87,8 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_count",
Help: "Search and query requests statistic in QueryNode.",
Name: "sq_req_count",
Help: "count of search / query request",
}, []string{
nodeIDLabelName,
queryTypeLabelName,
@ -99,8 +99,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency",
Help: "Search and query requests latency in QueryNode.",
Name: "sq_req_latency",
Help: "latency of Search or query requests",
Buckets: buckets,
}, []string{
nodeIDLabelName,
@ -111,8 +111,8 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency_in_queue",
Help: "The search and query latency in queue(unsolved buffer) in QueryNode.",
Name: "sq_queue_lantency",
Help: "latency of search or query in queue",
Buckets: buckets,
}, []string{
nodeIDLabelName,
@ -123,77 +123,68 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency_per_segment",
Help: "The search and query on segments(sealed/growing segments).",
Name: "sq_segment_latency",
Help: "latency of search or query per segment",
Buckets: buckets,
}, []string{
nodeIDLabelName,
queryTypeLabelName,
segmentTypeLabelName,
segmentStateLabelName,
})
QueryNodeSQSegmentLatencyInCore = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "sq_latency_in_core",
Help: "The search and query latency in core.",
Name: "sq_core_latency",
Help: "latency of search or query latency in segcore",
Buckets: buckets,
}, []string{
nodeIDLabelName,
queryTypeLabelName,
})
QueryNodeTranslateHitsLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "translate_hits_latency",
Help: "The search and query latency in translate hits.",
Buckets: buckets,
}, []string{
nodeIDLabelName,
})
QueryNodeReduceLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "reduce_latency",
Help: "The search and query latency in reduce(local reduce) in QueryNode.",
Name: "sq_reduce_latency",
Help: "latency of reduce search or query result",
Buckets: buckets,
}, []string{
nodeIDLabelName,
segmentTypeLabelName,
queryTypeLabelName,
})
QueryNodeLoadSegmentLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "load_latency_per_segment",
Help: "The load latency per segment in QueryNode.",
Name: "load_segment_latency",
Help: "latency of load per segment",
Buckets: buckets,
}, []string{
nodeIDLabelName,
})
/* Todo reimplement in query_shard.go
QueryNodeServiceTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "service_time",
Name: "sync_utc_time",
Help: "ServiceTimes of collections in QueryNode.",
}, []string{
nodeIDLabelName,
})
*/
QueryNodeNumFlowGraphs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "num_flow_graphs",
Help: "Number of flow graphs in QueryNode.",
Name: "flowgraph_num",
Help: "number of flowgraphs",
}, []string{
nodeIDLabelName,
})
@ -212,9 +203,8 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeSQLatencyInQueue)
registry.MustRegister(QueryNodeSQSegmentLatency)
registry.MustRegister(QueryNodeSQSegmentLatencyInCore)
registry.MustRegister(QueryNodeTranslateHitsLatency)
registry.MustRegister(QueryNodeReduceLatency)
registry.MustRegister(QueryNodeLoadSegmentLatency)
registry.MustRegister(QueryNodeServiceTime)
// registry.MustRegister(QueryNodeServiceTime)
registry.MustRegister(QueryNodeNumFlowGraphs)
}

View File

@ -6,197 +6,16 @@ import (
)
var (
// RootCoordProxyLister counts the num of registered proxy nodes
RootCoordProxyLister = prometheus.NewGaugeVec(
// RootCoordProxyCounter counts the num of registered proxy nodes
RootCoordProxyCounter = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "list_of_proxy",
Help: "List of proxy nodes which have registered with etcd",
}, []string{nodeIDLabelName})
Name: "proxy_num",
Help: "number of proxy nodes managered by rootcoord",
}, []string{})
////////////////////////////////////////////////////////////////////////////
// for grpc
// RootCoordCreateCollectionCounter counts the num of calls of CreateCollection
RootCoordCreateCollectionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "create_collection_total",
Help: "Counter of create collection",
}, []string{statusLabelName})
// RootCoordDropCollectionCounter counts the num of calls of DropCollection
RootCoordDropCollectionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "drop_collection_total",
Help: "Counter of drop collection",
}, []string{statusLabelName})
// RootCoordHasCollectionCounter counts the num of calls of HasCollection
RootCoordHasCollectionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "has_collection_total",
Help: "Counter of has collection",
}, []string{statusLabelName})
// RootCoordDescribeCollectionCounter counts the num of calls of DescribeCollection
RootCoordDescribeCollectionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "describe_collection_total",
Help: "Counter of describe collection",
}, []string{statusLabelName})
// RootCoordShowCollectionsCounter counts the num of calls of ShowCollections
RootCoordShowCollectionsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "show_collections_total",
Help: "Counter of show collections",
}, []string{statusLabelName})
// RootCoordCreatePartitionCounter counts the num of calls of CreatePartition
RootCoordCreatePartitionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "create_partition_total",
Help: "Counter of create partition",
}, []string{statusLabelName})
// RootCoordDropPartitionCounter counts the num of calls of DropPartition
RootCoordDropPartitionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "drop_partition_total",
Help: "Counter of drop partition",
}, []string{statusLabelName})
// RootCoordHasPartitionCounter counts the num of calls of HasPartition
RootCoordHasPartitionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "has_partition_total",
Help: "Counter of has partition",
}, []string{statusLabelName})
// RootCoordShowPartitionsCounter counts the num of calls of ShowPartitions
RootCoordShowPartitionsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "show_partitions_total",
Help: "Counter of show partitions",
}, []string{statusLabelName})
// RootCoordCreateIndexCounter counts the num of calls of CreateIndex
RootCoordCreateIndexCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "create_index_total",
Help: "Counter of create index",
}, []string{statusLabelName})
// RootCoordDropIndexCounter counts the num of calls of DropIndex
RootCoordDropIndexCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "drop_index_total",
Help: "Counter of drop index",
}, []string{statusLabelName})
// RootCoordDescribeIndexCounter counts the num of calls of DescribeIndex
RootCoordDescribeIndexCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "describe_index_total",
Help: "Counter of describe index",
}, []string{statusLabelName})
// RootCoordDescribeSegmentCounter counts the num of calls of DescribeSegment
RootCoordDescribeSegmentCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "describe_segment_total",
Help: "Counter of describe segment",
}, []string{statusLabelName})
// RootCoordShowSegmentsCounter counts the num of calls of ShowSegments
RootCoordShowSegmentsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "show_segments_total",
Help: "Counter of show segments",
}, []string{statusLabelName})
// RootCoordDescribeSegmentsCounter counts the num of calls of DescribeSegments
RootCoordDescribeSegmentsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "describe_segments_total",
Help: "Counter of describe segments",
}, []string{statusLabelName})
// RootCoordCreateCredentialCounter counts the num of calls of CreateCredential
RootCoordCreateCredentialCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "create_credential_total",
Help: "Counter of create credential",
}, []string{statusLabelName})
// RootCoordGetCredentialCounter counts the num of calls of GetCredential
RootCoordGetCredentialCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "get_credential_total",
Help: "Counter of get credential",
}, []string{statusLabelName})
// RootCoordDeleteCredentialCounter counts the num of calls of DeleteCredential
RootCoordDeleteCredentialCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "delete_credential_total",
Help: "Counter of delete credential",
}, []string{statusLabelName})
// RootCoordUpdateCredentialCounter counts the num of calls of UpdateCredential
RootCoordUpdateCredentialCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "update_credential_total",
Help: "Counter of update credential",
}, []string{statusLabelName})
// RootCoordListCredUsersCounter counts the num of calls of ListCredUsers
RootCoordListCredUsersCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "list_cred_users_total",
Help: "Counter of list cred users",
}, []string{statusLabelName})
////////////////////////////////////////////////////////////////////////////
// for time tick
@ -206,26 +25,25 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "insert_channel_time_tick",
Help: "Time tick of insert Channel in 24H",
Name: "sync_epoch_time",
Help: "synchronized unix epoch per physical channel",
}, []string{"PChannel"})
// RootCoordDDLReadTypeLatency records the latency for read type of DDL operations.
RootCoordDDLReadTypeLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
RootCoordDDLReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "ddl_read_type_latency",
Help: "The latency for read type of DDL operations",
}, []string{functionLabelName})
Name: "ddl_req_count",
Help: "count of DDL operations",
}, []string{functionLabelName, statusLabelName})
// RootCoordDDLWriteTypeLatency records the latency for write type of DDL operations.
RootCoordDDLWriteTypeLatency = prometheus.NewHistogramVec(
//RootCoordDDLReqLatency records the latency for read type of DDL operations.
RootCoordDDLReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "ddl_write_type_latency",
Help: "The latency for write type of DDL operations",
Name: "ddl_req_latency",
Help: "latency of each DDL operations",
}, []string{functionLabelName})
// RootCoordSyncTimeTickLatency records the latency of sync time tick.
@ -233,53 +51,35 @@ var (
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "sync_time_tick_latency",
Help: "The latency of sync time tick",
Name: "sync_timetick_latency",
Help: "latency of synchronizing timetick message",
})
// RootCoordCredentialWriteTypeLatency records the latency for write type of credential operations.
RootCoordCredentialWriteTypeLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "credential_write_type_latency",
Help: "The latency for write type of credential operations",
}, []string{functionLabelName})
// RootCoordCredentialReadTypeLatency records the latency for read type of credential operations.
RootCoordCredentialReadTypeLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "credential_read_type_latency",
Help: "The latency for read type of credential operations",
}, []string{functionLabelName, usernameLabelName})
// RootCoordIDAllocCounter records the number of global ID allocations.
RootCoordIDAllocCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "id_alloc_count",
Help: "The number of global ID allocations",
Help: "count of ID allocated",
})
// RootCoordLocalTimestampAllocCounter records the number of timestamp allocations in RootCoord.
RootCoordTimestampAllocCounter = prometheus.NewGauge(
//RootCoordTimestamp records the number of timestamp allocations in RootCoord.
RootCoordTimestamp = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "timestamp_alloc_count",
Help: "The number of timestamp allocations in RootCoord",
Name: "timestamp",
Help: "lateste timestamp allocated in memory",
})
// RootCoordETCDTimestampAllocCounter records the number of timestamp allocations in ETCD.
RootCoordETCDTimestampAllocCounter = prometheus.NewGauge(
// RootCoordTimestampSaved records the number of timestamp allocations in ETCD.
RootCoordTimestampSaved = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "etcd_timestamp_alloc_count",
Help: "The number of timestamp allocations in ETCD",
Name: "timestamp_saved",
Help: "timestamp saved in meta storage",
})
// RootCoordNumOfCollections counts the number of collections.
@ -287,8 +87,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "num_of_collections",
Help: "The number of collections",
Name: "collection_num",
Help: "number of collections",
})
// RootCoordNumOfPartitions counts the number of partitions per collection.
@ -296,26 +96,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "num_of_partitions",
Help: "The number of partitions",
}, []string{})
// RootCoordNumOfSegments counts the number of segments per collections.
RootCoordNumOfSegments = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "num_of_segments",
Help: "The number of segments",
}, []string{})
// RootCoordNumOfIndexedSegments counts the number of indexed segments per collection.
RootCoordNumOfIndexedSegments = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "num_of_indexed_segments",
Help: "The number of indexed segments",
Name: "partition_num",
Help: "number of partitions",
}, []string{})
// RootCoordNumOfDMLChannel counts the number of DML channels.
@ -323,8 +105,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "num_of_dml_channel",
Help: "The number of DML channels",
Name: "dml_channel_num",
Help: "number of DML channels",
})
// RootCoordNumOfMsgStream counts the number of message streams.
@ -332,8 +114,8 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "num_of_msg_stream",
Help: "The number of message streams",
Name: "msgstream_num",
Help: "number of message streams",
})
// RootCoordNumOfCredentials counts the number of credentials.
@ -341,62 +123,34 @@ var (
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "num_of_credentials",
Help: "The number of credentials",
Name: "credential_num",
Help: "number of credentials",
})
)
//RegisterRootCoord registers RootCoord metrics
func RegisterRootCoord(registry *prometheus.Registry) {
registry.Register(RootCoordProxyLister)
// for grpc
registry.MustRegister(RootCoordCreateCollectionCounter)
registry.MustRegister(RootCoordDropCollectionCounter)
registry.MustRegister(RootCoordHasCollectionCounter)
registry.MustRegister(RootCoordDescribeCollectionCounter)
registry.MustRegister(RootCoordShowCollectionsCounter)
registry.MustRegister(RootCoordCreatePartitionCounter)
registry.MustRegister(RootCoordDropPartitionCounter)
registry.MustRegister(RootCoordHasPartitionCounter)
registry.MustRegister(RootCoordShowPartitionsCounter)
registry.MustRegister(RootCoordCreateIndexCounter)
registry.MustRegister(RootCoordDropIndexCounter)
registry.MustRegister(RootCoordDescribeIndexCounter)
registry.MustRegister(RootCoordDescribeSegmentCounter)
registry.MustRegister(RootCoordShowSegmentsCounter)
registry.MustRegister(RootCoordDescribeSegmentsCounter)
registry.Register(RootCoordProxyCounter)
// for time tick
registry.MustRegister(RootCoordInsertChannelTimeTick)
//prometheus.MustRegister(PanicCounter)
registry.MustRegister(RootCoordSyncTimeTickLatency)
// for DDL latency
registry.MustRegister(RootCoordDDLReadTypeLatency)
registry.MustRegister(RootCoordDDLWriteTypeLatency)
registry.MustRegister(RootCoordDDLReqLatency)
// for allocator
registry.MustRegister(RootCoordIDAllocCounter)
registry.MustRegister(RootCoordTimestampAllocCounter)
registry.MustRegister(RootCoordETCDTimestampAllocCounter)
registry.MustRegister(RootCoordTimestamp)
registry.MustRegister(RootCoordTimestampSaved)
// for collection
registry.MustRegister(RootCoordNumOfCollections)
registry.MustRegister(RootCoordNumOfPartitions)
// registry.MustRegister(RootCoordNumOfSegments)
// registry.MustRegister(RootCoordNumOfIndexedSegments)
registry.MustRegister(RootCoordNumOfDMLChannel)
registry.MustRegister(RootCoordNumOfMsgStream)
// for credential
registry.MustRegister(RootCoordCreateCredentialCounter)
registry.MustRegister(RootCoordGetCredentialCounter)
registry.MustRegister(RootCoordDeleteCredentialCounter)
registry.MustRegister(RootCoordUpdateCredentialCounter)
registry.MustRegister(RootCoordListCredUsersCounter)
registry.MustRegister(RootCoordCredentialWriteTypeLatency)
registry.MustRegister(RootCoordCredentialReadTypeLatency)
registry.MustRegister(RootCoordNumOfCredentials)
}

View File

@ -486,7 +486,6 @@ func (mgr *channelsMgrImpl) getVChannels(collectionID UniqueID) ([]vChan, error)
}
func (mgr *channelsMgrImpl) createDQLStream(collectionID UniqueID) error {
metrics.ProxyMsgStreamObjectsForSearch.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "query").Inc()
return mgr.dqlChannelsMgr.createMsgStream(collectionID)
}
@ -495,7 +494,6 @@ func (mgr *channelsMgrImpl) getDQLStream(collectionID UniqueID) (msgstream.MsgSt
}
func (mgr *channelsMgrImpl) removeDQLStream(collectionID UniqueID) error {
metrics.ProxyMsgStreamObjectsForSearch.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "query").Dec()
return mgr.dqlChannelsMgr.removeStream(collectionID)
}

View File

@ -2176,6 +2176,11 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
method := "Insert"
tr := timerecord.NewTimeRecorder(method)
defer func() {
metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.TotalLabel).Inc()
}()
it := &insertTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -2236,7 +2241,8 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
if err := node.sched.dmQueue.Enqueue(it); err != nil {
log.Debug("Failed to enqueue insert task: " + err.Error())
metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.AbandonLabel).Inc()
metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.AbandonLabel).Inc()
return constructFailedResponse(err), nil
}
@ -2253,9 +2259,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
if err := it.WaitToFinish(); err != nil {
log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.TotalLabel).Inc()
metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.FailLabel).Inc()
return constructFailedResponse(err), nil
}
@ -2276,10 +2280,10 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
// InsertCnt always equals to the number of entities in the request
it.result.InsertCnt = int64(request.NumRows)
metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.SuccessLabel).Inc()
metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(it.result.InsertCnt))
metrics.ProxyInsertLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return it.result, nil
}
@ -2300,6 +2304,8 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
method := "Delete"
tr := timerecord.NewTimeRecorder(method)
metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.TotalLabel).Inc()
dt := &deleteTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
@ -2368,11 +2374,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
}, nil
}
metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.TotalLabel).Inc()
metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.SuccessLabel).Inc()
metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return dt.result, nil
}
@ -2385,6 +2389,8 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
}
method := "Search"
tr := timerecord.NewTimeRecorder(method)
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.TotalLabel).Inc()
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
defer sp.Finish()
@ -2439,8 +2445,8 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
zap.Uint64("travel_timestamp", travelTs),
zap.Uint64("guarantee_timestamp", guaranteeTs))
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.SearchLabel, metrics.AbandonLabel).Inc()
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.AbandonLabel).Inc()
return &milvuspb.SearchResults{
Status: &commonpb.Status{
@ -2482,11 +2488,9 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
zap.Any("search_params", request.SearchParams),
zap.Uint64("travel_timestamp", travelTs),
zap.Uint64("guarantee_timestamp", guaranteeTs))
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.SearchLabel, metrics.TotalLabel).Inc()
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.SearchLabel, metrics.FailLabel).Inc()
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.FailLabel).Inc()
return &milvuspb.SearchResults{
Status: &commonpb.Status{
@ -2511,16 +2515,13 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
zap.Uint64("travel_timestamp", travelTs),
zap.Uint64("guarantee_timestamp", guaranteeTs))
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.SearchLabel, metrics.TotalLabel).Inc()
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.SearchLabel, metrics.SuccessLabel).Inc()
metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.SearchLabel).Set(float64(qt.result.GetResults().GetNumQueries()))
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.SuccessLabel).Inc()
metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
searchDur := tr.ElapseSpan().Milliseconds()
metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.SearchLabel).Observe(float64(searchDur))
metrics.ProxySearchLatencyPerNQ.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(searchDur) / float64(qt.result.GetResults().GetNumQueries()))
return qt.result, nil
}
@ -2649,6 +2650,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
method := "Query"
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.TotalLabel).Inc()
log.Debug(
rpcReceived(method),
zap.String("traceID", traceID),
@ -2667,8 +2671,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.QueryLabel, metrics.FailLabel).Inc()
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.FailLabel).Inc()
return &milvuspb.QueryResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -2701,10 +2706,8 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.QueryLabel, metrics.TotalLabel).Inc()
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.QueryLabel, metrics.FailLabel).Inc()
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.FailLabel).Inc()
return &milvuspb.QueryResults{
Status: &commonpb.Status{
@ -2725,13 +2728,10 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.QueryLabel, metrics.TotalLabel).Inc()
metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.QueryLabel, metrics.SuccessLabel).Inc()
metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.QueryLabel).Set(float64(len(qt.result.FieldsData)))
metrics.ProxySendMessageLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
metrics.SuccessLabel).Inc()
metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.QueryResults{
Status: qt.result.Status,

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -283,7 +284,8 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
maxTs := ts
for channel, ts := range stats {
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), channel).Set(float64(ts))
physicalTs, _ := tsoutil.ParseHybridTs(ts)
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), channel).Set(float64(physicalTs))
channels = append(channels, channel)
tss = append(tss, ts)
if ts > maxTs {
@ -302,9 +304,8 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
Timestamps: tss,
DefaultTimestamp: maxTs,
}
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "DefaultTimestamp").Set(float64(maxTs))
maxPhysicalTs, _ := tsoutil.ParseHybridTs(maxTs)
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "default").Set(float64(maxPhysicalTs))
status, err := node.rootCoord.UpdateChannelTimeTick(node.ctx, req)
if err != nil {
log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err))

View File

@ -481,7 +481,7 @@ func (it *insertTask) Execute(ctx context.Context) error {
defer sp.Finish()
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute insert %d", it.ID()))
defer tr.Elapse("done")
defer tr.Elapse("insert execute done")
collectionName := it.CollectionName
collID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
@ -539,16 +539,14 @@ func (it *insertTask) Execute(ctx context.Context) error {
}
log.Debug("assign segmentID for insert data success", zap.Int64("msgID", it.Base.MsgID), zap.Int64("collectionID", collID), zap.String("collection name", it.CollectionName))
tr.Record("assign segment id")
tr.Record("sendInsertMsg")
err = stream.Produce(msgPack)
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error()
return err
}
sendMsgDur := tr.Record("send insert request to message stream")
metrics.ProxySendInsertReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(sendMsgDur.Milliseconds()))
sendMsgDur := tr.Record("send insert request to dml channel")
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
log.Debug("Proxy Insert Execute done", zap.Int64("msgID", it.Base.MsgID), zap.String("collection name", collectionName))
@ -3193,6 +3191,8 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
sp, ctx := trace.StartSpanFromContextWithOperationName(dt.ctx, "Proxy-Delete-Execute")
defer sp.Finish()
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))
collID := dt.DeleteRequest.CollectionID
stream, err := dt.chMgr.getDMLStream(collID)
if err != nil {
@ -3220,6 +3220,7 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
}
dt.HashValues = typeutil.HashPK2Channels(dt.result.IDs, channelNames)
tr.Record("get vchannels")
// repack delete msg by dmChannel
result := make(map[uint32]msgstream.TsMsg)
collectionName := dt.CollectionName
@ -3270,12 +3271,16 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
}
}
tr.Record("pack messages")
err = stream.Produce(msgPack)
if err != nil {
dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
dt.result.Status.Reason = err.Error()
return err
}
sendMsgDur := tr.Record("send delete request to dml channels")
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(sendMsgDur.Milliseconds()))
return nil
}

View File

@ -23,13 +23,10 @@ import (
"math"
"sync"
"time"
"unsafe"
"github.com/golang/protobuf/proto"
oplog "github.com/opentracing/opentracing-go/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
@ -37,7 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage"
@ -333,8 +329,8 @@ func (q *queryCollection) setServiceableTime(t Timestamp) {
return
}
q.serviceableTime = t
ps, _ := tsoutil.ParseHybridTs(t)
metrics.QueryNodeServiceTime.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(ps))
// ps, _ := tsoutil.ParseHybridTs(t)
// metrics.QueryNodeServiceTime.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(ps))
}
func (q *queryCollection) checkTimeout(msg queryMsg) bool {
@ -704,286 +700,6 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
}
}
func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][]byte) (*schemapb.SearchResultData, error) {
tr := timerecord.NewTimeRecorder("translateHitsDuration")
log.Debug("translateHits:", zap.Any("lenOfFieldIDs", len(fieldIDs)), zap.Any("lenOfRawHits", len(rawHits)))
if len(rawHits) == 0 {
return nil, fmt.Errorf("empty results")
}
var hits []*milvuspb.Hits
for _, rawHit := range rawHits {
var hit milvuspb.Hits
err := proto.Unmarshal(rawHit, &hit)
if err != nil {
return nil, err
}
hits = append(hits, &hit)
}
blobOffset := 0
// skip id
numQueries := len(rawHits)
pbHits := &milvuspb.Hits{}
err := proto.Unmarshal(rawHits[0], pbHits)
if err != nil {
return nil, err
}
topK := len(pbHits.IDs)
blobOffset += 8
var ids []int64
var scores []float32
for _, hit := range hits {
ids = append(ids, hit.IDs...)
scores = append(scores, hit.Scores...)
}
finalResult := &schemapb.SearchResultData{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: ids,
},
},
},
Scores: scores,
TopK: int64(topK),
NumQueries: int64(numQueries),
}
for _, fieldID := range fieldIDs {
fieldMeta, err := schema.GetFieldFromID(fieldID)
if err != nil {
return nil, err
}
switch fieldMeta.DataType {
case schemapb.DataType_Bool:
blobLen := 1
var colData []bool
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
data := dataBlob[0]
colData = append(colData, data != 0)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_Int8:
blobLen := 1
var colData []int32
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
data := int32(dataBlob[0])
colData = append(colData, data)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_Int16:
blobLen := 2
var colData []int32
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
data := int32(int16(common.Endian.Uint16(dataBlob)))
colData = append(colData, data)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_Int32:
blobLen := 4
var colData []int32
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
data := int32(common.Endian.Uint32(dataBlob))
colData = append(colData, data)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_Int64:
blobLen := 8
var colData []int64
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
data := int64(common.Endian.Uint64(dataBlob))
colData = append(colData, data)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_Float:
blobLen := 4
var colData []float32
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
data := math.Float32frombits(common.Endian.Uint32(dataBlob))
colData = append(colData, data)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_Double:
blobLen := 8
var colData []float64
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
data := math.Float64frombits(common.Endian.Uint64(dataBlob))
colData = append(colData, data)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_FloatVector:
dim, err := schema.GetVectorDimFromID(fieldID)
if err != nil {
return nil, err
}
blobLen := dim * 4
var colData []float32
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
//ref https://github.com/golang/go/wiki/cgo#turning-c-arrays-into-go-slices
/* #nosec G103 */
ptr := unsafe.Pointer(&dataBlob[0])
farray := (*[1 << 28]float32)(ptr)
colData = append(colData, farray[:dim:dim]...)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: colData,
},
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
case schemapb.DataType_BinaryVector:
dim, err := schema.GetVectorDimFromID(fieldID)
if err != nil {
return nil, err
}
blobLen := dim / 8
var colData []byte
for _, hit := range hits {
for _, row := range hit.RowData {
dataBlob := row[blobOffset : blobOffset+blobLen]
colData = append(colData, dataBlob...)
}
}
newCol := &schemapb.FieldData{
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_BinaryVector{
BinaryVector: colData,
},
},
},
}
finalResult.FieldsData = append(finalResult.FieldsData, newCol)
blobOffset += blobLen
default:
return nil, fmt.Errorf("unsupported data type %s", schemapb.DataType_name[int32(fieldMeta.DataType)])
}
}
metrics.QueryNodeTranslateHitsLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
return finalResult, nil
}
// TODO:: cache map[dsl]plan
// TODO: reBatched search requests
func (q *queryCollection) search(msg queryMsg) error {

View File

@ -17,9 +17,7 @@
package querynode
import (
"bytes"
"context"
"encoding/binary"
"math"
"math/rand"
"sync"
@ -43,7 +41,6 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func genSimpleQueryCollection(ctx context.Context, cancel context.CancelFunc) (*queryCollection, error) {
@ -325,153 +322,6 @@ func TestQueryCollection_consumeQuery(t *testing.T) {
})
}
func TestQueryCollection_TranslateHits(t *testing.T) {
fieldID := FieldID(0)
fieldIDs := []FieldID{fieldID}
genRawHits := func(dataType schemapb.DataType) [][]byte {
// ids
ids := make([]int64, 0)
for i := 0; i < defaultMsgLength; i++ {
ids = append(ids, int64(i))
}
// raw data
rawData := make([][]byte, 0)
switch dataType {
case schemapb.DataType_Bool:
var buf bytes.Buffer
for i := 0; i < defaultMsgLength; i++ {
err := binary.Write(&buf, common.Endian, true)
assert.NoError(t, err)
}
rawData = append(rawData, buf.Bytes())
case schemapb.DataType_Int8:
var buf bytes.Buffer
for i := 0; i < defaultMsgLength; i++ {
err := binary.Write(&buf, common.Endian, int8(i))
assert.NoError(t, err)
}
rawData = append(rawData, buf.Bytes())
case schemapb.DataType_Int16:
var buf bytes.Buffer
for i := 0; i < defaultMsgLength; i++ {
err := binary.Write(&buf, common.Endian, int16(i))
assert.NoError(t, err)
}
rawData = append(rawData, buf.Bytes())
case schemapb.DataType_Int32:
var buf bytes.Buffer
for i := 0; i < defaultMsgLength; i++ {
err := binary.Write(&buf, common.Endian, int32(i))
assert.NoError(t, err)
}
rawData = append(rawData, buf.Bytes())
case schemapb.DataType_Int64:
var buf bytes.Buffer
for i := 0; i < defaultMsgLength; i++ {
err := binary.Write(&buf, common.Endian, int64(i))
assert.NoError(t, err)
}
rawData = append(rawData, buf.Bytes())
case schemapb.DataType_Float:
var buf bytes.Buffer
for i := 0; i < defaultMsgLength; i++ {
err := binary.Write(&buf, common.Endian, float32(i))
assert.NoError(t, err)
}
rawData = append(rawData, buf.Bytes())
case schemapb.DataType_Double:
var buf bytes.Buffer
for i := 0; i < defaultMsgLength; i++ {
err := binary.Write(&buf, common.Endian, float64(i))
assert.NoError(t, err)
}
rawData = append(rawData, buf.Bytes())
}
hit := &milvuspb.Hits{
IDs: ids,
RowData: rawData,
}
hits := []*milvuspb.Hits{hit}
rawHits := make([][]byte, 0)
for _, h := range hits {
rawHit, err := proto.Marshal(h)
assert.NoError(t, err)
rawHits = append(rawHits, rawHit)
}
return rawHits
}
genSchema := func(dataType schemapb.DataType) *typeutil.SchemaHelper {
schema := &schemapb.CollectionSchema{
Name: defaultCollectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
genConstantField(constFieldParam{
id: fieldID,
dataType: dataType,
}),
},
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
assert.NoError(t, err)
return schemaHelper
}
t.Run("test bool field", func(t *testing.T) {
dataType := schemapb.DataType_Bool
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.NoError(t, err)
})
t.Run("test int8 field", func(t *testing.T) {
dataType := schemapb.DataType_Int8
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.NoError(t, err)
})
t.Run("test int16 field", func(t *testing.T) {
dataType := schemapb.DataType_Int16
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.NoError(t, err)
})
t.Run("test int32 field", func(t *testing.T) {
dataType := schemapb.DataType_Int32
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.NoError(t, err)
})
t.Run("test int64 field", func(t *testing.T) {
dataType := schemapb.DataType_Int64
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.NoError(t, err)
})
t.Run("test float field", func(t *testing.T) {
dataType := schemapb.DataType_Float
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.NoError(t, err)
})
t.Run("test double field", func(t *testing.T) {
dataType := schemapb.DataType_Double
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.NoError(t, err)
})
t.Run("test field with error type", func(t *testing.T) {
dataType := schemapb.DataType_FloatVector
_, err := translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.Error(t, err)
dataType = schemapb.DataType_BinaryVector
_, err = translateHits(genSchema(dataType), fieldIDs, genRawHits(dataType))
assert.Error(t, err)
})
}
func TestQueryCollection_serviceableTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -139,7 +139,7 @@ func (p *proxyManager) handlePutEvent(e *clientv3.Event) error {
for _, f := range p.addSessionsFunc {
f(session)
}
metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Inc()
metrics.RootCoordProxyCounter.WithLabelValues().Inc()
return nil
}
@ -152,7 +152,7 @@ func (p *proxyManager) handleDeleteEvent(e *clientv3.Event) error {
for _, f := range p.delSessionsFunc {
f(session)
}
metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Dec()
metrics.RootCoordProxyCounter.WithLabelValues().Dec()
return nil
}

View File

@ -334,7 +334,7 @@ func (c *Core) tsLoop() {
continue
}
ts := c.TSOGetLastSavedTime()
metrics.RootCoordETCDTimestampAllocCounter.Set(float64(ts.Unix()))
metrics.RootCoordTimestampSaved.Set(float64(ts.Unix()))
if err := c.IDAllocatorUpdate(); err != nil {
log.Warn("failed to update id: ", zap.Error(err))
continue
@ -1383,7 +1383,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon
// CreateCollection create collection
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
metrics.RootCoordCreateCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -1403,20 +1403,21 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
if err != nil {
log.Error("CreateCollection failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "CreateCollection failed: "+err.Error()), nil
}
log.Debug("CreateCollection success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordCreateCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfCollections.Inc()
return succStatus(), nil
}
// DropCollection drop collection
func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
metrics.RootCoordDropCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -1434,20 +1435,21 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
if err != nil {
log.Error("DropCollection failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "DropCollection failed: "+err.Error()), nil
}
log.Debug("DropCollection success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDropCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfCollections.Dec()
return succStatus(), nil
}
// HasCollection check collection existence
func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
metrics.RootCoordHasCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
@ -1470,6 +1472,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
if err != nil {
log.Error("HasCollection failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()),
Value: false,
@ -1478,8 +1481,8 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
log.Debug("HasCollection success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Bool("hasCollection", t.HasCollection))
metrics.RootCoordHasCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.BoolResponse{
Status: succStatus(),
Value: t.HasCollection,
@ -1488,7 +1491,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
// DescribeCollection return collection info
func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.DescribeCollectionResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode"+internalpb.StateCode_name[int32(code)]),
@ -1510,6 +1513,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
if err != nil {
log.Error("DescribeCollection failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
return &milvuspb.DescribeCollectionResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeCollection failed: "+err.Error()),
}, nil
@ -1517,15 +1521,15 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
log.Debug("DescribeCollection success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
t.Rsp.Status = succStatus()
return t.Rsp, nil
}
// ShowCollections list all collection names
func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
metrics.RootCoordShowCollectionsCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.ShowCollectionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
@ -1547,6 +1551,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
if err != nil {
log.Error("ShowCollections failed", zap.String("role", typeutil.RootCoordRole),
zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
return &milvuspb.ShowCollectionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()),
}, nil
@ -1555,15 +1560,15 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
zap.String("dbname", in.DbName), zap.Int("num of collections", len(t.Rsp.CollectionNames)),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordShowCollectionsCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.SuccessLabel).Inc()
t.Rsp.Status = succStatus()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds()))
return t.Rsp, nil
}
// CreatePartition create partition
func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
metrics.RootCoordCreatePartitionCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -1583,21 +1588,22 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
log.Error("CreatePartition failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "CreatePartition failed: "+err.Error()), nil
}
log.Debug("CreatePartition success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordCreatePartitionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()
return succStatus(), nil
}
// DropPartition drop partition
func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
metrics.RootCoordDropPartitionCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -1617,21 +1623,22 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
log.Error("DropPartition failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "DropPartition failed: "+err.Error()), nil
}
log.Debug("DropPartition success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDropPartitionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfPartitions.WithLabelValues().Dec()
return succStatus(), nil
}
// HasPartition check partition existence
func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
metrics.RootCoordHasPartitionCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
@ -1656,6 +1663,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
log.Error("HasPartition failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
return &milvuspb.BoolResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()),
Value: false,
@ -1665,8 +1673,8 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordHasPartitionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.BoolResponse{
Status: succStatus(),
Value: t.HasPartition,
@ -1675,7 +1683,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
// ShowPartitions list all partition names
func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
metrics.RootCoordShowPartitionsCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.ShowPartitionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
@ -1697,6 +1705,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
if err != nil {
log.Error("ShowPartitions failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
return &milvuspb.ShowPartitionsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()),
}, nil
@ -1705,15 +1714,15 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
zap.String("collection name", in.CollectionName), zap.Int("num of partitions", len(t.Rsp.PartitionNames)),
zap.Int64("msgID", t.Req.Base.MsgID))
metrics.RootCoordShowPartitionsCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.SuccessLabel).Inc()
t.Rsp.Status = succStatus()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds()))
return t.Rsp, nil
}
// CreateIndex create index
func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
metrics.RootCoordCreateIndexCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateIndex", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -1733,20 +1742,21 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
log.Error("CreateIndex failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateIndex", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "CreateIndex failed: "+err.Error()), nil
}
log.Debug("CreateIndex success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordCreateIndexCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreateIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateIndex", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("CreateIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
return succStatus(), nil
}
// DescribeIndex return index info
func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
metrics.RootCoordDescribeIndexCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeIndex", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.DescribeIndexResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
@ -1769,6 +1779,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
log.Error("DescribeIndex failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeIndex", metrics.FailLabel).Inc()
return &milvuspb.DescribeIndexResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeIndex failed: "+err.Error()),
}, nil
@ -1781,19 +1792,19 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDescribeIndexCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeIndex", metrics.SuccessLabel).Inc()
if len(t.Rsp.IndexDescriptions) == 0 {
t.Rsp.Status = failStatus(commonpb.ErrorCode_IndexNotExist, "index not exist")
} else {
t.Rsp.Status = succStatus()
}
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DescribeIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
return t.Rsp, nil
}
// DropIndex drop index
func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
metrics.RootCoordDropIndexCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DropIndex", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -1813,20 +1824,21 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
log.Error("DropIndex failed", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropIndex", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "DropIndex failed: "+err.Error()), nil
}
log.Debug("DropIndex success", zap.String("role", typeutil.RootCoordRole),
zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDropIndexCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropIndex", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DropIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
return succStatus(), nil
}
// DescribeSegment return segment info
func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeSegment", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.DescribeSegmentResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
@ -1849,6 +1861,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
log.Error("DescribeSegment failed", zap.String("role", typeutil.RootCoordRole),
zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeSegment", metrics.FailLabel).Inc()
return &milvuspb.DescribeSegmentResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeSegment failed: "+err.Error()),
}, nil
@ -1857,15 +1870,14 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("DescribeSegment").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeSegment", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeSegment").Observe(float64(tr.ElapseSpan().Milliseconds()))
t.Rsp.Status = succStatus()
return t.Rsp, nil
}
func (c *Core) DescribeSegments(ctx context.Context, in *rootcoordpb.DescribeSegmentsRequest) (*rootcoordpb.DescribeSegmentsResponse, error) {
metrics.RootCoordDescribeSegmentsCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeSegments", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
log.Error("failed to describe segments, rootcoord not healthy",
zap.String("role", typeutil.RootCoordRole),
@ -1903,6 +1915,7 @@ func (c *Core) DescribeSegments(ctx context.Context, in *rootcoordpb.DescribeSeg
zap.Int64("collection", in.GetCollectionID()),
zap.Int64s("segments", in.GetSegmentIDs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeSegments", metrics.FailLabel).Inc()
return &rootcoordpb.DescribeSegmentsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeSegments failed: "+err.Error()),
}, nil
@ -1914,8 +1927,8 @@ func (c *Core) DescribeSegments(ctx context.Context, in *rootcoordpb.DescribeSeg
zap.Int64("collection", in.GetCollectionID()),
zap.Int64s("segments", in.GetSegmentIDs()))
metrics.RootCoordDescribeSegmentsCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("DescribeSegments").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeSegments", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeSegments").Observe(float64(tr.ElapseSpan().Milliseconds()))
t.Rsp.Status = succStatus()
return t.Rsp, nil
@ -1923,7 +1936,7 @@ func (c *Core) DescribeSegments(ctx context.Context, in *rootcoordpb.DescribeSeg
// ShowSegments list all segments
func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
metrics.RootCoordShowSegmentsCounter.WithLabelValues(metrics.TotalLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowSegments", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.ShowSegmentsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
@ -1947,6 +1960,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
log.Debug("ShowSegments failed", zap.String("role", typeutil.RootCoordRole),
zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowSegments", metrics.FailLabel).Inc()
return &milvuspb.ShowSegmentsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowSegments failed: "+err.Error()),
}, nil
@ -1956,8 +1970,8 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
zap.Int64s("segments ids", t.Rsp.SegmentIDs),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordShowSegmentsCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReadTypeLatency.WithLabelValues("ShowSegments").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowSegments", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("ShowSegments").Observe(float64(tr.ElapseSpan().Milliseconds()))
t.Rsp.Status = succStatus()
return t.Rsp, nil
}
@ -1980,7 +1994,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam
//return first available time stamp
ts = ts - uint64(in.Count) + 1
metrics.RootCoordTimestampAllocCounter.Set(float64(ts))
metrics.RootCoordTimestamp.Set(float64(ts))
return &rootcoordpb.AllocTimestampResponse{
Status: succStatus(),
Timestamp: ts,
@ -2168,6 +2182,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
// CreateAlias create collection alias
func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -2187,18 +2202,21 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
log.Error("CreateAlias failed", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "CreateAlias failed: "+err.Error()), nil
}
log.Debug("CreateAlias success", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
return succStatus(), nil
}
// DropAlias drop collection alias
func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -2216,17 +2234,20 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
if err != nil {
log.Error("DropAlias failed", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.Alias), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "DropAlias failed: "+err.Error()), nil
}
log.Debug("DropAlias success", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.Alias), zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
return succStatus(), nil
}
// AlterAlias alter collection alias
func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
@ -2246,13 +2267,15 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
log.Error("AlterAlias failed", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, "AlterAlias failed: "+err.Error()), nil
}
log.Debug("AlterAlias success", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
zap.Int64("msgID", in.Base.MsgID))
metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
return succStatus(), nil
}
@ -2561,8 +2584,9 @@ func (c *Core) ClearCredUsersCache(ctx context.Context) error {
// 2. encrypt raw password
// 3. save in to etcd
func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("CreateCredential")
method := "CreateCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username))
@ -2574,7 +2598,7 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden
if err != nil {
log.Error("CreateCredential clear credential username list cache failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed: "+err.Error()), nil
}
// insert to db
@ -2582,22 +2606,23 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden
if err != nil {
log.Error("CreateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed: "+err.Error()), nil
}
log.Debug("CreateCredential success", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username))
metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordCredentialWriteTypeLatency.WithLabelValues("CreateCredential").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfCredentials.Inc()
return succStatus(), nil
}
// GetCredential get credential by username
func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
metrics.RootCoordGetCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("GetCredential")
method := "GetCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
log.Debug("GetCredential", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username))
@ -2605,7 +2630,7 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR
if err != nil {
log.Error("GetCredential query credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username), zap.Error(err))
metrics.RootCoordGetCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return &rootcoordpb.GetCredentialResponse{
Status: failStatus(commonpb.ErrorCode_GetCredentialFailure, "GetCredential failed: "+err.Error()),
}, err
@ -2613,8 +2638,8 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR
log.Debug("GetCredential success", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username))
metrics.RootCoordGetCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordCredentialReadTypeLatency.WithLabelValues("GetCredential", in.Username).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &rootcoordpb.GetCredentialResponse{
Status: succStatus(),
Username: credInfo.Username,
@ -2624,8 +2649,9 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR
// UpdateCredential update password for a user
func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("UpdateCredential")
method := "UpdateCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username))
// update proxy's local cache
@ -2633,7 +2659,7 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden
if err != nil {
log.Error("UpdateCredential update credential cache failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil
}
// update data on storage
@ -2641,21 +2667,22 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden
if err != nil {
log.Error("UpdateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username), zap.Error(err))
metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil
}
log.Debug("UpdateCredential success", zap.String("role", typeutil.RootCoordRole),
zap.String("username", credInfo.Username))
metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordCredentialWriteTypeLatency.WithLabelValues("UpdateCredential").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return succStatus(), nil
}
// DeleteCredential delete a user
func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("DeleteCredential")
method := "DeleteCredential"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
log.Debug("DeleteCredential", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username))
@ -2664,7 +2691,7 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti
if err != nil {
log.Error("DeleteCredential expire credential cache failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username), zap.Error(err))
metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), nil
}
// delete data on storage
@ -2672,35 +2699,37 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti
if err != nil {
log.Error("DeleteCredential remove credential failed", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username), zap.Error(err))
metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), err
}
log.Debug("DeleteCredential success", zap.String("role", typeutil.RootCoordRole),
zap.String("username", in.Username))
metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordCredentialWriteTypeLatency.WithLabelValues("DeleteCredential").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfCredentials.Dec()
return succStatus(), nil
}
// ListCredUsers list all usernames
func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
metrics.RootCoordListCredUsersCounter.WithLabelValues(metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("ListCredUsers")
method := "ListCredUsers"
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder(method)
credInfo, err := c.MetaTable.ListCredentialUsernames()
if err != nil {
log.Error("ListCredUsers query usernames failed", zap.String("role", typeutil.RootCoordRole),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return &milvuspb.ListCredUsersResponse{
Status: failStatus(commonpb.ErrorCode_ListCredUsersFailure, "ListCredUsers failed: "+err.Error()),
}, err
}
log.Debug("ListCredUsers success", zap.String("role", typeutil.RootCoordRole))
metrics.RootCoordListCredUsersCounter.WithLabelValues(metrics.SuccessLabel).Inc()
metrics.RootCoordCredentialReadTypeLatency.WithLabelValues("ListCredUsers", "ALL.API").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return &milvuspb.ListCredUsersResponse{
Status: succStatus(),
Usernames: credInfo.Usernames,

View File

@ -367,8 +367,9 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim
return err
}
physicalTs, _ := tsoutil.ParseHybridTs(ts)
for _, chanName := range chanNames {
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts)))
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(physicalTs))
}
return nil
}