From 1efeb2515d7f7fcb0dc606a3494329a9ae5c1295 Mon Sep 17 00:00:00 2001 From: John Date: Sun, 30 Dec 2018 18:56:21 +0800 Subject: [PATCH] refract package 'gcron'; add package 'gtimew' for Time Wheel style job management --- g/container/gpool/gpool.go | 37 ++++++++------- g/os/gcache/gcache_mem_cache.go | 7 ++- g/os/gcron/gcron_cron.go | 17 +++---- g/os/gcron/gcron_jobloop.go | 23 ++++----- g/os/gcron/gcron_unit_1_test.go | 82 +++++++++++++++++---------------- g/os/gcron/gcron_unit_2_test.go | 18 ++++---- g/os/gcron/gcron_unit_3_test.go | 24 +++++----- 7 files changed, 104 insertions(+), 104 deletions(-) diff --git a/g/container/gpool/gpool.go b/g/container/gpool/gpool.go index cb9b8cb1b..7381df2f9 100644 --- a/g/container/gpool/gpool.go +++ b/g/container/gpool/gpool.go @@ -4,16 +4,15 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. -// Package gpool provides a object-reusable concurrent-safe pool. -// 对象复用池. +// Package gpool provides a object-reusable concurrent-safe pool/对象复用池. package gpool import ( - "time" "errors" - "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/os/gtimew" ) // 对象池 @@ -44,7 +43,7 @@ func New(expire int, newFunc...func() (interface{}, error)) *Pool { if len(newFunc) > 0 { r.NewFunc = newFunc[0] } - go r.expireCheckingLoop() + gtimew.AddSingleton(1, r.checkExpire) return r } @@ -100,22 +99,22 @@ func (p *Pool) Close() { } // 超时检测循环 -func (p *Pool) expireCheckingLoop() { - for !p.closed.Val() { - for { - if r := p.list.PopFront(); r != nil { - item := r.(*poolItem) - if item.expire == 0 || item.expire > gtime.Millisecond() { - p.list.PushFront(item) - break - } - if p.ExpireFunc != nil { - p.ExpireFunc(item.value) - } - } else { +func (p *Pool) checkExpire() { + if p.closed.Val() { + gtimew.ExitJob() + } + for { + if r := p.list.PopFront(); r != nil { + item := r.(*poolItem) + if item.expire == 0 || item.expire > gtime.Millisecond() { + p.list.PushFront(item) break } + if p.ExpireFunc != nil { + p.ExpireFunc(item.value) + } + } else { + break } - time.Sleep(time.Second) } } \ No newline at end of file diff --git a/g/os/gcache/gcache_mem_cache.go b/g/os/gcache/gcache_mem_cache.go index f56a1462b..fd53a952a 100644 --- a/g/os/gcache/gcache_mem_cache.go +++ b/g/os/gcache/gcache_mem_cache.go @@ -344,7 +344,8 @@ func (c *memCache) syncEventAndClearExpired() { if expireSet := c.getExpireSet(expireTime); expireSet != nil { // 遍历Set,执行数据过期删除 expireSet.Iterator(func(key interface{}) bool { - return c.clearByKey(key) + c.clearByKey(key) + return true }) // Set数据处理完之后删除该Set c.expireSetMu.Lock() @@ -355,7 +356,7 @@ func (c *memCache) syncEventAndClearExpired() { } // 删除对应键名的缓存数据 -func (c *memCache) clearByKey(key interface{}, force...bool) bool { +func (c *memCache) clearByKey(key interface{}, force...bool) { // 删除缓存数据 c.dataMu.Lock() // 删除核对,真正的过期才删除 @@ -373,6 +374,4 @@ func (c *memCache) clearByKey(key interface{}, force...bool) bool { if c.cap > 0 { c.lru.Remove(key) } - - return true } diff --git a/g/os/gcron/gcron_cron.go b/g/os/gcron/gcron_cron.go index 3270a9e32..bdce36055 100644 --- a/g/os/gcron/gcron_cron.go +++ b/g/os/gcron/gcron_cron.go @@ -12,8 +12,8 @@ import ( "gitee.com/johng/gf/g/container/garray" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/g/os/gtimew" "strconv" - "time" ) // 定时任务管理对象 @@ -76,32 +76,29 @@ func (c *Cron) AddOnce(pattern string, job func(), name ... string) (*Entry, err // 延迟添加定时任务,delay参数单位为秒 func (c *Cron) DelayAdd(delay int, pattern string, job func(), name ... string) { - go func() { - time.Sleep(time.Duration(delay)*time.Second) + gtimew.AddOnce(delay, func() { if _, err := c.Add(pattern, job, name ...); err != nil { panic(err) } - }() + }) } // 延迟添加单例定时任务,delay参数单位为秒 func (c *Cron) DelayAddSingleton(delay int, pattern string, job func(), name ... string) { - go func() { - time.Sleep(time.Duration(delay)*time.Second) + gtimew.AddOnce(delay, func() { if _, err := c.AddSingleton(pattern, job, name ...); err != nil { panic(err) } - }() + }) } // 延迟添加只运行一次的定时任务,delay参数单位为秒 func (c *Cron) DelayAddOnce(delay int, pattern string, job func(), name ... string) { - go func() { - time.Sleep(time.Duration(delay)*time.Second) + gtimew.AddOnce(delay, func() { if _, err := c.AddOnce(pattern, job, name ...); err != nil { panic(err) } - }() + }) } // 检索指定名称的定时任务 diff --git a/g/os/gcron/gcron_jobloop.go b/g/os/gcron/gcron_jobloop.go index 0f341f85d..91cb686ec 100644 --- a/g/os/gcron/gcron_jobloop.go +++ b/g/os/gcron/gcron_jobloop.go @@ -7,25 +7,24 @@ package gcron import ( - "gitee.com/johng/gf/g/container/garray" + "gitee.com/johng/gf/g/os/gtimew" "time" ) // 延迟添加定时任务,delay参数单位为秒 func (c *Cron) startLoop() { - go func() { - for c.status.Val() != STATUS_CLOSED { - time.Sleep(time.Second) - if c.status.Val() == STATUS_RUNNING { - go c.checkEntries(time.Now()) - } + gtimew.Add(1, func() { + if c.status.Val() == STATUS_CLOSED { + gtimew.ExitJob() } - }() + if c.status.Val() == STATUS_RUNNING { + go c.checkEntries(time.Now()) + } + }) } // 遍历检查可执行定时任务,并异步执行 func (c *Cron) checkEntries(t time.Time) { - removeArray := garray.NewStringArray(0, 0, false) c.entries.RLockFunc(func(m map[string]interface{}) { for _, v := range m { entry := v.(*Entry) @@ -45,13 +44,14 @@ func (c *Cron) checkEntries(t time.Time) { if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED { continue } - removeArray.Append(entry.Name) } // 执行异步运行 go func() { defer func() { if entry.status.Val() != STATUS_CLOSED { entry.status.Set(STATUS_READY) + } else { + c.Remove(entry.Name) } }() entry.Job() @@ -59,7 +59,4 @@ func (c *Cron) checkEntries(t time.Time) { } } }) - if removeArray.Len() > 0 { - c.entries.BatchRemove(removeArray.Slice()) - } } \ No newline at end of file diff --git a/g/os/gcron/gcron_unit_1_test.go b/g/os/gcron/gcron_unit_1_test.go index 08efbc75c..fbfba372e 100644 --- a/g/os/gcron/gcron_unit_1_test.go +++ b/g/os/gcron/gcron_unit_1_test.go @@ -16,49 +16,53 @@ import ( ) func TestCron_Add_Close(t *testing.T) { - cron := gcron.New() - array := garray.New(0, 0) - _, err1 := cron.Add("* * * * * *", func() { - array.Append(1) + gtest.Case(func() { + cron := gcron.New() + array := garray.New(0, 0) + _, err1 := cron.Add("* * * * * *", func() { + array.Append(1) + }) + _, err2 := cron.Add("* * * * * *", func() { + array.Append(1) + }, "test") + _, err3 := cron.Add("* * * * * *", func() { + array.Append(1) + }, "test") + _, err4 := cron.Add("@every 2s", func() { + array.Append(1) + }) + gtest.Assert(err1, nil) + gtest.Assert(err2, nil) + gtest.AssertNE(err3, nil) + gtest.Assert(err4, nil) + gtest.Assert(len(cron.Entries()), 3) + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), 2) + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), 5) + cron.Close() + time.Sleep(1100*time.Millisecond) + fixedLength := array.Len() + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), fixedLength) }) - _, err2 := cron.Add("* * * * * *", func() { - array.Append(1) - }, "test") - _, err3 := cron.Add("* * * * * *", func() { - array.Append(1) - }, "test") - _, err4 := cron.Add("@every 2s", func() { - array.Append(1) - }) - gtest.Assert(err1, nil) - gtest.Assert(err2, nil) - gtest.AssertNE(err3, nil) - gtest.Assert(err4, nil) - gtest.Assert(len(cron.Entries()), 3) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 2) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 5) - cron.Close() - time.Sleep(1100*time.Millisecond) - fixedLength := array.Len() - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), fixedLength) } func TestCron_Mathod(t *testing.T) { - cron := gcron.New() - cron.Add("* * * * * *", func() {}, "add") - cron.DelayAdd(1, "* * * * * *", func() {}, "delay_add") - gtest.Assert(len(cron.Entries()), 1) - time.Sleep(1100*time.Millisecond) - gtest.Assert(len(cron.Entries()), 2) + gtest.Case(func() { + cron := gcron.New() + cron.Add("* * * * * *", func() {}, "add") + cron.DelayAdd(1, "* * * * * *", func() {}, "delay_add") + gtest.Assert(len(cron.Entries()), 1) + time.Sleep(1100*time.Millisecond) + gtest.Assert(len(cron.Entries()), 2) - cron.Remove("delay_add") - gtest.Assert(len(cron.Entries()), 1) + cron.Remove("delay_add") + gtest.Assert(len(cron.Entries()), 1) - entry1 := cron.Search("add") - entry2 := cron.Search("test-none") - gtest.AssertNE(entry1, nil) - gtest.Assert(entry2, nil) + entry1 := cron.Search("add") + entry2 := cron.Search("test-none") + gtest.AssertNE(entry1, nil) + gtest.Assert(entry2, nil) + }) } diff --git a/g/os/gcron/gcron_unit_2_test.go b/g/os/gcron/gcron_unit_2_test.go index bd3ce9842..761cf1f6c 100644 --- a/g/os/gcron/gcron_unit_2_test.go +++ b/g/os/gcron/gcron_unit_2_test.go @@ -15,14 +15,16 @@ import ( ) func TestCron_AddSingleton(t *testing.T) { - cron := gcron.New() - array := garray.New(0, 0) - cron.AddSingleton("* * * * * *", func() { - array.Append(1) - time.Sleep(5*time.Second) + gtest.Case(func() { + cron := gcron.New() + array := garray.New(0, 0) + cron.AddSingleton("* * * * * *", func() { + array.Append(1) + time.Sleep(5*time.Second) + }) + gtest.Assert(len(cron.Entries()), 1) + time.Sleep(3500*time.Millisecond) + gtest.Assert(array.Len(), 1) }) - gtest.Assert(len(cron.Entries()), 1) - time.Sleep(3500*time.Millisecond) - gtest.Assert(array.Len(), 1) } diff --git a/g/os/gcron/gcron_unit_3_test.go b/g/os/gcron/gcron_unit_3_test.go index 411c04416..9d3baaac2 100644 --- a/g/os/gcron/gcron_unit_3_test.go +++ b/g/os/gcron/gcron_unit_3_test.go @@ -15,16 +15,18 @@ import ( ) func TestCron_AddOnce(t *testing.T) { - cron := gcron.New() - array := garray.New(0, 0) - cron.AddOnce("* * * * * *", func() { - array.Append(1) + gtest.Case(func() { + cron := gcron.New() + array := garray.New(0, 0) + cron.AddOnce("* * * * * *", func() { + array.Append(1) + }) + cron.AddOnce("* * * * * *", func() { + array.Append(1) + }) + gtest.Assert(len(cron.Entries()), 2) + time.Sleep(2500*time.Millisecond) + gtest.Assert(array.Len(), 2) + gtest.Assert(len(cron.Entries()), 0) }) - cron.AddOnce("* * * * * *", func() { - array.Append(1) - }) - gtest.Assert(len(cron.Entries()), 2) - time.Sleep(2500*time.Millisecond) - gtest.Assert(array.Len(), 2) - gtest.Assert(len(cron.Entries()), 0) }