enhance: Enable to dynamic update balancer policy in querycoord (#33037) (#33272)

issue: #33036
pr: #33037
This PR enable to dynamic update balancer policy without restart
querycoord.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-05-23 15:43:41 +08:00 committed by GitHub
parent d6bc95de55
commit 32bfd9befa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 73 additions and 55 deletions

View File

@ -39,28 +39,28 @@ import (
// BalanceChecker checks the cluster distribution and generates balance tasks.
type BalanceChecker struct {
*checkerActivation
balance.Balance
meta *meta.Meta
nodeManager *session.NodeManager
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
scheduler task.Scheduler
targetMgr *meta.TargetManager
getBalancerFunc GetBalancerFunc
}
func NewBalanceChecker(meta *meta.Meta,
targetMgr *meta.TargetManager,
balancer balance.Balance,
nodeMgr *session.NodeManager,
scheduler task.Scheduler,
getBalancerFunc GetBalancerFunc,
) *BalanceChecker {
return &BalanceChecker{
checkerActivation: newCheckerActivation(),
Balance: balancer,
meta: meta,
targetMgr: targetMgr,
nodeManager: nodeMgr,
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
scheduler: scheduler,
getBalancerFunc: getBalancerFunc,
}
}
@ -155,7 +155,7 @@ func (b *BalanceChecker) balanceReplicas(replicaIDs []int64) ([]balance.SegmentA
if replica == nil {
continue
}
sPlans, cPlans := b.Balance.BalanceReplica(replica)
sPlans, cPlans := b.getBalancerFunc().BalanceReplica(replica)
segmentPlans = append(segmentPlans, sPlans...)
channelPlans = append(channelPlans, cPlans...)
if len(segmentPlans) != 0 || len(channelPlans) != 0 {

View File

@ -78,7 +78,7 @@ func (suite *BalanceCheckerTestSuite) SetupTest() {
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.balancer = balance.NewMockBalancer(suite.T())
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.balancer, suite.nodeMgr, suite.scheduler)
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer })
}
func (suite *BalanceCheckerTestSuite) TearDownTest() {

View File

@ -36,27 +36,27 @@ import (
// TODO(sunby): have too much similar codes with SegmentChecker
type ChannelChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
nodeMgr *session.NodeManager
balancer balance.Balance
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
nodeMgr *session.NodeManager
getBalancerFunc GetBalancerFunc
}
func NewChannelChecker(
meta *meta.Meta,
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
balancer balance.Balance,
nodeMgr *session.NodeManager,
getBalancerFunc GetBalancerFunc,
) *ChannelChecker {
return &ChannelChecker{
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
targetMgr: targetMgr,
balancer: balancer,
nodeMgr: nodeMgr,
getBalancerFunc: getBalancerFunc,
}
}
@ -215,7 +215,7 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*
if len(rwNodes) == 0 {
rwNodes = replica.GetRWNodes()
}
plan := c.balancer.AssignChannel([]*meta.DmChannel{ch}, rwNodes, false)
plan := c.getBalancerFunc().AssignChannel([]*meta.DmChannel{ch}, rwNodes, false)
plans = append(plans, plan...)
}

View File

@ -77,7 +77,7 @@ func (suite *ChannelCheckerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
balancer := suite.createMockBalancer()
suite.checker = NewChannelChecker(suite.meta, distManager, targetManager, balancer, suite.nodeMgr)
suite.checker = NewChannelChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer })
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()
}

View File

@ -35,6 +35,8 @@ import (
var errTypeNotFound = errors.New("checker type not found")
type GetBalancerFunc = func() balance.Balance
type CheckerController struct {
cancel context.CancelFunc
manualCheckChs map[utils.CheckerType]chan struct{}
@ -55,17 +57,17 @@ func NewCheckerController(
meta *meta.Meta,
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
balancer balance.Balance,
nodeMgr *session.NodeManager,
scheduler task.Scheduler,
broker meta.Broker,
getBalancerFunc GetBalancerFunc,
) *CheckerController {
// CheckerController runs checkers with the order,
// the former checker has higher priority
checkers := map[utils.CheckerType]Checker{
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer, nodeMgr),
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, balancer, nodeMgr, scheduler),
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc),
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr),
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr),
}

