This commit is contained in:
John 2019-01-01 19:43:31 +08:00
parent e4a7e23c46
commit f1c7b95b33
27 changed files with 1148 additions and 599 deletions

View File

@ -47,6 +47,13 @@
1. 改进WebServer获取POST参数处理逻辑当提交非form数据时例如json数据针对某些方法可以直接解析
1. WebServer增加可选择的路由覆盖配置默认情况下不覆盖
1. grpool性能压测结果变慢的问题
1. glist增加Element类型的并发安全处理
# DONE

View File

@ -5,274 +5,276 @@
// You can obtain one at https://gitee.com/johng/gf.
//
// Package glist provides a concurrent-safe(alternative) doubly linked list.
// 并发安全的双向链表.
// Package glist provides a concurrent-safe(alternative) doubly linked list/并发安全的双向链表.
package glist
import (
"container/list"
"gitee.com/johng/gf/g/container/internal/rwmutex"
"gitee.com/johng/gf/g/container/gtype"
)
// 变长双向链表
type List struct {
mu *rwmutex.RWMutex
list *list.List
safe bool
root *Element
length *gtype.Int
}
// 获得一个变长链表指针
func New(safe...bool) *List {
return &List {
mu : rwmutex.New(safe...),
list : list.New(),
l := &List{
length : gtype.NewInt(),
}
l.root = newElement(nil, safe...)
l.root.list = l
l.root.setNext(l.root)
l.root.setPrev(l.root)
if len(safe) > 0 {
l.safe = safe[0]
} else {
l.safe = true
}
return l
}
// 往链表头入栈数据项
func (l *List) PushFront(v interface{}) *list.Element {
l.mu.Lock()
e := l.list.PushFront(v)
l.mu.Unlock()
return e
func (l *List) PushFront(v interface{}) *Element {
return l.insertValue(v, l.root)
}
// 往链表尾入栈数据项
func (l *List) PushBack(v interface{}) *list.Element {
l.mu.Lock()
r := l.list.PushBack(v)
l.mu.Unlock()
return r
func (l *List) PushBack(v interface{}) *Element {
return l.insertValue(v, l.root.getPrev())
}
// 在list 中元素mark之后插入一个值为v的元素并返回该元素如果mark不是list中元素则list不改变。
func (l *List) InsertAfter(v interface{}, mark *list.Element) *list.Element {
l.mu.Lock()
r := l.list.InsertAfter(v, mark)
l.mu.Unlock()
return r
func (l *List) InsertAfter(v interface{}, mark *Element) *Element {
if mark.list != l {
return nil
}
return l.insertValue(v, mark)
}
// 在list 中元素mark之前插入一个值为v的元素并返回该元素如果mark不是list中元素则list不改变。
func (l *List) InsertBefore(v interface{}, mark *list.Element) *list.Element {
l.mu.Lock()
r := l.list.InsertBefore(v, mark)
l.mu.Unlock()
return r
func (l *List) InsertBefore(v interface{}, mark *Element) *Element {
if mark.list != l {
return nil
}
return l.insertValue(v, mark.getPrev())
}
// 批量往链表头入栈数据项
func (l *List) BatchPushFront(vs []interface{}) {
l.mu.Lock()
for _, item := range vs {
l.list.PushFront(item)
l.PushFront(item)
}
l.mu.Unlock()
}
// 从链表尾端出栈数据项(删除)
func (l *List) PopBack() interface{} {
l.mu.Lock()
if elem := l.list.Back(); elem != nil {
item := l.list.Remove(elem)
l.mu.Unlock()
return item
if e := l.Back(); e != nil {
return l.Remove(e)
}
l.mu.Unlock()
return nil
}
// 从链表头端出栈数据项(删除)
func (l *List) PopFront() interface{} {
l.mu.Lock()
if elem := l.list.Front(); elem != nil {
item := l.list.Remove(elem)
l.mu.Unlock()
return item
if e := l.Front(); e != nil {
return l.Remove(e)
}
l.mu.Unlock()
return nil
}
// 批量从链表尾端出栈数据项(删除)
func (l *List) BatchPopBack(max int) []interface{} {
l.mu.Lock()
count := l.list.Len()
count := l.Len()
if count == 0 {
l.mu.Unlock()
return []interface{}{}
}
if count > max {
count = max
}
items := make([]interface{}, count)
for i := 0; i < count; i++ {
items[i] = l.list.Remove(l.list.Back())
items[i] = l.PopBack()
}
l.mu.Unlock()
return items
}
// 批量从链表头端出栈数据项(删除)
func (l *List) BatchPopFront(max int) []interface{} {
l.mu.Lock()
count := l.list.Len()
count := l.Len()
if count == 0 {
l.mu.Unlock()
return []interface{}{}
}
if count > max {
count = max
}
items := make([]interface{}, count)
for i := 0; i < count; i++ {
items[i] = l.list.Remove(l.list.Front())
items[i] = l.PopFront()
}
l.mu.Unlock()
return items
}
// 批量从链表尾端依次获取所有数据(删除)
func (l *List) PopBackAll() []interface{} {
l.mu.Lock()
count := l.list.Len()
if count == 0 {
l.mu.Unlock()
return []interface{}{}
}
items := make([]interface{}, count)
for i := 0; i < count; i++ {
items[i] = l.list.Remove(l.list.Back())
}
l.mu.Unlock()
return items
return l.BatchPopFront(l.Len())
}
// 批量从链表头端依次获取所有数据(删除)
func (l *List) PopFrontAll() []interface{} {
l.mu.Lock()
count := l.list.Len()
if count == 0 {
l.mu.Unlock()
return []interface{}{}
}
items := make([]interface{}, count)
for i := 0; i < count; i++ {
items[i] = l.list.Remove(l.list.Front())
}
l.mu.Unlock()
return items
return l.BatchPopFront(l.Len())
}
// 删除数据项
func (l *List) Remove(e *list.Element) interface{} {
l.mu.Lock()
r := l.list.Remove(e)
l.mu.Unlock()
return r
// 删除数据项e
func (l *List) Remove(e *Element) interface{} {
if e.list == l {
l.remove(e)
}
return e.Value()
}
// 删除所有数据项
func (l *List) RemoveAll() {
l.mu.Lock()
l.list = list.New()
l.mu.Unlock()
l.length.Set(0)
l.root.setNext(l.root)
l.root.setPrev(l.root)
}
// 从链表头获取所有数据(不删除)
func (l *List) FrontAll() []interface{} {
l.mu.RLock()
count := l.list.Len()
count := l.Len()
if count == 0 {
l.mu.RUnlock()
return []interface{}{}
}
items := make([]interface{}, 0, count)
for e := l.list.Front(); e != nil; e = e.Next() {
items = append(items, e.Value)
for e := l.Front(); e != nil; e = e.Next() {
items = append(items, e.Value())
}
l.mu.RUnlock()
return items
}
// 从链表尾获取所有数据(不删除)
func (l *List) BackAll() []interface{} {
l.mu.RLock()
count := l.list.Len()
count := l.Len()
if count == 0 {
l.mu.RUnlock()
return []interface{}{}
}
items := make([]interface{}, 0, count)
for e := l.list.Back(); e != nil; e = e.Prev() {
items = append(items, e.Value)
for e := l.Back(); e != nil; e = e.Prev() {
items = append(items, e.Value())
}
l.mu.RUnlock()
return items
}
// 获取链表头值(不删除)
func (l *List) FrontItem() interface{} {
l.mu.RLock()
if f := l.list.Front(); f != nil {
l.mu.RUnlock()
return f.Value
if e := l.Front(); e != nil {
return e.Value()
}
l.mu.RUnlock()
return nil
}
// 获取链表尾值(不删除)
func (l *List) BackItem() interface{} {
l.mu.RLock()
if f := l.list.Back(); f != nil {
l.mu.RUnlock()
return f.Value
if e := l.Back(); e != nil {
return e.Value()
}
l.mu.RUnlock()
return nil
}
// 获取表头指针
func (l *List) Front() *list.Element {
l.mu.RLock()
r := l.list.Front()
l.mu.RUnlock()
return r
func (l *List) Front() *Element {
if l.length.Val() == 0 {
return nil
}
return l.root.getNext()
}
// 获取表位指针
func (l *List) Back() *list.Element {
l.mu.RLock()
r := l.list.Back()
l.mu.RUnlock()
return r
func (l *List) Back() *Element {
if l.length.Val() == 0 {
return nil
}
return l.root.getPrev()
}
// 获取链表长度
func (l *List) Len() int {
l.mu.RLock()
length := l.list.Len()
l.mu.RUnlock()
return length
return l.length.Val()
}
// 读锁操作
func (l *List) RLockFunc(f func(l *list.List)) {
l.mu.RLock()
defer l.mu.RUnlock()
f(l.list)
func (l *List) MoveBefore(e, mark *Element) {
if e.list != l || e == mark || mark.list != l {
return
}
l.insert(l.remove(e), mark.getPrev())
}
// 写锁操作
func (l *List) LockFunc(f func(l *list.List)) {
l.mu.Lock()
defer l.mu.Unlock()
f(l.list)
func (l *List) MoveAfter(e, mark *Element) {
if e.list != l || e == mark || mark.list != l {
return
}
l.insert(l.remove(e), mark)
}
func (l *List) MoveToFront(e *Element) {
if e.list != l || l.root.getNext() == e {
return
}
l.insert(l.remove(e), l.root)
}
func (l *List) MoveToBack(e *Element) {
if e.list != l || l.root.getPrev() == e {
return
}
l.insert(l.remove(e), l.root.getPrev())
}
func (l *List) PushBackList(other *List) {
for i, e := other.Len(), other.Front(); i > 0; i, e = i - 1, e.Next() {
l.insertValue(e.Value(), l.root.getPrev())
}
}
func (l *List) PushFrontList(other *List) {
for i, e := other.Len(), other.Back(); i > 0; i, e = i - 1, e.Prev() {
l.insertValue(e.Value(), l.root)
}
}
// 在元素项p后添加数值value, 自动创建元素项
func (l *List) insertValue(value interface{}, p *Element) *Element {
return l.insert(newElement(value, l.safe), p)
}
// 在元素项p后添加元素项e
func (l *List) insert(e, p *Element) *Element {
p.LockFunc(func(p *Element) {
e.LockFunc(func(e *Element) {
n := p.next
p.next = e
e.prev = p
e.next = n
n.prev = e
e.list = l
})
})
l.length.Add(1)
return e
}
// 从列表中删除元素项e
func (l *List) remove(e *Element) *Element {
e.LockFunc(func(e *Element) {
e.prev.setNext(e.next)
e.next.setPrev(e.prev)
e.next = nil
e.prev = nil
e.list = nil
})
l.length.Add(-1)
return e
}

View File

@ -0,0 +1,103 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with l file,
// You can obtain one at https://gitee.com/johng/gf.
//
package glist
import (
"gitee.com/johng/gf/g/container/internal/rwmutex"
)
// 链表元素项
type Element struct {
mu *rwmutex.RWMutex
list *List
prev *Element
next *Element
value interface{}
}
// 创建一个并发安全的列表元素项
func newElement(value interface{}, safe...bool) *Element {
return &Element {
mu : rwmutex.New(safe...),
value : value,
}
}
// 获得元素项值
func (e *Element) Value() interface{} {
e.mu.RLock()
r := e.value
e.mu.RUnlock()
return r
}
// 获得下一个元素项(遍历使用)
func (e *Element) Next() *Element {
e.mu.RLock()
defer e.mu.RUnlock()
if p := e.next; e.list != nil && p != e.list.root {
return p
}
return nil
}
// 获得前一个元素项(遍历使用)
func (e *Element) Prev() *Element {
e.mu.RLock()
defer e.mu.RUnlock()
if p := e.prev; e.list != nil && p != e.list.root {
return p
}
return nil
}
// 只读锁操作
func (e *Element) RLockFunc(f func(e *Element)) {
e.mu.RLock()
defer e.mu.RUnlock()
f(e)
}
// 写锁操作
func (e *Element) LockFunc(f func(e *Element)) {
e.mu.Lock()
defer e.mu.Unlock()
f(e)
}
func (e *Element) setPrev(prev *Element) (old *Element) {
e.mu.Lock()
old = e.prev
e.prev = prev
e.mu.Unlock()
return
}
func (e *Element) setNext(next *Element) (old *Element) {
e.mu.Lock()
old = e.next
e.next = next
e.mu.Unlock()
return
}
// 获得前一个元素项(内部并发安全使用)
func (e *Element) getPrev() (prev *Element) {
e.mu.RLock()
prev = e.prev
e.mu.RUnlock()
return
}
// 获得下一个元素项(内部并发安全使用)
func (e *Element) getNext() (next *Element) {
e.mu.RLock()
next = e.next
e.mu.RUnlock()
return
}

View File

@ -0,0 +1,354 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package glist
import "testing"
func checkListLen(t *testing.T, l *List, len int) bool {
if n := l.Len(); n != len {
t.Errorf("l.Len() = %d, want %d", n, len)
return false
}
return true
}
func checkListPointers(t *testing.T, l *List, es []*Element) {
root := l.root
if !checkListLen(t, l, len(es)) {
return
}
// zero length lists must be the zero value or properly initialized (sentinel circle)
if len(es) == 0 {
if l.root.next != nil && l.root.next != root || l.root.prev != nil && l.root.prev != root {
t.Errorf("l.root.next = %p, l.root.prev = %p; both should both be nil or %p", l.root.next, l.root.prev, root)
}
return
}
// len(es) > 0
// check internal and external prev/next connections
for i, e := range es {
prev := root
Prev := (*Element)(nil)
if i > 0 {
prev = es[i-1]
Prev = prev
}
if p := e.prev; p != prev {
t.Errorf("elt[%d](%p).prev = %p, want %p", i, e, p, prev)
}
if p := e.Prev(); p != Prev {
t.Errorf("elt[%d](%p).Prev() = %p, want %p", i, e, p, Prev)
}
next := root
Next := (*Element)(nil)
if i < len(es)-1 {
next = es[i+1]
Next = next
}
if n := e.next; n != next {
t.Errorf("elt[%d](%p).next = %p, want %p", i, e, n, next)
}
if n := e.Next(); n != Next {
t.Errorf("elt[%d](%p).Next() = %p, want %p", i, e, n, Next)
}
}
}
func TestList(t *testing.T) {
l := New()
checkListPointers(t, l, []*Element{})
// Single element list
e := l.PushFront("a")
checkListPointers(t, l, []*Element{e})
l.MoveToFront(e)
checkListPointers(t, l, []*Element{e})
l.MoveToBack(e)
checkListPointers(t, l, []*Element{e})
l.Remove(e)
checkListPointers(t, l, []*Element{})
// Bigger list
e2 := l.PushFront(2)
e1 := l.PushFront(1)
e3 := l.PushBack(3)
e4 := l.PushBack("banana")
checkListPointers(t, l, []*Element{e1, e2, e3, e4})
l.Remove(e2)
checkListPointers(t, l, []*Element{e1, e3, e4})
l.MoveToFront(e3) // move from middle
checkListPointers(t, l, []*Element{e3, e1, e4})
l.MoveToFront(e1)
l.MoveToBack(e3) // move from middle
checkListPointers(t, l, []*Element{e1, e4, e3})
l.MoveToFront(e3) // move from back
checkListPointers(t, l, []*Element{e3, e1, e4})
l.MoveToFront(e3) // should be no-op
checkListPointers(t, l, []*Element{e3, e1, e4})
l.MoveToBack(e3) // move from front
checkListPointers(t, l, []*Element{e1, e4, e3})
l.MoveToBack(e3) // should be no-op
checkListPointers(t, l, []*Element{e1, e4, e3})
e2 = l.InsertBefore(2, e1) // insert before front
checkListPointers(t, l, []*Element{e2, e1, e4, e3})
l.Remove(e2)
e2 = l.InsertBefore(2, e4) // insert before middle
checkListPointers(t, l, []*Element{e1, e2, e4, e3})
l.Remove(e2)
e2 = l.InsertBefore(2, e3) // insert before back
checkListPointers(t, l, []*Element{e1, e4, e2, e3})
l.Remove(e2)
e2 = l.InsertAfter(2, e1) // insert after front
checkListPointers(t, l, []*Element{e1, e2, e4, e3})
l.Remove(e2)
e2 = l.InsertAfter(2, e4) // insert after middle
checkListPointers(t, l, []*Element{e1, e4, e2, e3})
l.Remove(e2)
e2 = l.InsertAfter(2, e3) // insert after back
checkListPointers(t, l, []*Element{e1, e4, e3, e2})
l.Remove(e2)
// Check standard iteration.
sum := 0
for e := l.Front(); e != nil; e = e.Next() {
if i, ok := e.Value().(int); ok {
sum += i
}
}
if sum != 4 {
t.Errorf("sum over l = %d, want 4", sum)
}
// Clear all elements by iterating
var next *Element
for e := l.Front(); e != nil; e = next {
next = e.Next()
l.Remove(e)
}
checkListPointers(t, l, []*Element{})
}
func checkList(t *testing.T, l *List, es []interface{}) {
if !checkListLen(t, l, len(es)) {
return
}
i := 0
for e := l.Front(); e != nil; e = e.Next() {
le := e.Value().(int)
if le != es[i] {
t.Errorf("elt[%d].Value() = %v, want %v", i, le, es[i])
}
i++
}
}
func TestExtending(t *testing.T) {
l1 := New()
l2 := New()
l1.PushBack(1)
l1.PushBack(2)
l1.PushBack(3)
l2.PushBack(4)
l2.PushBack(5)
l3 := New()
l3.PushBackList(l1)
checkList(t, l3, []interface{}{1, 2, 3})
l3.PushBackList(l2)
checkList(t, l3, []interface{}{1, 2, 3, 4, 5})
l3 = New()
l3.PushFrontList(l2)
checkList(t, l3, []interface{}{4, 5})
l3.PushFrontList(l1)
checkList(t, l3, []interface{}{1, 2, 3, 4, 5})
checkList(t, l1, []interface{}{1, 2, 3})
checkList(t, l2, []interface{}{4, 5})
l3 = New()
l3.PushBackList(l1)
checkList(t, l3, []interface{}{1, 2, 3})
l3.PushBackList(l3)
checkList(t, l3, []interface{}{1, 2, 3, 1, 2, 3})
l3 = New()
l3.PushFrontList(l1)
checkList(t, l3, []interface{}{1, 2, 3})
l3.PushFrontList(l3)
checkList(t, l3, []interface{}{1, 2, 3, 1, 2, 3})
l3 = New()
l1.PushBackList(l3)
checkList(t, l1, []interface{}{1, 2, 3})
l1.PushFrontList(l3)
checkList(t, l1, []interface{}{1, 2, 3})
}
func TestRemove(t *testing.T) {
l := New()
e1 := l.PushBack(1)
e2 := l.PushBack(2)
checkListPointers(t, l, []*Element{e1, e2})
e := l.Front()
l.Remove(e)
checkListPointers(t, l, []*Element{e2})
l.Remove(e)
checkListPointers(t, l, []*Element{e2})
}
func TestIssue4103(t *testing.T) {
l1 := New()
l1.PushBack(1)
l1.PushBack(2)
l2 := New()
l2.PushBack(3)
l2.PushBack(4)
e := l1.Front()
l2.Remove(e) // l2 should not change because e is not an element of l2
if n := l2.Len(); n != 2 {
t.Errorf("l2.Len() = %d, want 2", n)
}
l1.InsertBefore(8, e)
if n := l1.Len(); n != 3 {
t.Errorf("l1.Len() = %d, want 3", n)
}
}
func TestIssue6349(t *testing.T) {
l := New()
l.PushBack(1)
l.PushBack(2)
e := l.Front()
l.Remove(e)
if e.Value() != 1 {
t.Errorf("e.value = %d, want 1", e.Value())
}
if e.Next() != nil {
t.Errorf("e.Next() != nil")
}
if e.Prev() != nil {
t.Errorf("e.Prev() != nil")
}
}
func TestMove(t *testing.T) {
l := New()
e1 := l.PushBack(1)
e2 := l.PushBack(2)
e3 := l.PushBack(3)
e4 := l.PushBack(4)
l.MoveAfter(e3, e3)
checkListPointers(t, l, []*Element{e1, e2, e3, e4})
l.MoveBefore(e2, e2)
checkListPointers(t, l, []*Element{e1, e2, e3, e4})
l.MoveAfter(e3, e2)
checkListPointers(t, l, []*Element{e1, e2, e3, e4})
l.MoveBefore(e2, e3)
checkListPointers(t, l, []*Element{e1, e2, e3, e4})
l.MoveBefore(e2, e4)
checkListPointers(t, l, []*Element{e1, e3, e2, e4})
e2, e3 = e3, e2
l.MoveBefore(e4, e1)
checkListPointers(t, l, []*Element{e4, e1, e2, e3})
e1, e2, e3, e4 = e4, e1, e2, e3
l.MoveAfter(e4, e1)
checkListPointers(t, l, []*Element{e1, e4, e2, e3})
e2, e3, e4 = e4, e2, e3
l.MoveAfter(e2, e3)
checkListPointers(t, l, []*Element{e1, e3, e2, e4})
e2, e3 = e3, e2
}
// Test PushFront, PushBack, PushFrontList, PushBackList with uninitialized List
func TestZeroList(t *testing.T) {
var l1 = New()
l1.PushFront(1)
checkList(t, l1, []interface{}{1})
var l2 = New()
l2.PushBack(1)
checkList(t, l2, []interface{}{1})
var l3 = New()
l3.PushFrontList(l1)
checkList(t, l3, []interface{}{1})
var l4 = New()
l4.PushBackList(l2)
checkList(t, l4, []interface{}{1})
}
// Test that a list l is not modified when calling InsertBefore with a mark that is not an element of l.
func TestInsertBeforeUnknownMark(t *testing.T) {
l := New()
l.PushBack(1)
l.PushBack(2)
l.PushBack(3)
l.InsertBefore(1, new(Element))
checkList(t, l, []interface{}{1, 2, 3})
}
// Test that a list l is not modified when calling InsertAfter with a mark that is not an element of l.
func TestInsertAfterUnknownMark(t *testing.T) {
l := New()
l.PushBack(1)
l.PushBack(2)
l.PushBack(3)
l.InsertAfter(1, new(Element))
checkList(t, l, []interface{}{1, 2, 3})
}
// Test that a list l is not modified when calling MoveAfter or MoveBefore with a mark that is not an element of l.
func TestMoveUnknownMark(t *testing.T) {
l1 := New()
e1 := l1.PushBack(1)
l2 := New()
e2 := l2.PushBack(2)
l1.MoveAfter(e1, e2)
checkList(t, l1, []interface{}{1})
checkList(t, l2, []interface{}{2})
l1.MoveBefore(e1, e2)
checkList(t, l1, []interface{}{1})
checkList(t, l2, []interface{}{2})
}
func TestList_RemoveAll(t *testing.T) {
l := New()
l.PushBack(1)
l.RemoveAll()
checkList(t, l, []interface{}{})
l.PushBack(2)
checkList(t, l, []interface{}{2})
}

View File

@ -12,7 +12,7 @@ import (
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
)
// 对象池
@ -43,7 +43,7 @@ func New(expire int, newFunc...func() (interface{}, error)) *Pool {
if len(newFunc) > 0 {
r.NewFunc = newFunc[0]
}
gtimew.AddSingleton(1, r.checkExpire)
gwheel.AddSingleton(1, r.checkExpire)
return r
}
@ -101,7 +101,7 @@ func (p *Pool) Close() {
// 超时检测循环
func (p *Pool) checkExpire() {
if p.closed.Val() {
gtimew.ExitJob()
gwheel.ExitJob()
}
for {
if r := p.list.PopFront(); r != nil {

View File

@ -7,7 +7,7 @@
package gcache
import (
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
"sync/atomic"
"unsafe"
)
@ -23,7 +23,7 @@ func New(lruCap...int) *Cache {
c := &Cache {
memCache : newMemCache(lruCap...),
}
gtimew.AddSingleton(1, c.syncEventAndClearExpired)
gwheel.AddSingleton(1, c.syncEventAndClearExpired)
return c
}

View File

@ -11,7 +11,7 @@ import (
"gitee.com/johng/gf/g/container/gset"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
"gitee.com/johng/gf/g/util/gconv"
"math"
"sync"
@ -292,7 +292,7 @@ func (c *memCache) syncEventAndClearExpired() {
oldExpireTime := int64(0)
newExpireTime := int64(0)
if c.closed.Val() {
gtimew.ExitJob()
gwheel.ExitJob()
return
}
// ========================

View File

@ -7,12 +7,11 @@
package gcache
import (
"container/list"
"fmt"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
)
// LRU算法实现对象底层双向链表使用了标准库的list.List
@ -33,7 +32,7 @@ func newMemCacheLru(cache *memCache) *memCacheLru {
rawList : glist.New(),
closed : gtype.NewBool(),
}
gtimew.AddSingleton(1, lru.SyncAndClear)
gwheel.AddSingleton(1, lru.SyncAndClear)
return lru
}
@ -46,7 +45,7 @@ func (lru *memCacheLru) Close() {
func (lru *memCacheLru) Remove(key interface{}) {
if v := lru.data.Get(key); v != nil {
lru.data.Remove(key)
lru.list.Remove(v.(*list.Element))
lru.list.Remove(v.(*glist.Element))
}
}
@ -80,7 +79,7 @@ func (lru *memCacheLru) Print() {
// 异步执行协程将queue中的数据同步到list中
func (lru *memCacheLru) SyncAndClear() {
if lru.closed.Val() {
gtimew.ExitJob()
gwheel.ExitJob()
return
}
// 数据同步
@ -88,7 +87,7 @@ func (lru *memCacheLru) SyncAndClear() {
if v := lru.rawList.PopFront(); v != nil {
// 删除对应链表项
if v := lru.data.Get(v); v != nil {
lru.list.Remove(v.(*list.Element))
lru.list.Remove(v.(*glist.Element))
}
// 将数据插入到链表头,并记录对应的链表项到哈希表中,便于检索
lru.data.Set(v, lru.list.PushFront(v))

View File

@ -12,7 +12,7 @@ import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
"strconv"
)
@ -76,7 +76,7 @@ func (c *Cron) AddOnce(pattern string, job func(), name ... string) (*Entry, err
// 延迟添加定时任务delay参数单位为秒
func (c *Cron) DelayAdd(delay int, pattern string, job func(), name ... string) {
gtimew.AddOnce(delay, func() {
gwheel.AddOnce(delay, func() {
if _, err := c.Add(pattern, job, name ...); err != nil {
panic(err)
}
@ -85,7 +85,7 @@ func (c *Cron) DelayAdd(delay int, pattern string, job func(), name ... string)
// 延迟添加单例定时任务delay参数单位为秒
func (c *Cron) DelayAddSingleton(delay int, pattern string, job func(), name ... string) {
gtimew.AddOnce(delay, func() {
gwheel.AddOnce(delay, func() {
if _, err := c.AddSingleton(pattern, job, name ...); err != nil {
panic(err)
}
@ -94,7 +94,7 @@ func (c *Cron) DelayAddSingleton(delay int, pattern string, job func(), name ...
// 延迟添加只运行一次的定时任务delay参数单位为秒
func (c *Cron) DelayAddOnce(delay int, pattern string, job func(), name ... string) {
gtimew.AddOnce(delay, func() {
gwheel.AddOnce(delay, func() {
if _, err := c.AddOnce(pattern, job, name ...); err != nil {
panic(err)
}

View File

@ -7,15 +7,15 @@
package gcron
import (
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
"time"
)
// 延迟添加定时任务delay参数单位为秒
func (c *Cron) startLoop() {
gtimew.Add(1, func() {
gwheel.Add(1, func() {
if c.status.Val() == STATUS_CLOSED {
gtimew.ExitJob()
gwheel.ExitJob()
}
if c.status.Val() == STATUS_RUNNING {
go c.checkEntries(time.Now())

View File

@ -8,6 +8,7 @@
package gcron_test
import (
"fmt"
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gcron"
"gitee.com/johng/gf/g/util/gtest"
@ -52,9 +53,10 @@ func TestCron_Method(t *testing.T) {
gtest.Case(func() {
cron := gcron.New()
cron.Add("* * * * * *", func() {}, "add")
fmt.Println("start", time.Now())
cron.DelayAdd(1, "* * * * * *", func() {}, "delay_add")
gtest.Assert(cron.Size(), 1)
time.Sleep(1500*time.Millisecond)
time.Sleep(1200*time.Millisecond)
gtest.Assert(cron.Size(), 2)
cron.Remove("delay_add")

View File

@ -8,9 +8,9 @@
package gfsnotify
import (
"container/list"
"errors"
"fmt"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/container/gtype"
@ -32,7 +32,7 @@ type Callback struct {
Id int // 唯一ID
Func func(event *Event) // 回调方法
Path string // 监听的文件/目录
elem *list.Element // 指向回调函数链表中的元素项位置(便于删除)
elem *glist.Element // 指向回调函数链表中的元素项位置(便于删除)
recursive bool // 当目录时,是否递归监听(使用在子文件/目录回溯查找回调函数时)
}

View File

@ -8,7 +8,7 @@ package gmlock
import (
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
)
// 内存锁管理对象
@ -78,7 +78,7 @@ func (l *Locker) doLock(key string, expire int, try bool) bool {
if ok && expire > 0 {
// 异步goroutine计时处理
wid := mu.wid.Val()
gtimew.AddOnce(expire, func() {
gwheel.AddOnce(expire, func() {
if wid == mu.wid.Val() {
mu.Unlock()
}
@ -98,7 +98,7 @@ func (l *Locker) doRLock(key string, expire int, try bool) bool {
}
if ok && expire > 0 {
rid := mu.rid.Val()
gtimew.AddOnce(expire, func() {
gwheel.AddOnce(expire, func() {
if rid == mu.rid.Val() {
mu.RUnlock()
}

View File

@ -1,70 +0,0 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimew
import (
"gitee.com/johng/gf/g/container/gtype"
"time"
)
// 循环任务项
type Entry struct {
mode *gtype.Int // 任务运行模式(0: normal; 1: singleton; 2: once)
status *gtype.Int // 循环任务状态(0: ready; 1: running; -1: stopped)
Job JobFunc // 注册循环任务方法
Create int64 // 创建时间戳(秒)
Interval int64 // 运行间隔(秒)
}
// 任务执行方法
type JobFunc func()
// 创建循环任务
func newEntry(interval int, job JobFunc, mode int) *Entry {
return &Entry {
mode : gtype.NewInt(mode),
status : gtype.NewInt(),
Job : job,
Create : time.Now().Unix(),
Interval : int64(interval),
}
}
// 获取任务运行模式
func (entry *Entry) Mode() int {
return entry.mode.Val()
}
// 设置任务运行模式(0: normal; 1: singleton; 2: once)
func (entry *Entry) SetMode(mode int) {
entry.mode.Set(mode)
}
// 循环任务状态
func (entry *Entry) Status() int {
return entry.status.Val()
}
// 启动循环任务
func (entry *Entry) Start() {
entry.status.Set(STATUS_READY)
}
// 停止循环任务
func (entry *Entry) Stop() {
entry.status.Set(STATUS_CLOSED)
}
// 给定时间是否满足当前循环任务运行间隔
func (entry *Entry) Meet(t time.Time) bool {
diff := t.Unix() - entry.Create
if diff > 0 {
return diff%entry.Interval == 0
}
return false
}

View File

@ -1,72 +0,0 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimew
import (
"container/list"
"time"
)
// 延迟添加循环任务delay参数单位为秒
func (w *Wheel) startLoop() {
go func() {
for w.status.Val() != STATUS_CLOSED {
time.Sleep(time.Second)
if w.status.Val() == STATUS_RUNNING {
go w.checkEntries(time.Now())
}
}
}()
}
// 遍历检查可执行循环任务,并异步执行
func (w *Wheel) checkEntries(t time.Time) {
w.entries.RLockFunc(func(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() {
entry := e.Value.(*Entry)
if entry.Meet(t) {
// 是否已命令停止运行
if entry.status.Val() == STATUS_CLOSED {
continue
}
// 判断任务的运行模式
switch entry.mode.Val() {
// 是否只允许单例运行
case MODE_SINGLETON:
if entry.status.Set(STATUS_RUNNING) == STATUS_RUNNING {
continue
}
// 只运行一次的任务
case MODE_ONCE:
if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED {
continue
}
}
// 执行异步运行
go func(element *list.Element) {
defer func() {
if err := recover(); err != nil {
if err == gPANIC_EXIT {
entry.status.Set(STATUS_CLOSED)
} else {
panic(err)
}
}
if entry.status.Val() != STATUS_CLOSED {
entry.status.Set(STATUS_READY)
} else {
// 异步删除,不受锁机制的影响
w.entries.Remove(element)
}
}()
entry.Job()
}(e)
}
}
})
}

View File

@ -1,150 +0,0 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 包方法操作
package gtimew_test
import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
)
func TestWheel_Add_Close(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
entry1 := wheel.Add(1, func() {
array.Append(1)
})
entry2 := wheel.Add(1, func() {
array.Append(1)
})
entry3 := wheel.Add(2, func() {
array.Append(1)
})
gtest.AssertNE(entry1, nil)
gtest.AssertNE(entry2, nil)
gtest.AssertNE(entry3, nil)
gtest.Assert(wheel.Size(), 3)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 5)
wheel.Close()
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
}
func TestWheel_Singlton(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
entry := wheel.AddSingleton(1, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
gtest.AssertNE(entry, nil)
gtest.Assert(wheel.Size(), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_Once(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
entry1 := wheel.AddOnce(1, func() {
array.Append(1)
})
entry2 := wheel.AddOnce(1, func() {
array.Append(1)
})
gtest.AssertNE(entry1, nil)
gtest.AssertNE(entry2, nil)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
wheel.Close()
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
}
func TestWheel_DelayAdd(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
wheel.DelayAdd(1, 1, func() {})
gtest.Assert(wheel.Size(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(wheel.Size(), 1)
})
}
func TestWheel_DelayAdd_Singleton(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
wheel.DelayAddSingleton(1, 1, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
gtest.Assert(wheel.Size(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(wheel.Size(), 1)
gtest.Assert(array.Len(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_DelayAdd_Once(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
wheel.DelayAddOnce(1, 1, func() {
array.Append(1)
})
gtest.Assert(wheel.Size(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(wheel.Size(), 1)
gtest.Assert(array.Len(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_ExitJob(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
wheel.Add(1, func() {
array.Append(1)
gtimew.ExitJob()
})
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
gtest.Assert(wheel.Size(), 0)
})
}

View File

@ -1,110 +0,0 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimew
import (
"container/list"
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
"time"
)
// 循环任务管理对象
type Wheel struct {
status *gtype.Int // 循环任务状态(0: 未执行; 1: 运行中; -1:删除关闭)
entries *glist.List // 所有的循环任务项
}
// 创建自定义的循环任务管理对象
func New() *Wheel {
wheel := &Wheel {
status : gtype.NewInt(STATUS_RUNNING),
entries : glist.New(),
}
wheel.startLoop()
return wheel
}
// 添加循环任务
func (w *Wheel) Add(interval int, job JobFunc) *Entry {
entry := newEntry(interval, job, MODE_NORMAL)
w.entries.PushBack(entry)
return entry
}
// 添加单例运行循环任务
func (w *Wheel) AddSingleton(interval int, job JobFunc) *Entry {
entry := newEntry(interval, job, MODE_SINGLETON)
w.entries.PushBack(entry)
return entry
}
// 添加只运行一次的循环任务
func (w *Wheel) AddOnce(interval int, job JobFunc) *Entry {
entry := newEntry(interval, job, MODE_ONCE)
w.entries.PushBack(entry)
return entry
}
// 延迟添加循环任务delay参数单位为秒
func (w *Wheel) DelayAdd(delay int, interval int, job JobFunc) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
w.Add(interval, job)
}()
}
// 延迟添加单例循环任务delay参数单位为秒
func (w *Wheel) DelayAddSingleton(delay int, interval int, job JobFunc) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
w.AddSingleton(interval, job)
}()
}
// 延迟添加只运行一次的循环任务delay参数单位为秒
func (w *Wheel) DelayAddOnce(delay int, interval int, job JobFunc) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
w.AddOnce(interval, job)
}()
}
// 当前时间轮已注册的任务数
func (w *Wheel) Size() int {
return w.entries.Len()
}
// 关闭循环任务
func (w *Wheel) Close() {
w.status.Set(STATUS_CLOSED)
}
// 获取所有已注册的循环任务项(按照注册时间从小到大进行排序)
func (w *Wheel) Entries() []*Entry {
array := garray.NewSortedArray(w.entries.Len(), func(v1, v2 interface{}) int {
entry1 := v1.(*Entry)
entry2 := v2.(*Entry)
if entry1.Create > entry2.Create {
return 1
}
return -1
}, false)
w.entries.RLockFunc(func(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() {
array.Add(e.Value.(*Entry))
}
})
entries := make([]*Entry, array.Len())
array.RLockFunc(func(array []interface{}) {
for k, v := range array {
entries[k] = v.(*Entry)
}
})
return entries
}

View File

@ -4,56 +4,73 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Package gtimew provides Time Wheel for interval jobs running management/时间轮.
// 高效的时间轮任务执行管理,用于管理异步的间隔运行任务,或者异步只运行一次的任务(最小时间粒度为秒)。
// Package gwheel provides Timing Wheel for interval jobs running and management/时间轮.
// 高效的时间轮任务执行管理,用于管理异步的间隔运行任务,或者异步只运行一次的任务(默认最小时间粒度为秒)。
// 与其他定时任务管理模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。
package gtimew
package gwheel
import "time"
const (
MODE_NORMAL = 0
MODE_SINGLETON = 1
MODE_ONCE = 2
MODE_TIMES = 3
STATUS_READY = 0
STATUS_RUNNING = 1
STATUS_CLOSED = -1
gPANIC_EXIT = "exit"
gDEFAULT_SLOT_NUMBER = 10
gDEFAULT_WHEEL_INTERVAL = 100*time.Millisecond
)
var (
// 默认的wheel管理对象
defaultWheel = New()
defaultWheel = NewDefault()
)
// 添加执行方法,可以给定名字,以便于后续执行删除
func Add(interval int, job JobFunc) *Entry {
return defaultWheel.Add(interval, job)
return defaultWheel.Add(10*interval, job)
}
// 添加单例运行循环任务
func AddSingleton(interval int, job JobFunc) *Entry {
return defaultWheel.AddSingleton(interval, job)
return nil
return defaultWheel.AddSingleton(10*interval, job)
}
// 添加只运行一次的循环任务
func AddOnce(interval int, job JobFunc) *Entry {
return defaultWheel.AddOnce(interval, job)
return defaultWheel.AddOnce(10*interval, job)
}
// 添加运行指定次数的循环任务
func AddTimes(interval int, times int, job JobFunc) *Entry {
return defaultWheel.AddTimes(10*interval, times, job)
}
// 延迟添加循环任务delay参数单位为秒
func DelayAdd(delay int, interval int, job JobFunc) {
defaultWheel.DelayAdd(delay, interval, job)
defaultWheel.DelayAdd(delay, 10*interval, job)
}
// 延迟添加单例循环任务delay参数单位为秒
func DelayAddSingleton(delay int, interval int, job JobFunc) {
defaultWheel.DelayAddSingleton(delay, interval, job)
defaultWheel.DelayAddSingleton(delay, 10*interval, job)
}
// 延迟添加只运行一次的循环任务delay参数单位为秒
func DelayAddOnce(delay int, interval int, job JobFunc) {
defaultWheel.DelayAddOnce(delay, interval, job)
defaultWheel.DelayAddOnce(delay, 10*interval, job)
}
// 延迟添加运行指定次数的循环任务delay参数单位为秒
func DelayAddTimes(delay int, interval int, times int, job JobFunc) {
defaultWheel.DelayAddTimes(delay, 10*interval, times, job)
}
// 获取所有已注册的循环任务项

View File

@ -0,0 +1,99 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gwheel
import (
"gitee.com/johng/gf/g/container/gtype"
"time"
)
// 循环任务项
type Entry struct {
wheel *Wheel // 所属时间轮
mode *gtype.Int // 任务运行模式(0: normal; 1: singleton; 2: once; 3: times)
status *gtype.Int // 循环任务状态(0: ready; 1: running; -1: stopped)
times *gtype.Int // 还需运行次数, 当mode=3时启用, 当times值为0时表示不再执行(自动该任务删除)
latest time.Time // 任务上一次的运行时间点
Job JobFunc // 注册循环任务方法
Create time.Time // 任务的创建时间点
Interval time.Duration // 运行间隔
}
// 任务执行方法
type JobFunc func()
// 创建循环任务
func (w *Wheel) newEntry(interval int, job JobFunc, mode int, times int) *Entry {
now := time.Now()
pos := (interval + w.index.Val()) % w.number
entry := &Entry {
wheel : w,
mode : gtype.NewInt(mode),
status : gtype.NewInt(),
times : gtype.NewInt(times),
latest : now,
Job : job,
Create : now,
Interval : w.interval*time.Duration(interval),
}
w.slots[pos].PushBack(entry)
return entry
}
// 获取任务运行模式
func (entry *Entry) Mode() int {
return entry.mode.Val()
}
// 设置任务运行模式(0: normal; 1: singleton; 2: once)
func (entry *Entry) SetMode(mode int) {
entry.mode.Set(mode)
}
// 循环任务状态
func (entry *Entry) Status() int {
return entry.status.Val()
}
// 启动循环任务
func (entry *Entry) Start() {
entry.status.Set(STATUS_READY)
}
// 停止循环任务
func (entry *Entry) Stop() {
entry.status.Set(STATUS_CLOSED)
}
// 检测当前任务是否可运行, 内部将事件转换为微秒数来计算(int64), 精度更高
func (entry *Entry) runnableCheck(t time.Time) bool {
if t.UnixNano() - entry.latest.UnixNano() >= entry.Interval.Nanoseconds() {
// 判断任务的运行模式
switch entry.mode.Val() {
// 是否只允许单例运行
case MODE_SINGLETON:
if entry.status.Set(STATUS_RUNNING) == STATUS_RUNNING {
return false
}
// 只运行一次的任务
case MODE_ONCE:
if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED {
return false
}
// 运行指定次数的任务
case MODE_TIMES:
if entry.times.Add(-1) <= 0 {
if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED {
return false
}
}
}
entry.latest = t
return true
}
return false
}

View File

@ -0,0 +1,66 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gwheel
import (
"gitee.com/johng/gf/g/container/glist"
"time"
)
// 延迟添加循环任务delay参数单位为秒
func (w *Wheel) startLoop() {
go func() {
for {
select {
case <- w.closed:
return
case t := <- w.ticker.C:
if w.status.Val() == STATUS_RUNNING {
index := w.index.Val()
go w.checkEntries(t, w.slots[index])
w.index.Set((index + 1) % w.number)
}
}
}
}()
}
// 遍历检查可执行循环任务,并异步执行
func (w *Wheel) checkEntries(t time.Time, l *glist.List) {
for e := l.Front(); e != nil; e = e.Next() {
entry := e.Value().(*Entry)
// 是否已命令停止运行
if entry.status.Val() == STATUS_CLOSED {
continue
}
// 是否满足运行时间间隔
if !entry.runnableCheck(t) {
continue
}
// 执行异步运行
go func(e *glist.Element, l *glist.List) {
defer func() {
if err := recover(); err != nil {
if err == gPANIC_EXIT {
entry.status.Set(STATUS_CLOSED)
} else {
panic(err)
}
}
if entry.status.Val() != STATUS_CLOSED {
entry.status.Set(STATUS_READY)
} else {
// 异步删除,不受锁机制的影响
l.Remove(e)
}
}()
entry.Job()
}(e, l)
}
}

124
g/os/gwheel/gwheel_wheel.go Normal file
View File

@ -0,0 +1,124 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gwheel
import (
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
"time"
)
// 循环任务管理对象
type Wheel struct {
status *gtype.Int // 循环任务状态(0: 未执行; 1: 运行中; -1:删除关闭)
index *gtype.Int // 时间轮处理的当前索引位置
slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组
number int // Slot Number
closed chan struct{} // 停止事件
create time.Time // 创建时间
ticker *time.Ticker // 时间轮间隔
interval time.Duration // 时间间隔(slot时间长度)
}
// 创建使用默认值的时间轮
func NewDefault() *Wheel {
return New(gDEFAULT_SLOT_NUMBER, gDEFAULT_WHEEL_INTERVAL)
}
// 创建自定义的循环任务管理对象
func New(slot int, interval time.Duration) *Wheel {
w := &Wheel {
status : gtype.NewInt(STATUS_RUNNING),
index : gtype.NewInt(),
slots : make([]*glist.List, slot),
number : slot,
closed : make(chan struct{}, 1),
create : time.Now(),
ticker : time.NewTicker(interval),
interval : interval,
}
for i := 0; i < w.number; i++ {
w.slots[i] = glist.New()
}
w.startLoop()
return w
}
// 添加循环任务
func (w *Wheel) Add(interval int, job JobFunc) *Entry {
return w.newEntry(interval, job, MODE_NORMAL, 0)
}
// 添加单例运行循环任务
func (w *Wheel) AddSingleton(interval int, job JobFunc) *Entry {
return w.newEntry(interval, job, MODE_SINGLETON, 0)
}
// 添加只运行一次的循环任务
func (w *Wheel) AddOnce(interval int, job JobFunc) *Entry {
return w.newEntry(interval, job, MODE_ONCE, 0)
}
// 添加运行指定次数的循环任务
func (w *Wheel) AddTimes(interval int, times int, job JobFunc) *Entry {
return w.newEntry(interval, job, MODE_TIMES, times)
}
// 延迟添加循环任务delay参数单位为时间轮刻度
func (w *Wheel) DelayAdd(delay int, interval int, job JobFunc) {
w.AddOnce(delay, func() {
w.Add(interval, job)
})
}
// 延迟添加单例循环任务delay参数单位为时间轮刻度
func (w *Wheel) DelayAddSingleton(delay int, interval int, job JobFunc) {
w.AddOnce(delay, func() {
w.AddSingleton(interval, job)
})
}
// 延迟添加只运行一次的循环任务delay参数单位为时间轮刻度
func (w *Wheel) DelayAddOnce(delay int, interval int, job JobFunc) {
w.AddOnce(delay, func() {
w.AddOnce(interval, job)
})
}
// 延迟添加只运行一次的循环任务delay参数单位为时间轮刻度
func (w *Wheel) DelayAddTimes(delay int, interval int, times int, job JobFunc) {
w.AddOnce(delay, func() {
w.AddTimes(interval, times, job)
})
}
// 当前时间轮已注册的任务数
func (w *Wheel) Size() int {
size := 0
for _, l := range w.slots {
size += l.Len()
}
return size
}
// 关闭循环任务
func (w *Wheel) Close() {
w.status.Set(STATUS_CLOSED)
w.ticker.Stop()
w.closed <- struct{}{}
}
// 获取所有已注册的循环任务项(按照注册时间从小到大进行排序)
func (w *Wheel) Entries() []*Entry {
entries := make([]*Entry, 0)
for _, l := range w.slots {
for e := l.Front(); e != nil; e = e.Next() {
entries = append(entries, e.Value().(*Entry))
}
}
return entries
}

View File

@ -4,17 +4,17 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimew_test
package gwheel_test
import (
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
"testing"
)
func Benchmark_Add(b *testing.B) {
for i := 0; i < b.N; i++ {
// 基准测试的时候不能设置为1秒否则大量的任务会崩掉系统
gtimew.Add(100000, func() {
gwheel.Add(100000, func() {
})
}

View File

@ -0,0 +1,150 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 包方法操作
package gwheel_test
import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gwheel"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
)
func TestWheel_Add_Close(t *testing.T) {
gtest.Case(func() {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
entry1 := wheel.Add(10, func() {
array.Append(1)
})
entry2 := wheel.Add(10, func() {
array.Append(1)
})
entry3 := wheel.Add(20, func() {
array.Append(1)
})
gtest.AssertNE(entry1, nil)
gtest.AssertNE(entry2, nil)
gtest.AssertNE(entry3, nil)
gtest.Assert(wheel.Size(), 3)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 5)
wheel.Close()
time.Sleep(1200*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
}
func TestWheel_Singlton(t *testing.T) {
gtest.Case(func() {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
entry := wheel.AddSingleton(10, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
gtest.AssertNE(entry, nil)
gtest.Assert(wheel.Size(), 1)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_Once(t *testing.T) {
gtest.Case(func() {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
entry1 := wheel.AddOnce(10, func() {
array.Append(1)
})
entry2 := wheel.AddOnce(10, func() {
array.Append(1)
})
gtest.AssertNE(entry1, nil)
gtest.AssertNE(entry2, nil)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 2)
wheel.Close()
time.Sleep(1200*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
}
func TestWheel_DelayAdd(t *testing.T) {
gtest.Case(func() {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
wheel.DelayAdd(10, 10, func() {
array.Append(1)
})
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 0)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_DelayAdd_Singleton(t *testing.T) {
gtest.Case(func() {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
wheel.DelayAddSingleton(10, 10, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_DelayAdd_Once(t *testing.T) {
gtest.Case(func() {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
wheel.DelayAddOnce(10, 10, func() {
array.Append(1)
})
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_ExitJob(t *testing.T) {
gtest.Case(func() {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
wheel.Add(10, func() {
array.Append(1)
gwheel.ExitJob()
})
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
gtest.Assert(wheel.Size(), 0)
})
}

View File

@ -6,63 +6,63 @@
// Entry操作
package gtimew_test
package gwheel_test
import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/os/gwheel"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
)
func TestWheel_Entry_Operation(t *testing.T) {
wheel := gtimew.New()
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
entry := wheel.Add(1, func() {
entry := wheel.Add(10, func() {
array.Append(1)
})
gtest.AssertNE(entry, nil)
gtest.Assert(wheel.Size(), 1)
time.Sleep(1100*time.Millisecond)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
entry.Stop()
time.Sleep(1100*time.Millisecond)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
entry.Start()
time.Sleep(1100*time.Millisecond)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 2)
}
func TestWheel_Entry_Singlton(t *testing.T) {
wheel := gtimew.New()
func TestWheel_Entry_Singleton(t *testing.T) {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
entry := wheel.Add(1, func() {
entry := wheel.Add(10, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
entry.SetMode(gtimew.MODE_SINGLETON)
entry.SetMode(gwheel.MODE_SINGLETON)
gtest.AssertNE(entry, nil)
gtest.Assert(wheel.Size(), 1)
time.Sleep(1100*time.Millisecond)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1100*time.Millisecond)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
}
func TestWheel_Entry_Once(t *testing.T) {
wheel := gtimew.New()
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
entry := wheel.Add(1, func() {
entry := wheel.Add(10, func() {
array.Append(1)
})
entry.SetMode(gtimew.MODE_ONCE)
entry.SetMode(gwheel.MODE_ONCE)
gtest.AssertNE(entry, nil)
gtest.Assert(wheel.Size(), 1)
time.Sleep(1100*time.Millisecond)
time.Sleep(1200*time.Millisecond)
gtest.Assert(array.Len(), 1)
gtest.Assert(wheel.Size(), 0)
}

View File

@ -0,0 +1,28 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 指定次数运行测试
package gwheel_test
import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gwheel"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
)
func TestWheel_Times(t *testing.T) {
wheel := gwheel.NewDefault()
array := garray.New(0, 0)
entry := wheel.AddTimes(10, 20, func() {
array.Append(1)
})
gtest.AssertNE(entry, nil)
time.Sleep(3500*time.Millisecond)
gtest.Assert(array.Len(), 2)
}

View File

@ -4,6 +4,6 @@ import "time"
func main() {
for {
time.Sleep(time.Second)
time.Sleep(10*time.Millisecond)
}
}