mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
c0555d4b45
issue: #32665 Signed-off-by: Wei Liu <wei.liu@zilliz.com>
912 lines
31 KiB
Go
912 lines
31 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package meta
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/samber/lo"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
|
|
"github.com/milvus-io/milvus/internal/metastore"
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
var ErrNodeNotEnough = errors.New("nodes not enough")
|
|
|
|
type ResourceManager struct {
|
|
incomingNode typeutil.UniqueSet // incomingNode is a temporary set for incoming hangup node,
|
|
// after node is assigned to resource group, it will be removed from this set.
|
|
groups map[string]*ResourceGroup // primary index from resource group name to resource group
|
|
nodeIDMap map[int64]string // secondary index from node id to resource group
|
|
|
|
catalog metastore.QueryCoordCatalog
|
|
nodeMgr *session.NodeManager // TODO: ResourceManager is watch node status with service discovery, so it can handle node up and down as fast as possible.
|
|
// All function can get latest online node without checking with node manager.
|
|
// so node manager is a redundant type here.
|
|
|
|
rwmutex sync.RWMutex
|
|
rgChangedNotifier *syncutil.VersionedNotifier // used to notify that resource group has been changed.
|
|
// resource_observer will listen this notifier to do a resource group recovery.
|
|
nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node distribution in resource group has been changed.
|
|
// replica_observer will listen this notifier to do a replica recovery.
|
|
}
|
|
|
|
// NewResourceManager is used to create a ResourceManager instance.
|
|
func NewResourceManager(catalog metastore.QueryCoordCatalog, nodeMgr *session.NodeManager) *ResourceManager {
|
|
groups := make(map[string]*ResourceGroup)
|
|
// Always create a default resource group to keep compatibility.
|
|
groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity))
|
|
return &ResourceManager{
|
|
incomingNode: typeutil.NewUniqueSet(),
|
|
groups: groups,
|
|
nodeIDMap: make(map[int64]string),
|
|
catalog: catalog,
|
|
nodeMgr: nodeMgr,
|
|
|
|
rwmutex: sync.RWMutex{},
|
|
rgChangedNotifier: syncutil.NewVersionedNotifier(),
|
|
nodeChangedNotifier: syncutil.NewVersionedNotifier(),
|
|
}
|
|
}
|
|
|
|
// Recover recover resource group from meta, other interface of ResourceManager can be only called after recover is done.
|
|
func (rm *ResourceManager) Recover() error {
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
rgs, err := rm.catalog.GetResourceGroups()
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to recover resource group from store")
|
|
}
|
|
|
|
// Resource group meta upgrade to latest version.
|
|
upgrades := make([]*querypb.ResourceGroup, 0)
|
|
for _, meta := range rgs {
|
|
needUpgrade := meta.Config == nil
|
|
|
|
rg := NewResourceGroupFromMeta(meta)
|
|
rm.groups[rg.GetName()] = rg
|
|
for _, node := range rg.GetNodes() {
|
|
if _, ok := rm.nodeIDMap[node]; ok {
|
|
// unreachable code, should never happen.
|
|
panic(fmt.Sprintf("dirty meta, node has been assign to multi resource group, %s, %s", rm.nodeIDMap[node], rg.GetName()))
|
|
}
|
|
rm.nodeIDMap[node] = rg.GetName()
|
|
}
|
|
log.Info("Recover resource group",
|
|
zap.String("rgName", rg.GetName()),
|
|
zap.Int64s("nodes", rm.groups[rg.GetName()].GetNodes()),
|
|
zap.Any("config", rg.GetConfig()),
|
|
)
|
|
if needUpgrade {
|
|
upgrades = append(upgrades, rg.GetMeta())
|
|
}
|
|
}
|
|
if len(upgrades) > 0 {
|
|
log.Info("upgrade resource group meta into latest", zap.Int("num", len(upgrades)))
|
|
return rm.catalog.SaveResourceGroup(upgrades...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddResourceGroup create a new ResourceGroup.
|
|
// Do no changed with node, all node will be reassign to new resource group by auto recover.
|
|
func (rm *ResourceManager) AddResourceGroup(rgName string, cfg *rgpb.ResourceGroupConfig) error {
|
|
if len(rgName) == 0 {
|
|
return merr.WrapErrParameterMissing("resource group name couldn't be empty")
|
|
}
|
|
if cfg == nil {
|
|
// Use default config if not set, compatible with old client.
|
|
cfg = newResourceGroupConfig(0, 0)
|
|
}
|
|
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
if rm.groups[rgName] != nil {
|
|
// Idempotent promise.
|
|
// If resource group already exist, check if configuration is the same,
|
|
if proto.Equal(rm.groups[rgName].GetConfig(), cfg) {
|
|
return nil
|
|
}
|
|
return merr.WrapErrResourceGroupAlreadyExist(rgName)
|
|
}
|
|
|
|
maxResourceGroup := paramtable.Get().QuotaConfig.MaxResourceGroupNumOfQueryNode.GetAsInt()
|
|
if len(rm.groups) >= maxResourceGroup {
|
|
return merr.WrapErrResourceGroupReachLimit(rgName, maxResourceGroup)
|
|
}
|
|
|
|
if err := rm.validateResourceGroupConfig(rgName, cfg); err != nil {
|
|
return err
|
|
}
|
|
|
|
rg := NewResourceGroup(rgName, cfg)
|
|
if err := rm.catalog.SaveResourceGroup(rg.GetMeta()); err != nil {
|
|
log.Warn("failed to add resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Any("config", cfg),
|
|
zap.Error(err),
|
|
)
|
|
return merr.WrapErrResourceGroupServiceAvailable()
|
|
}
|
|
|
|
rm.groups[rgName] = rg
|
|
log.Info("add resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Any("config", cfg),
|
|
)
|
|
|
|
// notify that resource group config has been changed.
|
|
rm.rgChangedNotifier.NotifyAll()
|
|
return nil
|
|
}
|
|
|
|
// UpdateResourceGroups update resource group configuration.
|
|
// Only change the configuration, no change with node. all node will be reassign by auto recover.
|
|
func (rm *ResourceManager) UpdateResourceGroups(rgs map[string]*rgpb.ResourceGroupConfig) error {
|
|
if len(rgs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
return rm.updateResourceGroups(rgs)
|
|
}
|
|
|
|
// updateResourceGroups update resource group configuration.
|
|
func (rm *ResourceManager) updateResourceGroups(rgs map[string]*rgpb.ResourceGroupConfig) error {
|
|
modifiedRG := make([]*ResourceGroup, 0, len(rgs))
|
|
updates := make([]*querypb.ResourceGroup, 0, len(rgs))
|
|
for rgName, cfg := range rgs {
|
|
if _, ok := rm.groups[rgName]; !ok {
|
|
return merr.WrapErrResourceGroupNotFound(rgName)
|
|
}
|
|
if err := rm.validateResourceGroupConfig(rgName, cfg); err != nil {
|
|
return err
|
|
}
|
|
// Update with copy on write.
|
|
mrg := rm.groups[rgName].CopyForWrite()
|
|
mrg.UpdateConfig(cfg)
|
|
rg := mrg.ToResourceGroup()
|
|
|
|
updates = append(updates, rg.GetMeta())
|
|
modifiedRG = append(modifiedRG, rg)
|
|
}
|
|
|
|
if err := rm.catalog.SaveResourceGroup(updates...); err != nil {
|
|
for rgName, cfg := range rgs {
|
|
log.Warn("failed to update resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Any("config", cfg),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
return merr.WrapErrResourceGroupServiceAvailable()
|
|
}
|
|
|
|
// Commit updates to memory.
|
|
for _, rg := range modifiedRG {
|
|
log.Info("update resource group",
|
|
zap.String("rgName", rg.GetName()),
|
|
zap.Any("config", rg.GetConfig()),
|
|
)
|
|
rm.groups[rg.GetName()] = rg
|
|
}
|
|
|
|
// notify that resource group config has been changed.
|
|
rm.rgChangedNotifier.NotifyAll()
|
|
return nil
|
|
}
|
|
|
|
// go:deprecated TransferNode transfer node from source resource group to target resource group.
|
|
// Deprecated, use Declarative API `UpdateResourceGroups` instead.
|
|
func (rm *ResourceManager) TransferNode(sourceRGName string, targetRGName string, nodeNum int) error {
|
|
if sourceRGName == targetRGName {
|
|
return merr.WrapErrParameterInvalidMsg("source resource group and target resource group should not be the same, resource group: %s", sourceRGName)
|
|
}
|
|
if nodeNum <= 0 {
|
|
return merr.WrapErrParameterInvalid("NumNode > 0", fmt.Sprintf("invalid NumNode %d", nodeNum))
|
|
}
|
|
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
if rm.groups[sourceRGName] == nil {
|
|
return merr.WrapErrResourceGroupNotFound(sourceRGName)
|
|
}
|
|
if rm.groups[targetRGName] == nil {
|
|
return merr.WrapErrResourceGroupNotFound(targetRGName)
|
|
}
|
|
|
|
sourceRG := rm.groups[sourceRGName]
|
|
targetRG := rm.groups[targetRGName]
|
|
|
|
// Check if source resource group has enough node to transfer.
|
|
if len(sourceRG.GetNodes()) < nodeNum {
|
|
return merr.WrapErrResourceGroupNodeNotEnough(sourceRGName, len(sourceRG.GetNodes()), nodeNum)
|
|
}
|
|
|
|
// Compatible with old version.
|
|
sourceCfg := sourceRG.GetConfigCloned()
|
|
targetCfg := targetRG.GetConfigCloned()
|
|
sourceCfg.Requests.NodeNum -= int32(nodeNum)
|
|
if sourceCfg.Requests.NodeNum < 0 {
|
|
sourceCfg.Requests.NodeNum = 0
|
|
}
|
|
// Special case for compatibility with old version.
|
|
if sourceRGName != DefaultResourceGroupName {
|
|
sourceCfg.Limits.NodeNum -= int32(nodeNum)
|
|
if sourceCfg.Limits.NodeNum < 0 {
|
|
sourceCfg.Limits.NodeNum = 0
|
|
}
|
|
}
|
|
|
|
targetCfg.Requests.NodeNum += int32(nodeNum)
|
|
if targetCfg.Requests.NodeNum > targetCfg.Limits.NodeNum {
|
|
targetCfg.Limits.NodeNum = targetCfg.Requests.NodeNum
|
|
}
|
|
return rm.updateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
|
|
sourceRGName: sourceCfg,
|
|
targetRGName: targetCfg,
|
|
})
|
|
}
|
|
|
|
// RemoveResourceGroup remove resource group.
|
|
func (rm *ResourceManager) RemoveResourceGroup(rgName string) error {
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
if rm.groups[rgName] == nil {
|
|
// Idempotent promise: delete a non-exist rg should be ok
|
|
return nil
|
|
}
|
|
|
|
// validateResourceGroupIsDeletable will check if rg is deletable.
|
|
if err := rm.validateResourceGroupIsDeletable(rgName); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Nodes may be still assign to these group,
|
|
// recover the resource group from redundant status before remove it.
|
|
if rm.groups[rgName].NodeNum() > 0 {
|
|
if err := rm.recoverRedundantNodeRG(rgName); err != nil {
|
|
log.Info("failed to recover redundant node resource group before remove it",
|
|
zap.String("rgName", rgName),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Remove it from meta storage.
|
|
if err := rm.catalog.RemoveResourceGroup(rgName); err != nil {
|
|
log.Info("failed to remove resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Error(err),
|
|
)
|
|
return merr.WrapErrResourceGroupServiceAvailable()
|
|
}
|
|
|
|
// After recovering, all node assigned to these rg has been removed.
|
|
// no secondary index need to be removed.
|
|
delete(rm.groups, rgName)
|
|
|
|
log.Info("remove resource group",
|
|
zap.String("rgName", rgName),
|
|
)
|
|
// notify that resource group has been changed.
|
|
rm.rgChangedNotifier.NotifyAll()
|
|
return nil
|
|
}
|
|
|
|
// GetNodesOfMultiRG return nodes of multi rg, it can be used to get a consistent view of nodes of multi rg.
|
|
func (rm *ResourceManager) GetNodesOfMultiRG(rgName []string) (map[string]typeutil.UniqueSet, error) {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
|
|
ret := make(map[string]typeutil.UniqueSet)
|
|
for _, name := range rgName {
|
|
if rm.groups[name] == nil {
|
|
return nil, merr.WrapErrResourceGroupNotFound(name)
|
|
}
|
|
ret[name] = typeutil.NewUniqueSet(rm.groups[name].GetNodes()...)
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
// GetNodes return nodes of given resource group.
|
|
func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
if rm.groups[rgName] == nil {
|
|
return nil, merr.WrapErrResourceGroupNotFound(rgName)
|
|
}
|
|
return rm.groups[rgName].GetNodes(), nil
|
|
}
|
|
|
|
// GetOutgoingNodeNumByReplica return outgoing node num on each rg from this replica.
|
|
func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[string]int32 {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
|
|
if rm.groups[replica.GetResourceGroup()] == nil {
|
|
return nil
|
|
}
|
|
rg := rm.groups[replica.GetResourceGroup()]
|
|
|
|
ret := make(map[string]int32)
|
|
replica.RangeOverRONodes(func(node int64) bool {
|
|
// if rgOfNode is not equal to rg of replica, outgoing node found.
|
|
if rgOfNode := rm.getResourceGroupByNodeID(node); rgOfNode != nil && rgOfNode.GetName() != rg.GetName() {
|
|
ret[rgOfNode.GetName()]++
|
|
}
|
|
return true
|
|
})
|
|
return ret
|
|
}
|
|
|
|
// getResourceGroupByNodeID get resource group by node id.
|
|
func (rm *ResourceManager) getResourceGroupByNodeID(nodeID int64) *ResourceGroup {
|
|
if rgName, ok := rm.nodeIDMap[nodeID]; ok {
|
|
return rm.groups[rgName]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ContainsNode return whether given node is in given resource group.
|
|
func (rm *ResourceManager) ContainsNode(rgName string, node int64) bool {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
if rm.groups[rgName] == nil {
|
|
return false
|
|
}
|
|
return rm.groups[rgName].ContainNode(node)
|
|
}
|
|
|
|
// ContainResourceGroup return whether given resource group is exist.
|
|
func (rm *ResourceManager) ContainResourceGroup(rgName string) bool {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
return rm.groups[rgName] != nil
|
|
}
|
|
|
|
// GetResourceGroup return resource group snapshot by name.
|
|
func (rm *ResourceManager) GetResourceGroup(rgName string) *ResourceGroup {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
|
|
if rm.groups[rgName] == nil {
|
|
return nil
|
|
}
|
|
return rm.groups[rgName].Snapshot()
|
|
}
|
|
|
|
// ListResourceGroups return all resource groups names.
|
|
func (rm *ResourceManager) ListResourceGroups() []string {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
|
|
return lo.Keys(rm.groups)
|
|
}
|
|
|
|
// MeetRequirement return whether resource group meet requirement.
|
|
// Return error with reason if not meet requirement.
|
|
func (rm *ResourceManager) MeetRequirement(rgName string) error {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
if rm.groups[rgName] == nil {
|
|
return nil
|
|
}
|
|
return rm.groups[rgName].MeetRequirement()
|
|
}
|
|
|
|
// CheckIncomingNodeNum return incoming node num.
|
|
func (rm *ResourceManager) CheckIncomingNodeNum() int {
|
|
rm.rwmutex.RLock()
|
|
defer rm.rwmutex.RUnlock()
|
|
return rm.incomingNode.Len()
|
|
}
|
|
|
|
// HandleNodeUp handle node when new node is incoming.
|
|
func (rm *ResourceManager) HandleNodeUp(node int64) {
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
rm.incomingNode.Insert(node)
|
|
// Trigger assign incoming node right away.
|
|
// error can be ignored here, because `AssignPendingIncomingNode`` will retry assign node.
|
|
rgName, err := rm.assignIncomingNodeWithNodeCheck(node)
|
|
log.Info("HandleNodeUp: add node to resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Int64("node", node),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
// HandleNodeDown handle the node when node is leave.
|
|
func (rm *ResourceManager) HandleNodeDown(node int64) {
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
// failure of node down can be ignored, node down can be done by `RemoveAllDownNode`.
|
|
rm.incomingNode.Remove(node)
|
|
|
|
// for stopping query node becomes offline, node change won't be triggered,
|
|
// cause when it becomes stopping, it already remove from resource manager
|
|
// then `unassignNode` will do nothing
|
|
rgName, err := rm.unassignNode(node)
|
|
|
|
// trigger node changes, expected to remove ro node from replica immediately
|
|
rm.nodeChangedNotifier.NotifyAll()
|
|
log.Info("HandleNodeDown: remove node from resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Int64("node", node),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
// ListenResourceGroupChanged return a listener for resource group changed.
|
|
func (rm *ResourceManager) ListenResourceGroupChanged() *syncutil.VersionedListener {
|
|
return rm.rgChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
|
|
}
|
|
|
|
// ListenNodeChanged return a listener for node changed.
|
|
func (rm *ResourceManager) ListenNodeChanged() *syncutil.VersionedListener {
|
|
return rm.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
|
|
}
|
|
|
|
// AssignPendingIncomingNode assign incoming node to resource group.
|
|
func (rm *ResourceManager) AssignPendingIncomingNode() {
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
for node := range rm.incomingNode {
|
|
rgName, err := rm.assignIncomingNodeWithNodeCheck(node)
|
|
log.Info("Pending HandleNodeUp: add node to resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Int64("node", node),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
}
|
|
|
|
// RemoveAllDownNode remove all down node from resource group.
|
|
func (rm *ResourceManager) RemoveAllDownNode() {
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
for nodeID := range rm.nodeIDMap {
|
|
if node := rm.nodeMgr.Get(nodeID); node == nil || node.IsStoppingState() {
|
|
// unassignNode failure can be skip.
|
|
rgName, err := rm.unassignNode(nodeID)
|
|
log.Info("remove down node from resource group",
|
|
zap.Bool("nodeExist", node != nil),
|
|
zap.Int64("nodeID", nodeID),
|
|
zap.String("rgName", rgName),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// AutoRecoverResourceGroup auto recover rg, return recover used node num
|
|
func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) error {
|
|
rm.rwmutex.Lock()
|
|
defer rm.rwmutex.Unlock()
|
|
|
|
rg := rm.groups[rgName]
|
|
if rg == nil {
|
|
return nil
|
|
}
|
|
|
|
if rg.MissingNumOfNodes() > 0 {
|
|
return rm.recoverMissingNodeRG(rgName)
|
|
}
|
|
|
|
// DefaultResourceGroup is the backup resource group of redundant recovery,
|
|
// So after all other resource group is reach the `limits`, rest redundant node will be transfer to DefaultResourceGroup.
|
|
if rg.RedundantNumOfNodes() > 0 {
|
|
return rm.recoverRedundantNodeRG(rgName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// recoverMissingNodeRG recover resource group by transfer node from other resource group.
|
|
func (rm *ResourceManager) recoverMissingNodeRG(rgName string) error {
|
|
for rm.groups[rgName].MissingNumOfNodes() > 0 {
|
|
rg := rm.groups[rgName]
|
|
sourceRG := rm.selectMissingRecoverSourceRG(rg)
|
|
if sourceRG == nil {
|
|
log.Warn("fail to select source resource group", zap.String("rgName", rg.GetName()))
|
|
return ErrNodeNotEnough
|
|
}
|
|
nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, rg)
|
|
if err != nil {
|
|
log.Warn("failed to recover missing node by transfer node from other resource group",
|
|
zap.String("sourceRG", sourceRG.GetName()),
|
|
zap.String("targetRG", rg.GetName()),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
log.Info("recover missing node by transfer node from other resource group",
|
|
zap.String("sourceRG", sourceRG.GetName()),
|
|
zap.String("targetRG", rg.GetName()),
|
|
zap.Int64("nodeID", nodeID),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// selectMissingRecoverSourceRG select source resource group for recover missing resource group.
|
|
func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup) *ResourceGroup {
|
|
// First, Transfer node from most redundant resource group first. `len(nodes) > limits`
|
|
if redundantRG := rm.findMaxRGWithGivenFilter(
|
|
func(sourceRG *ResourceGroup) bool {
|
|
return rg.GetName() != sourceRG.GetName() && sourceRG.RedundantNumOfNodes() > 0
|
|
},
|
|
func(sourceRG *ResourceGroup) int {
|
|
return sourceRG.RedundantNumOfNodes()
|
|
},
|
|
); redundantRG != nil {
|
|
return redundantRG
|
|
}
|
|
|
|
// Second, Transfer node from most oversized resource group. `len(nodes) > requests`
|
|
// `TransferFrom` configured resource group at high priority.
|
|
return rm.findMaxRGWithGivenFilter(
|
|
func(sourceRG *ResourceGroup) bool {
|
|
return rg.GetName() != sourceRG.GetName() && sourceRG.OversizedNumOfNodes() > 0
|
|
},
|
|
func(sourceRG *ResourceGroup) int {
|
|
if rg.HasFrom(sourceRG.GetName()) {
|
|
// give a boost if sourceRG is configured as `TransferFrom` to set as high priority to select.
|
|
return sourceRG.OversizedNumOfNodes() * resourceGroupTransferBoost
|
|
}
|
|
return sourceRG.OversizedNumOfNodes()
|
|
})
|
|
}
|
|
|
|
// recoverRedundantNodeRG recover resource group by transfer node to other resource group.
|
|
func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error {
|
|
for rm.groups[rgName].RedundantNumOfNodes() > 0 {
|
|
rg := rm.groups[rgName]
|
|
targetRG := rm.selectRedundantRecoverTargetRG(rg)
|
|
if targetRG == nil {
|
|
log.Info("failed to select redundant recover target resource group, please check resource group configuration if as expected.",
|
|
zap.String("rgName", rg.GetName()))
|
|
return errors.New("all resource group reach limits")
|
|
}
|
|
|
|
nodeID, err := rm.transferOneNodeFromRGToRG(rg, targetRG)
|
|
if err != nil {
|
|
log.Warn("failed to recover redundant node by transfer node to other resource group",
|
|
zap.String("sourceRG", rg.GetName()),
|
|
zap.String("targetRG", targetRG.GetName()),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
log.Info("recover redundant node by transfer node to other resource group",
|
|
zap.String("sourceRG", rg.GetName()),
|
|
zap.String("targetRG", targetRG.GetName()),
|
|
zap.Int64("nodeID", nodeID),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// selectRedundantRecoverTargetRG select target resource group for recover redundant resource group.
|
|
func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup) *ResourceGroup {
|
|
// First, Transfer node to most missing resource group first.
|
|
if missingRG := rm.findMaxRGWithGivenFilter(
|
|
func(targetRG *ResourceGroup) bool {
|
|
return rg.GetName() != targetRG.GetName() && targetRG.MissingNumOfNodes() > 0
|
|
},
|
|
func(targetRG *ResourceGroup) int {
|
|
return targetRG.MissingNumOfNodes()
|
|
},
|
|
); missingRG != nil {
|
|
return missingRG
|
|
}
|
|
|
|
// Second, Transfer node to max reachLimit resource group.
|
|
// `TransferTo` configured resource group at high priority.
|
|
if selectRG := rm.findMaxRGWithGivenFilter(
|
|
func(targetRG *ResourceGroup) bool {
|
|
return rg.GetName() != targetRG.GetName() && targetRG.ReachLimitNumOfNodes() > 0
|
|
},
|
|
func(targetRG *ResourceGroup) int {
|
|
if rg.HasTo(targetRG.GetName()) {
|
|
// give a boost if targetRG is configured as `TransferTo` to set as high priority to select.
|
|
return targetRG.ReachLimitNumOfNodes() * resourceGroupTransferBoost
|
|
}
|
|
return targetRG.ReachLimitNumOfNodes()
|
|
},
|
|
); selectRG != nil {
|
|
return selectRG
|
|
}
|
|
|
|
// Finally, Always transfer node to default resource group.
|
|
if rg.GetName() != DefaultResourceGroupName {
|
|
return rm.groups[DefaultResourceGroupName]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// transferOneNodeFromRGToRG transfer one node from source resource group to target resource group.
|
|
func (rm *ResourceManager) transferOneNodeFromRGToRG(sourceRG *ResourceGroup, targetRG *ResourceGroup) (int64, error) {
|
|
if sourceRG.NodeNum() == 0 {
|
|
return -1, ErrNodeNotEnough
|
|
}
|
|
// TODO: select node by some load strategy, such as segment loaded.
|
|
node := sourceRG.GetNodes()[0]
|
|
if err := rm.transferNode(targetRG.GetName(), node); err != nil {
|
|
return -1, err
|
|
}
|
|
return node, nil
|
|
}
|
|
|
|
// assignIncomingNodeWithNodeCheck assign node to resource group with node status check.
|
|
func (rm *ResourceManager) assignIncomingNodeWithNodeCheck(node int64) (string, error) {
|
|
// node is on stopping or stopped, remove it from incoming node set.
|
|
if rm.nodeMgr.Get(node) == nil {
|
|
rm.incomingNode.Remove(node)
|
|
return "", errors.New("node is not online")
|
|
}
|
|
if ok, _ := rm.nodeMgr.IsStoppingNode(node); ok {
|
|
rm.incomingNode.Remove(node)
|
|
return "", errors.New("node has been stopped")
|
|
}
|
|
|
|
rgName, err := rm.assignIncomingNode(node)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
// node assignment is finished, remove the node from incoming node set.
|
|
rm.incomingNode.Remove(node)
|
|
return rgName, nil
|
|
}
|
|
|
|
// assignIncomingNode assign node to resource group.
|
|
func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) {
|
|
// If node already assign to rg.
|
|
rg := rm.getResourceGroupByNodeID(node)
|
|
if rg != nil {
|
|
log.Info("HandleNodeUp: node already assign to resource group",
|
|
zap.String("rgName", rg.GetName()),
|
|
zap.Int64("node", node),
|
|
)
|
|
return rg.GetName(), nil
|
|
}
|
|
|
|
// select a resource group to assign incoming node.
|
|
rg = rm.mustSelectAssignIncomingNodeTargetRG()
|
|
if err := rm.transferNode(rg.GetName(), node); err != nil {
|
|
return "", errors.Wrap(err, "at finally assign to default resource group")
|
|
}
|
|
return rg.GetName(), nil
|
|
}
|
|
|
|
// mustSelectAssignIncomingNodeTargetRG select resource group for assign incoming node.
|
|
func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup {
|
|
// First, Assign it to rg with the most missing nodes at high priority.
|
|
if rg := rm.findMaxRGWithGivenFilter(
|
|
func(rg *ResourceGroup) bool {
|
|
return rg.MissingNumOfNodes() > 0
|
|
},
|
|
func(rg *ResourceGroup) int {
|
|
return rg.MissingNumOfNodes()
|
|
},
|
|
); rg != nil {
|
|
return rg
|
|
}
|
|
|
|
// Second, assign it to rg do not reach limit.
|
|
if rg := rm.findMaxRGWithGivenFilter(
|
|
func(rg *ResourceGroup) bool {
|
|
return rg.ReachLimitNumOfNodes() > 0
|
|
},
|
|
func(rg *ResourceGroup) int {
|
|
return rg.ReachLimitNumOfNodes()
|
|
},
|
|
); rg != nil {
|
|
return rg
|
|
}
|
|
|
|
// Finally, add node to default rg.
|
|
return rm.groups[DefaultResourceGroupName]
|
|
}
|
|
|
|
// findMaxRGWithGivenFilter find resource group with given filter and return the max one.
|
|
// not efficient, but it's ok for low nodes and low resource group.
|
|
func (rm *ResourceManager) findMaxRGWithGivenFilter(filter func(rg *ResourceGroup) bool, attr func(rg *ResourceGroup) int) *ResourceGroup {
|
|
var maxRG *ResourceGroup
|
|
for _, rg := range rm.groups {
|
|
if filter == nil || filter(rg) {
|
|
if maxRG == nil || attr(rg) > attr(maxRG) {
|
|
maxRG = rg
|
|
}
|
|
}
|
|
}
|
|
return maxRG
|
|
}
|
|
|
|
// transferNode transfer given node to given resource group.
|
|
// if given node is assigned in given resource group, do nothing.
|
|
// if given node is assigned to other resource group, it will be unassigned first.
|
|
func (rm *ResourceManager) transferNode(rgName string, node int64) error {
|
|
if rm.groups[rgName] == nil {
|
|
return merr.WrapErrResourceGroupNotFound(rgName)
|
|
}
|
|
|
|
updates := make([]*querypb.ResourceGroup, 0, 2)
|
|
modifiedRG := make([]*ResourceGroup, 0, 2)
|
|
originalRG := "_"
|
|
// Check if node is already assign to rg.
|
|
if rg := rm.getResourceGroupByNodeID(node); rg != nil {
|
|
if rg.GetName() == rgName {
|
|
// node is already assign to rg.
|
|
log.Info("node already assign to resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.Int64("node", node),
|
|
)
|
|
return nil
|
|
}
|
|
// Apply update.
|
|
mrg := rg.CopyForWrite()
|
|
mrg.UnassignNode(node)
|
|
rg := mrg.ToResourceGroup()
|
|
|
|
updates = append(updates, rg.GetMeta())
|
|
modifiedRG = append(modifiedRG, rg)
|
|
originalRG = rg.GetName()
|
|
}
|
|
|
|
// assign the node to rg.
|
|
mrg := rm.groups[rgName].CopyForWrite()
|
|
mrg.AssignNode(node)
|
|
rg := mrg.ToResourceGroup()
|
|
updates = append(updates, rg.GetMeta())
|
|
modifiedRG = append(modifiedRG, rg)
|
|
|
|
// Commit updates to meta storage.
|
|
if err := rm.catalog.SaveResourceGroup(updates...); err != nil {
|
|
log.Warn("failed to transfer node to resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.String("originalRG", originalRG),
|
|
zap.Int64("node", node),
|
|
zap.Error(err),
|
|
)
|
|
return merr.WrapErrResourceGroupServiceAvailable()
|
|
}
|
|
|
|
// Commit updates to memory.
|
|
for _, rg := range modifiedRG {
|
|
rm.groups[rg.GetName()] = rg
|
|
}
|
|
rm.nodeIDMap[node] = rgName
|
|
log.Info("transfer node to resource group",
|
|
zap.String("rgName", rgName),
|
|
zap.String("originalRG", originalRG),
|
|
zap.Int64("node", node),
|
|
)
|
|
|
|
// notify that node distribution has been changed.
|
|
rm.nodeChangedNotifier.NotifyAll()
|
|
return nil
|
|
}
|
|
|
|
// unassignNode remove a node from resource group where it belongs to.
|
|
func (rm *ResourceManager) unassignNode(node int64) (string, error) {
|
|
if rg := rm.getResourceGroupByNodeID(node); rg != nil {
|
|
mrg := rg.CopyForWrite()
|
|
mrg.UnassignNode(node)
|
|
rg := mrg.ToResourceGroup()
|
|
if err := rm.catalog.SaveResourceGroup(rg.GetMeta()); err != nil {
|
|
log.Warn("unassign node from resource group",
|
|
zap.String("rgName", rg.GetName()),
|
|
zap.Int64("node", node),
|
|
zap.Error(err),
|
|
)
|
|
return "", merr.WrapErrResourceGroupServiceAvailable()
|
|
}
|
|
|
|
// Commit updates to memory.
|
|
rm.groups[rg.GetName()] = rg
|
|
delete(rm.nodeIDMap, node)
|
|
log.Info("unassign node to resource group",
|
|
zap.String("rgName", rg.GetName()),
|
|
zap.Int64("node", node),
|
|
)
|
|
|
|
// notify that node distribution has been changed.
|
|
rm.nodeChangedNotifier.NotifyAll()
|
|
return rg.GetName(), nil
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
// validateResourceGroupConfig validate resource group config.
|
|
// validateResourceGroupConfig must be called after lock, because it will check with other resource group.
|
|
func (rm *ResourceManager) validateResourceGroupConfig(rgName string, cfg *rgpb.ResourceGroupConfig) error {
|
|
if cfg.GetLimits() == nil || cfg.GetRequests() == nil {
|
|
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, "requests or limits is required")
|
|
}
|
|
if cfg.GetRequests().GetNodeNum() < 0 || cfg.GetLimits().GetNodeNum() < 0 {
|
|
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, "node num in `requests` or `limits` should not less than 0")
|
|
}
|
|
if cfg.GetLimits().GetNodeNum() < cfg.GetRequests().GetNodeNum() {
|
|
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, "limits node num should not less than requests node num")
|
|
}
|
|
|
|
for _, transferCfg := range cfg.GetTransferFrom() {
|
|
if transferCfg.GetResourceGroup() == rgName {
|
|
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferFrom` %s should not be itself", rgName))
|
|
}
|
|
if rm.groups[transferCfg.GetResourceGroup()] == nil {
|
|
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferFrom` %s not exist", transferCfg.GetResourceGroup()))
|
|
}
|
|
}
|
|
for _, transferCfg := range cfg.GetTransferTo() {
|
|
if transferCfg.GetResourceGroup() == rgName {
|
|
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferTo` %s should not be itself", rgName))
|
|
}
|
|
if rm.groups[transferCfg.GetResourceGroup()] == nil {
|
|
return merr.WrapErrResourceGroupIllegalConfig(rgName, cfg, fmt.Sprintf("resource group in `TransferTo` %s not exist", transferCfg.GetResourceGroup()))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validateResourceGroupIsDeletable validate a resource group is deletable.
|
|
func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error {
|
|
// default rg is not deletable.
|
|
if rgName == DefaultResourceGroupName {
|
|
return merr.WrapErrParameterInvalid("not default resource group", rgName, "default resource group is not deletable")
|
|
}
|
|
|
|
// If rg is not empty, it's not deletable.
|
|
if rm.groups[rgName].GetConfig().GetLimits().GetNodeNum() != 0 {
|
|
return merr.WrapErrParameterInvalid("not empty resource group", rgName, "resource group's limits node num is not 0")
|
|
}
|
|
|
|
// If rg is used by other rg, it's not deletable.
|
|
for _, rg := range rm.groups {
|
|
for _, transferCfg := range rg.GetConfig().GetTransferFrom() {
|
|
if transferCfg.GetResourceGroup() == rgName {
|
|
return merr.WrapErrParameterInvalid("not `TransferFrom` of resource group", rgName, fmt.Sprintf("resource group %s is used by %s's `TransferFrom`, remove that configuration first", rgName, rg.name))
|
|
}
|
|
}
|
|
for _, transferCfg := range rg.GetConfig().GetTransferTo() {
|
|
if transferCfg.GetResourceGroup() == rgName {
|
|
return merr.WrapErrParameterInvalid("not `TransferTo` of resource group", rgName, fmt.Sprintf("resource group %s is used by %s's `TransferTo`, remove that configuration first", rgName, rg.name))
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|