diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index e2f1b9b5b0..0870198980 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -104,12 +104,15 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error minTsafeChannel, minTsafe := node.tSafeManager.Min() + collections := node.manager.Collection.List() + var totalGrowingSize int64 growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) growingGroupByCollection := lo.GroupBy(growingSegments, func(seg segments.Segment) int64 { return seg.Collection() }) - for collection, segs := range growingGroupByCollection { + for _, collection := range collections { + segs := growingGroupByCollection[collection] size := lo.SumBy(segs, func(seg segments.Segment) int64 { return seg.MemSize() }) @@ -122,7 +125,8 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error sealedGroupByCollection := lo.GroupBy(sealedSegments, func(seg segments.Segment) int64 { return seg.Collection() }) - for collection, segs := range sealedGroupByCollection { + for _, collection := range collections { + segs := sealedGroupByCollection[collection] size := lo.SumBy(segs, func(seg segments.Segment) int64 { return seg.MemSize() }) @@ -130,12 +134,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error fmt.Sprint(collection), segments.SegmentTypeSealed.String()).Set(float64(size)) } - allSegments := node.manager.Segment.GetBy() - collections := typeutil.NewUniqueSet() - for _, segment := range allSegments { - collections.Insert(segment.Collection()) - } - return &metricsinfo.QueryNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, Rms: rms, @@ -149,7 +147,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error GrowingSegmentsSize: totalGrowingSize, Effect: metricsinfo.NodeEffect{ NodeID: node.GetNodeID(), - CollectionIDs: collections.Collect(), + CollectionIDs: collections, }, }, nil } diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index 89f12b463b..a181734187 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -25,6 +25,7 @@ package segments import "C" import ( + "fmt" "sync" "unsafe" @@ -39,11 +40,14 @@ import ( "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) type CollectionManager interface { + List() []int64 Get(collectionID int64) *Collection PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) Ref(collectionID int64, count uint32) bool @@ -64,6 +68,13 @@ func NewCollectionManager() *collectionManager { } } +func (m *collectionManager) List() []int64 { + m.mut.RLock() + defer m.mut.RUnlock() + + return lo.Keys(m.collections) +} + func (m *collectionManager) Get(collectionID int64) *Collection { m.mut.RLock() defer m.mut.RUnlock() @@ -109,6 +120,16 @@ func (m *collectionManager) Unref(collectionID int64, count uint32) bool { log.Info("release collection due to ref count to 0", zap.Int64("collectionID", collectionID)) delete(m.collections, collectionID) DeleteCollection(collection) + metrics.QueryNodeEntitiesSize.DeleteLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(collectionID), + SegmentTypeSealed.String(), + ) + metrics.QueryNodeEntitiesSize.DeleteLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(collectionID), + SegmentTypeGrowing.String(), + ) return true } return false diff --git a/internal/querynodev2/segments/mock_collection_manager.go b/internal/querynodev2/segments/mock_collection_manager.go index 5e11244a49..049b326c71 100644 --- a/internal/querynodev2/segments/mock_collection_manager.go +++ b/internal/querynodev2/segments/mock_collection_manager.go @@ -67,6 +67,49 @@ func (_c *MockCollectionManager_Get_Call) RunAndReturn(run func(int64) *Collecti return _c } +// List provides a mock function with given fields: +func (_m *MockCollectionManager) List() []int64 { + ret := _m.Called() + + var r0 []int64 + if rf, ok := ret.Get(0).(func() []int64); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockCollectionManager_List_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'List' +type MockCollectionManager_List_Call struct { + *mock.Call +} + +// List is a helper method to define mock.On call +func (_e *MockCollectionManager_Expecter) List() *MockCollectionManager_List_Call { + return &MockCollectionManager_List_Call{Call: _e.mock.On("List")} +} + +func (_c *MockCollectionManager_List_Call) Run(run func()) *MockCollectionManager_List_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCollectionManager_List_Call) Return(_a0 []int64) *MockCollectionManager_List_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCollectionManager_List_Call) RunAndReturn(run func() []int64) *MockCollectionManager_List_Call { + _c.Call.Return(run) + return _c +} + // PutOrRef provides a mock function with given fields: collectionID, schema, meta, loadMeta func (_m *MockCollectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) { _m.Called(collectionID, schema, meta, loadMeta)