From 2f0c7a6544de58669061f2a04fdc6e25ba81c7b7 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 12 Dec 2023 20:38:38 +0800 Subject: [PATCH] fix: forbid balancing level zero segments (#29130) we can't balance the L0 segments related #29128 --------- Signed-off-by: yah01 Signed-off-by: yah01 --- internal/querycoordv2/balance/rowcount_based_balancer.go | 7 +++++-- internal/querycoordv2/balance/score_based_balancer.go | 5 ++++- internal/querycoordv2/balance/score_based_balancer_test.go | 1 + 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 23c180621c..ba3a6ff4b8 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" @@ -191,7 +192,8 @@ func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, on dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool { return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && - b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil + b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && + segment.GetLevel() != datapb.SegmentLevel_L0 }) plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes) for i := range plans { @@ -213,7 +215,8 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node) segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool { return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && - b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil + b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && + segment.GetLevel() != datapb.SegmentLevel_L0 }) rowCount := 0 for _, s := range segments { diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 7351e8ae27..0a8421dd4c 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "golang.org/x/exp/maps" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -136,7 +137,9 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid) // Only balance segments in targets segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool { - return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil + return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && + b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && + segment.GetLevel() != datapb.SegmentLevel_L0 }) if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 38efd181cf..9839d89338 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -604,6 +604,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) // 2. set up target for distribution for multi collections for node, s := range c.distributions {