enhance: enable to set load config in cluster level (#35169)

issue: #35170
This PR enable to set load configs in cluster level, such as replicas
and resource groups. then when load collections will use the load
config.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-08-07 12:38:21 +08:00 committed by GitHub
parent c725416288
commit 344dc6a9f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 124 additions and 1 deletions

View File

@ -151,6 +151,22 @@ func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, coll
}
}
if replicaNum <= 0 || len(rgs) == 0 {
if replicaNum <= 0 {
replicaNum = paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.GetAsInt64()
if replicaNum > 0 {
log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
}
if len(rgs) == 0 {
rgs = paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.GetAsStrings()
if len(rgs) > 0 {
log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
}
}
return rgs, replicaNum, nil
}

View File

@ -1702,6 +1702,8 @@ type queryCoordConfig struct {
CheckExecutedFlagInterval ParamItem `refreshable:"false"`
CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"`
UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"`
ClusterLevelLoadReplicaNumber ParamItem `refreshable:"true"`
ClusterLevelLoadResourceGroups ParamItem `refreshable:"true"`
}
func (p *queryCoordConfig) init(base *BaseTable) {
@ -2238,6 +2240,24 @@ If this parameter is set false, Milvus simply searches the growing segments with
Export: false,
}
p.CollectionBalanceSegmentBatchSize.Init(base.mgr)
p.ClusterLevelLoadReplicaNumber = ParamItem{
Key: "queryCoord.clusterLevelLoadReplicaNumber",
Version: "2.4.7",
DefaultValue: "0",
Doc: "the cluster level default value for load replica number",
Export: false,
}
p.ClusterLevelLoadReplicaNumber.Init(base.mgr)
p.ClusterLevelLoadResourceGroups = ParamItem{
Key: "queryCoord.clusterLevelLoadResourceGroups",
Version: "2.4.7",
DefaultValue: "",
Doc: "resource group names for load collection should be at least equal to queryCoord.clusterLevelLoadReplicaNumber, separate with commas",
Export: false,
}
p.ClusterLevelLoadResourceGroups.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -108,7 +108,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, "defaultMilvus", Params.DefaultRootPassword.GetValue())
params.Save("common.security.superUsers", "")
assert.Equal(t, []string{""}, Params.SuperUsers.GetAsStrings())
assert.Equal(t, []string{}, Params.SuperUsers.GetAsStrings())
assert.Equal(t, false, Params.PreCreatedTopicEnabled.GetAsBool())
@ -345,6 +345,9 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 0.1, Params.DelegatorMemoryOverloadFactor.GetAsFloat())
assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt())
assert.Equal(t, 0, Params.ClusterLevelLoadReplicaNumber.GetAsInt())
assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0)
})
t.Run("test queryNodeConfig", func(t *testing.T) {

View File

@ -331,6 +331,9 @@ func ParseAsStings(v string) []string {
}
func getAsStrings(v string) []string {
if len(v) == 0 {
return []string{}
}
return getAndConvert(v, func(value string) ([]string, error) {
return strings.Split(value, ","), nil
}, []string{})

View File

@ -355,6 +355,87 @@ func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() {
s.releaseCollection(newDbName, collectionName)
}
func (s *LoadTestSuite) TestLoadWithPredefineClusterLevelConfig() {
ctx := context.Background()
// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}
resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)
for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}
s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
})
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key, "3")
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key, strings.Join(rgs, ","))
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key)
// load collection without specified replica and rgs
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(dbName, collectionName)
}
func TestReplicas(t *testing.T) {
suite.Run(t, new(LoadTestSuite))
}