mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
enhance: Maintain collection-patitions mapping in qc meta (#32227)
Related to #32165 Add collection to partitionIDs mapping to avoid interation on all partitions loaded when trying to get all partitions with collection id --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
14977b84d1
commit
dc11cbd123
@ -103,14 +103,17 @@ type CollectionManager struct {
|
||||
|
||||
collections map[typeutil.UniqueID]*Collection
|
||||
partitions map[typeutil.UniqueID]*Partition
|
||||
catalog metastore.QueryCoordCatalog
|
||||
|
||||
collectionPartitions map[typeutil.UniqueID]typeutil.Set[typeutil.UniqueID]
|
||||
catalog metastore.QueryCoordCatalog
|
||||
}
|
||||
|
||||
func NewCollectionManager(catalog metastore.QueryCoordCatalog) *CollectionManager {
|
||||
return &CollectionManager{
|
||||
collections: make(map[int64]*Collection),
|
||||
partitions: make(map[int64]*Partition),
|
||||
catalog: catalog,
|
||||
collections: make(map[int64]*Collection),
|
||||
partitions: make(map[int64]*Partition),
|
||||
collectionPartitions: make(map[int64]typeutil.Set[typeutil.UniqueID]),
|
||||
catalog: catalog,
|
||||
}
|
||||
}
|
||||
|
||||
@ -175,9 +178,11 @@ func (m *CollectionManager) Recover(broker Broker) error {
|
||||
continue
|
||||
}
|
||||
|
||||
m.partitions[partition.PartitionID] = &Partition{
|
||||
PartitionLoadInfo: partition,
|
||||
}
|
||||
m.putPartition([]*Partition{
|
||||
{
|
||||
PartitionLoadInfo: partition,
|
||||
},
|
||||
}, false)
|
||||
}
|
||||
}
|
||||
|
||||
@ -393,13 +398,7 @@ func (m *CollectionManager) GetPartitionsByCollection(collectionID typeutil.Uniq
|
||||
}
|
||||
|
||||
func (m *CollectionManager) getPartitionsByCollection(collectionID typeutil.UniqueID) []*Partition {
|
||||
partitions := make([]*Partition, 0)
|
||||
for _, partition := range m.partitions {
|
||||
if partition.CollectionID == collectionID {
|
||||
partitions = append(partitions, partition)
|
||||
}
|
||||
}
|
||||
return partitions
|
||||
return lo.Map(m.collectionPartitions[collectionID].Collect(), func(partitionID int64, _ int) *Partition { return m.partitions[partitionID] })
|
||||
}
|
||||
|
||||
func (m *CollectionManager) PutCollection(collection *Collection, partitions ...*Partition) error {
|
||||
@ -429,6 +428,13 @@ func (m *CollectionManager) putCollection(withSave bool, collection *Collection,
|
||||
for _, partition := range partitions {
|
||||
partition.UpdatedAt = time.Now()
|
||||
m.partitions[partition.GetPartitionID()] = partition
|
||||
|
||||
partitions := m.collectionPartitions[collection.CollectionID]
|
||||
if partitions == nil {
|
||||
partitions = make(typeutil.Set[int64])
|
||||
m.collectionPartitions[collection.CollectionID] = partitions
|
||||
}
|
||||
partitions.Insert(partition.GetPartitionID())
|
||||
}
|
||||
collection.UpdatedAt = time.Now()
|
||||
m.collections[collection.CollectionID] = collection
|
||||
@ -463,6 +469,14 @@ func (m *CollectionManager) putPartition(partitions []*Partition, withSave bool)
|
||||
for _, partition := range partitions {
|
||||
partition.UpdatedAt = time.Now()
|
||||
m.partitions[partition.GetPartitionID()] = partition
|
||||
collID := partition.GetCollectionID()
|
||||
|
||||
partitions := m.collectionPartitions[collID]
|
||||
if partitions == nil {
|
||||
partitions = make(typeutil.Set[int64])
|
||||
m.collectionPartitions[collID] = partitions
|
||||
}
|
||||
partitions.Insert(partition.GetPartitionID())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -536,11 +550,10 @@ func (m *CollectionManager) RemoveCollection(collectionID typeutil.UniqueID) err
|
||||
return err
|
||||
}
|
||||
delete(m.collections, collectionID)
|
||||
for partID, partition := range m.partitions {
|
||||
if partition.CollectionID == collectionID {
|
||||
delete(m.partitions, partID)
|
||||
}
|
||||
for _, partition := range m.collectionPartitions[collectionID].Collect() {
|
||||
delete(m.partitions, partition)
|
||||
}
|
||||
delete(m.collectionPartitions, collectionID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -561,8 +574,10 @@ func (m *CollectionManager) removePartition(collectionID typeutil.UniqueID, part
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partitions := m.collectionPartitions[collectionID]
|
||||
for _, id := range partitionIDs {
|
||||
delete(m.partitions, id)
|
||||
delete(partitions, id)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user