milvus/internal/querycoordv2/meta/resource_manager.go
chyezh a8c8a6bb0f
fix: parameter check of TransferReplica and TransferNode (#32297)
issue: #30647 

- Same dst and src resource group should not be allowed in
`TransferReplica` and `TransferNode`.

-  Remove redundant parameter check.

Signed-off-by: chyezh <chyezh@outlook.com>
2024-04-17 15:27:19 +08:00

902 lines
30 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package meta
import (
"fmt"
"sync"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var ErrNodeNotEnough = errors.New("nodes not enough")
type ResourceManager struct {
incomingNode typeutil.UniqueSet // incomingNode is a temporary set for incoming hangup node,
// after node is assigned to resource group, it will be removed from this set.
groups map[string]*ResourceGroup // primary index from resource group name to resource group
nodeIDMap map[int64]string // secondary index from node id to resource group
catalog metastore.QueryCoordCatalog
nodeMgr *session.NodeManager // TODO: ResourceManager is watch node status with service discovery, so it can handle node up and down as fast as possible.
// All function can get latest online node without checking with node manager.
// so node manager is a redundant type here.
rwmutex sync.RWMutex
rgChangedNotifier *syncutil.VersionedNotifier // used to notify that resource group has been changed.
// resource_observer will listen this notifier to do a resource group recovery.
nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node distribution in resource group has been changed.
// replica_observer will listen this notifier to do a replica recovery.
}
// NewResourceManager is used to create a ResourceManager instance.
func NewResourceManager(catalog metastore.QueryCoordCatalog, nodeMgr *session.NodeManager) *ResourceManager {
groups := make(map[string]*ResourceGroup)
// Always create a default resource group to keep compatibility.
groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity))
return &ResourceManager{
incomingNode: typeutil.NewUniqueSet(),
groups: groups,
nodeIDMap: make(map[int64]string),
catalog: catalog,
nodeMgr: nodeMgr,
rwmutex: sync.RWMutex{},
rgChangedNotifier: syncutil.NewVersionedNotifier(),
nodeChangedNotifier: syncutil.NewVersionedNotifier(),
}
}
// Recover recover resource group from meta, other interface of ResourceManager can be only called after recover is done.
func (rm *ResourceManager) Recover() error {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
rgs, err := rm.catalog.GetResourceGroups()
if err != nil {
return errors.Wrap(err, "failed to recover resource group from store")
}
// Resource group meta upgrade to latest version.
upgrades := make([]*querypb.ResourceGroup, 0)
for _, meta := range rgs {
needUpgrade := meta.Config == nil
rg := NewResourceGroupFromMeta(meta)
rm.groups[rg.GetName()] = rg
for _, node := range rg.GetNodes() {
if _, ok := rm.nodeIDMap[node]; ok {
// unreachable code, should never happen.
panic(fmt.Sprintf("dirty meta, node has been assign to multi resource group, %s, %s", rm.nodeIDMap[node], rg.GetName()))
}
rm.nodeIDMap[node] = rg.GetName()
}
log.Info("Recover resource group",
zap.String("rgName", rg.GetName()),
zap.Int64s("nodes", rm.groups[rg.GetName()].GetNodes()),
zap.Any("config", rg.GetConfig()),
)
if needUpgrade {
upgrades = append(upgrades, rg.GetMeta())
}
}
if len(upgrades) > 0 {
log.Info("upgrade resource group meta into latest", zap.Int("num", len(upgrades)))
return rm.catalog.SaveResourceGroup(upgrades...)
}
return nil
}
// AddResourceGroup create a new ResourceGroup.
// Do no changed with node, all node will be reassign to new resource group by auto recover.
func (rm *ResourceManager) AddResourceGroup(rgName string, cfg *rgpb.ResourceGroupConfig) error {
if len(rgName) == 0 {
return merr.WrapErrParameterMissing("resource group name couldn't be empty")
}
if cfg == nil {
// Use default config if not set, compatible with old client.
cfg = newResourceGroupConfig(0, 0)
}
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
if rm.groups[rgName] != nil {
// Idempotent promise.
// If resource group already exist, check if configuration is the same,
if proto.Equal(rm.groups[rgName].GetConfig(), cfg) {
return nil
}
return merr.WrapErrResourceGroupAlreadyExist(rgName)
}
maxResourceGroup := paramtable.Get().QuotaConfig.MaxResourceGroupNumOfQueryNode.GetAsInt()
if len(rm.groups) >= maxResourceGroup {
return merr.WrapErrResourceGroupReachLimit(rgName, maxResourceGroup)
}
if err := rm.validateResourceGroupConfig(rgName, cfg); err != nil {
return err
}
rg := NewResourceGroup(rgName, cfg)
if err := rm.catalog.SaveResourceGroup(rg.GetMeta()); err != nil {
log.Warn("failed to add resource group",
zap.String("rgName", rgName),
zap.Any("config", cfg),
zap.Error(err),
)
return merr.WrapErrResourceGroupServiceAvailable()
}
rm.groups[rgName] = rg
log.Info("add resource group",
zap.String("rgName", rgName),
zap.Any("config", cfg),
)
// notify that resource group config has been changed.
rm.rgChangedNotifier.NotifyAll()
return nil
}
// UpdateResourceGroups update resource group configuration.
// Only change the configuration, no change with node. all node will be reassign by auto recover.
func (rm *ResourceManager) UpdateResourceGroups(rgs map[string]*rgpb.ResourceGroupConfig) error {
if len(rgs) == 0 {
return nil
}
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
return rm.updateResourceGroups(rgs)
}
// updateResourceGroups update resource group configuration.
func (rm *ResourceManager) updateResourceGroups(rgs map[string]*rgpb.ResourceGroupConfig) error {
modifiedRG := make([]*ResourceGroup, 0, len(rgs))
updates := make([]*querypb.ResourceGroup, 0, len(rgs))
for rgName, cfg := range rgs {
if _, ok := rm.groups[rgName]; !ok {
return merr.WrapErrResourceGroupNotFound(rgName)
}
if err := rm.validateResourceGroupConfig(rgName, cfg); err != nil {
return err
}
// Update with copy on write.
mrg := rm.groups[rgName].CopyForWrite()
mrg.UpdateConfig(cfg)
rg := mrg.ToResourceGroup()
updates = append(updates, rg.GetMeta())
modifiedRG = append(modifiedRG, rg)
}
if err := rm.catalog.SaveResourceGroup(updates...); err != nil {
for rgName, cfg := range rgs {
log.Warn("failed to update resource group",
zap.String("rgName", rgName),
zap.Any("config", cfg),
zap.Error(err),
)
}
return merr.WrapErrResourceGroupServiceAvailable()
}
// Commit updates to memory.
for _, rg := range modifiedRG {
log.Info("update resource group",
zap.String("rgName", rg.GetName()),
zap.Any("config", rg.GetConfig()),
)
rm.groups[rg.GetName()] = rg
}
// notify that resource group config has been changed.
rm.rgChangedNotifier.NotifyAll()
return nil
}
// go:deprecated TransferNode transfer node from source resource group to target resource group.
// Deprecated, use Declarative API `UpdateResourceGroups` instead.
func (rm *ResourceManager) TransferNode(sourceRGName string, targetRGName string, nodeNum int) error {
if sourceRGName == targetRGName {
return merr.WrapErrParameterInvalidMsg("source resource group and target resource group should not be the same, resource group: %s", sourceRGName)
}
if nodeNum <= 0 {
return merr.WrapErrParameterInvalid("NumNode > 0", fmt.Sprintf("invalid NumNode %d", nodeNum))
}
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
if rm.groups[sourceRGName] == nil {
return merr.WrapErrResourceGroupNotFound(sourceRGName)
}
if rm.groups[targetRGName] == nil {
return merr.WrapErrResourceGroupNotFound(targetRGName)
}
sourceRG := rm.groups[sourceRGName]
targetRG := rm.groups[targetRGName]
// Check if source resource group has enough node to transfer.
if len(sourceRG.GetNodes()) < nodeNum {
return merr.WrapErrResourceGroupNodeNotEnough(sourceRGName, len(sourceRG.GetNodes()), nodeNum)
}
// Compatible with old version.
sourceCfg := sourceRG.GetConfigCloned()
targetCfg := targetRG.GetConfigCloned()
sourceCfg.Requests.NodeNum -= int32(nodeNum)
if sourceCfg.Requests.NodeNum < 0 {
sourceCfg.Requests.NodeNum = 0
}
targetCfg.Requests.NodeNum += int32(nodeNum)
if targetCfg.Requests.NodeNum > targetCfg.Limits.NodeNum {
targetCfg.Limits.NodeNum = targetCfg.Requests.NodeNum
}
// transfer node from source resource group to target resource group at high priority.
targetCfg.TransferFrom = append(targetCfg.TransferFrom, &rgpb.ResourceGroupTransfer{
ResourceGroup: sourceRGName,
})
return rm.updateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
sourceRGName: sourceCfg,
targetRGName: targetCfg,
})
}
// RemoveResourceGroup remove resource group.
func (rm *ResourceManager) RemoveResourceGroup(rgName string) error {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
if rm.groups[rgName] == nil {
// Idempotent promise: delete a non-exist rg should be ok
return nil
}
// validateResourceGroupIsDeletable will check if rg is deletable.
if err := rm.validateResourceGroupIsDeletable(rgName); err != nil {
return err
}
// Nodes may be still assign to these group,
// recover the resource group from redundant status before remove it.
if rm.groups[rgName].NodeNum() > 0 {
if err := rm.recoverRedundantNodeRG(rgName); err != nil {
log.Info("failed to recover redundant node resource group before remove it",
zap.String("rgName", rgName),
zap.Error(err),
)
return err
}
}
// Remove it from meta storage.
if err := rm.catalog.RemoveResourceGroup(rgName); err != nil {
log.Info("failed to remove resource group",
zap.String("rgName", rgName),
zap.Error(err),
)
return merr.WrapErrResourceGroupServiceAvailable()
}
// After recovering, all node assigned to these rg has been removed.
// no secondary index need to be removed.
delete(rm.groups, rgName)
log.Info("remove resource group",
zap.String("rgName", rgName),
)
// notify that resource group has been changed.
rm.rgChangedNotifier.NotifyAll()
return nil
}
// GetNodesOfMultiRG return nodes of multi rg, it can be used to get a consistent view of nodes of multi rg.
func (rm *ResourceManager) GetNodesOfMultiRG(rgName []string) (map[string]typeutil.UniqueSet, error) {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
ret := make(map[string]typeutil.UniqueSet)
for _, name := range rgName {
if rm.groups[name] == nil {
return nil, merr.WrapErrResourceGroupNotFound(name)
}
ret[name] = typeutil.NewUniqueSet(rm.groups[name].GetNodes()...)
}
return ret, nil
}
// GetNodes return nodes of given resource group.
func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
if rm.groups[rgName] == nil {
return nil, merr.WrapErrResourceGroupNotFound(rgName)
}
return rm.groups[rgName].GetNodes(), nil
}
// GetOutgoingNodeNumByReplica return outgoing node num on each rg from this replica.
func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[string]int32 {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
if rm.groups[replica.GetResourceGroup()] == nil {
return nil
}
rg := rm.groups[replica.GetResourceGroup()]
ret := make(map[string]int32)
replica.RangeOverRONodes(func(node int64) bool {
// if rgOfNode is not equal to rg of replica, outgoing node found.
if rgOfNode := rm.getResourceGroupByNodeID(node); rgOfNode != nil && rgOfNode.GetName() != rg.GetName() {
ret[rgOfNode.GetName()]++
}
return true
})
return ret
}
// getResourceGroupByNodeID get resource group by node id.
func (rm *ResourceManager) getResourceGroupByNodeID(nodeID int64) *ResourceGroup {
if rgName, ok := rm.nodeIDMap[nodeID]; ok {
return rm.groups[rgName]
}
return nil
}
// ContainsNode return whether given node is in given resource group.
func (rm *ResourceManager) ContainsNode(rgName string, node int64) bool {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
if rm.groups[rgName] == nil {
return false
}
return rm.groups[rgName].ContainNode(node)
}
// ContainResourceGroup return whether given resource group is exist.
func (rm *ResourceManager) ContainResourceGroup(rgName string) bool {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
return rm.groups[rgName] != nil
}
// GetResourceGroup return resource group snapshot by name.
func (rm *ResourceManager) GetResourceGroup(rgName string) *ResourceGroup {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
if rm.groups[rgName] == nil {
return nil
}
return rm.groups[rgName].Snapshot()
}
// ListResourceGroups return all resource groups names.
func (rm *ResourceManager) ListResourceGroups() []string {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
return lo.Keys(rm.groups)
}
// MeetRequirement return whether resource group meet requirement.
// Return error with reason if not meet requirement.
func (rm *ResourceManager) MeetRequirement(rgName string) error {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
if rm.groups[rgName] == nil {
return nil
}
return rm.groups[rgName].MeetRequirement()
}
// CheckIncomingNodeNum return incoming node num.
func (rm *ResourceManager) CheckIncomingNodeNum() int {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
return rm.incomingNode.Len()
}
// HandleNodeUp handle node when new node is incoming.
func (rm *ResourceManager) HandleNodeUp(node int64) {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
rm.incomingNode.Insert(node)
// Trigger assign incoming node right away.
// error can be ignored here, because `AssignPendingIncomingNode`` will retry assign node.
rgName, err := rm.assignIncomingNodeWithNodeCheck(node)
log.Info("HandleNodeUp: add node to resource group",
zap.String("rgName", rgName),
zap.Int64("node", node),
zap.Error(err),
)
}
// HandleNodeDown handle the node when node is leave.
func (rm *ResourceManager) HandleNodeDown(node int64) {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
// failure of node down can be ignored, node down can be done by `RemoveAllDownNode`.
rm.incomingNode.Remove(node)
rgName, err := rm.unassignNode(node)
log.Info("HandleNodeDown: remove node from resource group",
zap.String("rgName", rgName),
zap.Int64("node", node),
zap.Error(err),
)
}
// ListenResourceGroupChanged return a listener for resource group changed.
func (rm *ResourceManager) ListenResourceGroupChanged() *syncutil.VersionedListener {
return rm.rgChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
}
// ListenNodeChanged return a listener for node changed.
func (rm *ResourceManager) ListenNodeChanged() *syncutil.VersionedListener {
return rm.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
}
// AssignPendingIncomingNode assign incoming node to resource group.
func (rm *ResourceManager) AssignPendingIncomingNode() {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
for node := range rm.incomingNode {
rgName, err := rm.assignIncomingNodeWithNodeCheck(node)
log.Info("Pending HandleNodeUp: add node to resource group",
zap.String("rgName", rgName),
zap.Int64("node", node),
zap.Error(err),
)
}
}
// RemoveAllDownNode remove all down node from resource group.
func (rm *ResourceManager) RemoveAllDownNode() {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
for nodeID := range rm.nodeIDMap {
if node := rm.nodeMgr.Get(nodeID); node == nil || node.IsStoppingState() {
// unassignNode failure can be skip.
rgName, err := rm.unassignNode(nodeID)
log.Info("remove down node from resource group",
zap.Bool("nodeExist", node != nil),
zap.Int64("nodeID", nodeID),
zap.String("rgName", rgName),
zap.Error(err),
)
}
}
}
// AutoRecoverResourceGroup auto recover rg, return recover used node num
func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) error {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
rg := rm.groups[rgName]
if rg == nil {
return nil
}
if rg.MissingNumOfNodes() > 0 {
return rm.recoverMissingNodeRG(rgName)
}
// DefaultResourceGroup is the backup resource group of redundant recovery,
// So after all other resource group is reach the `limits`, rest redundant node will be transfer to DefaultResourceGroup.
if rg.RedundantNumOfNodes() > 0 {
return rm.recoverRedundantNodeRG(rgName)
}
return nil
}
// recoverMissingNodeRG recover resource group by transfer node from other resource group.
func (rm *ResourceManager) recoverMissingNodeRG(rgName string) error {
for rm.groups[rgName].MissingNumOfNodes() > 0 {
rg := rm.groups[rgName]
sourceRG := rm.selectMissingRecoverSourceRG(rg)
if sourceRG == nil {
log.Warn("fail to select source resource group", zap.String("rgName", rg.GetName()))
return ErrNodeNotEnough
}
nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, rg)
if err != nil {
log.Warn("failed to recover missing node by transfer node from other resource group",
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", rg.GetName()),
zap.Error(err))
return err
}
log.Info("recover missing node by transfer node from other resource group",
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", rg.GetName()),
zap.Int64("nodeID", nodeID),
)
}
return nil
}
// selectMissingRecoverSourceRG select source resource group for recover missing resource group.
func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup) *ResourceGroup {
// First, Transfer node from most redundant resource group first. `len(nodes) > limits`
if redundantRG := rm.findMaxRGWithGivenFilter(
func(sourceRG *ResourceGroup) bool {
return rg.GetName() != sourceRG.GetName() && sourceRG.RedundantNumOfNodes() > 0
},
func(sourceRG *ResourceGroup) int {
return sourceRG.RedundantNumOfNodes()
},
); redundantRG != nil {
return redundantRG
}
// Second, Transfer node from most oversized resource group. `len(nodes) > requests`
// `TransferFrom` configured resource group at high priority.
return rm.findMaxRGWithGivenFilter(
func(sourceRG *ResourceGroup) bool {
return rg.GetName() != sourceRG.GetName() && sourceRG.OversizedNumOfNodes() > 0
},
func(sourceRG *ResourceGroup) int {
if rg.HasFrom(sourceRG.GetName()) {
// give a boost if sourceRG is configured as `TransferFrom` to set as high priority to select.
return sourceRG.OversizedNumOfNodes() * resourceGroupTransferBoost
}
return sourceRG.OversizedNumOfNodes()
})
}
// recoverRedundantNodeRG recover resource group by transfer node to other resource group.
func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error {
for rm.groups[rgName].RedundantNumOfNodes() > 0 {
rg := rm.groups[rgName]
targetRG := rm.selectRedundantRecoverTargetRG(rg)
if targetRG == nil {
log.Info("failed to select redundant recover target resource group, please check resource group configuration if as expected.",
zap.String("rgName", rg.GetName()))
return errors.New("all resource group reach limits")
}
nodeID, err := rm.transferOneNodeFromRGToRG(rg, targetRG)
if err != nil {
log.Warn("failed to recover redundant node by transfer node to other resource group",
zap.String("sourceRG", rg.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Error(err))
return err
}
log.Info("recover redundant node by transfer node to other resource group",
zap.String("sourceRG", rg.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", nodeID),
)
}
return nil
}
// selectRedundantRecoverTargetRG select target resource group for recover redundant resource group.
func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup) *ResourceGroup {
// First, Transfer node to most missing resource group first.
if missingRG := rm.findMaxRGWithGivenFilter(
func(targetRG *ResourceGroup) bool {
return rg.GetName() != targetRG.GetName() && targetRG.MissingNumOfNodes() > 0
},
func(targetRG *ResourceGroup) int {
return targetRG.MissingNumOfNodes()
},
); missingRG != nil {
return missingRG
}
// Second, Transfer node to max reachLimit resource group.
// `TransferTo` configured resource group at high priority.
if selectRG := rm.findMaxRGWithGivenFilter(
func(targetRG *ResourceGroup) bool {
return rg.GetName() != targetRG.GetName() && targetRG.ReachLimitNumOfNodes() > 0
},
func(targetRG *ResourceGroup) int {
if rg.HasTo(targetRG.GetName()) {
// give a boost if targetRG is configured as `TransferTo` to set as high priority to select.
return targetRG.ReachLimitNumOfNodes() * resourceGroupTransferBoost
}
return targetRG.ReachLimitNumOfNodes()
},
); selectRG != nil {
return selectRG
}
// Finally, Always transfer node to default resource group.
if rg.GetName() != DefaultResourceGroupName {
return rm.groups[DefaultResourceGroupName]
}
return nil
}
// transferOneNodeFromRGToRG transfer one node from source resource group to target resource group.
func (rm *ResourceManager) transferOneNodeFromRGToRG(sourceRG *ResourceGroup, targetRG *ResourceGroup) (int64, error) {
if sourceRG.NodeNum() == 0 {
return -1, ErrNodeNotEnough
}
// TODO: select node by some load strategy, such as segment loaded.
node := sourceRG.GetNodes()[0]
if err := rm.transferNode(targetRG.GetName(), node); err != nil {
return -1, err
}
return node, nil
}
// assignIncomingNodeWithNodeCheck assign node to resource group with node status check.
func (rm *ResourceManager) assignIncomingNodeWithNodeCheck(node int64) (string, error) {
// node is on stopping or stopped, remove it from incoming node set.
if rm.nodeMgr.Get(node) == nil {
rm.incomingNode.Remove(node)
return "", errors.New("node is not online")
}
if ok, _ := rm.nodeMgr.IsStoppingNode(node); ok {
rm.incomingNode.Remove(node)
return "", errors.New("node has been stopped")
}
rgName, err := rm.assignIncomingNode(node)
if err != nil {
return "", err
}
// node assignment is finished, remove the node from incoming node set.
rm.incomingNode.Remove(node)
return rgName, nil
}
// assignIncomingNode assign node to resource group.
func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) {
// If node already assign to rg.
rg := rm.getResourceGroupByNodeID(node)
if rg != nil {
log.Info("HandleNodeUp: node already assign to resource group",
zap.String("rgName", rg.GetName()),
zap.Int64("node", node),
)
return rg.GetName(), nil
}
// select a resource group to assign incoming node.
rg = rm.mustSelectAssignIncomingNodeTargetRG()
if err := rm.transferNode(rg.GetName(), node); err != nil {
return "", errors.Wrap(err, "at finally assign to default resource group")
}
return rg.GetName(), nil
}
// mustSelectAssignIncomingNodeTargetRG select resource group for assign incoming node.
func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup {
// First, Assign it to rg with the most missing nodes at high priority.
if rg := rm.findMaxRGWithGivenFilter(
func(rg *ResourceGroup) bool {
return rg.MissingNumOfNodes() > 0
},
func(rg *ResourceGroup) int {
return rg.MissingNumOfNodes()
},
); rg != nil {
return rg
}
// Second, assign it to rg do not reach limit.
if rg := rm.findMaxRGWithGivenFilter(
func(rg *ResourceGroup) bool {
return rg.ReachLimitNumOfNodes() > 0
},
func(rg *ResourceGroup) int {
return rg.ReachLimitNumOfNodes()
},
); rg != nil {
return rg
}
// Finally, add node to default rg.
return rm.groups[DefaultResourceGroupName]
}
// findMaxRGWithGivenFilter find resource group with given filter and return the max one.
// not efficient, but it's ok for low nodes and low resource group.
func (rm *ResourceManager) findMaxRGWithGivenFilter(filter func(rg *ResourceGroup) bool, attr func(rg *ResourceGroup) int) *ResourceGroup {
var maxRG *ResourceGroup
for _, rg := range rm.groups {
if filter == nil || filter(rg) {
if maxRG == nil || attr(rg) > attr(maxRG) {
maxRG = rg
}
}
}
return maxRG
}
// transferNode transfer given node to given resource group.
// if given node is assigned in given resource group, do nothing.
// if given node is assigned to other resource group, it will be unassigned first.
func (rm *ResourceManager) transferNode(rgName string, node int64) error {
if rm.groups[rgName] == nil {
return merr.WrapErrResourceGroupNotFound(rgName)
}
updates := make([]*querypb.ResourceGroup, 0, 2)
modifiedRG := make([]*ResourceGroup, 0, 2)
originalRG := "_"
// Check if node is already assign to rg.
if rg := rm.getResourceGroupByNodeID(node); rg != nil {
if rg.GetName() == rgName {
// node is already assign to rg.
log.Info("node already assign to resource group",
zap.String("rgName", rgName),
zap.Int64("node", node),
)
return nil
}
// Apply update.
mrg := rg.CopyForWrite()
mrg.UnassignNode(node)
rg := mrg.ToResourceGroup()
updates = append(updates, rg.GetMeta())
modifiedRG = append(modifiedRG, rg)
originalRG = rg.GetName()
}
// assign the node to rg.
mrg := rm.groups[rgName].CopyForWrite()
mrg.AssignNode(node)
rg := mrg.ToResourceGroup()
updates = append(updates, rg.GetMeta())
modifiedRG = append(modifiedRG, rg)
// Commit updates to meta storage.
if err := rm.catalog.SaveResourceGroup(updates...); err != nil {
log.Warn("failed to transfer node to resource group",
zap.String("rgName", rgName),
zap.String("originalRG", originalRG),
zap.Int64("node", node),
zap.Error(err),
)
return merr.WrapErrResourceGroupServiceAvailable()
}
// Commit updates to memory.
for _, rg := range modifiedRG {
rm.groups[rg.GetName()] = rg
}
rm.nodeIDMap[node] = rgName
log.Info("transfer node to resource group",
zap.String("rgName", rgName),
zap.String("originalRG", originalRG),
zap.Int64("node", node),
)
// notify that node distribution has been changed.
rm.nodeChangedNotifier.NotifyAll()
return nil
}
// unassignNode remove a node from resource group where it belongs to.
func (rm *ResourceManager) unassignNode(node int64) (string, error) {
if rg := rm.getResourceGroupByNodeID(node); rg != nil {
mrg := rg.CopyForWrite()
mrg.UnassignNode(node)
rg := mrg.ToResourceGroup()
if err := rm.catalog.SaveResourceGroup(rg.GetMeta()); err != nil {
log.Warn("unassign node from resource group",
zap.String("rgName", rg.GetName()),
zap.Int64("node", node),
zap.Error(err),
)
return "", merr.WrapErrResourceGroupServiceAvailable()
}
// Commit updates to memory.
rm.groups[rg.GetName()] = rg
delete(rm.nodeIDMap, node)
log.Info("unassign node to resource group",
zap.String("rgName", rg.GetName()),
zap.Int64("node", node),
)
// notify that node distribution has been changed.
rm.nodeChangedNotifier.NotifyAll()
return rg.GetName(), nil
}
return "", nil
}
// validateResourceGroupConfig validate resource group config.
// validateResourceGroupConfig must be called after lock, because it will check with other resource group.
func (rm *ResourceManager) validateResourceGroupConfig(rgName string, cfg *rgpb.ResourceGroupConfig) error {
if cfg.GetLimits() == nil || cfg.GetRequests() == nil {
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, "requests or limits is required")
}
if cfg.GetRequests().GetNodeNum() < 0 || cfg.GetLimits().GetNodeNum() < 0 {
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, "node num in `requests` or `limits` should not less than 0")
}
if cfg.GetLimits().GetNodeNum() < cfg.GetRequests().GetNodeNum() {
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, "limits node num should not less than requests node num")
}
for _, transferCfg := range cfg.GetTransferFrom() {
if transferCfg.GetResourceGroup() == rgName {
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferFrom` %s should not be itself", rgName))
}
if rm.groups[transferCfg.GetResourceGroup()] == nil {
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferFrom` %s not exist", transferCfg.GetResourceGroup()))
}
}
for _, transferCfg := range cfg.GetTransferTo() {
if transferCfg.GetResourceGroup() == rgName {
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferTo` %s should not be itself", rgName))
}
if rm.groups[transferCfg.GetResourceGroup()] == nil {
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferTo` %s not exist", transferCfg.GetResourceGroup()))
}
}
return nil
}
// validateResourceGroupIsDeletable validate a resource group is deletable.
func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error {
// default rg is not deletable.
if rgName == DefaultResourceGroupName {
return merr.WrapErrParameterInvalid("not default resource group", rgName, "default resource group is not deletable")
}
// If rg is not empty, it's not deletable.
if rm.groups[rgName].GetConfig().GetLimits().GetNodeNum() != 0 {
return merr.WrapErrParameterInvalid("not empty resource group", rgName, "resource group's limits node num is not 0")
}
// If rg is used by other rg, it's not deletable.
for _, rg := range rm.groups {
for _, transferCfg := range rg.GetConfig().GetTransferFrom() {
if transferCfg.GetResourceGroup() == rgName {
return merr.WrapErrParameterInvalid("not `TransferFrom` of resource group", rgName, fmt.Sprintf("resource group %s is used by %s's `TransferFrom`, remove that configuration first", rgName, rg.name))
}
}
for _, transferCfg := range rg.GetConfig().GetTransferTo() {
if transferCfg.GetResourceGroup() == rgName {
return merr.WrapErrParameterInvalid("not `TransferTo` of resource group", rgName, fmt.Sprintf("resource group %s is used by %s's `TransferTo`, remove that configuration first", rgName, rg.name))
}
}
}
return nil
}