fixing gtimer

This commit is contained in:
John Guo 2021-01-18 21:20:10 +08:00
parent 826b9b1b1f
commit 93a648ba15
6 changed files with 132 additions and 43 deletions

View File

@ -36,7 +36,7 @@ const (
panicExit = "exit" // Internal usage for custom job exit function with panic.
defaultTimes = math.MaxInt32 // Default limit running times, a big number.
defaultSlotNumber = 10 // Default slot number.
defaultWheelInterval = 50 // Default wheel interval.
defaultWheelInterval = 60 // Default wheel interval.
defaultWheelLevel = 5 // Default wheel level.
cmdEnvKey = "gf.gtimer" // Configuration key for command argument or environment.
)

View File

@ -14,16 +14,16 @@ import (
// Entry is the timing job entry to wheel.
type Entry struct {
name string
wheel *wheel // Belonged wheel.
job JobFunc // The job function.
singleton *gtype.Bool // Singleton mode.
status *gtype.Int // Job status.
times *gtype.Int // Limit running times.
create int64 // Timer ticks when the job installed.
interval int64 // The interval ticks of the job.
intervalTicks int64 // The interval ticks of the job.
createTicks int64 // Timer ticks when the job installed.
createMs int64 // The timestamp in milliseconds when job installed.
intervalMs int64 // The interval milliseconds of the job.
rawIntervalMs int64 // Raw input interval in milliseconds.
}
// JobFunc is the job function.
@ -50,12 +50,11 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti
job: job,
times: gtype.NewInt(times),
status: gtype.NewInt(status),
create: ticks,
interval: num,
createTicks: ticks,
intervalTicks: num,
singleton: gtype.NewBool(singleton),
createMs: nowMs,
intervalMs: ms,
rawIntervalMs: ms,
}
// Install the job to the list of the slot.
w.slots[(ticks+num)%w.number].PushBack(entry)
@ -63,26 +62,31 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti
}
// addEntryByParent adds a timing job with parent entry.
func (w *wheel) addEntryByParent(interval int64, parent *Entry) *Entry {
num := interval / w.intervalMs
if num == 0 {
num = 1
func (w *wheel) addEntryByParent(jobRan bool, nowMs, interval int64, parent *Entry) *Entry {
intervalTicks := interval / w.intervalMs
if intervalTicks == 0 {
intervalTicks = 1
}
nowMs := time.Now().UnixNano() / 1e6
ticks := w.ticks.Val()
nowTicks := w.ticks.Val()
entry := &Entry{
name: parent.name,
wheel: w,
job: parent.job,
times: parent.times,
status: parent.status,
create: ticks,
interval: num,
intervalTicks: intervalTicks,
singleton: parent.singleton,
createTicks: nowTicks,
createMs: nowMs,
intervalMs: interval,
rawIntervalMs: parent.rawIntervalMs,
}
w.slots[(ticks+num)%w.number].PushBack(entry)
if !jobRan {
entry.createMs = parent.createMs
if parent.wheel.level == w.level {
entry.createTicks = parent.createTicks
}
}
w.slots[(nowTicks+intervalTicks)%w.number].PushBack(entry)
return entry
}
@ -147,14 +151,17 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool)
return false, true
}
// Firstly checks using the ticks, this may be low precision as one tick is a little bit long.
if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 {
//if entry.name == "1" {
// intlog.Print("check:", nowTicks-entry.createTicks, nowTicks, entry.createTicks, entry.interval)
//}
if diff := nowTicks - entry.createTicks; diff > 0 && diff%entry.intervalTicks == 0 {
// If not the lowest level wheel.
if entry.wheel.level > 0 {
diffMs := nowMs - entry.createMs
switch {
// Add it to the next slot, which means it will run on next interval.
case diffMs < entry.wheel.timer.intervalMs:
entry.wheel.slots[(nowTicks+entry.interval)%entry.wheel.number].PushBack(entry)
entry.wheel.slots[(nowTicks+entry.intervalTicks)%entry.wheel.number].PushBack(entry)
return false, false
// Normal rolls on the job.
@ -163,7 +170,7 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool)
// if it is greater than the minimum interval, then re-install it.
if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs {
// Re-calculate and re-installs the job proper slot.
entry.wheel.timer.doAddEntryByParent(leftMs, entry)
entry.wheel.timer.doAddEntryByParent(true, nowMs, leftMs, entry)
return false, false
}
}
@ -188,6 +195,9 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool)
if times < 2000000000 && times > 1000000000 {
entry.times.Set(defaultTimes)
}
//if entry.name == "1" {
// intlog.Print("runnable:", nowTicks-entry.createTicks, nowTicks, entry.createTicks, entry.createTicks, entry.interval)
//}
return true, true
}
return false, true

