mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-04 12:59:23 +08:00
fix: forbid balancing level zero segments (#29130)
we can't balance the L0 segments related #29128 --------- Signed-off-by: yah01 <yah2er0ne@outlook.com> Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
cb5c06c718
commit
2f0c7a6544
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||||
@ -191,7 +192,8 @@ func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, on
|
|||||||
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)
|
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)
|
||||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
|
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||||
|
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||||
})
|
})
|
||||||
plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes)
|
plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes)
|
||||||
for i := range plans {
|
for i := range plans {
|
||||||
@ -213,7 +215,8 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode
|
|||||||
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node)
|
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node)
|
||||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil
|
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||||
|
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||||
})
|
})
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
for _, s := range segments {
|
for _, s := range segments {
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
@ -136,7 +137,9 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
|
|||||||
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
|
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
|
||||||
// Only balance segments in targets
|
// Only balance segments in targets
|
||||||
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
|
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
|
||||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil
|
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||||
|
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||||
|
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||||
})
|
})
|
||||||
|
|
||||||
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
|
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
|
||||||
|
@ -604,6 +604,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
|
|||||||
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
|
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
|
||||||
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
|
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
|
||||||
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
|
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
|
||||||
|
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
|
||||||
|
|
||||||
// 2. set up target for distribution for multi collections
|
// 2. set up target for distribution for multi collections
|
||||||
for node, s := range c.distributions {
|
for node, s := range c.distributions {
|
||||||
|
Loading…
Reference in New Issue
Block a user