enhance: enable to set load config in cluster level (#35293)

issue: #35170
pr: #35169
This PR enable to set load configs in cluster level, such as replicas
and resource groups. then when load collections will use the load
config.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-08-07 12:38:21 +08:00 committed by GitHub
parent 4f0c1982d3
commit 0201e00a2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 124 additions and 1 deletions

View File

@ -151,6 +151,22 @@ func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, coll
} }
} }
if replicaNum <= 0 || len(rgs) == 0 {
if replicaNum <= 0 {
replicaNum = paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.GetAsInt64()
if replicaNum > 0 {
log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
}
if len(rgs) == 0 {
rgs = paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.GetAsStrings()
if len(rgs) > 0 {
log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
}
}
return rgs, replicaNum, nil return rgs, replicaNum, nil
} }

View File

@ -1660,6 +1660,8 @@ type queryCoordConfig struct {
CheckExecutedFlagInterval ParamItem `refreshable:"false"` CheckExecutedFlagInterval ParamItem `refreshable:"false"`
UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"` UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"`
CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"` CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"`
ClusterLevelLoadReplicaNumber ParamItem `refreshable:"true"`
ClusterLevelLoadResourceGroups ParamItem `refreshable:"true"`
} }
func (p *queryCoordConfig) init(base *BaseTable) { func (p *queryCoordConfig) init(base *BaseTable) {
@ -2193,6 +2195,24 @@ func (p *queryCoordConfig) init(base *BaseTable) {
Export: false, Export: false,
} }
p.CollectionBalanceSegmentBatchSize.Init(base.mgr) p.CollectionBalanceSegmentBatchSize.Init(base.mgr)
p.ClusterLevelLoadReplicaNumber = ParamItem{
Key: "queryCoord.clusterLevelLoadReplicaNumber",
Version: "2.4.7",
DefaultValue: "0",
Doc: "the cluster level default value for load replica number",
Export: false,
}
p.ClusterLevelLoadReplicaNumber.Init(base.mgr)
p.ClusterLevelLoadResourceGroups = ParamItem{
Key: "queryCoord.clusterLevelLoadResourceGroups",
Version: "2.4.7",
DefaultValue: "",
Doc: "resource group names for load collection should be at least equal to queryCoord.clusterLevelLoadReplicaNumber, separate with commas",
Export: false,
}
p.ClusterLevelLoadResourceGroups.Init(base.mgr)
} }
// ///////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////

View File

@ -103,7 +103,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, "defaultMilvus", Params.DefaultRootPassword.GetValue()) assert.Equal(t, "defaultMilvus", Params.DefaultRootPassword.GetValue())
params.Save("common.security.superUsers", "") params.Save("common.security.superUsers", "")
assert.Equal(t, []string{""}, Params.SuperUsers.GetAsStrings()) assert.Equal(t, []string{}, Params.SuperUsers.GetAsStrings())
assert.Equal(t, false, Params.PreCreatedTopicEnabled.GetAsBool()) assert.Equal(t, false, Params.PreCreatedTopicEnabled.GetAsBool())
@ -340,6 +340,9 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 0.1, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) assert.Equal(t, 0.1, Params.DelegatorMemoryOverloadFactor.GetAsFloat())
assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt()) assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt())
assert.Equal(t, 0, Params.ClusterLevelLoadReplicaNumber.GetAsInt())
assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0)
}) })
t.Run("test queryNodeConfig", func(t *testing.T) { t.Run("test queryNodeConfig", func(t *testing.T) {

View File

@ -313,6 +313,9 @@ func ParseAsStings(v string) []string {
} }
func getAsStrings(v string) []string { func getAsStrings(v string) []string {
if len(v) == 0 {
return []string{}
}
return getAndConvert(v, func(value string) ([]string, error) { return getAndConvert(v, func(value string) ([]string, error) {
return strings.Split(value, ","), nil return strings.Split(value, ","), nil
}, []string{}) }, []string{})

View File

@ -355,6 +355,87 @@ func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() {
s.releaseCollection(newDbName, collectionName) s.releaseCollection(newDbName, collectionName)
} }
func (s *LoadTestSuite) TestLoadWithPredefineClusterLevelConfig() {
ctx := context.Background()
// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}
resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)
for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}
s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
})
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key, "3")
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key, strings.Join(rgs, ","))
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key)
// load collection without specified replica and rgs
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(dbName, collectionName)
}
func TestReplicas(t *testing.T) { func TestReplicas(t *testing.T) {
suite.Run(t, new(LoadTestSuite)) suite.Run(t, new(LoadTestSuite))
} }