diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index 39d77ddff1..a927bd56b2 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -162,6 +162,13 @@ func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) { // TransferReplica transfers N replicas from srcRGName to dstRGName. func (m *ReplicaManager) TransferReplica(collectionID typeutil.UniqueID, srcRGName string, dstRGName string, replicaNum int) error { + if srcRGName == dstRGName { + return merr.WrapErrParameterInvalidMsg("source resource group and target resource group should not be the same, resource group: %s", srcRGName) + } + if replicaNum <= 0 { + return merr.WrapErrParameterInvalid("NumReplica > 0", fmt.Sprintf("invalid NumReplica %d", replicaNum)) + } + m.rwmutex.Lock() defer m.rwmutex.Unlock() diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index e45689332e..c7cf580f81 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -319,21 +319,21 @@ func (suite *ReplicaManagerV2Suite) SetupSuite() { "RG5": typeutil.NewUniqueSet(11, 12, 13, 14, 15), } suite.collections = map[int64]collectionLoadConfig{ - // 1000: { - // spawnConfig: map[string]int{"RG1": 1}, - // }, - // 1001: { - // spawnConfig: map[string]int{"RG2": 2}, - // }, - // 1002: { - // spawnConfig: map[string]int{"RG3": 2}, - // }, - // 1003: { - // spawnConfig: map[string]int{"RG1": 1, "RG2": 1, "RG3": 1}, - // }, - // 1004: { - // spawnConfig: map[string]int{"RG4": 2, "RG5": 3}, - // }, + 1000: { + spawnConfig: map[string]int{"RG1": 1}, + }, + 1001: { + spawnConfig: map[string]int{"RG2": 2}, + }, + 1002: { + spawnConfig: map[string]int{"RG3": 2}, + }, + 1003: { + spawnConfig: map[string]int{"RG1": 1, "RG2": 1, "RG3": 1}, + }, + 1004: { + spawnConfig: map[string]int{"RG4": 2, "RG5": 3}, + }, 1005: { spawnConfig: map[string]int{"RG4": 3, "RG5": 2}, }, @@ -420,7 +420,16 @@ func (suite *ReplicaManagerV2Suite) testIfBalanced() { } func (suite *ReplicaManagerV2Suite) TestTransferReplica() { - suite.mgr.TransferReplica(1005, "RG4", "RG5", 1) + // param error + err := suite.mgr.TransferReplica(10086, "RG4", "RG5", 1) + suite.Error(err) + err = suite.mgr.TransferReplica(1005, "RG4", "RG5", 0) + suite.Error(err) + err = suite.mgr.TransferReplica(1005, "RG4", "RG4", 1) + suite.Error(err) + + err = suite.mgr.TransferReplica(1005, "RG4", "RG5", 1) + suite.NoError(err) suite.recoverReplica(2, true) suite.testIfBalanced() } diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 483c5f6c60..b074993b14 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -226,6 +226,13 @@ func (rm *ResourceManager) updateResourceGroups(rgs map[string]*rgpb.ResourceGro // go:deprecated TransferNode transfer node from source resource group to target resource group. // Deprecated, use Declarative API `UpdateResourceGroups` instead. func (rm *ResourceManager) TransferNode(sourceRGName string, targetRGName string, nodeNum int) error { + if sourceRGName == targetRGName { + return merr.WrapErrParameterInvalidMsg("source resource group and target resource group should not be the same, resource group: %s", sourceRGName) + } + if nodeNum <= 0 { + return merr.WrapErrParameterInvalid("NumNode > 0", fmt.Sprintf("invalid NumNode %d", nodeNum)) + } + rm.rwmutex.Lock() defer rm.rwmutex.Unlock() diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index da8d6da457..1d6e07b3b1 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -465,6 +465,66 @@ func (suite *ResourceManagerSuite) TestAutoRecover() { suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum()) suite.Equal(40, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + suite.testTransferNode() + + // Test redundant nodes recover to default resource group. + suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ + DefaultResourceGroupName: newResourceGroupConfig(1, 1), + "rg3": newResourceGroupConfig(0, 0), + "rg2": newResourceGroupConfig(0, 0), + "rg1": newResourceGroupConfig(0, 0), + }) + // Even default resource group has 1 node limit, + // all redundant nodes will be assign to default resource group if there's no resource group can hold. + suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) + suite.manager.AutoRecoverResourceGroup("rg1") + suite.manager.AutoRecoverResourceGroup("rg2") + suite.manager.AutoRecoverResourceGroup("rg3") + suite.Equal(0, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(100, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + + // Test redundant recover to missing nodes and missing nodes from redundant nodes. + // Initialize + suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ + DefaultResourceGroupName: newResourceGroupConfig(0, 0), + "rg3": newResourceGroupConfig(10, 10), + "rg2": newResourceGroupConfig(80, 80), + "rg1": newResourceGroupConfig(10, 10), + }) + suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) + suite.manager.AutoRecoverResourceGroup("rg1") + suite.manager.AutoRecoverResourceGroup("rg2") + suite.manager.AutoRecoverResourceGroup("rg3") + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(80, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + + suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ + DefaultResourceGroupName: newResourceGroupConfig(0, 5), + "rg3": newResourceGroupConfig(5, 5), + "rg2": newResourceGroupConfig(80, 80), + "rg1": newResourceGroupConfig(20, 30), + }) + suite.manager.AutoRecoverResourceGroup("rg3") // recover redundant to missing rg. + suite.Equal(15, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(80, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(5, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + suite.manager.updateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ + DefaultResourceGroupName: newResourceGroupConfig(5, 5), + "rg3": newResourceGroupConfig(5, 10), + "rg2": newResourceGroupConfig(80, 80), + "rg1": newResourceGroupConfig(10, 10), + }) + suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) // recover missing from redundant rg. + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(80, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(5, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(5, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + // Test down all nodes. for i := 1; i <= 100; i++ { suite.manager.nodeMgr.Remove(int64(i)) @@ -475,3 +535,52 @@ func (suite *ResourceManagerSuite) TestAutoRecover() { suite.Zero(suite.manager.GetResourceGroup("rg3").NodeNum()) suite.Zero(suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) } + +func (suite *ResourceManagerSuite) testTransferNode() { + // Test TransferNode. + // param error. + err := suite.manager.TransferNode("rg1", "rg1", 1) + suite.Error(err) + + err = suite.manager.TransferNode("rg1", "rg2", 0) + suite.Error(err) + + err = suite.manager.TransferNode("rg3", "rg2", 1) + suite.Error(err) + + err = suite.manager.TransferNode("rg1", "rg10086", 1) + suite.Error(err) + + err = suite.manager.TransferNode("rg10086", "rg2", 1) + suite.Error(err) + + // success + err = suite.manager.TransferNode("rg1", "rg3", 5) + suite.NoError(err) + + suite.manager.AutoRecoverResourceGroup("rg1") + suite.manager.AutoRecoverResourceGroup("rg2") + suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) + suite.manager.AutoRecoverResourceGroup("rg3") + + suite.Equal(15, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(40, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(5, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(40, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) +} + +func (suite *ResourceManagerSuite) TestIncomingNode() { + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.manager.incomingNode.Insert(1) + + suite.Equal(1, suite.manager.CheckIncomingNodeNum()) + suite.manager.AssignPendingIncomingNode() + suite.Equal(0, suite.manager.CheckIncomingNodeNum()) + nodes, err := suite.manager.GetNodes(DefaultResourceGroupName) + suite.NoError(err) + suite.Len(nodes, 1) +} diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 3220b68b94..1b594cfcd6 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -1089,21 +1089,6 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq return merr.Status(err), nil } - if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetSourceResourceGroup()); !ok { - err := merr.WrapErrParameterInvalid("valid resource group", req.GetSourceResourceGroup(), "source resource group not found") - return merr.Status(err), nil - } - - if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetTargetResourceGroup()); !ok { - err := merr.WrapErrParameterInvalid("valid resource group", req.GetTargetResourceGroup(), "target resource group not found") - return merr.Status(err), nil - } - - if req.GetNumNode() <= 0 { - err := merr.WrapErrParameterInvalid("NumNode > 0", fmt.Sprintf("invalid NumNode %d", req.GetNumNode())) - return merr.Status(err), nil - } - // Move node from source resource group to target resource group. if err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())); err != nil { log.Warn("failed to transfer node", zap.Error(err)) @@ -1141,11 +1126,6 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()))), nil } - if req.GetNumReplica() <= 0 { - err := merr.WrapErrParameterInvalid("NumReplica > 0", fmt.Sprintf("invalid NumReplica %d", req.GetNumReplica())) - return merr.Status(err), nil - } - // Apply change into replica manager. err := s.meta.TransferReplica(req.GetCollectionID(), req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumReplica())) return merr.Status(err), nil