View File

@ -7,6 +7,7 @@
package gtimer
import (
"fmt"
"time"
"github.com/gogf/gf/container/glist"
@ -15,7 +16,10 @@ import (
// start starts the ticker using a standalone goroutine.
func (w *wheel) start() {
go func() {
ticker := time.NewTicker(time.Duration(w.intervalMs) * time.Millisecond)
var (
tickDuration = time.Duration(w.intervalMs) * time.Millisecond
ticker = time.NewTicker(tickDuration)
)
for {
select {
case <-ticker.C:
@ -41,13 +45,15 @@ func (w *wheel) start() {
// or else it removes from current slot and re-installs the job to another wheel and slot
// according to its leftover interval in milliseconds.
func (w *wheel) proceed() {
n := w.ticks.Add(1)
l := w.slots[int(n%w.number)]
length := l.Len()
var (
n = w.ticks.Add(1)
l = w.slots[int(n%w.number)]
length = l.Len()
nowMs = w.timer.nowFunc().UnixNano() / 1e6
)
if length > 0 {
go func(l *glist.List, nowTicks int64) {
entry := (*Entry)(nil)
nowMs := time.Now().UnixNano() / 1e6
for i := length; i > 0; i-- {
if v := l.PopFront(); v == nil {
break
@ -71,16 +77,19 @@ func (w *wheel) proceed() {
entry.SetStatus(StatusReady)
}
}()
if entry.wheel.level == 5 {
fmt.Println(entry.name, entry.wheel.level)
}
entry.job()
}(entry)
}
// If rolls on the job.
// Add job again, which make the job continuous running.
if addable {
//If StatusReset , reset to runnable state.
// If StatusReset, reset to runnable state.
if entry.Status() == StatusReset {
entry.SetStatus(StatusReady)
}
entry.wheel.timer.doAddEntryByParent(entry.rawIntervalMs, entry)
entry.wheel.timer.doAddEntryByParent(runnable, nowMs, entry.intervalMs, entry)
}
}
}(l, n)

View File

@ -16,11 +16,12 @@ import (
// Timer is a Hierarchical Timing Wheel manager for timing jobs.
type Timer struct {
status *gtype.Int // Timer status.
wheels []*wheel // The underlying wheels.
length int // Max level of the wheels.
number int // Slot Number of each wheel.
intervalMs int64 // Interval of the slot in milliseconds.
status *gtype.Int // Timer status.
wheels []*wheel // The underlying wheels.
length int // Max level of the wheels.
number int // Slot Number of each wheel.
intervalMs int64 // Interval of the slot in milliseconds.
nowFunc func() time.Time // nowFunc returns the current time, which can be custom.
}
// Wheel is a slot wrapper for timing job install and uninstall.
@ -40,6 +41,12 @@ type wheel struct {
// The optional parameter <level> specifies the wheels count of the timer,
// which is defaultWheelLevel in default.
func New(slot int, interval time.Duration, level ...int) *Timer {
t := doNewWithoutAutoStart(slot, interval, level...)
t.wheels[0].start()
return t
}
func doNewWithoutAutoStart(slot int, interval time.Duration, level ...int) *Timer {
if slot <= 0 {
panic(fmt.Sprintf("invalid slot number: %d", slot))
}
@ -53,6 +60,9 @@ func New(slot int, interval time.Duration, level ...int) *Timer {
length: length,
number: slot,
intervalMs: interval.Nanoseconds() / 1e6,
nowFunc: func() time.Time {
return time.Now()
},
}
for i := 0; i < length; i++ {
if i > 0 {
@ -63,11 +73,14 @@ func New(slot int, interval time.Duration, level ...int) *Timer {
w := t.newWheel(i, slot, n)
t.wheels[i] = w
t.wheels[i-1].addEntry(n, w.proceed, false, defaultTimes, StatusReady)
if i == length-1 {
t.wheels[i].addEntry(n, w.proceed, false, defaultTimes, StatusReady)
}
} else {
t.wheels[i] = t.newWheel(i, slot, interval)
w := t.newWheel(i, slot, interval)
t.wheels[i] = w
}
}
t.wheels[0].start()
return t
}
@ -185,8 +198,8 @@ func (t *Timer) doAddEntry(interval time.Duration, job JobFunc, singleton bool,
}
// doAddEntryByParent adds a timing job to timer with parent entry for internal usage.
func (t *Timer) doAddEntryByParent(interval int64, parent *Entry) *Entry {
return t.wheels[t.getLevelByIntervalMs(interval)].addEntryByParent(interval, parent)
func (t *Timer) doAddEntryByParent(jobRan bool, nowMs, interval int64, parent *Entry) *Entry {
return t.wheels[t.getLevelByIntervalMs(interval)].addEntryByParent(jobRan, nowMs, interval, parent)
}
// getLevelByIntervalMs calculates and returns the level of timer wheel with given milliseconds.

View File

@ -31,11 +31,11 @@ func TestSetTimeout(t *testing.T) {
func TestSetInterval(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
array := garray.New(true)
gtimer.SetInterval(200*time.Millisecond, func() {
gtimer.SetInterval(300*time.Millisecond, func() {
array.Append(1)
})
time.Sleep(1100 * time.Millisecond)
t.Assert(array.Len(), 5)
time.Sleep(1000 * time.Millisecond)
t.Assert(array.Len(), 3)
})
}
@ -76,12 +76,12 @@ func TestAddTimes(t *testing.T) {
func TestDelayAdd(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
array := garray.New(true)
gtimer.DelayAdd(200*time.Millisecond, 200*time.Millisecond, func() {
gtimer.DelayAdd(500*time.Millisecond, 500*time.Millisecond, func() {
array.Append(1)
})
time.Sleep(300 * time.Millisecond)
time.Sleep(600 * time.Millisecond)
t.Assert(array.Len(), 0)
time.Sleep(200 * time.Millisecond)
time.Sleep(600 * time.Millisecond)
t.Assert(array.Len(), 1)
})
}

View File

@ -0,0 +1,57 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// Entry Operations
package gtimer
import (
"testing"
"time"
)
func TestTimer(t *testing.T) {
timer := doNewWithoutAutoStart(10, 60*time.Millisecond, 4)
timer.AddOnce(2*time.Second, func() {
t.Log("2*time.Second")
})
timer.AddOnce(1*time.Minute, func() {
t.Log("1*time.Minute")
})
timer.AddOnce(5*time.Minute, func() {
t.Log("5*time.Minute")
})
timer.AddOnce(1*time.Hour, func() {
t.Log("1*time.Hour")
})
timer.AddOnce(100*time.Minute, func() {
t.Log("100*time.Minute")
})
timer.AddOnce(2*time.Hour, func() {
t.Log("2*time.Hour")
})
timer.AddOnce(1000*time.Minute, func() {
t.Log("1000*time.Minute")
})
entry1 := timer.AddOnce(1100*time.Minute, func() {
t.Log("1100*time.Minute")
})
entry1.name = "1"
entry2 := timer.AddOnce(1200*time.Minute, func() {
t.Log("1200*time.Minute")
})
entry2.name = "2"
for i := 0; i < 10000000; i++ {
timer.nowFunc = func() time.Time {
return time.Now().Add(time.Duration(i) * time.Millisecond * 60)
}
timer.wheels[0].proceed()
time.Sleep(time.Microsecond)
}
t.Log("测试执行完成")
time.Sleep(time.Second)
}