refract package 'gcron'; add package 'gtimew' for Time Wheel style job management

This commit is contained in:
John 2018-12-30 18:56:21 +08:00
parent ccf837b2bf
commit 1efeb2515d
7 changed files with 104 additions and 104 deletions

View File

@ -4,16 +4,15 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Package gpool provides a object-reusable concurrent-safe pool.
// 对象复用池.
// Package gpool provides a object-reusable concurrent-safe pool/对象复用池.
package gpool
import (
"time"
"errors"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gtimew"
)
// 对象池
@ -44,7 +43,7 @@ func New(expire int, newFunc...func() (interface{}, error)) *Pool {
if len(newFunc) > 0 {
r.NewFunc = newFunc[0]
}
go r.expireCheckingLoop()
gtimew.AddSingleton(1, r.checkExpire)
return r
}
@ -100,22 +99,22 @@ func (p *Pool) Close() {
}
// 超时检测循环
func (p *Pool) expireCheckingLoop() {
for !p.closed.Val() {
for {
if r := p.list.PopFront(); r != nil {
item := r.(*poolItem)
if item.expire == 0 || item.expire > gtime.Millisecond() {
p.list.PushFront(item)
break
}
if p.ExpireFunc != nil {
p.ExpireFunc(item.value)
}
} else {
func (p *Pool) checkExpire() {
if p.closed.Val() {
gtimew.ExitJob()
}
for {
if r := p.list.PopFront(); r != nil {
item := r.(*poolItem)
if item.expire == 0 || item.expire > gtime.Millisecond() {
p.list.PushFront(item)
break
}
if p.ExpireFunc != nil {
p.ExpireFunc(item.value)
}
} else {
break
}
time.Sleep(time.Second)
}
}

View File

@ -344,7 +344,8 @@ func (c *memCache) syncEventAndClearExpired() {
if expireSet := c.getExpireSet(expireTime); expireSet != nil {
// 遍历Set执行数据过期删除
expireSet.Iterator(func(key interface{}) bool {
return c.clearByKey(key)
c.clearByKey(key)
return true
})
// Set数据处理完之后删除该Set
c.expireSetMu.Lock()
@ -355,7 +356,7 @@ func (c *memCache) syncEventAndClearExpired() {
}
// 删除对应键名的缓存数据
func (c *memCache) clearByKey(key interface{}, force...bool) bool {
func (c *memCache) clearByKey(key interface{}, force...bool) {
// 删除缓存数据
c.dataMu.Lock()
// 删除核对,真正的过期才删除
@ -373,6 +374,4 @@ func (c *memCache) clearByKey(key interface{}, force...bool) bool {
if c.cap > 0 {
c.lru.Remove(key)
}
return true
}

View File

@ -12,8 +12,8 @@ import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtimew"
"strconv"
"time"
)
// 定时任务管理对象
@ -76,32 +76,29 @@ func (c *Cron) AddOnce(pattern string, job func(), name ... string) (*Entry, err
// 延迟添加定时任务delay参数单位为秒
func (c *Cron) DelayAdd(delay int, pattern string, job func(), name ... string) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
gtimew.AddOnce(delay, func() {
if _, err := c.Add(pattern, job, name ...); err != nil {
panic(err)
}
}()
})
}
// 延迟添加单例定时任务delay参数单位为秒
func (c *Cron) DelayAddSingleton(delay int, pattern string, job func(), name ... string) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
gtimew.AddOnce(delay, func() {
if _, err := c.AddSingleton(pattern, job, name ...); err != nil {
panic(err)
}
}()
})
}
// 延迟添加只运行一次的定时任务delay参数单位为秒
func (c *Cron) DelayAddOnce(delay int, pattern string, job func(), name ... string) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
gtimew.AddOnce(delay, func() {
if _, err := c.AddOnce(pattern, job, name ...); err != nil {
panic(err)
}
}()
})
}
// 检索指定名称的定时任务

View File

@ -7,25 +7,24 @@
package gcron
import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gtimew"
"time"
)
// 延迟添加定时任务delay参数单位为秒
func (c *Cron) startLoop() {
go func() {
for c.status.Val() != STATUS_CLOSED {
time.Sleep(time.Second)
if c.status.Val() == STATUS_RUNNING {
go c.checkEntries(time.Now())
}
gtimew.Add(1, func() {
if c.status.Val() == STATUS_CLOSED {
gtimew.ExitJob()
}
}()
if c.status.Val() == STATUS_RUNNING {
go c.checkEntries(time.Now())
}
})
}
// 遍历检查可执行定时任务,并异步执行
func (c *Cron) checkEntries(t time.Time) {
removeArray := garray.NewStringArray(0, 0, false)
c.entries.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
entry := v.(*Entry)
@ -45,13 +44,14 @@ func (c *Cron) checkEntries(t time.Time) {
if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED {
continue
}
removeArray.Append(entry.Name)
}
// 执行异步运行
go func() {
defer func() {
if entry.status.Val() != STATUS_CLOSED {
entry.status.Set(STATUS_READY)
} else {
c.Remove(entry.Name)
}
}()
entry.Job()
@ -59,7 +59,4 @@ func (c *Cron) checkEntries(t time.Time) {
}
}
})
if removeArray.Len() > 0 {
c.entries.BatchRemove(removeArray.Slice())
}
}

