fix balance generate reduce task (#22236)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-02-21 19:06:27 +08:00 committed by GitHub
parent abbf1efa0e
commit c3e8ad3629
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 40 deletions

View File

@ -148,7 +148,8 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment
}
if len(nodes) == len(stoppingNodesSegments) {
return b.handleStoppingNodes(replica, stoppingNodesSegments)
// no available nodes to balance
return nil, nil
}
if len(nodesSegments) == 0 {
@ -237,35 +238,6 @@ outer:
return plans, b.getChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))
}
func (b *RowCountBasedBalancer) handleStoppingNodes(replica *meta.Replica, nodeSegments map[int64][]*meta.Segment) ([]SegmentAssignPlan, []ChannelAssignPlan) {
segmentPlans := make([]SegmentAssignPlan, 0, len(nodeSegments))
channelPlans := make([]ChannelAssignPlan, 0, len(nodeSegments))
for nodeID, segments := range nodeSegments {
for _, segment := range segments {
segmentPlan := SegmentAssignPlan{
ReplicaID: replica.ID,
From: nodeID,
To: -1,
Segment: segment,
Weight: GetWeight(1),
}
segmentPlans = append(segmentPlans, segmentPlan)
}
for _, dmChannel := range b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) {
channelPlan := ChannelAssignPlan{
ReplicaID: replica.ID,
From: nodeID,
To: -1,
Channel: dmChannel,
Weight: GetWeight(1),
}
channelPlans = append(channelPlans, channelPlan)
}
}
return segmentPlans, channelPlans
}
func (b *RowCountBasedBalancer) collectionStoppingSegments(stoppingNodesSegments map[int64][]*meta.Segment) ([]*meta.Segment, int) {
var (
segments []*meta.Segment

View File

@ -172,11 +172,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, From: 2, To: -1, ReplicaID: 1, Weight: weightHigh},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, From: 2, To: -1, ReplicaID: 1, Weight: weightHigh},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}, From: 1, To: -1, ReplicaID: 1, Weight: weightHigh},
},
expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{},
},
{

View File

@ -55,6 +55,7 @@ var (
ErrListResourceGroupsFailed = errors.New("failed to list resource group")
ErrDescribeResourceGroupFailed = errors.New("failed to describe resource group")
ErrLoadUseWrongRG = errors.New("load operation should use collection's resource group")
ErrLoadWithDefaultRG = errors.New("load operation can't use default resource group and other resource group together")
)
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
@ -359,6 +360,10 @@ func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string)
if len(collectionUsedRG) > 0 && !collectionUsedRG.Contain(rgName) {
return ErrLoadUseWrongRG
}
if len(resourceGroups) > 1 && rgName == meta.DefaultResourceGroupName {
return ErrLoadWithDefaultRG
}
}
}
@ -997,6 +1002,12 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), ErrNotHealthy), nil
}
replicas := s.meta.ReplicaManager.GetByResourceGroup(req.GetResourceGroup())
if len(replicas) > 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("some replicas still loaded in resource group[%s], release it first", req.GetResourceGroup()), meta.ErrDeleteNonEmptyRG), nil
}
err := s.meta.ResourceManager.RemoveResourceGroup(req.GetResourceGroup())
if err != nil {
log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(err))
@ -1028,6 +1039,11 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
}
if req.GetNumNode() <= 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("transfer node num can't be [%d]", req.GetNumNode()), nil), nil
}
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
if err != nil {
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
@ -1064,7 +1080,12 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
if len(replicas) > 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
len(replicas), req.GetSourceResourceGroup())), nil
len(replicas), req.GetTargetResourceGroup())), nil
}
if req.GetNumReplica() <= 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("transfer replica num can't be [%d]", req.GetNumReplica()), nil), nil
}
// for now, we don't support to transfer replica of same collection to same resource group

View File

@ -533,11 +533,20 @@ func (suite *ServiceSuite) TestTransferNode() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg1",
NumNode: -1,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
// server unhealthy
server.status.Store(commonpb.StateCode_Abnormal)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg1",
NumNode: 3,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
@ -581,6 +590,15 @@ func (suite *ServiceSuite) TestTransferReplica() {
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg1",
CollectionID: 1,
NumReplica: 0,
})
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
CollectionID: 1,
ID: 111,
@ -655,6 +673,15 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() {
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
req := &querypb.LoadCollectionRequest{
CollectionID: 0,
ReplicaNumber: 2,
ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"},
}
resp, err := server.LoadCollection(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
// Test load with partitions loaded
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
@ -713,13 +740,22 @@ func (suite *ServiceSuite) TestLoadPartition() {
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
}
req := &querypb.LoadPartitionsRequest{
CollectionID: suite.collections[0],
PartitionIDs: suite.partitions[suite.collections[0]],
ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"},
}
resp, err := server.LoadPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
// Test when server is not healthy
server.UpdateStateCode(commonpb.StateCode_Initializing)
req := &querypb.LoadPartitionsRequest{
req = &querypb.LoadPartitionsRequest{
CollectionID: suite.collections[0],
PartitionIDs: suite.partitions[suite.collections[0]],
}
resp, err := server.LoadPartitions(ctx, req)
resp, err = server.LoadPartitions(ctx, req)
suite.NoError(err)
suite.Contains(resp.Reason, ErrNotHealthy.Error())
}

View File

@ -7,7 +7,7 @@ from common.common_type import CaseLabel, CheckTasks
from utils.util_pymilvus import *
@pytest.mark.skip(reason="debugging")
@pytest.mark.skip(reason="will cause concurrent problems")
class TestResourceGroupParams(TestcaseBase):
@pytest.mark.tags(CaseLabel.L0)
def test_rg_default(self):
@ -311,7 +311,7 @@ class TestResourceGroupParams(TestcaseBase):
self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name,
check_task=ct.CheckTasks.check_rg_property,
check_items=default_rg_info)
error = {ct.err_code: 999, ct.err_msg: 'failed to drop resource group, err=delete default rg is not permitted'}
error = {ct.err_code: 5, ct.err_msg: 'delete default rg is not permitted'}
self.utility_wrap.drop_resource_group(name=ct.default_resource_group_name,
check_task=CheckTasks.err_res,
check_items=error)
@ -1344,6 +1344,57 @@ class TestResourceGroupMultiNodes(TestcaseBase):
"limit": ct.default_limit}
)
@pytest.mark.tags(CaseLabel.L0)
def test_transfer_nodes_back(self):
self._connect()
rgA_name = "rgA"
self.init_resource_group(name=rgA_name)
self.utility_wrap.transfer_node(source=ct.default_resource_group_name,
target=rgA_name,
num_node=2)
dim = 128
collection_w = self.init_collection_wrap(shards_num=1)
insert_ids = []
nb = 500
for i in range(5):
res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i * nb))
collection_w.flush()
insert_ids.extend(res.primary_keys)
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
collection_w.load(replica_number=2, _resource_groups=[rgA_name, ct.default_resource_group_name])
nq = 5
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# verify search succ
collection_w.search(vectors[:nq],
ct.default_float_vec_field_name,
ct.default_search_params,
ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids.copy(),
"limit": ct.default_limit}
)
self.utility_wrap.transfer_node(source=rgA_name,
target=ct.default_resource_group_name,
num_node=2)
time.sleep(10)
collection_w.search(vectors[:nq],
ct.default_float_vec_field_name,
ct.default_search_params,
ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids.copy(),
"limit": ct.default_limit}
)
collection_w.get_replicas()
@pytest.mark.tags(CaseLabel.L0)
def test_transfer_replica_not_enough_replicas_to_transfer(self):
"""