From bf25a3a601e6b49a84fc912c03a659b35fd9f157 Mon Sep 17 00:00:00 2001 From: John Date: Thu, 17 Jan 2019 14:15:23 +0800 Subject: [PATCH] gtimer updates; grand updates, change buffer slice type from uint64 to uint32 --- g/os/gtimer/gtimer.go | 1 + g/os/gtimer/gtimer_entry.go | 33 ++++++++++++++++++++--------- g/os/gtimer/gtimer_loop.go | 15 ++++++++----- g/os/gtimer/gtimer_timer.go | 21 ++++++++++++------ g/os/gtimer/gtimer_wheel.go | 8 +------ g/os/gtimer/gtimer_z_bench_test.go | 14 +++++++++--- g/os/gtimer/gtimer_z_unit_2_test.go | 8 ++++++- g/util/grand/grand_intn.go | 12 +++++------ geg/os/gtimer/gtimer1.go | 2 +- 9 files changed, 75 insertions(+), 39 deletions(-) diff --git a/g/os/gtimer/gtimer.go b/g/os/gtimer/gtimer.go index 8b7bd5ac3..acc4c8e86 100644 --- a/g/os/gtimer/gtimer.go +++ b/g/os/gtimer/gtimer.go @@ -20,6 +20,7 @@ import ( const ( STATUS_READY = 0 STATUS_RUNNING = 1 + STATUS_STOPPED = 2 STATUS_CLOSED = -1 gPANIC_EXIT = "exit" gDEFAULT_TIMES = math.MaxInt32 diff --git a/g/os/gtimer/gtimer_entry.go b/g/os/gtimer/gtimer_entry.go index a7e3c765a..3659f3376 100644 --- a/g/os/gtimer/gtimer_entry.go +++ b/g/os/gtimer/gtimer_entry.go @@ -16,12 +16,11 @@ type Entry struct { job JobFunc // 注册循环任务方法 wheel *wheel // 所属时间轮 singleton *gtype.Bool // 任务是否单例运行 - status *gtype.Int // 任务状态(0: ready; 1: running; -1: closed) + status *gtype.Int // 任务状态(0: ready; 1: running; 2: stopped; -1: closed) times *gtype.Int // 还需运行次数 create int64 // 注册时的时间轮ticks interval int64 // 设置的运行间隔(时间轮刻度数量) createMs int64 // 创建时间(毫秒) - updateMs int64 // 更新时间(上一次检查/运行时间, 毫秒) intervalMs int64 // 间隔时间(毫秒) } @@ -48,7 +47,6 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti create : ticks, interval : num, createMs : nowMs, - updateMs : nowMs, intervalMs : ms, } // 安装任务 @@ -56,14 +54,13 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti return entry } -// 递增添加任务 +// 重新添加任务 func (w *wheel) reAddEntry(entry *Entry, nowTicks int64, nowMs int64) { left := entry.interval - (nowTicks - entry.create) if left <= 0 { left = entry.interval entry.create = nowTicks entry.createMs = nowMs - entry.updateMs = nowMs } w.slots[(nowTicks + left) % w.number].PushBack(entry) } @@ -78,6 +75,16 @@ func (entry *Entry) SetStatus(status int) int { return entry.status.Set(status) } +// 启动当前任务 +func (entry *Entry) Start() { + entry.status.Set(STATUS_READY) +} + +// 停止当前任务 +func (entry *Entry) Stop() { + entry.status.Set(STATUS_STOPPED) +} + // 关闭当前任务 func (entry *Entry) Close() { entry.status.Set(STATUS_CLOSED) @@ -105,6 +112,10 @@ func (entry *Entry) Run() { // 检测当前任务是否可运行, 参数为当前时间的纳秒数, 精度更高 func (entry *Entry) check(nowTicks int64, nowMs int64) bool { + // 是否停止 + if entry.status.Val() == STATUS_STOPPED { + return false + } // 运行检查 if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 { // 是否单例 @@ -126,16 +137,18 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) bool { times = gDEFAULT_TIMES entry.times.Set(gDEFAULT_TIMES) } - // 分层转换 + // 分层转换处理 if entry.wheel.level > 0 { - if diffMs := nowMs - entry.updateMs; diffMs < entry.intervalMs { - if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.wheels.intervalMs { + // 是否达到任务运行间隔 + if diffMs := nowMs - entry.createMs; diffMs < entry.intervalMs { + // 任务是否有必要进行分层转换 + if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs { delay := time.Duration(leftMs)*time.Millisecond // 往底层添加 - entry.wheel.wheels.addEntry(delay, entry.job, false, 1) + entry.wheel.timer.addEntry(delay, entry.job, false, 1) // 延迟重新添加 if times > 0 { - entry.wheel.wheels.DelayAddTimes( + entry.wheel.timer.DelayAddTimes( delay, time.Duration(entry.intervalMs)*time.Millisecond, times, diff --git a/g/os/gtimer/gtimer_loop.go b/g/os/gtimer/gtimer_loop.go index d21613606..965ce3c93 100644 --- a/g/os/gtimer/gtimer_loop.go +++ b/g/os/gtimer/gtimer_loop.go @@ -17,12 +17,17 @@ func (w *wheel) start() { ticker := time.NewTicker(time.Duration(w.intervalMs)*time.Millisecond) for { select { - case <- w.closed: - ticker.Stop() - return - case <- ticker.C: - w.proceed() + switch w.timer.status.Val() { + case STATUS_READY: fallthrough + case STATUS_RUNNING: + w.proceed() + case STATUS_STOPPED: + case STATUS_CLOSED: + ticker.Stop() + return + } + } } }() diff --git a/g/os/gtimer/gtimer_timer.go b/g/os/gtimer/gtimer_timer.go index 3fcae6e4e..6af82e1a1 100644 --- a/g/os/gtimer/gtimer_timer.go +++ b/g/os/gtimer/gtimer_timer.go @@ -14,6 +14,7 @@ import ( // 定时器/分层时间轮 type Timer struct { + status *gtype.Int // 状态 wheels []*wheel // 分层 length int // 层数 number int // 每一层Slot Number @@ -27,6 +28,7 @@ func New(slot int, interval time.Duration, level...int) *Timer { length = level[0] } t := &Timer { + status : gtype.NewInt(STATUS_RUNNING), wheels : make([]*wheel, length), length : length, number : slot, @@ -49,11 +51,10 @@ func New(slot int, interval time.Duration, level...int) *Timer { // 创建自定义的循环任务管理对象 func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel { w := &wheel { - wheels : t, + timer : t, level : level, slots : make([]*glist.List, slot), number : int64(slot), - closed : make(chan struct{}, 1), ticks : gtype.NewInt64(), totalMs : int64(slot)*interval.Nanoseconds()/1e6, createMs : time.Now().UnixNano()/1e6, @@ -113,11 +114,19 @@ func (t *Timer) DelayAddTimes(delay time.Duration, interval time.Duration, times }) } -// 关闭分层时间轮 +// 启动定时器 +func (t *Timer) Start() { + t.status.Set(STATUS_RUNNING) +} + +// 定制定时器 +func (t *Timer) Stop() { + t.status.Set(STATUS_STOPPED) +} + +// 关闭定时器 func (t *Timer) Close() { - for _, w := range t.wheels { - w.Close() - } + t.status.Set(STATUS_CLOSED) } // 添加定时任务 diff --git a/g/os/gtimer/gtimer_wheel.go b/g/os/gtimer/gtimer_wheel.go index 91a37eacd..eccc94fde 100644 --- a/g/os/gtimer/gtimer_wheel.go +++ b/g/os/gtimer/gtimer_wheel.go @@ -13,18 +13,12 @@ import ( // 单层时间轮 type wheel struct { - wheels *Timer // 所属定时器 + timer *Timer // 所属定时器 level int // 所属分层索引号 slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组 number int64 // Slot Number - closed chan struct{} // 停止事件 ticks *gtype.Int64 // 当前时间轮已转动的刻度数量 totalMs int64 // 整个时间轮的时间长度(毫秒)=number*interval createMs int64 // 创建时间(毫秒) intervalMs int64 // 时间间隔(slot时间长度, 毫秒) } - -// 关闭循环任务 -func (w *wheel) Close() { - w.closed <- struct{}{} -} \ No newline at end of file diff --git a/g/os/gtimer/gtimer_z_bench_test.go b/g/os/gtimer/gtimer_z_bench_test.go index c009fa829..441bfd676 100644 --- a/g/os/gtimer/gtimer_z_bench_test.go +++ b/g/os/gtimer/gtimer_z_bench_test.go @@ -12,12 +12,20 @@ import ( "time" ) - +var ( + timer = gtimer.New(5, 30*time.Millisecond) +) func Benchmark_Add(b *testing.B) { for i := 0; i < b.N; i++ { - // 基准测试的时候不能设置为1秒,否则大量的任务会崩掉系统 - gtimer.Add(time.Hour, func() { + timer.Add(time.Hour, func() { }) } } + +func Benchmark_StartStop(b *testing.B) { + for i := 0; i < b.N; i++ { + timer.Start() + timer.Stop() + } +} diff --git a/g/os/gtimer/gtimer_z_unit_2_test.go b/g/os/gtimer/gtimer_z_unit_2_test.go index d0da4e344..66a45880f 100644 --- a/g/os/gtimer/gtimer_z_unit_2_test.go +++ b/g/os/gtimer/gtimer_z_unit_2_test.go @@ -23,9 +23,15 @@ func TestTimer_Entry_Operation(t *testing.T) { }) time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 1) - entry.Close() + entry.Stop() time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 1) + entry.Start() + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 2) + entry.Close() + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 2) } func TestTimer_Entry_Singleton(t *testing.T) { diff --git a/g/util/grand/grand_intn.go b/g/util/grand/grand_intn.go index 61a154594..2309b7b4b 100644 --- a/g/util/grand/grand_intn.go +++ b/g/util/grand/grand_intn.go @@ -12,11 +12,11 @@ import ( ) const ( - gBUFFER_SIZE = 10000 // 缓冲区uint64数量大小 + gBUFFER_SIZE = 10000 // 缓冲区uint32数量大小 ) var ( - bufferChan = make(chan uint64, gBUFFER_SIZE) + bufferChan = make(chan uint32, gBUFFER_SIZE) ) // 使用缓冲区实现快速的随机数生成 @@ -29,14 +29,14 @@ func init() { panic(err) } else { // 使用缓冲区数据进行一次完整的随机数生成 - for i := 0; i < n - 8; { - bufferChan <- binary.LittleEndian.Uint64(buffer[i : i + 8]) + for i := 0; i < n - 4; { + bufferChan <- binary.LittleEndian.Uint32(buffer[i : i + 4]) i ++ } // 充分利用缓冲区数据,随机索引递增 step = int(buffer[0])%10 - for i := 0; i < n - 8; { - bufferChan <- binary.BigEndian.Uint64(buffer[i : i + 8]) + for i := 0; i < n - 4; { + bufferChan <- binary.BigEndian.Uint32(buffer[i : i + 4]) i += step } } diff --git a/geg/os/gtimer/gtimer1.go b/geg/os/gtimer/gtimer1.go index 9d1d525bd..f086d880e 100644 --- a/geg/os/gtimer/gtimer1.go +++ b/geg/os/gtimer/gtimer1.go @@ -8,7 +8,7 @@ import ( func main() { now := time.Now() - interval := 1000*time.Millisecond + interval := 1400*time.Millisecond gtimer.Add(interval, func() { fmt.Println(time.Now(), time.Duration(time.Now().UnixNano() - now.UnixNano())) now = time.Now()