View File

@ -77,7 +77,8 @@ func (suite *ControllerBaseTestSuite) SetupTest() {
suite.balancer = balance.NewMockBalancer(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())
suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler, suite.broker)
suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.nodeMgr, suite.scheduler, suite.broker, func() balance.Balance { return suite.balancer })
}
func (s *ControllerBaseTestSuite) TestActivation() {

View File

@ -81,7 +81,7 @@ func (suite *CheckerControllerSuite) SetupTest() {
suite.balancer = balance.NewMockBalancer(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())
suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler, suite.broker)
suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.nodeMgr, suite.scheduler, suite.broker, func() balance.Balance { return suite.balancer })
}
func (suite *CheckerControllerSuite) TestBasic() {

View File

@ -41,27 +41,27 @@ const initialTargetVersion = int64(0)
type SegmentChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
balancer balance.Balance
nodeMgr *session.NodeManager
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
nodeMgr *session.NodeManager
getBalancerFunc GetBalancerFunc
}
func NewSegmentChecker(
meta *meta.Meta,
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
balancer balance.Balance,
nodeMgr *session.NodeManager,
getBalancerFunc GetBalancerFunc,
) *SegmentChecker {
return &SegmentChecker{
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
targetMgr: targetMgr,
balancer: balancer,
nodeMgr: nodeMgr,
getBalancerFunc: getBalancerFunc,
}
}
@ -403,7 +403,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
SegmentInfo: s,
}
})
shardPlans := c.balancer.AssignSegment(replica.GetCollectionID(), segmentInfos, rwNodes, false)
shardPlans := c.getBalancerFunc().AssignSegment(replica.GetCollectionID(), segmentInfos, rwNodes, false)
for i := range shardPlans {
shardPlans[i].Replica = replica
}

View File

@ -77,7 +77,7 @@ func (suite *SegmentCheckerTestSuite) SetupTest() {
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
balancer := suite.createMockBalancer()
suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, balancer, suite.nodeMgr)
suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer })
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()
}

View File

