enhance: Enable setting the replica number and resource group during collection creation (#34403)

issue: #30040

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-07-10 10:20:13 +08:00 committed by GitHub
parent d60e628aed
commit 9b37d3f517
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 280 additions and 31 deletions

2
go.sum
View File

@ -607,8 +607,6 @@ github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEH
github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016 h1:8WV4maXLeGEyJCCYIc1DmZ18H+VFAjMrwXJg5iI2nX4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53 h1:hLeTFOV/IXUoTbm4slVWFSnR296yALJ8Zo+YCMEvAy0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=

View File

@ -107,23 +107,48 @@ func (broker *CoordinatorBroker) DescribeDatabase(ctx context.Context, dbName st
// try to get database level replica_num and resource groups, return (resource_groups, replica_num, error)
func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) {
// to do by weiliu1031: querycoord should cache mappings: collectionID->dbName
collectionInfo, err := broker.DescribeCollection(ctx, collectionID)
if err != nil {
return nil, 0, err
}
dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
replicaNum, err := common.CollectionLevelReplicaNumber(collectionInfo.GetProperties())
if err != nil {
return nil, 0, err
log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if replicaNum > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
replicaNum, err := common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
rgs, err := common.CollectionLevelResourceGroups(collectionInfo.GetProperties())
if err != nil {
return nil, 0, err
log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if len(rgs) > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
rgs, err := common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
return nil, 0, err
if replicaNum <= 0 || len(rgs) == 0 {
dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
if err != nil {
return nil, 0, err
}
if replicaNum <= 0 {
replicaNum, err = common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
if err != nil {
log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if replicaNum > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
}
if len(rgs) == 0 {
rgs, err = common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if len(rgs) > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
}
}
return rgs, replicaNum, nil

View File

@ -572,7 +572,7 @@ func (s *CoordinatorBrokerRootCoordSuite) TestGetCollectionLoadInfo() {
Properties: []*commonpb.KeyValuePair{},
}, nil)
_, _, err := s.broker.GetCollectionLoadInfo(ctx, 1)
s.Error(err)
s.NoError(err)
s.resetMock()
})
}

View File

@ -217,20 +217,18 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
}
if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use database level config
// when replica number or resource groups is not set, use pre-defined load config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to get data base level load info", zap.Error(err))
}
log.Warn("failed to get pre-defined load info", zap.Error(err))
} else {
if req.GetReplicaNumber() <= 0 && replicas > 0 {
req.ReplicaNumber = int32(replicas)
}
if req.GetReplicaNumber() <= 0 {
log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas))
req.ReplicaNumber = int32(replicas)
}
if len(req.GetResourceGroups()) == 0 {
log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs))
req.ResourceGroups = rgs
if len(req.GetResourceGroups()) == 0 && len(rgs) > 0 {
req.ResourceGroups = rgs
}
}
}

View File

@ -153,6 +153,10 @@ const (
DatabaseDiskQuotaKey = "database.diskQuota.mb"
DatabaseMaxCollectionsKey = "database.max.collections"
DatabaseForceDenyWritingKey = "database.force.deny.writing"
// collection level load properties
CollectionReplicaNumber = "collection.replica.number"
CollectionResourceGroups = "collection.resource_groups"
)
// common properties
@ -259,3 +263,38 @@ func DatabaseLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error)
return nil, fmt.Errorf("database property not found: %s", DatabaseResourceGroups)
}
func CollectionLevelReplicaNumber(kvs []*commonpb.KeyValuePair) (int64, error) {
for _, kv := range kvs {
if kv.Key == CollectionReplicaNumber {
replicaNum, err := strconv.ParseInt(kv.Value, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)
}
return replicaNum, nil
}
}
return 0, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}
func CollectionLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) {
for _, kv := range kvs {
if kv.Key == CollectionResourceGroups {
invalidPropValue := fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)
if len(kv.Value) == 0 {
return nil, invalidPropValue
}
rgs := strings.Split(kv.Value, ",")
if len(rgs) == 0 {
return nil, invalidPropValue
}
return rgs, nil
}
}
return nil, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}

View File

@ -54,29 +54,29 @@ func (s *LoadTestSuite) SetupSuite() {
s.Require().NoError(s.SetupEmbedEtcd())
}
func (s *LoadTestSuite) loadCollection(collectionName string, replica int, rgs []string) {
func (s *LoadTestSuite) loadCollection(collectionName string, db string, replica int, rgs []string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// load
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
DbName: db,
CollectionName: collectionName,
ReplicaNumber: int32(replica),
ResourceGroups: rgs,
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.WaitForLoad(ctx, collectionName)
s.WaitForLoadWithDB(ctx, db, collectionName)
}
func (s *LoadTestSuite) releaseCollection(collectionName string) {
func (s *LoadTestSuite) releaseCollection(db, collectionName string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// load
status, err := s.Cluster.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
DbName: dbName,
DbName: db,
CollectionName: collectionName,
})
s.NoError(err)
@ -171,7 +171,7 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() {
s.Len(resp1.GetProperties(), 2)
// load collection without specified replica and rgs
s.loadCollection(collectionName, 0, nil)
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
@ -179,7 +179,180 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() {
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(collectionName)
s.releaseCollection(dbName, collectionName)
}
func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() {
ctx := context.Background()
// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}
resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)
for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}
s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
ReplicaNumber: 3,
ResourceGroups: rgs,
})
// load collection without specified replica and rgs
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(dbName, collectionName)
}
func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() {
ctx := context.Background()
// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}
resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)
for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}
s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)
newDbName := "db_load_test_with_db_level_config"
resp1, err := s.Cluster.Proxy.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{
DbName: newDbName,
Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseReplicaNumber,
Value: "3",
},
{
Key: common.DatabaseResourceGroups,
Value: strings.Join(rgs, ","),
},
},
})
s.NoError(err)
s.True(merr.Ok(resp1))
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: newDbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
})
// load collection without specified replica and rgs
s.loadCollection(collectionName, newDbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: newDbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(newDbName, collectionName)
}
func TestReplicas(t *testing.T) {

View File

@ -2,12 +2,16 @@ package integration
import (
"context"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
@ -20,6 +24,8 @@ type CreateCollectionConfig struct {
SegmentNum int
RowNumPerSegment int
Dim int
ReplicaNumber int32
ResourceGroups []string
}
func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) {
@ -33,12 +39,22 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context
CollectionName: cfg.CollectionName,
Schema: marshaledSchema,
ShardsNum: int32(cfg.ChannelNum),
Properties: []*commonpb.KeyValuePair{
{
Key: common.CollectionReplicaNumber,
Value: strconv.FormatInt(int64(cfg.ReplicaNumber), 10),
},
{
Key: common.CollectionResourceGroups,
Value: strings.Join(cfg.ResourceGroups, ","),
},
},
})
s.NoError(err)
s.True(merr.Ok(createCollectionStatus))
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{DbName: cfg.DBName})
s.NoError(err)
s.True(merr.Ok(showCollectionsResp.Status))
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
@ -80,5 +96,5 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context
})
s.NoError(err)
s.True(merr.Ok(createIndexStatus))
s.WaitForIndexBuilt(ctx, cfg.CollectionName, FloatVecField)
s.WaitForIndexBuiltWithDB(ctx, cfg.DBName, cfg.CollectionName, FloatVecField)
}