gtimer updates; grand updates, change buffer slice type from uint64 to uint32

This commit is contained in:
John 2019-01-17 14:15:23 +08:00
parent 14fcd0b2f9
commit bf25a3a601
9 changed files with 75 additions and 39 deletions

View File

@ -20,6 +20,7 @@ import (
const ( const (
STATUS_READY = 0 STATUS_READY = 0
STATUS_RUNNING = 1 STATUS_RUNNING = 1
STATUS_STOPPED = 2
STATUS_CLOSED = -1 STATUS_CLOSED = -1
gPANIC_EXIT = "exit" gPANIC_EXIT = "exit"
gDEFAULT_TIMES = math.MaxInt32 gDEFAULT_TIMES = math.MaxInt32

View File

@ -16,12 +16,11 @@ type Entry struct {
job JobFunc // 注册循环任务方法 job JobFunc // 注册循环任务方法
wheel *wheel // 所属时间轮 wheel *wheel // 所属时间轮
singleton *gtype.Bool // 任务是否单例运行 singleton *gtype.Bool // 任务是否单例运行
status *gtype.Int // 任务状态(0: ready; 1: running; -1: closed) status *gtype.Int // 任务状态(0: ready; 1: running; 2: stopped; -1: closed)
times *gtype.Int // 还需运行次数 times *gtype.Int // 还需运行次数
create int64 // 注册时的时间轮ticks create int64 // 注册时的时间轮ticks
interval int64 // 设置的运行间隔(时间轮刻度数量) interval int64 // 设置的运行间隔(时间轮刻度数量)
createMs int64 // 创建时间(毫秒) createMs int64 // 创建时间(毫秒)
updateMs int64 // 更新时间(上一次检查/运行时间, 毫秒)
intervalMs int64 // 间隔时间(毫秒) intervalMs int64 // 间隔时间(毫秒)
} }
@ -48,7 +47,6 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti
create : ticks, create : ticks,
interval : num, interval : num,
createMs : nowMs, createMs : nowMs,
updateMs : nowMs,
intervalMs : ms, intervalMs : ms,
} }
// 安装任务 // 安装任务
@ -56,14 +54,13 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti
return entry return entry
} }
// 递增添加任务 // 重新添加任务
func (w *wheel) reAddEntry(entry *Entry, nowTicks int64, nowMs int64) { func (w *wheel) reAddEntry(entry *Entry, nowTicks int64, nowMs int64) {
left := entry.interval - (nowTicks - entry.create) left := entry.interval - (nowTicks - entry.create)
if left <= 0 { if left <= 0 {
left = entry.interval left = entry.interval
entry.create = nowTicks entry.create = nowTicks
entry.createMs = nowMs entry.createMs = nowMs
entry.updateMs = nowMs
} }
w.slots[(nowTicks + left) % w.number].PushBack(entry) w.slots[(nowTicks + left) % w.number].PushBack(entry)
} }
@ -78,6 +75,16 @@ func (entry *Entry) SetStatus(status int) int {
return entry.status.Set(status) return entry.status.Set(status)
} }
// 启动当前任务
func (entry *Entry) Start() {
entry.status.Set(STATUS_READY)
}
// 停止当前任务
func (entry *Entry) Stop() {
entry.status.Set(STATUS_STOPPED)
}
// 关闭当前任务 // 关闭当前任务
func (entry *Entry) Close() { func (entry *Entry) Close() {
entry.status.Set(STATUS_CLOSED) entry.status.Set(STATUS_CLOSED)
@ -105,6 +112,10 @@ func (entry *Entry) Run() {
// 检测当前任务是否可运行, 参数为当前时间的纳秒数, 精度更高 // 检测当前任务是否可运行, 参数为当前时间的纳秒数, 精度更高
func (entry *Entry) check(nowTicks int64, nowMs int64) bool { func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
// 是否停止
if entry.status.Val() == STATUS_STOPPED {
return false
}
// 运行检查 // 运行检查
if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 { if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 {
// 是否单例 // 是否单例
@ -126,16 +137,18 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
times = gDEFAULT_TIMES times = gDEFAULT_TIMES
entry.times.Set(gDEFAULT_TIMES) entry.times.Set(gDEFAULT_TIMES)
} }
// 分层转换 // 分层转换处理
if entry.wheel.level > 0 { if entry.wheel.level > 0 {
if diffMs := nowMs - entry.updateMs; diffMs < entry.intervalMs { // 是否达到任务运行间隔
if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.wheels.intervalMs { if diffMs := nowMs - entry.createMs; diffMs < entry.intervalMs {
// 任务是否有必要进行分层转换
if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs {
delay := time.Duration(leftMs)*time.Millisecond delay := time.Duration(leftMs)*time.Millisecond
// 往底层添加 // 往底层添加
entry.wheel.wheels.addEntry(delay, entry.job, false, 1) entry.wheel.timer.addEntry(delay, entry.job, false, 1)
// 延迟重新添加 // 延迟重新添加
if times > 0 { if times > 0 {
entry.wheel.wheels.DelayAddTimes( entry.wheel.timer.DelayAddTimes(
delay, delay,
time.Duration(entry.intervalMs)*time.Millisecond, time.Duration(entry.intervalMs)*time.Millisecond,
times, times,

View File

@ -17,12 +17,17 @@ func (w *wheel) start() {
ticker := time.NewTicker(time.Duration(w.intervalMs)*time.Millisecond) ticker := time.NewTicker(time.Duration(w.intervalMs)*time.Millisecond)
for { for {
select { select {
case <- w.closed:
ticker.Stop()
return
case <- ticker.C: case <- ticker.C:
w.proceed() switch w.timer.status.Val() {
case STATUS_READY: fallthrough
case STATUS_RUNNING:
w.proceed()
case STATUS_STOPPED:
case STATUS_CLOSED:
ticker.Stop()
return
}
} }
} }
}() }()

View File

@ -14,6 +14,7 @@ import (
// 定时器/分层时间轮 // 定时器/分层时间轮
type Timer struct { type Timer struct {
status *gtype.Int // 状态
wheels []*wheel // 分层 wheels []*wheel // 分层
length int // 层数 length int // 层数
number int // 每一层Slot Number number int // 每一层Slot Number
@ -27,6 +28,7 @@ func New(slot int, interval time.Duration, level...int) *Timer {
length = level[0] length = level[0]
} }
t := &Timer { t := &Timer {
status : gtype.NewInt(STATUS_RUNNING),
wheels : make([]*wheel, length), wheels : make([]*wheel, length),
length : length, length : length,
number : slot, number : slot,
@ -49,11 +51,10 @@ func New(slot int, interval time.Duration, level...int) *Timer {
// 创建自定义的循环任务管理对象 // 创建自定义的循环任务管理对象
func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel { func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel {
w := &wheel { w := &wheel {
wheels : t, timer : t,
level : level, level : level,
slots : make([]*glist.List, slot), slots : make([]*glist.List, slot),
number : int64(slot), number : int64(slot),
closed : make(chan struct{}, 1),
ticks : gtype.NewInt64(), ticks : gtype.NewInt64(),
totalMs : int64(slot)*interval.Nanoseconds()/1e6, totalMs : int64(slot)*interval.Nanoseconds()/1e6,
createMs : time.Now().UnixNano()/1e6, createMs : time.Now().UnixNano()/1e6,
@ -113,11 +114,19 @@ func (t *Timer) DelayAddTimes(delay time.Duration, interval time.Duration, times
}) })
} }
// 关闭分层时间轮 // 启动定时器
func (t *Timer) Start() {
t.status.Set(STATUS_RUNNING)
}
// 定制定时器
func (t *Timer) Stop() {
t.status.Set(STATUS_STOPPED)
}
// 关闭定时器
func (t *Timer) Close() { func (t *Timer) Close() {
for _, w := range t.wheels { t.status.Set(STATUS_CLOSED)
w.Close()
}
} }
// 添加定时任务 // 添加定时任务

View File

@ -13,18 +13,12 @@ import (
// 单层时间轮 // 单层时间轮
type wheel struct { type wheel struct {
wheels *Timer // 所属定时器 timer *Timer // 所属定时器
level int // 所属分层索引号 level int // 所属分层索引号
slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组 slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组
number int64 // Slot Number number int64 // Slot Number
closed chan struct{} // 停止事件
ticks *gtype.Int64 // 当前时间轮已转动的刻度数量 ticks *gtype.Int64 // 当前时间轮已转动的刻度数量
totalMs int64 // 整个时间轮的时间长度(毫秒)=number*interval totalMs int64 // 整个时间轮的时间长度(毫秒)=number*interval
createMs int64 // 创建时间(毫秒) createMs int64 // 创建时间(毫秒)
intervalMs int64 // 时间间隔(slot时间长度, 毫秒) intervalMs int64 // 时间间隔(slot时间长度, 毫秒)
} }
// 关闭循环任务
func (w *wheel) Close() {
w.closed <- struct{}{}
}

View File

@ -12,12 +12,20 @@ import (
"time" "time"
) )
var (
timer = gtimer.New(5, 30*time.Millisecond)
)
func Benchmark_Add(b *testing.B) { func Benchmark_Add(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
// 基准测试的时候不能设置为1秒否则大量的任务会崩掉系统 timer.Add(time.Hour, func() {
gtimer.Add(time.Hour, func() {
}) })
} }
} }
func Benchmark_StartStop(b *testing.B) {
for i := 0; i < b.N; i++ {
timer.Start()
timer.Stop()
}
}

View File

@ -23,9 +23,15 @@ func TestTimer_Entry_Operation(t *testing.T) {
}) })
time.Sleep(1200*time.Millisecond) time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1) gtest.Assert(array.Len(), 1)
entry.Close() entry.Stop()
time.Sleep(1200*time.Millisecond) time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1) gtest.Assert(array.Len(), 1)
entry.Start()
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 2)
entry.Close()
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 2)
} }
func TestTimer_Entry_Singleton(t *testing.T) { func TestTimer_Entry_Singleton(t *testing.T) {

View File

@ -12,11 +12,11 @@ import (
) )
const ( const (
gBUFFER_SIZE = 10000 // 缓冲区uint64数量大小 gBUFFER_SIZE = 10000 // 缓冲区uint32数量大小
) )
var ( var (
bufferChan = make(chan uint64, gBUFFER_SIZE) bufferChan = make(chan uint32, gBUFFER_SIZE)
) )
// 使用缓冲区实现快速的随机数生成 // 使用缓冲区实现快速的随机数生成
@ -29,14 +29,14 @@ func init() {
panic(err) panic(err)
} else { } else {
// 使用缓冲区数据进行一次完整的随机数生成 // 使用缓冲区数据进行一次完整的随机数生成
for i := 0; i < n - 8; { for i := 0; i < n - 4; {
bufferChan <- binary.LittleEndian.Uint64(buffer[i : i + 8]) bufferChan <- binary.LittleEndian.Uint32(buffer[i : i + 4])
i ++ i ++
} }
// 充分利用缓冲区数据,随机索引递增 // 充分利用缓冲区数据,随机索引递增
step = int(buffer[0])%10 step = int(buffer[0])%10
for i := 0; i < n - 8; { for i := 0; i < n - 4; {
bufferChan <- binary.BigEndian.Uint64(buffer[i : i + 8]) bufferChan <- binary.BigEndian.Uint32(buffer[i : i + 4])
i += step i += step
} }
} }

View File

@ -8,7 +8,7 @@ import (
func main() { func main() {
now := time.Now() now := time.Now()
interval := 1000*time.Millisecond interval := 1400*time.Millisecond
gtimer.Add(interval, func() { gtimer.Add(interval, func() {
fmt.Println(time.Now(), time.Duration(time.Now().UnixNano() - now.UnixNano())) fmt.Println(time.Now(), time.Duration(time.Now().UnixNano() - now.UnixNano()))
now = time.Now() now = time.Now()