2022-10-11 11:39:22 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2022-09-15 18:48:32 +08:00
|
|
|
package checkers
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-05-04 12:22:40 +08:00
|
|
|
"sort"
|
2022-12-07 18:01:19 +08:00
|
|
|
"time"
|
2022-09-15 18:48:32 +08:00
|
|
|
|
2023-09-21 09:45:27 +08:00
|
|
|
"github.com/samber/lo"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2023-05-04 12:22:40 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
2022-09-15 18:48:32 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
|
2023-05-04 12:22:40 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
2022-09-15 18:48:32 +08:00
|
|
|
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
2023-05-04 12:22:40 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
2022-09-15 18:48:32 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
2024-01-05 15:54:55 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
2023-05-04 12:22:40 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
2022-09-15 18:48:32 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
// BalanceChecker checks the cluster distribution and generates balance tasks.
|
|
|
|
type BalanceChecker struct {
|
2023-11-24 18:08:24 +08:00
|
|
|
*checkerActivation
|
2022-09-15 18:48:32 +08:00
|
|
|
balance.Balance
|
2023-05-04 12:22:40 +08:00
|
|
|
meta *meta.Meta
|
|
|
|
nodeManager *session.NodeManager
|
|
|
|
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
|
|
|
|
scheduler task.Scheduler
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
|
2023-05-04 12:22:40 +08:00
|
|
|
func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *session.NodeManager, scheduler task.Scheduler) *BalanceChecker {
|
2022-09-15 18:48:32 +08:00
|
|
|
return &BalanceChecker{
|
2023-11-24 18:08:24 +08:00
|
|
|
checkerActivation: newCheckerActivation(),
|
2023-05-04 12:22:40 +08:00
|
|
|
Balance: balancer,
|
|
|
|
meta: meta,
|
|
|
|
nodeManager: nodeMgr,
|
|
|
|
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
|
|
|
|
scheduler: scheduler,
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-05 15:54:55 +08:00
|
|
|
func (b *BalanceChecker) ID() utils.CheckerType {
|
|
|
|
return utils.BalanceChecker
|
2023-10-27 01:08:12 +08:00
|
|
|
}
|
|
|
|
|
2022-09-15 18:48:32 +08:00
|
|
|
func (b *BalanceChecker) Description() string {
|
|
|
|
return "BalanceChecker checks the cluster distribution and generates balance tasks"
|
|
|
|
}
|
|
|
|
|
2023-05-04 12:22:40 +08:00
|
|
|
func (b *BalanceChecker) replicasToBalance() []int64 {
|
|
|
|
ids := b.meta.GetAll()
|
|
|
|
|
|
|
|
// all replicas belonging to loading collection will be skipped
|
|
|
|
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
|
|
|
|
collection := b.meta.GetCollection(cid)
|
2023-05-11 12:47:20 +08:00
|
|
|
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
|
2023-05-04 12:22:40 +08:00
|
|
|
})
|
|
|
|
sort.Slice(loadedCollections, func(i, j int) bool {
|
|
|
|
return loadedCollections[i] < loadedCollections[j]
|
|
|
|
})
|
|
|
|
|
|
|
|
// balance collections influenced by stopping nodes
|
|
|
|
stoppingReplicas := make([]int64, 0)
|
|
|
|
for _, cid := range loadedCollections {
|
|
|
|
replicas := b.meta.ReplicaManager.GetByCollection(cid)
|
|
|
|
for _, replica := range replicas {
|
|
|
|
for _, nodeID := range replica.GetNodes() {
|
|
|
|
isStopping, _ := b.nodeManager.IsStoppingNode(nodeID)
|
|
|
|
if isStopping {
|
|
|
|
stoppingReplicas = append(stoppingReplicas, replica.GetID())
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-09-21 09:45:27 +08:00
|
|
|
// do stopping balance only in this round
|
2023-05-04 12:22:40 +08:00
|
|
|
if len(stoppingReplicas) > 0 {
|
|
|
|
return stoppingReplicas
|
|
|
|
}
|
|
|
|
|
2023-09-21 09:45:27 +08:00
|
|
|
// no stopping balance and auto balance is disabled, return empty collections for balance
|
2023-01-06 14:45:35 +08:00
|
|
|
if !Params.QueryCoordCfg.AutoBalance.GetAsBool() {
|
2023-05-04 12:22:40 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// scheduler is handling segment task, skip
|
|
|
|
if b.scheduler.GetSegmentTaskNum() != 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-09-21 09:45:27 +08:00
|
|
|
// iterator one normal collection in one round
|
2023-05-04 12:22:40 +08:00
|
|
|
normalReplicasToBalance := make([]int64, 0)
|
2023-06-20 14:14:41 +08:00
|
|
|
hasUnbalancedCollection := false
|
2023-05-04 12:22:40 +08:00
|
|
|
for _, cid := range loadedCollections {
|
|
|
|
if b.normalBalanceCollectionsCurrentRound.Contain(cid) {
|
|
|
|
log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round",
|
|
|
|
zap.Int64("collectionID", cid))
|
|
|
|
continue
|
|
|
|
}
|
2023-06-20 14:14:41 +08:00
|
|
|
hasUnbalancedCollection = true
|
2023-05-04 12:22:40 +08:00
|
|
|
b.normalBalanceCollectionsCurrentRound.Insert(cid)
|
|
|
|
for _, replica := range b.meta.ReplicaManager.GetByCollection(cid) {
|
|
|
|
normalReplicasToBalance = append(normalReplicasToBalance, replica.GetID())
|
|
|
|
}
|
|
|
|
break
|
2023-01-06 14:45:35 +08:00
|
|
|
}
|
2023-05-04 12:22:40 +08:00
|
|
|
|
2023-06-20 14:14:41 +08:00
|
|
|
if !hasUnbalancedCollection {
|
2023-05-04 12:22:40 +08:00
|
|
|
b.normalBalanceCollectionsCurrentRound.Clear()
|
2023-06-20 14:14:41 +08:00
|
|
|
log.RatedDebug(10, "ScoreBasedBalancer has balanced all "+
|
2023-05-04 12:22:40 +08:00
|
|
|
"collections in one round, clear collectionIDs for this round")
|
|
|
|
}
|
|
|
|
return normalReplicasToBalance
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *BalanceChecker) balanceReplicas(replicaIDs []int64) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
|
|
|
|
segmentPlans, channelPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0)
|
|
|
|
for _, rid := range replicaIDs {
|
|
|
|
replica := b.meta.ReplicaManager.Get(rid)
|
2023-07-17 16:56:35 +08:00
|
|
|
if replica == nil {
|
|
|
|
continue
|
|
|
|
}
|
2023-05-04 12:22:40 +08:00
|
|
|
sPlans, cPlans := b.Balance.BalanceReplica(replica)
|
|
|
|
segmentPlans = append(segmentPlans, sPlans...)
|
|
|
|
channelPlans = append(channelPlans, cPlans...)
|
|
|
|
if len(segmentPlans) != 0 || len(channelPlans) != 0 {
|
|
|
|
balance.PrintNewBalancePlans(replica.GetCollectionID(), replica.GetID(), sPlans, cPlans)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return segmentPlans, channelPlans
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
|
2023-11-24 18:08:24 +08:00
|
|
|
if !b.IsActive() {
|
|
|
|
return nil
|
|
|
|
}
|
2023-05-04 12:22:40 +08:00
|
|
|
ret := make([]task.Task, 0)
|
|
|
|
|
|
|
|
replicasToBalance := b.replicasToBalance()
|
|
|
|
segmentPlans, channelPlans := b.balanceReplicas(replicasToBalance)
|
2022-11-07 14:53:02 +08:00
|
|
|
|
2022-12-07 18:01:19 +08:00
|
|
|
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
|
2023-04-11 14:38:30 +08:00
|
|
|
task.SetPriority(task.TaskPriorityLow, tasks...)
|
2023-07-24 17:07:00 +08:00
|
|
|
task.SetReason("segment unbalanced", tasks...)
|
2022-09-15 18:48:32 +08:00
|
|
|
ret = append(ret, tasks...)
|
2022-11-07 14:53:02 +08:00
|
|
|
|
2022-12-07 18:01:19 +08:00
|
|
|
tasks = balance.CreateChannelTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), channelPlans)
|
2023-07-24 17:07:00 +08:00
|
|
|
task.SetReason("channel unbalanced", tasks...)
|
2022-09-15 18:48:32 +08:00
|
|
|
ret = append(ret, tasks...)
|
|
|
|
return ret
|
|
|
|
}
|