@ -99,7 +99,7 @@ func (s *Server) balanceSegments(ctx context.Context,
copyMode bool,
) error {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID), zap.Int64("srcNode", srcNode))
plans := s.balancer.AssignSegment(collectionID, segments, dstNodes, true)
plans := s.getBalancerFunc().AssignSegment(collectionID, segments, dstNodes, true)
for i := range plans {
plans[i].From = srcNode
plans[i].Replica = replica
@ -175,7 +175,7 @@ func (s *Server) balanceChannels(ctx context.Context,
) error {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
plans := s.balancer.AssignChannel(channels, dstNodes, true)
plans := s.getBalancerFunc().AssignChannel(channels, dstNodes, true)
for i := range plans {
plans[i].From = srcNode
plans[i].Replica = replica

View File

@ -121,7 +121,7 @@ func (suite *OpsServiceSuite) SetupTest() {
suite.distController = dist.NewMockController(suite.T())
suite.checkerController = checkers.NewCheckerController(suite.meta, suite.distMgr,
suite.targetMgr, suite.balancer, suite.nodeMgr, suite.taskScheduler, suite.broker)
suite.targetMgr, suite.nodeMgr, suite.taskScheduler, suite.broker, func() balance.Balance { return suite.balancer })
suite.server = &Server{
kv: suite.kv,
@ -137,7 +137,7 @@ func (suite *OpsServiceSuite) SetupTest() {
cluster: suite.cluster,
jobScheduler: suite.jobScheduler,
taskScheduler: suite.taskScheduler,
balancer: suite.balancer,
getBalancerFunc: func() balance.Balance { return suite.balancer },
distController: suite.distController,
ctx: context.Background(),
checkerController: suite.checkerController,

View File

@ -115,7 +115,9 @@ type Server struct {
resourceObserver *observers.ResourceObserver
leaderCacheObserver *observers.LeaderCacheObserver
balancer balance.Balance
getBalancerFunc checkers.GetBalancerFunc
balancerMap map[string]balance.Balance
balancerLock sync.RWMutex
// Active-standby
enableActiveStandBy bool
@ -137,6 +139,7 @@ func NewQueryCoord(ctx context.Context) (*Server, error) {
cancel: cancel,
nodeUpEventChan: make(chan int64, 10240),
notifyNodeUp: make(chan struct{}),
balancerMap: make(map[string]balance.Balance),
}
server.UpdateStateCode(commonpb.StateCode_Abnormal)
server.queryNodeCreator = session.DefaultQueryNodeCreator
@ -287,34 +290,46 @@ func (s *Server) initQueryCoord() error {
s.taskScheduler,
)
// Init balancer map and balancer
log.Info("init balancer")
switch params.Params.QueryCoordCfg.Balancer.GetValue() {
case meta.RoundRobinBalancerName:
s.balancer = balance.NewRoundRobinBalancer(s.taskScheduler, s.nodeMgr)
case meta.RowCountBasedBalancerName:
s.balancer = balance.NewRowCountBasedBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
case meta.ScoreBasedBalancerName:
s.balancer = balance.NewScoreBasedBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
case meta.MultiTargetBalancerName:
s.balancer = balance.NewMultiTargetBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
case meta.ChannelLevelScoreBalancerName:
s.balancer = balance.NewChannelLevelScoreBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
default:
log.Info(fmt.Sprintf("default to use %s", meta.ScoreBasedBalancerName))
s.balancer = balance.NewScoreBasedBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
}
// Init checker controller
log.Info("init checker controller")
s.getBalancerFunc = func() balance.Balance {
balanceKey := paramtable.Get().QueryCoordCfg.Balancer.GetValue()
s.balancerLock.Lock()
defer s.balancerLock.Unlock()
balancer, ok := s.balancerMap[balanceKey]
if ok {
return balancer
}
log.Info("switch to new balancer", zap.String("name", balanceKey))
switch balanceKey {
case meta.RoundRobinBalancerName:
balancer = balance.NewRoundRobinBalancer(s.taskScheduler, s.nodeMgr)
case meta.RowCountBasedBalancerName:
balancer = balance.NewRowCountBasedBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
case meta.ScoreBasedBalancerName:
balancer = balance.NewScoreBasedBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
case meta.MultiTargetBalancerName:
balancer = balance.NewMultiTargetBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
case meta.ChannelLevelScoreBalancerName:
balancer = balance.NewChannelLevelScoreBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
default:
log.Info(fmt.Sprintf("default to use %s", meta.ScoreBasedBalancerName))
balancer = balance.NewScoreBasedBalancer(s.taskScheduler, s.nodeMgr, s.dist, s.meta, s.targetMgr)
}
s.balancerMap[balanceKey] = balancer
return balancer
}
s.checkerController = checkers.NewCheckerController(
s.meta,
s.dist,
s.targetMgr,
s.balancer,
s.nodeMgr,
s.taskScheduler,
s.broker,
s.getBalancerFunc,
)
// Init observers

View File

@ -567,10 +567,10 @@ func (suite *ServerSuite) hackServer() {
suite.server.meta,
suite.server.dist,
suite.server.targetMgr,
suite.server.balancer,
suite.server.nodeMgr,
suite.server.taskScheduler,
suite.server.broker,
suite.server.getBalancerFunc,
)
suite.server.targetObserver = observers.NewTargetObserver(
suite.server.meta,

View File

@ -201,7 +201,7 @@ func (suite *ServiceSuite) SetupTest() {
cluster: suite.cluster,
jobScheduler: suite.jobScheduler,
taskScheduler: suite.taskScheduler,
balancer: suite.balancer,
getBalancerFunc: func() balance.Balance { return suite.balancer },
distController: suite.distController,
ctx: context.Background(),
}