milvus/internal/proxyservice/task_scheduler.go

133 lines
3.1 KiB
Go
Raw Normal View History

package proxyservice
import (
"context"
"sync"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
type TaskScheduler struct {
RegisterLinkTaskQueue TaskQueue
RegisterNodeTaskQueue TaskQueue
InvalidateCollectionMetaCacheTaskQueue TaskQueue
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewTaskScheduler(ctx context.Context) *TaskScheduler {
ctx1, cancel := context.WithCancel(ctx)
return &TaskScheduler{
RegisterLinkTaskQueue: NewBaseTaskQueue(),
RegisterNodeTaskQueue: NewBaseTaskQueue(),
InvalidateCollectionMetaCacheTaskQueue: NewBaseTaskQueue(),
ctx: ctx1,
cancel: cancel,
}
}
func (sched *TaskScheduler) scheduleRegisterLinkTask() task {
return sched.RegisterLinkTaskQueue.PopTask()
}
func (sched *TaskScheduler) scheduleRegisterNodeTask() task {
return sched.RegisterNodeTaskQueue.PopTask()
}
func (sched *TaskScheduler) scheduleInvalidateCollectionMetaCacheTask() task {
return sched.InvalidateCollectionMetaCacheTaskQueue.PopTask()
}
func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
span, ctx := trace.StartSpanFromContext(t.Ctx(),
opentracing.Tags{
"Type": t.Name(),
})
defer span.Finish()
span.LogFields(oplog.String("scheduler process PreExecute", t.Name()))
err := t.PreExecute(ctx)
defer func() {
trace.LogError(span, err)
t.Notify(err)
}()
if err != nil {
return
}
span.LogFields(oplog.String("scheduler process Execute", t.Name()))
err = t.Execute(ctx)
if err != nil {
trace.LogError(span, err)
return
}
span.LogFields(oplog.String("scheduler process PostExecute", t.Name()))
err = t.PostExecute(ctx)
}
func (sched *TaskScheduler) registerLinkLoop() {
defer sched.wg.Done()
for {
select {
case <-sched.ctx.Done():
return
case <-sched.RegisterLinkTaskQueue.Chan():
if !sched.RegisterLinkTaskQueue.Empty() {
t := sched.scheduleRegisterLinkTask()
go sched.processTask(t, sched.RegisterLinkTaskQueue)
}
}
}
}
func (sched *TaskScheduler) registerNodeLoop() {
defer sched.wg.Done()
for {
select {
case <-sched.ctx.Done():
return
case <-sched.RegisterNodeTaskQueue.Chan():
if !sched.RegisterNodeTaskQueue.Empty() {
t := sched.scheduleRegisterNodeTask()
go sched.processTask(t, sched.RegisterNodeTaskQueue)
}
}
}
}
func (sched *TaskScheduler) invalidateCollectionMetaCacheLoop() {
defer sched.wg.Done()
for {
select {
case <-sched.ctx.Done():
return
case <-sched.InvalidateCollectionMetaCacheTaskQueue.Chan():
if !sched.InvalidateCollectionMetaCacheTaskQueue.Empty() {
t := sched.scheduleInvalidateCollectionMetaCacheTask()
go sched.processTask(t, sched.InvalidateCollectionMetaCacheTaskQueue)
}
}
}
}
func (sched *TaskScheduler) Start() {
sched.wg.Add(1)
go sched.registerLinkLoop()
sched.wg.Add(1)
go sched.registerNodeLoop()
sched.wg.Add(1)
go sched.invalidateCollectionMetaCacheLoop()
}
func (sched *TaskScheduler) Close() {
sched.cancel()
sched.wg.Wait()
}