mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 19:39:21 +08:00
Fix ddl tt go back (#22617)
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
parent
f1a60b295c
commit
e3c11069eb
@ -22,11 +22,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/tso"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
)
|
||||
@ -70,6 +69,9 @@ func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAlloc
|
||||
func (s *scheduler) Start() {
|
||||
s.wg.Add(1)
|
||||
go s.taskLoop()
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.syncTsLoop()
|
||||
}
|
||||
|
||||
func (s *scheduler) Stop() {
|
||||
@ -88,6 +90,20 @@ func (s *scheduler) execute(task task) {
|
||||
}
|
||||
|
||||
func (s *scheduler) taskLoop() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case task := <-s.taskChan:
|
||||
s.execute(task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncTsLoop send a base task into queue periodically, the base task will gain the latest ts which is bigger than
|
||||
// everyone in the queue. The scheduler will update the ts after the task is finished.
|
||||
func (s *scheduler) syncTsLoop() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
|
||||
defer ticker.Stop()
|
||||
@ -97,22 +113,14 @@ func (s *scheduler) taskLoop() {
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.updateLatestTsoAsMinDdlTs()
|
||||
case task := <-s.taskChan:
|
||||
s.execute(task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scheduler) updateLatestTsoAsMinDdlTs() {
|
||||
if len(s.taskChan) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ts, err := s.tsoAllocator.GenerateTSO(1)
|
||||
if err != nil {
|
||||
log.Warn("failed to generate tso, ignore to update min ddl ts", zap.Error(err))
|
||||
} else {
|
||||
s.setMinDdlTs(ts)
|
||||
t := newBaseTask(context.Background(), nil)
|
||||
if err := s.AddTask(&t); err != nil {
|
||||
log.Warn("failed to update latest ddl ts", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,6 +101,7 @@ func Test_scheduler_failed_to_set_id(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
s := newScheduler(ctx, idAlloc, tsoAlloc)
|
||||
s.Start()
|
||||
time.Sleep(time.Second)
|
||||
defer s.Stop()
|
||||
task := newMockNormalTask()
|
||||
err := s.AddTask(task)
|
||||
@ -119,6 +120,7 @@ func Test_scheduler_failed_to_set_ts(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
s := newScheduler(ctx, idAlloc, tsoAlloc)
|
||||
s.Start()
|
||||
time.Sleep(time.Second)
|
||||
defer s.Stop()
|
||||
task := newMockNormalTask()
|
||||
err := s.AddTask(task)
|
||||
|
Loading…
Reference in New Issue
Block a user