fix: Remove QueryNodeEntitiesSize after segment/collection released (#31290)

See also #31289

This PR:
- Set collection level `QueryNodeEntitiesSize` to zero if all segment
released
- Delete `QueryNodeEntitiesSize` metrics value after collection ref is
zero

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-03-15 15:43:04 +08:00 committed by GitHub
parent 465fd474de
commit 08aba2e05f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 71 additions and 9 deletions

View File

@ -104,12 +104,15 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
minTsafeChannel, minTsafe := node.tSafeManager.Min()
collections := node.manager.Collection.List()
var totalGrowingSize int64
growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
growingGroupByCollection := lo.GroupBy(growingSegments, func(seg segments.Segment) int64 {
return seg.Collection()
})
for collection, segs := range growingGroupByCollection {
for _, collection := range collections {
segs := growingGroupByCollection[collection]
size := lo.SumBy(segs, func(seg segments.Segment) int64 {
return seg.MemSize()
})
@ -122,7 +125,8 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
sealedGroupByCollection := lo.GroupBy(sealedSegments, func(seg segments.Segment) int64 {
return seg.Collection()
})
for collection, segs := range sealedGroupByCollection {
for _, collection := range collections {
segs := sealedGroupByCollection[collection]
size := lo.SumBy(segs, func(seg segments.Segment) int64 {
return seg.MemSize()
})
@ -130,12 +134,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
fmt.Sprint(collection), segments.SegmentTypeSealed.String()).Set(float64(size))
}
allSegments := node.manager.Segment.GetBy()
collections := typeutil.NewUniqueSet()
for _, segment := range allSegments {
collections.Insert(segment.Collection())
}
return &metricsinfo.QueryNodeQuotaMetrics{
Hms: metricsinfo.HardwareMetrics{},
Rms: rms,
@ -149,7 +147,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
GrowingSegmentsSize: totalGrowingSize,
Effect: metricsinfo.NodeEffect{
NodeID: node.GetNodeID(),
CollectionIDs: collections.Collect(),
CollectionIDs: collections,
},
}, nil
}

View File

@ -25,6 +25,7 @@ package segments
import "C"
import (
"fmt"
"sync"
"unsafe"
@ -39,11 +40,14 @@ import (
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type CollectionManager interface {
List() []int64
Get(collectionID int64) *Collection
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)
Ref(collectionID int64, count uint32) bool
@ -64,6 +68,13 @@ func NewCollectionManager() *collectionManager {
}
}
func (m *collectionManager) List() []int64 {
m.mut.RLock()
defer m.mut.RUnlock()
return lo.Keys(m.collections)
}
func (m *collectionManager) Get(collectionID int64) *Collection {
m.mut.RLock()
defer m.mut.RUnlock()
@ -109,6 +120,16 @@ func (m *collectionManager) Unref(collectionID int64, count uint32) bool {
log.Info("release collection due to ref count to 0", zap.Int64("collectionID", collectionID))
delete(m.collections, collectionID)
DeleteCollection(collection)
metrics.QueryNodeEntitiesSize.DeleteLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(collectionID),
SegmentTypeSealed.String(),
)
metrics.QueryNodeEntitiesSize.DeleteLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(collectionID),
SegmentTypeGrowing.String(),
)
return true
}
return false

View File

@ -67,6 +67,49 @@ func (_c *MockCollectionManager_Get_Call) RunAndReturn(run func(int64) *Collecti
return _c
}
// List provides a mock function with given fields:
func (_m *MockCollectionManager) List() []int64 {
ret := _m.Called()
var r0 []int64
if rf, ok := ret.Get(0).(func() []int64); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
return r0
}
// MockCollectionManager_List_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'List'
type MockCollectionManager_List_Call struct {
*mock.Call
}
// List is a helper method to define mock.On call
func (_e *MockCollectionManager_Expecter) List() *MockCollectionManager_List_Call {
return &MockCollectionManager_List_Call{Call: _e.mock.On("List")}
}
func (_c *MockCollectionManager_List_Call) Run(run func()) *MockCollectionManager_List_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockCollectionManager_List_Call) Return(_a0 []int64) *MockCollectionManager_List_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCollectionManager_List_Call) RunAndReturn(run func() []int64) *MockCollectionManager_List_Call {
_c.Call.Return(run)
return _c
}
// PutOrRef provides a mock function with given fields: collectionID, schema, meta, loadMeta
func (_m *MockCollectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) {
_m.Called(collectionID, schema, meta, loadMeta)