View File

@ -16,49 +16,53 @@ import (
)
func TestCron_Add_Close(t *testing.T) {
cron := gcron.New()
array := garray.New(0, 0)
_, err1 := cron.Add("* * * * * *", func() {
array.Append(1)
gtest.Case(func() {
cron := gcron.New()
array := garray.New(0, 0)
_, err1 := cron.Add("* * * * * *", func() {
array.Append(1)
})
_, err2 := cron.Add("* * * * * *", func() {
array.Append(1)
}, "test")
_, err3 := cron.Add("* * * * * *", func() {
array.Append(1)
}, "test")
_, err4 := cron.Add("@every 2s", func() {
array.Append(1)
})
gtest.Assert(err1, nil)
gtest.Assert(err2, nil)
gtest.AssertNE(err3, nil)
gtest.Assert(err4, nil)
gtest.Assert(len(cron.Entries()), 3)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 5)
cron.Close()
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
_, err2 := cron.Add("* * * * * *", func() {
array.Append(1)
}, "test")
_, err3 := cron.Add("* * * * * *", func() {
array.Append(1)
}, "test")
_, err4 := cron.Add("@every 2s", func() {
array.Append(1)
})
gtest.Assert(err1, nil)
gtest.Assert(err2, nil)
gtest.AssertNE(err3, nil)
gtest.Assert(err4, nil)
gtest.Assert(len(cron.Entries()), 3)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 5)
cron.Close()
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
}
func TestCron_Mathod(t *testing.T) {
cron := gcron.New()
cron.Add("* * * * * *", func() {}, "add")
cron.DelayAdd(1, "* * * * * *", func() {}, "delay_add")
gtest.Assert(len(cron.Entries()), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(len(cron.Entries()), 2)
gtest.Case(func() {
cron := gcron.New()
cron.Add("* * * * * *", func() {}, "add")
cron.DelayAdd(1, "* * * * * *", func() {}, "delay_add")
gtest.Assert(len(cron.Entries()), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(len(cron.Entries()), 2)
cron.Remove("delay_add")
gtest.Assert(len(cron.Entries()), 1)
cron.Remove("delay_add")
gtest.Assert(len(cron.Entries()), 1)
entry1 := cron.Search("add")
entry2 := cron.Search("test-none")
gtest.AssertNE(entry1, nil)
gtest.Assert(entry2, nil)
entry1 := cron.Search("add")
entry2 := cron.Search("test-none")
gtest.AssertNE(entry1, nil)
gtest.Assert(entry2, nil)
})
}

View File

@ -15,14 +15,16 @@ import (
)
func TestCron_AddSingleton(t *testing.T) {
cron := gcron.New()
array := garray.New(0, 0)
cron.AddSingleton("* * * * * *", func() {
array.Append(1)
time.Sleep(5*time.Second)
gtest.Case(func() {
cron := gcron.New()
array := garray.New(0, 0)
cron.AddSingleton("* * * * * *", func() {
array.Append(1)
time.Sleep(5*time.Second)
})
gtest.Assert(len(cron.Entries()), 1)
time.Sleep(3500*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
gtest.Assert(len(cron.Entries()), 1)
time.Sleep(3500*time.Millisecond)
gtest.Assert(array.Len(), 1)
}

View File

@ -15,16 +15,18 @@ import (
)
func TestCron_AddOnce(t *testing.T) {
cron := gcron.New()
array := garray.New(0, 0)
cron.AddOnce("* * * * * *", func() {
array.Append(1)
gtest.Case(func() {
cron := gcron.New()
array := garray.New(0, 0)
cron.AddOnce("* * * * * *", func() {
array.Append(1)
})
cron.AddOnce("* * * * * *", func() {
array.Append(1)
})
gtest.Assert(len(cron.Entries()), 2)
time.Sleep(2500*time.Millisecond)
gtest.Assert(array.Len(), 2)
gtest.Assert(len(cron.Entries()), 0)
})
cron.AddOnce("* * * * * *", func() {
array.Append(1)
})
gtest.Assert(len(cron.Entries()), 2)
time.Sleep(2500*time.Millisecond)
gtest.Assert(array.Len(), 2)
gtest.Assert(len(cron.Entries()), 0)
}