fix unstable auto balance config ut (#28288)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-11-09 10:00:22 +08:00 committed by GitHub
parent 7dda2e8814
commit b9bf910039
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 52 deletions

View File

@ -401,7 +401,7 @@ func (s *Server) startDataCoord() {
}
func (s *Server) afterStart() {
go s.updateBalanceConfigLoop(s.ctx)
s.updateBalanceConfigLoop(s.ctx)
}
func (s *Server) initCluster() error {
@ -1117,30 +1117,47 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
}
func (s *Server) updateBalanceConfigLoop(ctx context.Context) {
log := log.Ctx(s.ctx).WithRateGroup("dc.updateBalanceConfigLoop", 1, 60)
ticker := time.NewTicker(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second))
for {
select {
case <-ctx.Done():
log.Info("update balance config loop exit!")
return
case <-ticker.C:
r := semver.MustParseRange("<2.3.0")
sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r)
if err != nil {
log.Warn("check data node version occur error on etcd", zap.Error(err))
continue
}
if len(sessions) == 0 {
// only balance channel when all data node's version > 2.3.0
Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true")
log.Info("all old data node down, enable auto balance!")
return
}
log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions)))
}
success := s.updateBalanceConfig()
if success {
return
}
s.serverLoopWg.Add(1)
go func() {
defer s.serverLoopWg.Done()
ticker := time.NewTicker(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("update balance config loop exit!")
return
case <-ticker.C:
success := s.updateBalanceConfig()
if success {
return
}
}
}
}()
}
func (s *Server) updateBalanceConfig() bool {
r := semver.MustParseRange("<2.3.0")
sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r)
if err != nil {
log.Warn("check data node version occur error on etcd", zap.Error(err))
return false
}
if len(sessions) == 0 {
// only balance channel when all data node's version > 2.3.0
Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true")
log.Info("all old data node down, enable auto balance!")
return true
}
log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions)))
return false
}

View File

@ -391,7 +391,7 @@ func (s *Server) initObserver() {
}
func (s *Server) afterStart() {
go s.updateBalanceConfigLoop(s.ctx)
s.updateBalanceConfigLoop(s.ctx)
}
func (s *Server) Start() error {
@ -797,30 +797,48 @@ func (s *Server) checkReplicas() {
}
func (s *Server) updateBalanceConfigLoop(ctx context.Context) {
log := log.Ctx(s.ctx).WithRateGroup("qcv2.updateBalanceConfigLoop", 1, 60)
ticker := time.NewTicker(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second))
for {
select {
case <-ctx.Done():
log.Info("update balance config loop exit!")
return
case <-ticker.C:
r := semver.MustParseRange("<2.3.0")
sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.QueryNodeRole, r)
if err != nil {
log.Warn("check query node version occur error on etcd", zap.Error(err))
continue
}
if len(sessions) == 0 {
// only balance channel when all query node's version >= 2.3.0
Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
log.Info("all old query node down, enable auto balance!")
return
}
log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions)))
}
success := s.updateBalanceConfig()
if success {
return
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
ticker := time.NewTicker(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("update balance config loop exit!")
return
case <-ticker.C:
success := s.updateBalanceConfig()
if success {
return
}
}
}
}()
}
func (s *Server) updateBalanceConfig() bool {
log := log.Ctx(s.ctx).WithRateGroup("qcv2.updateBalanceConfigLoop", 1, 60)
r := semver.MustParseRange("<2.3.0")
sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.QueryNodeRole, r)
if err != nil {
log.Warn("check query node version occur error on etcd", zap.Error(err))
return false
}
if len(sessions) == 0 {
// only balance channel when all query node's version >= 2.3.0
Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
log.Info("all old query node down, enable auto balance!")
return true
}
log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions)))
return false
}