mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
fix: failed to release collection with more than 128 partitions (#28446)
#28343 Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
parent
a1c505dbd5
commit
7f46f4c628
@ -21,6 +21,8 @@ const (
|
||||
CollectionMetaPrefixV1 = "queryCoord-collectionMeta"
|
||||
ReplicaMetaPrefixV1 = "queryCoord-ReplicaMeta"
|
||||
ResourceGroupPrefix = "queryCoord-ResourceGroup"
|
||||
|
||||
MetaOpsBatchSize = 128
|
||||
)
|
||||
|
||||
type Catalog struct {
|
||||
@ -195,27 +197,42 @@ func (s Catalog) ReleaseCollection(collection int64) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partitions := make([]*querypb.PartitionLoadInfo, 0)
|
||||
partitionIDs := make([]int64, 0)
|
||||
for _, v := range values {
|
||||
info := querypb.PartitionLoadInfo{}
|
||||
if err = proto.Unmarshal([]byte(v), &info); err != nil {
|
||||
return err
|
||||
}
|
||||
partitions = append(partitions, &info)
|
||||
partitionIDs = append(partitionIDs, (&info).GetPartitionID())
|
||||
}
|
||||
// remove collection and obtained partitions
|
||||
keys := lo.Map(partitions, func(partition *querypb.PartitionLoadInfo, _ int) string {
|
||||
return EncodePartitionLoadInfoKey(collection, partition.GetPartitionID())
|
||||
})
|
||||
k := EncodeCollectionLoadInfoKey(collection)
|
||||
keys = append(keys, k)
|
||||
return s.cli.MultiRemove(keys)
|
||||
collectionKey := EncodeCollectionLoadInfoKey(collection)
|
||||
err = s.cli.Remove(collectionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.ReleasePartition(collection, partitionIDs...)
|
||||
}
|
||||
|
||||
func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error {
|
||||
keys := lo.Map(partitions, func(partition int64, _ int) string {
|
||||
return EncodePartitionLoadInfoKey(collection, partition)
|
||||
})
|
||||
if len(partitions) >= MetaOpsBatchSize {
|
||||
index := 0
|
||||
for index < len(partitions) {
|
||||
endIndex := index + MetaOpsBatchSize
|
||||
if endIndex > len(partitions) {
|
||||
endIndex = len(partitions)
|
||||
}
|
||||
err := s.cli.MultiRemove(keys[index:endIndex])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
index = endIndex
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return s.cli.MultiRemove(keys)
|
||||
}
|
||||
|
||||
|
@ -121,6 +121,23 @@ func (suite *CatalogTestSuite) TestPartition() {
|
||||
suite.Len(partitions, 1)
|
||||
}
|
||||
|
||||
func (suite *CatalogTestSuite) TestReleaseManyPartitions() {
|
||||
partitionIds := make([]int64, 0)
|
||||
for i := 1; i <= 150; i++ {
|
||||
suite.catalog.SavePartition(&querypb.PartitionLoadInfo{
|
||||
CollectionID: 1,
|
||||
PartitionID: int64(i),
|
||||
})
|
||||
partitionIds = append(partitionIds, int64(i))
|
||||
}
|
||||
|
||||
err := suite.catalog.ReleasePartition(1, partitionIds...)
|
||||
suite.NoError(err)
|
||||
partitions, err := suite.catalog.GetPartitions()
|
||||
suite.NoError(err)
|
||||
suite.Len(partitions, 0)
|
||||
}
|
||||
|
||||
func (suite *CatalogTestSuite) TestReplica() {
|
||||
suite.catalog.SaveReplica(&querypb.Replica{
|
||||
CollectionID: 1,
|
||||
|
Loading…
Reference in New Issue
Block a user