diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 94ea9773d9..44b72aa06f 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -148,7 +148,8 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment } if len(nodes) == len(stoppingNodesSegments) { - return b.handleStoppingNodes(replica, stoppingNodesSegments) + // no available nodes to balance + return nil, nil } if len(nodesSegments) == 0 { @@ -237,35 +238,6 @@ outer: return plans, b.getChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments)) } -func (b *RowCountBasedBalancer) handleStoppingNodes(replica *meta.Replica, nodeSegments map[int64][]*meta.Segment) ([]SegmentAssignPlan, []ChannelAssignPlan) { - segmentPlans := make([]SegmentAssignPlan, 0, len(nodeSegments)) - channelPlans := make([]ChannelAssignPlan, 0, len(nodeSegments)) - for nodeID, segments := range nodeSegments { - for _, segment := range segments { - segmentPlan := SegmentAssignPlan{ - ReplicaID: replica.ID, - From: nodeID, - To: -1, - Segment: segment, - Weight: GetWeight(1), - } - segmentPlans = append(segmentPlans, segmentPlan) - } - for _, dmChannel := range b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) { - channelPlan := ChannelAssignPlan{ - ReplicaID: replica.ID, - From: nodeID, - To: -1, - Channel: dmChannel, - Weight: GetWeight(1), - } - channelPlans = append(channelPlans, channelPlan) - } - } - - return segmentPlans, channelPlans -} - func (b *RowCountBasedBalancer) collectionStoppingSegments(stoppingNodesSegments map[int64][]*meta.Segment) ([]*meta.Segment, int) { var ( segments []*meta.Segment diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 7367b49488..e3e520e5d2 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -172,11 +172,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, }, }, - expectPlans: []SegmentAssignPlan{ - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, From: 2, To: -1, ReplicaID: 1, Weight: weightHigh}, - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, From: 2, To: -1, ReplicaID: 1, Weight: weightHigh}, - {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}, From: 1, To: -1, ReplicaID: 1, Weight: weightHigh}, - }, + expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{}, }, { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 22267930db..6a60f7a840 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -55,6 +55,7 @@ var ( ErrListResourceGroupsFailed = errors.New("failed to list resource group") ErrDescribeResourceGroupFailed = errors.New("failed to describe resource group") ErrLoadUseWrongRG = errors.New("load operation should use collection's resource group") + ErrLoadWithDefaultRG = errors.New("load operation can't use default resource group and other resource group together") ) func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { @@ -359,6 +360,10 @@ func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string) if len(collectionUsedRG) > 0 && !collectionUsedRG.Contain(rgName) { return ErrLoadUseWrongRG } + + if len(resourceGroups) > 1 && rgName == meta.DefaultResourceGroupName { + return ErrLoadWithDefaultRG + } } } @@ -997,6 +1002,12 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), ErrNotHealthy), nil } + replicas := s.meta.ReplicaManager.GetByResourceGroup(req.GetResourceGroup()) + if len(replicas) > 0 { + return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, + fmt.Sprintf("some replicas still loaded in resource group[%s], release it first", req.GetResourceGroup()), meta.ErrDeleteNonEmptyRG), nil + } + err := s.meta.ResourceManager.RemoveResourceGroup(req.GetResourceGroup()) if err != nil { log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(err)) @@ -1028,6 +1039,11 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil } + if req.GetNumNode() <= 0 { + return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, + fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil + } + err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())) if err != nil { log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err)) @@ -1064,7 +1080,12 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli if len(replicas) > 0 { return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported", - len(replicas), req.GetSourceResourceGroup())), nil + len(replicas), req.GetTargetResourceGroup())), nil + } + + if req.GetNumReplica() <= 0 { + return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, + fmt.Sprintf("transfer replica num can't be [%d]", req.GetNumReplica()), nil), nil } // for now, we don't support to transfer replica of same collection to same resource group diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index fca57b7fe0..4b29ff45fc 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -533,11 +533,20 @@ func (suite *ServiceSuite) TestTransferNode() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) + resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ + SourceResourceGroup: meta.DefaultResourceGroupName, + TargetResourceGroup: "rg1", + NumNode: -1, + }) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) + // server unhealthy server.status.Store(commonpb.StateCode_Abnormal) resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg1", + NumNode: 3, }) suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) @@ -581,6 +590,15 @@ func (suite *ServiceSuite) TestTransferReplica() { suite.NoError(err) suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) + resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ + SourceResourceGroup: meta.DefaultResourceGroupName, + TargetResourceGroup: "rg1", + CollectionID: 1, + NumReplica: 0, + }) + suite.NoError(err) + suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) + suite.server.meta.Put(meta.NewReplica(&querypb.Replica{ CollectionID: 1, ID: 111, @@ -655,6 +673,15 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() { suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error()) } + req := &querypb.LoadCollectionRequest{ + CollectionID: 0, + ReplicaNumber: 2, + ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"}, + } + resp, err := server.LoadCollection(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) + // Test load with partitions loaded for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { @@ -713,13 +740,22 @@ func (suite *ServiceSuite) TestLoadPartition() { suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode) } + req := &querypb.LoadPartitionsRequest{ + CollectionID: suite.collections[0], + PartitionIDs: suite.partitions[suite.collections[0]], + ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"}, + } + resp, err := server.LoadPartitions(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) + // Test when server is not healthy server.UpdateStateCode(commonpb.StateCode_Initializing) - req := &querypb.LoadPartitionsRequest{ + req = &querypb.LoadPartitionsRequest{ CollectionID: suite.collections[0], PartitionIDs: suite.partitions[suite.collections[0]], } - resp, err := server.LoadPartitions(ctx, req) + resp, err = server.LoadPartitions(ctx, req) suite.NoError(err) suite.Contains(resp.Reason, ErrNotHealthy.Error()) } diff --git a/tests/python_client/testcases/test_resourcegroup.py b/tests/python_client/testcases/test_resourcegroup.py index 7011a06145..29b580565d 100644 --- a/tests/python_client/testcases/test_resourcegroup.py +++ b/tests/python_client/testcases/test_resourcegroup.py @@ -7,7 +7,7 @@ from common.common_type import CaseLabel, CheckTasks from utils.util_pymilvus import * -@pytest.mark.skip(reason="debugging") +@pytest.mark.skip(reason="will cause concurrent problems") class TestResourceGroupParams(TestcaseBase): @pytest.mark.tags(CaseLabel.L0) def test_rg_default(self): @@ -311,7 +311,7 @@ class TestResourceGroupParams(TestcaseBase): self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, check_task=ct.CheckTasks.check_rg_property, check_items=default_rg_info) - error = {ct.err_code: 999, ct.err_msg: 'failed to drop resource group, err=delete default rg is not permitted'} + error = {ct.err_code: 5, ct.err_msg: 'delete default rg is not permitted'} self.utility_wrap.drop_resource_group(name=ct.default_resource_group_name, check_task=CheckTasks.err_res, check_items=error) @@ -1344,6 +1344,57 @@ class TestResourceGroupMultiNodes(TestcaseBase): "limit": ct.default_limit} ) + @pytest.mark.tags(CaseLabel.L0) + def test_transfer_nodes_back(self): + self._connect() + + rgA_name = "rgA" + self.init_resource_group(name=rgA_name) + + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rgA_name, + num_node=2) + dim = 128 + collection_w = self.init_collection_wrap(shards_num=1) + insert_ids = [] + nb = 500 + for i in range(5): + res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i * nb)) + collection_w.flush() + insert_ids.extend(res.primary_keys) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + + collection_w.load(replica_number=2, _resource_groups=[rgA_name, ct.default_resource_group_name]) + + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # verify search succ + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids.copy(), + "limit": ct.default_limit} + ) + self.utility_wrap.transfer_node(source=rgA_name, + target=ct.default_resource_group_name, + num_node=2) + + time.sleep(10) + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids.copy(), + "limit": ct.default_limit} + ) + + collection_w.get_replicas() + @pytest.mark.tags(CaseLabel.L0) def test_transfer_replica_not_enough_replicas_to_transfer(self): """