mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
fix: correct error of metrics stats (#33305)
issue: #32980 cherry pick from master pr: #33075 #33255 --------- Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
d4a146ef1a
commit
8990b8b051
@ -176,21 +176,10 @@ func (m *meta) AddCollection(collection *collectionInfo) {
|
||||
// DropCollection drop a collection from meta
|
||||
func (m *meta) DropCollection(collectionID int64) {
|
||||
log.Info("meta update: drop collection", zap.Int64("collectionID", collectionID))
|
||||
segments := m.SelectSegments(WithCollection(collectionID))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
coll, ok := m.collections[collectionID]
|
||||
if ok {
|
||||
metrics.CleanupDataCoordNumStoredRows(coll.DatabaseName, collectionID)
|
||||
metrics.CleanupDataCoordBulkInsertVectors(coll.DatabaseName, collectionID)
|
||||
for _, seg := range segments {
|
||||
metrics.CleanupDataCoordSegmentMetrics(coll.DatabaseName, collectionID, seg.ID)
|
||||
}
|
||||
} else {
|
||||
log.Warn("not found database name", zap.Int64("collectionID", collectionID))
|
||||
}
|
||||
|
||||
delete(m.collections, collectionID)
|
||||
metrics.CleanupDataCoordWithCollectionID(collectionID)
|
||||
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections)))
|
||||
log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID))
|
||||
}
|
||||
@ -318,13 +307,13 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueI
|
||||
collectionRowsNum[segment.GetCollectionID()][segment.GetState()] += segment.GetNumOfRows()
|
||||
}
|
||||
}
|
||||
|
||||
metrics.DataCoordNumStoredRows.Reset()
|
||||
for collectionID, statesRows := range collectionRowsNum {
|
||||
for state, rows := range statesRows {
|
||||
coll, ok := m.collections[collectionID]
|
||||
if ok {
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), state.String()).Set(float64(rows))
|
||||
} else {
|
||||
log.Warn("not found database name", zap.Int64("collectionID", collectionID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -555,6 +555,7 @@ func (m *CollectionManager) RemoveCollection(collectionID typeutil.UniqueID) err
|
||||
}
|
||||
delete(m.collectionPartitions, collectionID)
|
||||
}
|
||||
metrics.CleanQueryCoordMetricsWithCollectionID(collectionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -798,7 +798,11 @@ func (scheduler *taskScheduler) remove(task Task) {
|
||||
|
||||
scheduler.updateTaskMetrics()
|
||||
log.Info("task removed")
|
||||
metrics.QueryCoordTaskLatency.WithLabelValues(scheduler.getTaskMetricsLabel(task), task.Shard()).Observe(float64(task.GetTaskLatency()))
|
||||
|
||||
if scheduler.meta.Exist(task.CollectionID()) {
|
||||
metrics.QueryCoordTaskLatency.WithLabelValues(fmt.Sprint(task.CollectionID()),
|
||||
scheduler.getTaskMetricsLabel(task), task.Shard()).Observe(float64(task.GetTaskLatency()))
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) getTaskMetricsLabel(task Task) string {
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
@ -440,15 +439,6 @@ func (s *LocalSegment) initializeSegment() error {
|
||||
|
||||
// Update the insert count when initialize the segment and update the metrics.
|
||||
s.insertCount.Store(loadInfo.GetNumOfRows())
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
s.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(s.Collection()),
|
||||
fmt.Sprint(s.Partition()),
|
||||
s.Type().String(),
|
||||
strconv.FormatInt(int64(len(s.Indexes())), 10),
|
||||
).Add(float64(loadInfo.GetNumOfRows()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -808,15 +798,6 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
|
||||
}
|
||||
|
||||
s.insertCount.Add(int64(numOfRow))
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
s.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(s.Collection()),
|
||||
fmt.Sprint(s.Partition()),
|
||||
s.Type().String(),
|
||||
strconv.FormatInt(int64(len(s.Indexes())), 10),
|
||||
).Add(float64(numOfRow))
|
||||
|
||||
s.rowNum.Store(-1)
|
||||
s.memSize.Store(-1)
|
||||
return nil
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -341,19 +340,26 @@ func CleanupDataCoordSegmentMetrics(dbName string, collectionID int64, segmentID
|
||||
})
|
||||
}
|
||||
|
||||
func CleanupDataCoordNumStoredRows(dbName string, collectionID int64) {
|
||||
for _, state := range commonpb.SegmentState_name {
|
||||
DataCoordNumStoredRows.Delete(prometheus.Labels{
|
||||
databaseLabelName: dbName,
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
segmentStateLabelName: fmt.Sprint(state),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func CleanupDataCoordBulkInsertVectors(dbName string, collectionID int64) {
|
||||
DataCoordBulkVectors.Delete(prometheus.Labels{
|
||||
databaseLabelName: dbName,
|
||||
func CleanupDataCoordWithCollectionID(collectionID int64) {
|
||||
IndexTaskNum.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
DataCoordNumStoredRows.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
DataCoordBulkVectors.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
DataCoordSegmentBinLogFileCount.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
DataCoordStoredBinlogSize.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
DataCoordStoredIndexFilesSize.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
DataCoordSizeStoredL0Segment.Delete(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -129,7 +131,7 @@ var (
|
||||
Name: "task_latency",
|
||||
Help: "latency of all kind of task in query coord scheduler scheduler",
|
||||
Buckets: longTaskBuckets,
|
||||
}, []string{taskTypeLabel, channelNameLabelName})
|
||||
}, []string{collectionIDLabelName, taskTypeLabel, channelNameLabelName})
|
||||
)
|
||||
|
||||
// RegisterQueryCoord registers QueryCoord metrics
|
||||
@ -145,3 +147,9 @@ func RegisterQueryCoord(registry *prometheus.Registry) {
|
||||
registry.MustRegister(QueryCoordCurrentTargetCheckpointUnixSeconds)
|
||||
registry.MustRegister(QueryCoordTaskLatency)
|
||||
}
|
||||
|
||||
func CleanQueryCoordMetricsWithCollectionID(collectionID int64) {
|
||||
QueryCoordTaskLatency.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user