2022-09-21 15:46:51 +08:00
|
|
|
package rootcoord
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/retry"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
defaultBgExecutingParallel = 4
|
|
|
|
defaultBgExecutingInterval = time.Second
|
|
|
|
)
|
|
|
|
|
|
|
|
type StepExecutor interface {
|
|
|
|
Start()
|
|
|
|
Stop()
|
|
|
|
AddSteps(s *stepStack)
|
|
|
|
}
|
|
|
|
|
|
|
|
type stepStack struct {
|
|
|
|
steps []nestedStep
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *stepStack) Execute(ctx context.Context) *stepStack {
|
|
|
|
steps := s.steps
|
|
|
|
for len(steps) > 0 {
|
|
|
|
l := len(steps)
|
|
|
|
todo := steps[l-1]
|
|
|
|
childSteps, err := todo.Execute(ctx)
|
2022-10-25 14:19:30 +08:00
|
|
|
// TODO: maybe a interface `step.LogOnError` is better.
|
|
|
|
_, skipLog := todo.(*waitForTsSyncedStep)
|
2022-09-21 15:46:51 +08:00
|
|
|
if retry.IsUnRecoverable(err) {
|
2022-10-25 14:19:30 +08:00
|
|
|
if !skipLog {
|
|
|
|
log.Warn("failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc()))
|
|
|
|
}
|
2022-09-21 15:46:51 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
s.steps = nil // let s can be collected.
|
2022-10-25 14:19:30 +08:00
|
|
|
if !skipLog {
|
|
|
|
log.Warn("failed to execute step, wait for reschedule", zap.Error(err), zap.String("step", todo.Desc()))
|
|
|
|
}
|
2022-09-21 15:46:51 +08:00
|
|
|
return &stepStack{steps: steps}
|
|
|
|
}
|
|
|
|
// this step is done.
|
|
|
|
steps = steps[:l-1]
|
|
|
|
steps = append(steps, childSteps...)
|
|
|
|
}
|
|
|
|
// everything is done.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type selectStepPolicy func(map[*stepStack]struct{}) []*stepStack
|
|
|
|
|
|
|
|
func randomSelect(parallel int, m map[*stepStack]struct{}) []*stepStack {
|
|
|
|
if parallel <= 0 {
|
|
|
|
parallel = defaultBgExecutingParallel
|
|
|
|
}
|
|
|
|
res := make([]*stepStack, 0, parallel)
|
|
|
|
for s := range m {
|
|
|
|
if len(res) >= parallel {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
res = append(res, s)
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
|
|
|
func randomSelectPolicy(parallel int) selectStepPolicy {
|
|
|
|
return func(m map[*stepStack]struct{}) []*stepStack {
|
|
|
|
return randomSelect(parallel, m)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func defaultSelectPolicy() selectStepPolicy {
|
|
|
|
return randomSelectPolicy(defaultBgExecutingParallel)
|
|
|
|
}
|
|
|
|
|
|
|
|
type bgOpt func(*bgStepExecutor)
|
|
|
|
|
|
|
|
func withSelectStepPolicy(policy selectStepPolicy) bgOpt {
|
|
|
|
return func(bg *bgStepExecutor) {
|
|
|
|
bg.selector = policy
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func withBgInterval(interval time.Duration) bgOpt {
|
|
|
|
return func(bg *bgStepExecutor) {
|
|
|
|
bg.interval = interval
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type bgStepExecutor struct {
|
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
|
|
|
wg sync.WaitGroup
|
|
|
|
bufferedSteps map[*stepStack]struct{}
|
|
|
|
selector selectStepPolicy
|
|
|
|
mu sync.Mutex
|
2022-09-22 17:36:52 +08:00
|
|
|
notifyChan chan struct{}
|
2022-09-21 15:46:51 +08:00
|
|
|
interval time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
func newBgStepExecutor(ctx context.Context, opts ...bgOpt) *bgStepExecutor {
|
|
|
|
ctx1, cancel := context.WithCancel(ctx)
|
|
|
|
bg := &bgStepExecutor{
|
|
|
|
ctx: ctx1,
|
|
|
|
cancel: cancel,
|
|
|
|
wg: sync.WaitGroup{},
|
|
|
|
bufferedSteps: make(map[*stepStack]struct{}),
|
|
|
|
selector: defaultSelectPolicy(),
|
|
|
|
mu: sync.Mutex{},
|
2022-09-22 17:36:52 +08:00
|
|
|
notifyChan: make(chan struct{}, 1),
|
2022-09-21 15:46:51 +08:00
|
|
|
interval: defaultBgExecutingInterval,
|
|
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(bg)
|
|
|
|
}
|
|
|
|
return bg
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) Start() {
|
|
|
|
bg.wg.Add(1)
|
|
|
|
go bg.scheduleLoop()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) Stop() {
|
|
|
|
bg.cancel()
|
|
|
|
bg.wg.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) AddSteps(s *stepStack) {
|
|
|
|
bg.addStepsInternal(s)
|
2022-09-22 17:36:52 +08:00
|
|
|
bg.notify()
|
2022-09-21 15:46:51 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) process(steps []*stepStack) {
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
for i := range steps {
|
|
|
|
s := steps[i]
|
|
|
|
if s == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
child := s.Execute(bg.ctx)
|
|
|
|
if child != nil {
|
2022-09-22 17:36:52 +08:00
|
|
|
// don't notify, wait for reschedule.
|
|
|
|
bg.addStepsInternal(child)
|
2022-09-21 15:46:51 +08:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) schedule() {
|
|
|
|
bg.mu.Lock()
|
|
|
|
selected := bg.selector(bg.bufferedSteps)
|
|
|
|
for _, s := range selected {
|
2022-09-22 17:36:52 +08:00
|
|
|
bg.unlockRemoveSteps(s)
|
2022-09-21 15:46:51 +08:00
|
|
|
}
|
|
|
|
bg.mu.Unlock()
|
|
|
|
|
|
|
|
bg.process(selected)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) scheduleLoop() {
|
|
|
|
defer bg.wg.Done()
|
|
|
|
|
|
|
|
ticker := time.NewTicker(bg.interval)
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-bg.ctx.Done():
|
|
|
|
return
|
2022-09-22 17:36:52 +08:00
|
|
|
case <-bg.notifyChan:
|
2022-09-21 15:46:51 +08:00
|
|
|
bg.schedule()
|
|
|
|
case <-ticker.C:
|
|
|
|
bg.schedule()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) addStepsInternal(s *stepStack) {
|
2022-09-22 17:36:52 +08:00
|
|
|
bg.mu.Lock()
|
|
|
|
bg.unlockAddSteps(s)
|
|
|
|
bg.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bg *bgStepExecutor) unlockAddSteps(s *stepStack) {
|
2022-09-21 15:46:51 +08:00
|
|
|
bg.bufferedSteps[s] = struct{}{}
|
|
|
|
}
|
|
|
|
|
2022-09-22 17:36:52 +08:00
|
|
|
func (bg *bgStepExecutor) unlockRemoveSteps(s *stepStack) {
|
2022-09-21 15:46:51 +08:00
|
|
|
delete(bg.bufferedSteps, s)
|
|
|
|
}
|
2022-09-22 17:36:52 +08:00
|
|
|
|
|
|
|
func (bg *bgStepExecutor) notify() {
|
|
|
|
select {
|
|
|
|
case bg.notifyChan <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|