enhance: Skip balance segment when channel need be balanced (#29116)

issue: #28622
After we support balance segment with growing segment count #28623, if
we balance segment and channel at same time, some segments need to be
rebalanced after balance channel finish.

This PR skip balance segment when channel need be balanced.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-12-14 16:44:43 +08:00 committed by GitHub
parent 5a7fb97e07
commit 008bae675d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 102 additions and 46 deletions

View File

@ -180,10 +180,21 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment
zap.Any("stoppingNodes", offlineNodes),
zap.Any("onlineNodes", onlineNodes),
)
return b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes), b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes)
channelPlans := b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes)
if len(channelPlans) == 0 {
return b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes), nil
}
return nil, channelPlans
}
return b.genSegmentPlan(replica, onlineNodes), b.genChannelPlan(replica, onlineNodes)
// segment balance will count the growing row num in delegator, so it's better to balance channel first,
// to avoid balance segment again after balance channel
channelPlans := b.genChannelPlan(replica, onlineNodes)
if len(channelPlans) == 0 {
return b.genSegmentPlan(replica, onlineNodes), nil
}
return nil, channelPlans
}
func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan {

View File

@ -217,7 +217,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
expectChannelPlans: []ChannelAssignPlan{},
},
{
name: "part stopping balance",
name: "part stopping balance channel",
nodes: []int64{1, 2, 3},
segmentCnts: []int{1, 2, 2},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateStopping},
@ -241,13 +241,41 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
},
{
name: "part stopping balance segment",
nodes: []int64{1, 2, 3},
segmentCnts: []int{1, 2, 2},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateStopping},
shouldMock: true,
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3},
},
},
distributionChannels: map[int64][]*meta.DmChannel{
2: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
},
1: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 1},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{},
},
{
name: "balance channel",
@ -488,17 +516,12 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
2: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
},
3: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{},
},
{
name: "not exist in next target",
@ -619,7 +642,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
expectChannelPlans []ChannelAssignPlan
}{
{
name: "balance out bound nodes",
name: "balance channel with outbound nodes",
nodes: []int64{1, 2, 3},
segmentCnts: []int{1, 2, 2},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
@ -643,13 +666,41 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
},
{
name: "balance segment with outbound node",
nodes: []int64{1, 2, 3},
segmentCnts: []int{1, 2, 2},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
shouldMock: true,
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3},
},
},
distributionChannels: map[int64][]*meta.DmChannel{
2: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2},
},
1: {
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 1},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{
{Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{},
},
}
@ -714,6 +765,12 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
suite.ElementsMatch(c.expectChannelPlans, channelPlans)
suite.ElementsMatch(c.expectPlans, segmentPlans)
// clean up distribution for next test
for node := range c.distributions {
balancer.dist.SegmentDistManager.Update(node)
balancer.dist.ChannelDistManager.Update(node)
}
})
}
}

View File

@ -191,13 +191,18 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
zap.Any("available nodes", maps.Keys(nodesSegments)),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
segmentPlans = append(segmentPlans, b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...)
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
if len(channelPlans) == 0 {
segmentPlans = append(segmentPlans, b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...)
}
} else {
// normal balance, find segments from largest score nodes and transfer to smallest score nodes.
segmentPlans = append(segmentPlans, b.getNormalSegmentPlan(replica, nodesSegments)...)
channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments))...)
if len(channelPlans) == 0 {
segmentPlans = append(segmentPlans, b.getNormalSegmentPlan(replica, nodesSegments)...)
}
}
if len(segmentPlans) != 0 || len(channelPlans) != 0 {
PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager, b.dist.SegmentDistManager)
}

View File

@ -71,7 +71,8 @@ func NewResourceGroup(capacity int) *ResourceGroup {
// assign node to resource group
func (rg *ResourceGroup) assignNode(id int64, deltaCapacity int) error {
if rg.containsNode(id) {
return ErrNodeAlreadyAssign
// add node to same rg more than once should be tolerable
return nil
}
rg.nodes.Insert(id)
@ -213,8 +214,12 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
}
rm.checkRGNodeStatus(rgName)
if rm.checkNodeAssigned(node) {
return ErrNodeAlreadyAssign
for name, group := range rm.groups {
// check whether node has been assign to other rg
if name != rgName && group.containsNode(node) {
return ErrNodeAlreadyAssign
}
}
newNodes := rm.groups[rgName].GetNodes()
@ -238,10 +243,7 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
return err
}
err = rm.groups[rgName].assignNode(node, deltaCapacity)
if err != nil {
return err
}
rm.groups[rgName].assignNode(node, deltaCapacity)
log.Info("add node to resource group",
zap.String("rgName", rgName),
@ -251,16 +253,6 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
return nil
}
func (rm *ResourceManager) checkNodeAssigned(node int64) bool {
for _, group := range rm.groups {
if group.containsNode(node) {
return true
}
}
return false
}
func (rm *ResourceManager) UnassignNode(rgName string, node int64) error {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
@ -542,17 +534,8 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) ([]
}
for _, node := range movedNodes {
err := rm.groups[from].unassignNode(node, deltaFromCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return nil, err
}
err = rm.groups[to].assignNode(node, deltaToCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return nil, err
}
rm.groups[from].unassignNode(node, deltaFromCapacity)
rm.groups[to].assignNode(node, deltaToCapacity)
log.Info("transfer node",
zap.String("sourceRG", from),