mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
enhance: speed up minor functions calls in datacoord (#32389)
Related to https://github.com/milvus-io/milvus/issues/32165 1. nodeid based channel store access should use map access instead of iteration. 2. The join-ish functions calls are slow when # collections/segments increases (e.g. 10k). e.g. getNumRowsOfCollectionUnsafe is O(num_segments); GetAllCollectionNumRows is of O(num_collections*num_segments). Signed-off-by: yiwangdr <yiwangdr@gmail.com>
This commit is contained in:
parent
93f0c262c1
commit
037de8e4d3
@ -551,10 +551,8 @@ func (c *ChannelManagerImpl) Match(nodeID int64, channel string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, ch := range info.Channels {
|
||||
if ch.GetName() == channel {
|
||||
return true
|
||||
}
|
||||
if _, ok := info.Channels[channel]; ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -399,43 +399,35 @@ func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo {
|
||||
|
||||
// GetBufferChannelInfo returns all unassigned channels.
|
||||
func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == bufferID {
|
||||
return info
|
||||
}
|
||||
if info, ok := c.channelsInfo[bufferID]; ok {
|
||||
return info
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetNode returns the channel info of a given node.
|
||||
func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == nodeID {
|
||||
return info
|
||||
}
|
||||
if info, ok := c.channelsInfo[nodeID]; ok {
|
||||
return info
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int {
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == nodeID {
|
||||
return len(info.Channels)
|
||||
}
|
||||
if info, ok := c.channelsInfo[nodeID]; ok {
|
||||
return len(info.Channels)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// Delete removes the given node from the channel store and returns its channels.
|
||||
func (c *ChannelStore) Delete(nodeID int64) ([]RWChannel, error) {
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == nodeID {
|
||||
if err := c.remove(nodeID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(c.channelsInfo, id)
|
||||
return lo.Values(info.Channels), nil
|
||||
if info, ok := c.channelsInfo[nodeID]; ok {
|
||||
if err := c.remove(nodeID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(c.channelsInfo, nodeID)
|
||||
return lo.Values(info.Channels), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -385,7 +385,7 @@ func (m *indexMeta) GetSegmentIndexState(collID, segmentID UniqueID, indexID Uni
|
||||
return state
|
||||
}
|
||||
|
||||
func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID) []int64 {
|
||||
func (m *indexMeta) GetIndexedSegments(collectionID int64, segmentIDs, fieldIDs []UniqueID) []int64 {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
@ -412,9 +412,11 @@ func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID)
|
||||
}
|
||||
|
||||
ret := make([]int64, 0)
|
||||
for sid, indexes := range m.segmentIndexes {
|
||||
if checkSegmentState(indexes) {
|
||||
ret = append(ret, sid)
|
||||
for _, sid := range segmentIDs {
|
||||
if indexes, ok := m.segmentIndexes[sid]; ok {
|
||||
if checkSegmentState(indexes) {
|
||||
ret = append(ret, sid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -614,17 +614,17 @@ func TestMeta_GetIndexedSegment(t *testing.T) {
|
||||
}
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
segments := m.GetIndexedSegments(collID, []int64{fieldID})
|
||||
segments := m.GetIndexedSegments(collID, []int64{segID}, []int64{fieldID})
|
||||
assert.Len(t, segments, 1)
|
||||
})
|
||||
|
||||
t.Run("no index on field", func(t *testing.T) {
|
||||
segments := m.GetIndexedSegments(collID, []int64{fieldID + 1})
|
||||
segments := m.GetIndexedSegments(collID, []int64{segID}, []int64{fieldID + 1})
|
||||
assert.Len(t, segments, 0)
|
||||
})
|
||||
|
||||
t.Run("no index", func(t *testing.T) {
|
||||
segments := m.GetIndexedSegments(collID+1, []int64{fieldID})
|
||||
segments := m.GetIndexedSegments(collID+1, []int64{segID}, []int64{fieldID})
|
||||
assert.Len(t, segments, 0)
|
||||
})
|
||||
}
|
||||
|
@ -321,8 +321,11 @@ func (m *meta) GetAllCollectionNumRows() map[int64]int64 {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
ret := make(map[int64]int64, len(m.collections))
|
||||
for collectionID := range m.collections {
|
||||
ret[collectionID] = m.getNumRowsOfCollectionUnsafe(collectionID)
|
||||
segments := m.segments.GetSegments()
|
||||
for _, segment := range segments {
|
||||
if isSegmentHealthy(segment) {
|
||||
ret[segment.GetCollectionID()] += segment.GetNumOfRows()
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
@ -1053,14 +1056,7 @@ func (m *meta) GetFlushingSegments() []*SegmentInfo {
|
||||
func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
var ret []*SegmentInfo
|
||||
segments := m.segments.GetSegments()
|
||||
for _, info := range segments {
|
||||
if selector(info) {
|
||||
ret = append(ret, info)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
return m.segments.GetSegmentsBySelector(selector)
|
||||
}
|
||||
|
||||
// AddAllocation add allocation in segment
|
||||
|
@ -93,6 +93,16 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
|
||||
return segments
|
||||
}
|
||||
|
||||
func (s *SegmentsInfo) GetSegmentsBySelector(selector SegmentInfoSelector) []*SegmentInfo {
|
||||
var segments []*SegmentInfo
|
||||
for _, segment := range s.segments {
|
||||
if selector(segment) {
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
}
|
||||
return segments
|
||||
}
|
||||
|
||||
// GetCompactionTo returns the segment that the provided segment is compacted to.
|
||||
// Return (nil, false) if given segmentID can not found in the meta.
|
||||
// Return (nil, true) if given segmentID can be found not no compaction to.
|
||||
|
@ -92,9 +92,12 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
|
||||
vecFieldIDs = append(vecFieldIDs, field.GetFieldID())
|
||||
}
|
||||
}
|
||||
segmentIDs := lo.Map(segmentList, func(seg *SegmentInfo, _ int) UniqueID {
|
||||
return seg.GetID()
|
||||
})
|
||||
|
||||
// get indexed segments which finish build index on all vector field
|
||||
indexed := mt.indexMeta.GetIndexedSegments(collection, vecFieldIDs)
|
||||
indexed := mt.indexMeta.GetIndexedSegments(collection, segmentIDs, vecFieldIDs)
|
||||
if len(indexed) > 0 {
|
||||
indexedSet := typeutil.NewUniqueSet(indexed...)
|
||||
for _, segment := range segmentList {
|
||||
|
Loading…
Reference in New Issue
Block a user