Fix snapshot of timestmap statistics (#6543)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
This commit is contained in:
dragondriver 2021-07-15 17:47:55 +08:00 committed by GitHub
parent 68a5d6219f
commit 84b9791221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -210,41 +210,23 @@ type pChanStatInfo struct {
type DmTaskQueue struct {
BaseTaskQueue
lock sync.Mutex
statsLock sync.RWMutex
pChanStatisticsInfos map[pChan]*pChanStatInfo
}
func (queue *DmTaskQueue) Enqueue(t task) error {
err := t.OnEnqueue()
queue.lock.Lock()
defer queue.lock.Unlock()
err := queue.BaseTaskQueue.Enqueue(t)
if err != nil {
return err
}
ts, err := queue.sched.tsoAllocator.AllocOne()
if err != nil {
return err
}
t.SetTs(ts)
_ = queue.addPChanStats(t)
reqID, err := queue.sched.idAllocator.AllocOne()
if err != nil {
return err
}
t.SetID(reqID)
return queue.addUnissuedTask(t)
}
func (queue *DmTaskQueue) addUnissuedTask(t task) error {
queue.utLock.Lock()
defer queue.utLock.Unlock()
if queue.utFull() {
return errors.New("task queue is full")
}
queue.unissuedTasks.PushBack(t)
queue.addPChanStats(t)
queue.utBufChan <- 1
return nil
}
@ -281,12 +263,12 @@ func (queue *DmTaskQueue) addPChanStats(t task) error {
queue.pChanStatisticsInfos[cName] = info
} else {
if info.minTs > stat.minTs {
info.minTs = stat.minTs
queue.pChanStatisticsInfos[cName].minTs = stat.minTs
}
if info.maxTs < stat.maxTs {
info.maxTs = stat.maxTs
queue.pChanStatisticsInfos[cName].maxTs = stat.maxTs
}
info.refCnt++
queue.pChanStatisticsInfos[cName].refCnt++
}
}
queue.statsLock.Unlock()
@ -446,7 +428,16 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
"ID": t.ID(),
})
defer span.Finish()
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
q.AddActiveTask(t)
defer func() {
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
q.PopActiveTask(t.ID())
}()
span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID()))
err := t.PreExecute(ctx)
defer func() {
@ -457,19 +448,13 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
return
}
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
q.AddActiveTask(t)
defer func() {
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
q.PopActiveTask(t.ID())
}()
span.LogFields(oplog.Int64("scheduler process Execute", t.ID()))
err = t.Execute(ctx)
if err != nil {
trace.LogError(span, err)
return
}
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
err = t.PostExecute(ctx)
}