diff --git a/TODO.MD b/TODO.MD index 3345805b4..3aae6623c 100644 --- a/TODO.MD +++ b/TODO.MD @@ -47,6 +47,13 @@ 1. 改进WebServer获取POST参数处理逻辑,当提交非form数据时,例如json数据,针对某些方法可以直接解析; 1. WebServer增加可选择的路由覆盖配置,默认情况下不覆盖; 1. grpool性能压测结果变慢的问题; +1. glist增加Element类型的并发安全处理; + + + + + + # DONE diff --git a/g/container/glist/glist.go b/g/container/glist/glist.go index f6eb2643d..6baeea906 100644 --- a/g/container/glist/glist.go +++ b/g/container/glist/glist.go @@ -5,274 +5,276 @@ // You can obtain one at https://gitee.com/johng/gf. // -// Package glist provides a concurrent-safe(alternative) doubly linked list. -// 并发安全的双向链表. +// Package glist provides a concurrent-safe(alternative) doubly linked list/并发安全的双向链表. package glist import ( - "container/list" - "gitee.com/johng/gf/g/container/internal/rwmutex" + "gitee.com/johng/gf/g/container/gtype" ) // 变长双向链表 type List struct { - mu *rwmutex.RWMutex - list *list.List + safe bool + root *Element + length *gtype.Int } // 获得一个变长链表指针 func New(safe...bool) *List { - return &List { - mu : rwmutex.New(safe...), - list : list.New(), + l := &List{ + length : gtype.NewInt(), } + l.root = newElement(nil, safe...) + l.root.list = l + l.root.setNext(l.root) + l.root.setPrev(l.root) + if len(safe) > 0 { + l.safe = safe[0] + } else { + l.safe = true + } + return l } // 往链表头入栈数据项 -func (l *List) PushFront(v interface{}) *list.Element { - l.mu.Lock() - e := l.list.PushFront(v) - l.mu.Unlock() - return e +func (l *List) PushFront(v interface{}) *Element { + return l.insertValue(v, l.root) } // 往链表尾入栈数据项 -func (l *List) PushBack(v interface{}) *list.Element { - l.mu.Lock() - r := l.list.PushBack(v) - l.mu.Unlock() - return r +func (l *List) PushBack(v interface{}) *Element { + return l.insertValue(v, l.root.getPrev()) } // 在list 中元素mark之后插入一个值为v的元素,并返回该元素,如果mark不是list中元素,则list不改变。 -func (l *List) InsertAfter(v interface{}, mark *list.Element) *list.Element { - l.mu.Lock() - r := l.list.InsertAfter(v, mark) - l.mu.Unlock() - return r +func (l *List) InsertAfter(v interface{}, mark *Element) *Element { + if mark.list != l { + return nil + } + return l.insertValue(v, mark) } // 在list 中元素mark之前插入一个值为v的元素,并返回该元素,如果mark不是list中元素,则list不改变。 -func (l *List) InsertBefore(v interface{}, mark *list.Element) *list.Element { - l.mu.Lock() - r := l.list.InsertBefore(v, mark) - l.mu.Unlock() - return r +func (l *List) InsertBefore(v interface{}, mark *Element) *Element { + if mark.list != l { + return nil + } + return l.insertValue(v, mark.getPrev()) } - // 批量往链表头入栈数据项 func (l *List) BatchPushFront(vs []interface{}) { - l.mu.Lock() for _, item := range vs { - l.list.PushFront(item) + l.PushFront(item) } - l.mu.Unlock() } // 从链表尾端出栈数据项(删除) func (l *List) PopBack() interface{} { - l.mu.Lock() - if elem := l.list.Back(); elem != nil { - item := l.list.Remove(elem) - l.mu.Unlock() - return item + if e := l.Back(); e != nil { + return l.Remove(e) } - l.mu.Unlock() return nil } // 从链表头端出栈数据项(删除) func (l *List) PopFront() interface{} { - l.mu.Lock() - if elem := l.list.Front(); elem != nil { - item := l.list.Remove(elem) - l.mu.Unlock() - return item + if e := l.Front(); e != nil { + return l.Remove(e) } - l.mu.Unlock() return nil } // 批量从链表尾端出栈数据项(删除) func (l *List) BatchPopBack(max int) []interface{} { - l.mu.Lock() - count := l.list.Len() + count := l.Len() if count == 0 { - l.mu.Unlock() return []interface{}{} } - if count > max { count = max } items := make([]interface{}, count) for i := 0; i < count; i++ { - items[i] = l.list.Remove(l.list.Back()) + items[i] = l.PopBack() } - l.mu.Unlock() return items } // 批量从链表头端出栈数据项(删除) func (l *List) BatchPopFront(max int) []interface{} { - l.mu.Lock() - count := l.list.Len() + count := l.Len() if count == 0 { - l.mu.Unlock() return []interface{}{} } - if count > max { count = max } items := make([]interface{}, count) for i := 0; i < count; i++ { - items[i] = l.list.Remove(l.list.Front()) + items[i] = l.PopFront() } - l.mu.Unlock() return items } // 批量从链表尾端依次获取所有数据(删除) func (l *List) PopBackAll() []interface{} { - l.mu.Lock() - count := l.list.Len() - if count == 0 { - l.mu.Unlock() - return []interface{}{} - } - items := make([]interface{}, count) - for i := 0; i < count; i++ { - items[i] = l.list.Remove(l.list.Back()) - } - l.mu.Unlock() - return items + return l.BatchPopFront(l.Len()) } // 批量从链表头端依次获取所有数据(删除) func (l *List) PopFrontAll() []interface{} { - l.mu.Lock() - count := l.list.Len() - if count == 0 { - l.mu.Unlock() - return []interface{}{} - } - items := make([]interface{}, count) - for i := 0; i < count; i++ { - items[i] = l.list.Remove(l.list.Front()) - } - l.mu.Unlock() - return items + return l.BatchPopFront(l.Len()) } -// 删除数据项 -func (l *List) Remove(e *list.Element) interface{} { - l.mu.Lock() - r := l.list.Remove(e) - l.mu.Unlock() - return r +// 删除数据项e +func (l *List) Remove(e *Element) interface{} { + if e.list == l { + l.remove(e) + } + return e.Value() } // 删除所有数据项 func (l *List) RemoveAll() { - l.mu.Lock() - l.list = list.New() - l.mu.Unlock() + l.length.Set(0) + l.root.setNext(l.root) + l.root.setPrev(l.root) } // 从链表头获取所有数据(不删除) func (l *List) FrontAll() []interface{} { - l.mu.RLock() - count := l.list.Len() + count := l.Len() if count == 0 { - l.mu.RUnlock() return []interface{}{} } - items := make([]interface{}, 0, count) - for e := l.list.Front(); e != nil; e = e.Next() { - items = append(items, e.Value) + for e := l.Front(); e != nil; e = e.Next() { + items = append(items, e.Value()) } - l.mu.RUnlock() return items } // 从链表尾获取所有数据(不删除) func (l *List) BackAll() []interface{} { - l.mu.RLock() - count := l.list.Len() + count := l.Len() if count == 0 { - l.mu.RUnlock() return []interface{}{} } - items := make([]interface{}, 0, count) - for e := l.list.Back(); e != nil; e = e.Prev() { - items = append(items, e.Value) + for e := l.Back(); e != nil; e = e.Prev() { + items = append(items, e.Value()) } - l.mu.RUnlock() return items } // 获取链表头值(不删除) func (l *List) FrontItem() interface{} { - l.mu.RLock() - if f := l.list.Front(); f != nil { - l.mu.RUnlock() - return f.Value + if e := l.Front(); e != nil { + return e.Value() } - - l.mu.RUnlock() return nil } // 获取链表尾值(不删除) func (l *List) BackItem() interface{} { - l.mu.RLock() - if f := l.list.Back(); f != nil { - l.mu.RUnlock() - return f.Value + if e := l.Back(); e != nil { + return e.Value() } - - l.mu.RUnlock() return nil } // 获取表头指针 -func (l *List) Front() *list.Element { - l.mu.RLock() - r := l.list.Front() - l.mu.RUnlock() - return r +func (l *List) Front() *Element { + if l.length.Val() == 0 { + return nil + } + return l.root.getNext() } // 获取表位指针 -func (l *List) Back() *list.Element { - l.mu.RLock() - r := l.list.Back() - l.mu.RUnlock() - return r +func (l *List) Back() *Element { + if l.length.Val() == 0 { + return nil + } + return l.root.getPrev() } // 获取链表长度 func (l *List) Len() int { - l.mu.RLock() - length := l.list.Len() - l.mu.RUnlock() - return length + return l.length.Val() } -// 读锁操作 -func (l *List) RLockFunc(f func(l *list.List)) { - l.mu.RLock() - defer l.mu.RUnlock() - f(l.list) +func (l *List) MoveBefore(e, mark *Element) { + if e.list != l || e == mark || mark.list != l { + return + } + l.insert(l.remove(e), mark.getPrev()) } -// 写锁操作 -func (l *List) LockFunc(f func(l *list.List)) { - l.mu.Lock() - defer l.mu.Unlock() - f(l.list) +func (l *List) MoveAfter(e, mark *Element) { + if e.list != l || e == mark || mark.list != l { + return + } + l.insert(l.remove(e), mark) +} + +func (l *List) MoveToFront(e *Element) { + if e.list != l || l.root.getNext() == e { + return + } + l.insert(l.remove(e), l.root) +} + +func (l *List) MoveToBack(e *Element) { + if e.list != l || l.root.getPrev() == e { + return + } + l.insert(l.remove(e), l.root.getPrev()) +} + +func (l *List) PushBackList(other *List) { + for i, e := other.Len(), other.Front(); i > 0; i, e = i - 1, e.Next() { + l.insertValue(e.Value(), l.root.getPrev()) + } +} + +func (l *List) PushFrontList(other *List) { + for i, e := other.Len(), other.Back(); i > 0; i, e = i - 1, e.Prev() { + l.insertValue(e.Value(), l.root) + } +} + +// 在元素项p后添加数值value, 自动创建元素项 +func (l *List) insertValue(value interface{}, p *Element) *Element { + return l.insert(newElement(value, l.safe), p) +} + +// 在元素项p后添加元素项e +func (l *List) insert(e, p *Element) *Element { + p.LockFunc(func(p *Element) { + e.LockFunc(func(e *Element) { + n := p.next + p.next = e + e.prev = p + e.next = n + n.prev = e + e.list = l + }) + }) + l.length.Add(1) + return e +} + +// 从列表中删除元素项e +func (l *List) remove(e *Element) *Element { + e.LockFunc(func(e *Element) { + e.prev.setNext(e.next) + e.next.setPrev(e.prev) + e.next = nil + e.prev = nil + e.list = nil + }) + l.length.Add(-1) + return e } diff --git a/g/container/glist/glist_element.go b/g/container/glist/glist_element.go new file mode 100644 index 000000000..989ee13c8 --- /dev/null +++ b/g/container/glist/glist_element.go @@ -0,0 +1,103 @@ +// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with l file, +// You can obtain one at https://gitee.com/johng/gf. +// + +package glist + +import ( + "gitee.com/johng/gf/g/container/internal/rwmutex" +) + +// 链表元素项 +type Element struct { + mu *rwmutex.RWMutex + list *List + prev *Element + next *Element + value interface{} +} + +// 创建一个并发安全的列表元素项 +func newElement(value interface{}, safe...bool) *Element { + return &Element { + mu : rwmutex.New(safe...), + value : value, + } +} + +// 获得元素项值 +func (e *Element) Value() interface{} { + e.mu.RLock() + r := e.value + e.mu.RUnlock() + return r +} + +// 获得下一个元素项(遍历使用) +func (e *Element) Next() *Element { + e.mu.RLock() + defer e.mu.RUnlock() + if p := e.next; e.list != nil && p != e.list.root { + return p + } + return nil +} + +// 获得前一个元素项(遍历使用) +func (e *Element) Prev() *Element { + e.mu.RLock() + defer e.mu.RUnlock() + if p := e.prev; e.list != nil && p != e.list.root { + return p + } + return nil +} + +// 只读锁操作 +func (e *Element) RLockFunc(f func(e *Element)) { + e.mu.RLock() + defer e.mu.RUnlock() + f(e) +} + +// 写锁操作 +func (e *Element) LockFunc(f func(e *Element)) { + e.mu.Lock() + defer e.mu.Unlock() + f(e) +} + +func (e *Element) setPrev(prev *Element) (old *Element) { + e.mu.Lock() + old = e.prev + e.prev = prev + e.mu.Unlock() + return +} + +func (e *Element) setNext(next *Element) (old *Element) { + e.mu.Lock() + old = e.next + e.next = next + e.mu.Unlock() + return +} + +// 获得前一个元素项(内部并发安全使用) +func (e *Element) getPrev() (prev *Element) { + e.mu.RLock() + prev = e.prev + e.mu.RUnlock() + return +} + +// 获得下一个元素项(内部并发安全使用) +func (e *Element) getNext() (next *Element) { + e.mu.RLock() + next = e.next + e.mu.RUnlock() + return +} \ No newline at end of file diff --git a/g/container/glist/glist_test.go b/g/container/glist/glist_z_bench_test.go similarity index 100% rename from g/container/glist/glist_test.go rename to g/container/glist/glist_z_bench_test.go diff --git a/g/container/glist/glist_z_unit_test.go b/g/container/glist/glist_z_unit_test.go new file mode 100644 index 000000000..759f504d9 --- /dev/null +++ b/g/container/glist/glist_z_unit_test.go @@ -0,0 +1,354 @@ +// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package glist + +import "testing" + +func checkListLen(t *testing.T, l *List, len int) bool { + if n := l.Len(); n != len { + t.Errorf("l.Len() = %d, want %d", n, len) + return false + } + return true +} + +func checkListPointers(t *testing.T, l *List, es []*Element) { + root := l.root + + if !checkListLen(t, l, len(es)) { + return + } + + // zero length lists must be the zero value or properly initialized (sentinel circle) + if len(es) == 0 { + if l.root.next != nil && l.root.next != root || l.root.prev != nil && l.root.prev != root { + t.Errorf("l.root.next = %p, l.root.prev = %p; both should both be nil or %p", l.root.next, l.root.prev, root) + } + return + } + // len(es) > 0 + + // check internal and external prev/next connections + for i, e := range es { + prev := root + Prev := (*Element)(nil) + if i > 0 { + prev = es[i-1] + Prev = prev + } + if p := e.prev; p != prev { + t.Errorf("elt[%d](%p).prev = %p, want %p", i, e, p, prev) + } + if p := e.Prev(); p != Prev { + t.Errorf("elt[%d](%p).Prev() = %p, want %p", i, e, p, Prev) + } + + next := root + Next := (*Element)(nil) + if i < len(es)-1 { + next = es[i+1] + Next = next + } + if n := e.next; n != next { + t.Errorf("elt[%d](%p).next = %p, want %p", i, e, n, next) + } + if n := e.Next(); n != Next { + t.Errorf("elt[%d](%p).Next() = %p, want %p", i, e, n, Next) + } + } +} + +func TestList(t *testing.T) { + l := New() + checkListPointers(t, l, []*Element{}) + + // Single element list + e := l.PushFront("a") + checkListPointers(t, l, []*Element{e}) + l.MoveToFront(e) + checkListPointers(t, l, []*Element{e}) + l.MoveToBack(e) + checkListPointers(t, l, []*Element{e}) + l.Remove(e) + checkListPointers(t, l, []*Element{}) + + // Bigger list + e2 := l.PushFront(2) + e1 := l.PushFront(1) + e3 := l.PushBack(3) + e4 := l.PushBack("banana") + checkListPointers(t, l, []*Element{e1, e2, e3, e4}) + + l.Remove(e2) + checkListPointers(t, l, []*Element{e1, e3, e4}) + + l.MoveToFront(e3) // move from middle + checkListPointers(t, l, []*Element{e3, e1, e4}) + + l.MoveToFront(e1) + l.MoveToBack(e3) // move from middle + checkListPointers(t, l, []*Element{e1, e4, e3}) + + l.MoveToFront(e3) // move from back + checkListPointers(t, l, []*Element{e3, e1, e4}) + l.MoveToFront(e3) // should be no-op + checkListPointers(t, l, []*Element{e3, e1, e4}) + + l.MoveToBack(e3) // move from front + checkListPointers(t, l, []*Element{e1, e4, e3}) + l.MoveToBack(e3) // should be no-op + checkListPointers(t, l, []*Element{e1, e4, e3}) + + e2 = l.InsertBefore(2, e1) // insert before front + checkListPointers(t, l, []*Element{e2, e1, e4, e3}) + l.Remove(e2) + e2 = l.InsertBefore(2, e4) // insert before middle + checkListPointers(t, l, []*Element{e1, e2, e4, e3}) + l.Remove(e2) + e2 = l.InsertBefore(2, e3) // insert before back + checkListPointers(t, l, []*Element{e1, e4, e2, e3}) + l.Remove(e2) + + e2 = l.InsertAfter(2, e1) // insert after front + checkListPointers(t, l, []*Element{e1, e2, e4, e3}) + l.Remove(e2) + e2 = l.InsertAfter(2, e4) // insert after middle + checkListPointers(t, l, []*Element{e1, e4, e2, e3}) + l.Remove(e2) + e2 = l.InsertAfter(2, e3) // insert after back + checkListPointers(t, l, []*Element{e1, e4, e3, e2}) + l.Remove(e2) + + // Check standard iteration. + sum := 0 + for e := l.Front(); e != nil; e = e.Next() { + if i, ok := e.Value().(int); ok { + sum += i + } + } + if sum != 4 { + t.Errorf("sum over l = %d, want 4", sum) + } + + // Clear all elements by iterating + var next *Element + for e := l.Front(); e != nil; e = next { + next = e.Next() + l.Remove(e) + } + checkListPointers(t, l, []*Element{}) +} + +func checkList(t *testing.T, l *List, es []interface{}) { + if !checkListLen(t, l, len(es)) { + return + } + + i := 0 + for e := l.Front(); e != nil; e = e.Next() { + le := e.Value().(int) + if le != es[i] { + t.Errorf("elt[%d].Value() = %v, want %v", i, le, es[i]) + } + i++ + } +} + +func TestExtending(t *testing.T) { + l1 := New() + l2 := New() + + l1.PushBack(1) + l1.PushBack(2) + l1.PushBack(3) + + l2.PushBack(4) + l2.PushBack(5) + + l3 := New() + l3.PushBackList(l1) + checkList(t, l3, []interface{}{1, 2, 3}) + l3.PushBackList(l2) + checkList(t, l3, []interface{}{1, 2, 3, 4, 5}) + + l3 = New() + l3.PushFrontList(l2) + checkList(t, l3, []interface{}{4, 5}) + l3.PushFrontList(l1) + checkList(t, l3, []interface{}{1, 2, 3, 4, 5}) + + checkList(t, l1, []interface{}{1, 2, 3}) + checkList(t, l2, []interface{}{4, 5}) + + l3 = New() + l3.PushBackList(l1) + checkList(t, l3, []interface{}{1, 2, 3}) + l3.PushBackList(l3) + checkList(t, l3, []interface{}{1, 2, 3, 1, 2, 3}) + + l3 = New() + l3.PushFrontList(l1) + checkList(t, l3, []interface{}{1, 2, 3}) + l3.PushFrontList(l3) + checkList(t, l3, []interface{}{1, 2, 3, 1, 2, 3}) + + l3 = New() + l1.PushBackList(l3) + checkList(t, l1, []interface{}{1, 2, 3}) + l1.PushFrontList(l3) + checkList(t, l1, []interface{}{1, 2, 3}) +} + +func TestRemove(t *testing.T) { + l := New() + e1 := l.PushBack(1) + e2 := l.PushBack(2) + checkListPointers(t, l, []*Element{e1, e2}) + e := l.Front() + l.Remove(e) + checkListPointers(t, l, []*Element{e2}) + l.Remove(e) + checkListPointers(t, l, []*Element{e2}) +} + +func TestIssue4103(t *testing.T) { + l1 := New() + l1.PushBack(1) + l1.PushBack(2) + + l2 := New() + l2.PushBack(3) + l2.PushBack(4) + + e := l1.Front() + l2.Remove(e) // l2 should not change because e is not an element of l2 + if n := l2.Len(); n != 2 { + t.Errorf("l2.Len() = %d, want 2", n) + } + + l1.InsertBefore(8, e) + if n := l1.Len(); n != 3 { + t.Errorf("l1.Len() = %d, want 3", n) + } +} + +func TestIssue6349(t *testing.T) { + l := New() + l.PushBack(1) + l.PushBack(2) + + e := l.Front() + l.Remove(e) + if e.Value() != 1 { + t.Errorf("e.value = %d, want 1", e.Value()) + } + if e.Next() != nil { + t.Errorf("e.Next() != nil") + } + if e.Prev() != nil { + t.Errorf("e.Prev() != nil") + } +} + +func TestMove(t *testing.T) { + l := New() + e1 := l.PushBack(1) + e2 := l.PushBack(2) + e3 := l.PushBack(3) + e4 := l.PushBack(4) + + l.MoveAfter(e3, e3) + checkListPointers(t, l, []*Element{e1, e2, e3, e4}) + l.MoveBefore(e2, e2) + checkListPointers(t, l, []*Element{e1, e2, e3, e4}) + + l.MoveAfter(e3, e2) + checkListPointers(t, l, []*Element{e1, e2, e3, e4}) + l.MoveBefore(e2, e3) + checkListPointers(t, l, []*Element{e1, e2, e3, e4}) + + l.MoveBefore(e2, e4) + checkListPointers(t, l, []*Element{e1, e3, e2, e4}) + e2, e3 = e3, e2 + + l.MoveBefore(e4, e1) + checkListPointers(t, l, []*Element{e4, e1, e2, e3}) + e1, e2, e3, e4 = e4, e1, e2, e3 + + l.MoveAfter(e4, e1) + checkListPointers(t, l, []*Element{e1, e4, e2, e3}) + e2, e3, e4 = e4, e2, e3 + + l.MoveAfter(e2, e3) + checkListPointers(t, l, []*Element{e1, e3, e2, e4}) + e2, e3 = e3, e2 +} + +// Test PushFront, PushBack, PushFrontList, PushBackList with uninitialized List +func TestZeroList(t *testing.T) { + var l1 = New() + l1.PushFront(1) + checkList(t, l1, []interface{}{1}) + + var l2 = New() + l2.PushBack(1) + checkList(t, l2, []interface{}{1}) + + var l3 = New() + l3.PushFrontList(l1) + checkList(t, l3, []interface{}{1}) + + var l4 = New() + l4.PushBackList(l2) + checkList(t, l4, []interface{}{1}) +} + +// Test that a list l is not modified when calling InsertBefore with a mark that is not an element of l. +func TestInsertBeforeUnknownMark(t *testing.T) { + l := New() + l.PushBack(1) + l.PushBack(2) + l.PushBack(3) + l.InsertBefore(1, new(Element)) + checkList(t, l, []interface{}{1, 2, 3}) +} + +// Test that a list l is not modified when calling InsertAfter with a mark that is not an element of l. +func TestInsertAfterUnknownMark(t *testing.T) { + l := New() + l.PushBack(1) + l.PushBack(2) + l.PushBack(3) + l.InsertAfter(1, new(Element)) + checkList(t, l, []interface{}{1, 2, 3}) +} + +// Test that a list l is not modified when calling MoveAfter or MoveBefore with a mark that is not an element of l. +func TestMoveUnknownMark(t *testing.T) { + l1 := New() + e1 := l1.PushBack(1) + + l2 := New() + e2 := l2.PushBack(2) + + l1.MoveAfter(e1, e2) + checkList(t, l1, []interface{}{1}) + checkList(t, l2, []interface{}{2}) + + l1.MoveBefore(e1, e2) + checkList(t, l1, []interface{}{1}) + checkList(t, l2, []interface{}{2}) +} + +func TestList_RemoveAll(t *testing.T) { + l := New() + l.PushBack(1) + l.RemoveAll() + checkList(t, l, []interface{}{}) + l.PushBack(2) + checkList(t, l, []interface{}{2}) +} \ No newline at end of file diff --git a/g/container/gpool/gpool.go b/g/container/gpool/gpool.go index 7381df2f9..527f8c512 100644 --- a/g/container/gpool/gpool.go +++ b/g/container/gpool/gpool.go @@ -12,7 +12,7 @@ import ( "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/os/gtime" - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" ) // 对象池 @@ -43,7 +43,7 @@ func New(expire int, newFunc...func() (interface{}, error)) *Pool { if len(newFunc) > 0 { r.NewFunc = newFunc[0] } - gtimew.AddSingleton(1, r.checkExpire) + gwheel.AddSingleton(1, r.checkExpire) return r } @@ -101,7 +101,7 @@ func (p *Pool) Close() { // 超时检测循环 func (p *Pool) checkExpire() { if p.closed.Val() { - gtimew.ExitJob() + gwheel.ExitJob() } for { if r := p.list.PopFront(); r != nil { diff --git a/g/os/gcache/gcache_cache.go b/g/os/gcache/gcache_cache.go index a25b20ca5..df7302b8b 100644 --- a/g/os/gcache/gcache_cache.go +++ b/g/os/gcache/gcache_cache.go @@ -7,7 +7,7 @@ package gcache import ( - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" "sync/atomic" "unsafe" ) @@ -23,7 +23,7 @@ func New(lruCap...int) *Cache { c := &Cache { memCache : newMemCache(lruCap...), } - gtimew.AddSingleton(1, c.syncEventAndClearExpired) + gwheel.AddSingleton(1, c.syncEventAndClearExpired) return c } diff --git a/g/os/gcache/gcache_mem_cache.go b/g/os/gcache/gcache_mem_cache.go index fd53a952a..8aa74213f 100644 --- a/g/os/gcache/gcache_mem_cache.go +++ b/g/os/gcache/gcache_mem_cache.go @@ -11,7 +11,7 @@ import ( "gitee.com/johng/gf/g/container/gset" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/os/gtime" - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" "gitee.com/johng/gf/g/util/gconv" "math" "sync" @@ -292,7 +292,7 @@ func (c *memCache) syncEventAndClearExpired() { oldExpireTime := int64(0) newExpireTime := int64(0) if c.closed.Val() { - gtimew.ExitJob() + gwheel.ExitJob() return } // ======================== diff --git a/g/os/gcache/gcache_mem_cache_lru.go b/g/os/gcache/gcache_mem_cache_lru.go index bc2588014..b95680ffe 100644 --- a/g/os/gcache/gcache_mem_cache_lru.go +++ b/g/os/gcache/gcache_mem_cache_lru.go @@ -7,12 +7,11 @@ package gcache import ( - "container/list" "fmt" "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gtype" - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" ) // LRU算法实现对象,底层双向链表使用了标准库的list.List @@ -33,7 +32,7 @@ func newMemCacheLru(cache *memCache) *memCacheLru { rawList : glist.New(), closed : gtype.NewBool(), } - gtimew.AddSingleton(1, lru.SyncAndClear) + gwheel.AddSingleton(1, lru.SyncAndClear) return lru } @@ -46,7 +45,7 @@ func (lru *memCacheLru) Close() { func (lru *memCacheLru) Remove(key interface{}) { if v := lru.data.Get(key); v != nil { lru.data.Remove(key) - lru.list.Remove(v.(*list.Element)) + lru.list.Remove(v.(*glist.Element)) } } @@ -80,7 +79,7 @@ func (lru *memCacheLru) Print() { // 异步执行协程,将queue中的数据同步到list中 func (lru *memCacheLru) SyncAndClear() { if lru.closed.Val() { - gtimew.ExitJob() + gwheel.ExitJob() return } // 数据同步 @@ -88,7 +87,7 @@ func (lru *memCacheLru) SyncAndClear() { if v := lru.rawList.PopFront(); v != nil { // 删除对应链表项 if v := lru.data.Get(v); v != nil { - lru.list.Remove(v.(*list.Element)) + lru.list.Remove(v.(*glist.Element)) } // 将数据插入到链表头,并记录对应的链表项到哈希表中,便于检索 lru.data.Set(v, lru.list.PushFront(v)) diff --git a/g/os/gcron/gcron_cron.go b/g/os/gcron/gcron_cron.go index 15e4273a6..4d1752365 100644 --- a/g/os/gcron/gcron_cron.go +++ b/g/os/gcron/gcron_cron.go @@ -12,7 +12,7 @@ import ( "gitee.com/johng/gf/g/container/garray" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gtype" - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" "strconv" ) @@ -76,7 +76,7 @@ func (c *Cron) AddOnce(pattern string, job func(), name ... string) (*Entry, err // 延迟添加定时任务,delay参数单位为秒 func (c *Cron) DelayAdd(delay int, pattern string, job func(), name ... string) { - gtimew.AddOnce(delay, func() { + gwheel.AddOnce(delay, func() { if _, err := c.Add(pattern, job, name ...); err != nil { panic(err) } @@ -85,7 +85,7 @@ func (c *Cron) DelayAdd(delay int, pattern string, job func(), name ... string) // 延迟添加单例定时任务,delay参数单位为秒 func (c *Cron) DelayAddSingleton(delay int, pattern string, job func(), name ... string) { - gtimew.AddOnce(delay, func() { + gwheel.AddOnce(delay, func() { if _, err := c.AddSingleton(pattern, job, name ...); err != nil { panic(err) } @@ -94,7 +94,7 @@ func (c *Cron) DelayAddSingleton(delay int, pattern string, job func(), name ... // 延迟添加只运行一次的定时任务,delay参数单位为秒 func (c *Cron) DelayAddOnce(delay int, pattern string, job func(), name ... string) { - gtimew.AddOnce(delay, func() { + gwheel.AddOnce(delay, func() { if _, err := c.AddOnce(pattern, job, name ...); err != nil { panic(err) } diff --git a/g/os/gcron/gcron_jobloop.go b/g/os/gcron/gcron_jobloop.go index 91cb686ec..73f7b7a96 100644 --- a/g/os/gcron/gcron_jobloop.go +++ b/g/os/gcron/gcron_jobloop.go @@ -7,15 +7,15 @@ package gcron import ( - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" "time" ) // 延迟添加定时任务,delay参数单位为秒 func (c *Cron) startLoop() { - gtimew.Add(1, func() { + gwheel.Add(1, func() { if c.status.Val() == STATUS_CLOSED { - gtimew.ExitJob() + gwheel.ExitJob() } if c.status.Val() == STATUS_RUNNING { go c.checkEntries(time.Now()) diff --git a/g/os/gcron/gcron_unit_1_test.go b/g/os/gcron/gcron_unit_1_test.go index 3f53ea83b..28f8b3417 100644 --- a/g/os/gcron/gcron_unit_1_test.go +++ b/g/os/gcron/gcron_unit_1_test.go @@ -8,6 +8,7 @@ package gcron_test import ( + "fmt" "gitee.com/johng/gf/g/container/garray" "gitee.com/johng/gf/g/os/gcron" "gitee.com/johng/gf/g/util/gtest" @@ -52,9 +53,10 @@ func TestCron_Method(t *testing.T) { gtest.Case(func() { cron := gcron.New() cron.Add("* * * * * *", func() {}, "add") + fmt.Println("start", time.Now()) cron.DelayAdd(1, "* * * * * *", func() {}, "delay_add") gtest.Assert(cron.Size(), 1) - time.Sleep(1500*time.Millisecond) + time.Sleep(1200*time.Millisecond) gtest.Assert(cron.Size(), 2) cron.Remove("delay_add") diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index c141f7eff..dbc8096b8 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -8,9 +8,9 @@ package gfsnotify import ( - "container/list" "errors" "fmt" + "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gqueue" "gitee.com/johng/gf/g/container/gtype" @@ -32,7 +32,7 @@ type Callback struct { Id int // 唯一ID Func func(event *Event) // 回调方法 Path string // 监听的文件/目录 - elem *list.Element // 指向回调函数链表中的元素项位置(便于删除) + elem *glist.Element // 指向回调函数链表中的元素项位置(便于删除) recursive bool // 当目录时,是否递归监听(使用在子文件/目录回溯查找回调函数时) } diff --git a/g/os/gmlock/gmlock_locker.go b/g/os/gmlock/gmlock_locker.go index 12e105316..912f98b77 100644 --- a/g/os/gmlock/gmlock_locker.go +++ b/g/os/gmlock/gmlock_locker.go @@ -8,7 +8,7 @@ package gmlock import ( "gitee.com/johng/gf/g/container/gmap" - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" ) // 内存锁管理对象 @@ -78,7 +78,7 @@ func (l *Locker) doLock(key string, expire int, try bool) bool { if ok && expire > 0 { // 异步goroutine计时处理 wid := mu.wid.Val() - gtimew.AddOnce(expire, func() { + gwheel.AddOnce(expire, func() { if wid == mu.wid.Val() { mu.Unlock() } @@ -98,7 +98,7 @@ func (l *Locker) doRLock(key string, expire int, try bool) bool { } if ok && expire > 0 { rid := mu.rid.Val() - gtimew.AddOnce(expire, func() { + gwheel.AddOnce(expire, func() { if rid == mu.rid.Val() { mu.RUnlock() } diff --git a/g/os/gtimew/gtimew_entry.go b/g/os/gtimew/gtimew_entry.go deleted file mode 100644 index 671557d97..000000000 --- a/g/os/gtimew/gtimew_entry.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package gtimew - -import ( - "gitee.com/johng/gf/g/container/gtype" - "time" -) - -// 循环任务项 -type Entry struct { - mode *gtype.Int // 任务运行模式(0: normal; 1: singleton; 2: once) - status *gtype.Int // 循环任务状态(0: ready; 1: running; -1: stopped) - Job JobFunc // 注册循环任务方法 - Create int64 // 创建时间戳(秒) - Interval int64 // 运行间隔(秒) -} - -// 任务执行方法 -type JobFunc func() - -// 创建循环任务 -func newEntry(interval int, job JobFunc, mode int) *Entry { - return &Entry { - mode : gtype.NewInt(mode), - status : gtype.NewInt(), - Job : job, - Create : time.Now().Unix(), - Interval : int64(interval), - } -} - -// 获取任务运行模式 -func (entry *Entry) Mode() int { - return entry.mode.Val() -} - -// 设置任务运行模式(0: normal; 1: singleton; 2: once) -func (entry *Entry) SetMode(mode int) { - entry.mode.Set(mode) -} - -// 循环任务状态 -func (entry *Entry) Status() int { - return entry.status.Val() -} - -// 启动循环任务 -func (entry *Entry) Start() { - entry.status.Set(STATUS_READY) -} - -// 停止循环任务 -func (entry *Entry) Stop() { - entry.status.Set(STATUS_CLOSED) -} - -// 给定时间是否满足当前循环任务运行间隔 -func (entry *Entry) Meet(t time.Time) bool { - diff := t.Unix() - entry.Create - if diff > 0 { - return diff%entry.Interval == 0 - } - return false -} - diff --git a/g/os/gtimew/gtimew_jobloop.go b/g/os/gtimew/gtimew_jobloop.go deleted file mode 100644 index 17039261b..000000000 --- a/g/os/gtimew/gtimew_jobloop.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package gtimew - -import ( - "container/list" - "time" -) - -// 延迟添加循环任务,delay参数单位为秒 -func (w *Wheel) startLoop() { - go func() { - for w.status.Val() != STATUS_CLOSED { - time.Sleep(time.Second) - if w.status.Val() == STATUS_RUNNING { - go w.checkEntries(time.Now()) - } - } - }() -} - -// 遍历检查可执行循环任务,并异步执行 -func (w *Wheel) checkEntries(t time.Time) { - w.entries.RLockFunc(func(l *list.List) { - for e := l.Front(); e != nil; e = e.Next() { - entry := e.Value.(*Entry) - if entry.Meet(t) { - // 是否已命令停止运行 - if entry.status.Val() == STATUS_CLOSED { - continue - } - // 判断任务的运行模式 - switch entry.mode.Val() { - // 是否只允许单例运行 - case MODE_SINGLETON: - if entry.status.Set(STATUS_RUNNING) == STATUS_RUNNING { - continue - } - // 只运行一次的任务 - case MODE_ONCE: - if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED { - continue - } - } - // 执行异步运行 - go func(element *list.Element) { - defer func() { - if err := recover(); err != nil { - if err == gPANIC_EXIT { - entry.status.Set(STATUS_CLOSED) - } else { - panic(err) - } - } - if entry.status.Val() != STATUS_CLOSED { - entry.status.Set(STATUS_READY) - } else { - // 异步删除,不受锁机制的影响 - w.entries.Remove(element) - } - }() - - entry.Job() - }(e) - } - } - }) -} \ No newline at end of file diff --git a/g/os/gtimew/gtimew_unit_1_test.go b/g/os/gtimew/gtimew_unit_1_test.go deleted file mode 100644 index 7b4afcf19..000000000 --- a/g/os/gtimew/gtimew_unit_1_test.go +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -// 包方法操作 - -package gtimew_test - -import ( - "gitee.com/johng/gf/g/container/garray" - "gitee.com/johng/gf/g/os/gtimew" - "gitee.com/johng/gf/g/util/gtest" - "testing" - "time" -) - -func TestWheel_Add_Close(t *testing.T) { - gtest.Case(func() { - wheel := gtimew.New() - array := garray.New(0, 0) - entry1 := wheel.Add(1, func() { - array.Append(1) - }) - entry2 := wheel.Add(1, func() { - array.Append(1) - }) - entry3 := wheel.Add(2, func() { - array.Append(1) - }) - gtest.AssertNE(entry1, nil) - gtest.AssertNE(entry2, nil) - gtest.AssertNE(entry3, nil) - gtest.Assert(wheel.Size(), 3) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 2) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 5) - wheel.Close() - time.Sleep(1100*time.Millisecond) - fixedLength := array.Len() - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), fixedLength) - }) -} - -func TestWheel_Singlton(t *testing.T) { - gtest.Case(func() { - wheel := gtimew.New() - array := garray.New(0, 0) - entry := wheel.AddSingleton(1, func() { - array.Append(1) - time.Sleep(10*time.Second) - }) - gtest.AssertNE(entry, nil) - gtest.Assert(wheel.Size(), 1) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 1) - - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 1) - }) -} - -func TestWheel_Once(t *testing.T) { - gtest.Case(func() { - wheel := gtimew.New() - array := garray.New(0, 0) - entry1 := wheel.AddOnce(1, func() { - array.Append(1) - }) - entry2 := wheel.AddOnce(1, func() { - array.Append(1) - }) - gtest.AssertNE(entry1, nil) - gtest.AssertNE(entry2, nil) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 2) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 2) - wheel.Close() - time.Sleep(1100*time.Millisecond) - fixedLength := array.Len() - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), fixedLength) - }) -} - -func TestWheel_DelayAdd(t *testing.T) { - gtest.Case(func() { - wheel := gtimew.New() - wheel.DelayAdd(1, 1, func() {}) - gtest.Assert(wheel.Size(), 0) - time.Sleep(1100*time.Millisecond) - gtest.Assert(wheel.Size(), 1) - }) -} - -func TestWheel_DelayAdd_Singleton(t *testing.T) { - gtest.Case(func() { - wheel := gtimew.New() - array := garray.New(0, 0) - wheel.DelayAddSingleton(1, 1, func() { - array.Append(1) - time.Sleep(10*time.Second) - }) - gtest.Assert(wheel.Size(), 0) - time.Sleep(1100*time.Millisecond) - gtest.Assert(wheel.Size(), 1) - gtest.Assert(array.Len(), 0) - - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 1) - }) -} - -func TestWheel_DelayAdd_Once(t *testing.T) { - gtest.Case(func() { - wheel := gtimew.New() - array := garray.New(0, 0) - wheel.DelayAddOnce(1, 1, func() { - array.Append(1) - }) - gtest.Assert(wheel.Size(), 0) - time.Sleep(1100*time.Millisecond) - gtest.Assert(wheel.Size(), 1) - gtest.Assert(array.Len(), 0) - - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 1) - - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 1) - }) -} - -func TestWheel_ExitJob(t *testing.T) { - gtest.Case(func() { - wheel := gtimew.New() - array := garray.New(0, 0) - wheel.Add(1, func() { - array.Append(1) - gtimew.ExitJob() - }) - time.Sleep(1100*time.Millisecond) - gtest.Assert(array.Len(), 1) - gtest.Assert(wheel.Size(), 0) - }) -} diff --git a/g/os/gtimew/gtimew_wheel.go b/g/os/gtimew/gtimew_wheel.go deleted file mode 100644 index 62c55863c..000000000 --- a/g/os/gtimew/gtimew_wheel.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package gtimew - -import ( - "container/list" - "gitee.com/johng/gf/g/container/garray" - "gitee.com/johng/gf/g/container/glist" - "gitee.com/johng/gf/g/container/gtype" - "time" -) - -// 循环任务管理对象 -type Wheel struct { - status *gtype.Int // 循环任务状态(0: 未执行; 1: 运行中; -1:删除关闭) - entries *glist.List // 所有的循环任务项 -} - -// 创建自定义的循环任务管理对象 -func New() *Wheel { - wheel := &Wheel { - status : gtype.NewInt(STATUS_RUNNING), - entries : glist.New(), - } - wheel.startLoop() - return wheel -} - -// 添加循环任务 -func (w *Wheel) Add(interval int, job JobFunc) *Entry { - entry := newEntry(interval, job, MODE_NORMAL) - w.entries.PushBack(entry) - return entry -} - -// 添加单例运行循环任务 -func (w *Wheel) AddSingleton(interval int, job JobFunc) *Entry { - entry := newEntry(interval, job, MODE_SINGLETON) - w.entries.PushBack(entry) - return entry -} - -// 添加只运行一次的循环任务 -func (w *Wheel) AddOnce(interval int, job JobFunc) *Entry { - entry := newEntry(interval, job, MODE_ONCE) - w.entries.PushBack(entry) - return entry -} - -// 延迟添加循环任务,delay参数单位为秒 -func (w *Wheel) DelayAdd(delay int, interval int, job JobFunc) { - go func() { - time.Sleep(time.Duration(delay)*time.Second) - w.Add(interval, job) - }() -} - -// 延迟添加单例循环任务,delay参数单位为秒 -func (w *Wheel) DelayAddSingleton(delay int, interval int, job JobFunc) { - go func() { - time.Sleep(time.Duration(delay)*time.Second) - w.AddSingleton(interval, job) - }() -} - -// 延迟添加只运行一次的循环任务,delay参数单位为秒 -func (w *Wheel) DelayAddOnce(delay int, interval int, job JobFunc) { - go func() { - time.Sleep(time.Duration(delay)*time.Second) - w.AddOnce(interval, job) - }() -} - -// 当前时间轮已注册的任务数 -func (w *Wheel) Size() int { - return w.entries.Len() -} - -// 关闭循环任务 -func (w *Wheel) Close() { - w.status.Set(STATUS_CLOSED) -} - -// 获取所有已注册的循环任务项(按照注册时间从小到大进行排序) -func (w *Wheel) Entries() []*Entry { - array := garray.NewSortedArray(w.entries.Len(), func(v1, v2 interface{}) int { - entry1 := v1.(*Entry) - entry2 := v2.(*Entry) - if entry1.Create > entry2.Create { - return 1 - } - return -1 - }, false) - w.entries.RLockFunc(func(l *list.List) { - for e := l.Front(); e != nil; e = e.Next() { - array.Add(e.Value.(*Entry)) - } - }) - entries := make([]*Entry, array.Len()) - array.RLockFunc(func(array []interface{}) { - for k, v := range array { - entries[k] = v.(*Entry) - } - }) - return entries -} diff --git a/g/os/gtimew/gtimew.go b/g/os/gwheel/gwheel.go similarity index 60% rename from g/os/gtimew/gtimew.go rename to g/os/gwheel/gwheel.go index 8cb147310..6c6ee19a8 100644 --- a/g/os/gtimew/gtimew.go +++ b/g/os/gwheel/gwheel.go @@ -4,56 +4,73 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. -// Package gtimew provides Time Wheel for interval jobs running management/时间轮. -// 高效的时间轮任务执行管理,用于管理异步的间隔运行任务,或者异步只运行一次的任务(最小时间粒度为秒)。 +// Package gwheel provides Timing Wheel for interval jobs running and management/时间轮. +// 高效的时间轮任务执行管理,用于管理异步的间隔运行任务,或者异步只运行一次的任务(默认最小时间粒度为秒)。 // 与其他定时任务管理模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。 -package gtimew +package gwheel + +import "time" const ( MODE_NORMAL = 0 MODE_SINGLETON = 1 MODE_ONCE = 2 + MODE_TIMES = 3 STATUS_READY = 0 STATUS_RUNNING = 1 STATUS_CLOSED = -1 gPANIC_EXIT = "exit" + + gDEFAULT_SLOT_NUMBER = 10 + gDEFAULT_WHEEL_INTERVAL = 100*time.Millisecond ) var ( // 默认的wheel管理对象 - defaultWheel = New() + defaultWheel = NewDefault() ) // 添加执行方法,可以给定名字,以便于后续执行删除 func Add(interval int, job JobFunc) *Entry { - return defaultWheel.Add(interval, job) + return defaultWheel.Add(10*interval, job) } // 添加单例运行循环任务 func AddSingleton(interval int, job JobFunc) *Entry { - return defaultWheel.AddSingleton(interval, job) + return nil + return defaultWheel.AddSingleton(10*interval, job) } // 添加只运行一次的循环任务 func AddOnce(interval int, job JobFunc) *Entry { - return defaultWheel.AddOnce(interval, job) + return defaultWheel.AddOnce(10*interval, job) +} + +// 添加运行指定次数的循环任务 +func AddTimes(interval int, times int, job JobFunc) *Entry { + return defaultWheel.AddTimes(10*interval, times, job) } // 延迟添加循环任务,delay参数单位为秒 func DelayAdd(delay int, interval int, job JobFunc) { - defaultWheel.DelayAdd(delay, interval, job) + defaultWheel.DelayAdd(delay, 10*interval, job) } // 延迟添加单例循环任务,delay参数单位为秒 func DelayAddSingleton(delay int, interval int, job JobFunc) { - defaultWheel.DelayAddSingleton(delay, interval, job) + defaultWheel.DelayAddSingleton(delay, 10*interval, job) } // 延迟添加只运行一次的循环任务,delay参数单位为秒 func DelayAddOnce(delay int, interval int, job JobFunc) { - defaultWheel.DelayAddOnce(delay, interval, job) + defaultWheel.DelayAddOnce(delay, 10*interval, job) +} + +// 延迟添加运行指定次数的循环任务,delay参数单位为秒 +func DelayAddTimes(delay int, interval int, times int, job JobFunc) { + defaultWheel.DelayAddTimes(delay, 10*interval, times, job) } // 获取所有已注册的循环任务项 diff --git a/g/os/gwheel/gwheel_entry.go b/g/os/gwheel/gwheel_entry.go new file mode 100644 index 000000000..819c13674 --- /dev/null +++ b/g/os/gwheel/gwheel_entry.go @@ -0,0 +1,99 @@ +// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gwheel + +import ( + "gitee.com/johng/gf/g/container/gtype" + "time" +) + +// 循环任务项 +type Entry struct { + wheel *Wheel // 所属时间轮 + mode *gtype.Int // 任务运行模式(0: normal; 1: singleton; 2: once; 3: times) + status *gtype.Int // 循环任务状态(0: ready; 1: running; -1: stopped) + times *gtype.Int // 还需运行次数, 当mode=3时启用, 当times值为0时表示不再执行(自动该任务删除) + latest time.Time // 任务上一次的运行时间点 + Job JobFunc // 注册循环任务方法 + Create time.Time // 任务的创建时间点 + Interval time.Duration // 运行间隔 +} + +// 任务执行方法 +type JobFunc func() + +// 创建循环任务 +func (w *Wheel) newEntry(interval int, job JobFunc, mode int, times int) *Entry { + now := time.Now() + pos := (interval + w.index.Val()) % w.number + entry := &Entry { + wheel : w, + mode : gtype.NewInt(mode), + status : gtype.NewInt(), + times : gtype.NewInt(times), + latest : now, + Job : job, + Create : now, + Interval : w.interval*time.Duration(interval), + } + w.slots[pos].PushBack(entry) + return entry +} + +// 获取任务运行模式 +func (entry *Entry) Mode() int { + return entry.mode.Val() +} + +// 设置任务运行模式(0: normal; 1: singleton; 2: once) +func (entry *Entry) SetMode(mode int) { + entry.mode.Set(mode) +} + +// 循环任务状态 +func (entry *Entry) Status() int { + return entry.status.Val() +} + +// 启动循环任务 +func (entry *Entry) Start() { + entry.status.Set(STATUS_READY) +} + +// 停止循环任务 +func (entry *Entry) Stop() { + entry.status.Set(STATUS_CLOSED) +} + +// 检测当前任务是否可运行, 内部将事件转换为微秒数来计算(int64), 精度更高 +func (entry *Entry) runnableCheck(t time.Time) bool { + if t.UnixNano() - entry.latest.UnixNano() >= entry.Interval.Nanoseconds() { + // 判断任务的运行模式 + switch entry.mode.Val() { + // 是否只允许单例运行 + case MODE_SINGLETON: + if entry.status.Set(STATUS_RUNNING) == STATUS_RUNNING { + return false + } + // 只运行一次的任务 + case MODE_ONCE: + if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED { + return false + } + // 运行指定次数的任务 + case MODE_TIMES: + if entry.times.Add(-1) <= 0 { + if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED { + return false + } + } + } + entry.latest = t + return true + } + return false +} diff --git a/g/os/gwheel/gwheel_jobloop.go b/g/os/gwheel/gwheel_jobloop.go new file mode 100644 index 000000000..b75baa4b4 --- /dev/null +++ b/g/os/gwheel/gwheel_jobloop.go @@ -0,0 +1,66 @@ +// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gwheel + +import ( + "gitee.com/johng/gf/g/container/glist" + "time" +) + +// 延迟添加循环任务,delay参数单位为秒 +func (w *Wheel) startLoop() { + go func() { + for { + select { + case <- w.closed: + return + case t := <- w.ticker.C: + if w.status.Val() == STATUS_RUNNING { + index := w.index.Val() + go w.checkEntries(t, w.slots[index]) + w.index.Set((index + 1) % w.number) + } + } + } + }() +} + +// 遍历检查可执行循环任务,并异步执行 +func (w *Wheel) checkEntries(t time.Time, l *glist.List) { + for e := l.Front(); e != nil; e = e.Next() { + entry := e.Value().(*Entry) + // 是否已命令停止运行 + if entry.status.Val() == STATUS_CLOSED { + continue + } + // 是否满足运行时间间隔 + if !entry.runnableCheck(t) { + continue + } + // 执行异步运行 + go func(e *glist.Element, l *glist.List) { + defer func() { + if err := recover(); err != nil { + if err == gPANIC_EXIT { + entry.status.Set(STATUS_CLOSED) + } else { + panic(err) + } + } + if entry.status.Val() != STATUS_CLOSED { + entry.status.Set(STATUS_READY) + } else { + // 异步删除,不受锁机制的影响 + l.Remove(e) + } + }() + + entry.Job() + }(e, l) + + } +} \ No newline at end of file diff --git a/g/os/gwheel/gwheel_wheel.go b/g/os/gwheel/gwheel_wheel.go new file mode 100644 index 000000000..c1cd07351 --- /dev/null +++ b/g/os/gwheel/gwheel_wheel.go @@ -0,0 +1,124 @@ +// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gwheel + +import ( + "gitee.com/johng/gf/g/container/glist" + "gitee.com/johng/gf/g/container/gtype" + "time" +) + +// 循环任务管理对象 +type Wheel struct { + status *gtype.Int // 循环任务状态(0: 未执行; 1: 运行中; -1:删除关闭) + index *gtype.Int // 时间轮处理的当前索引位置 + slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组 + number int // Slot Number + closed chan struct{} // 停止事件 + create time.Time // 创建时间 + ticker *time.Ticker // 时间轮间隔 + interval time.Duration // 时间间隔(slot时间长度) +} + +// 创建使用默认值的时间轮 +func NewDefault() *Wheel { + return New(gDEFAULT_SLOT_NUMBER, gDEFAULT_WHEEL_INTERVAL) +} + +// 创建自定义的循环任务管理对象 +func New(slot int, interval time.Duration) *Wheel { + w := &Wheel { + status : gtype.NewInt(STATUS_RUNNING), + index : gtype.NewInt(), + slots : make([]*glist.List, slot), + number : slot, + closed : make(chan struct{}, 1), + create : time.Now(), + ticker : time.NewTicker(interval), + interval : interval, + } + for i := 0; i < w.number; i++ { + w.slots[i] = glist.New() + } + w.startLoop() + return w +} + +// 添加循环任务 +func (w *Wheel) Add(interval int, job JobFunc) *Entry { + return w.newEntry(interval, job, MODE_NORMAL, 0) +} + +// 添加单例运行循环任务 +func (w *Wheel) AddSingleton(interval int, job JobFunc) *Entry { + return w.newEntry(interval, job, MODE_SINGLETON, 0) +} + +// 添加只运行一次的循环任务 +func (w *Wheel) AddOnce(interval int, job JobFunc) *Entry { + return w.newEntry(interval, job, MODE_ONCE, 0) +} + +// 添加运行指定次数的循环任务 +func (w *Wheel) AddTimes(interval int, times int, job JobFunc) *Entry { + return w.newEntry(interval, job, MODE_TIMES, times) +} + +// 延迟添加循环任务,delay参数单位为时间轮刻度 +func (w *Wheel) DelayAdd(delay int, interval int, job JobFunc) { + w.AddOnce(delay, func() { + w.Add(interval, job) + }) +} + +// 延迟添加单例循环任务,delay参数单位为时间轮刻度 +func (w *Wheel) DelayAddSingleton(delay int, interval int, job JobFunc) { + w.AddOnce(delay, func() { + w.AddSingleton(interval, job) + }) +} + +// 延迟添加只运行一次的循环任务,delay参数单位为时间轮刻度 +func (w *Wheel) DelayAddOnce(delay int, interval int, job JobFunc) { + w.AddOnce(delay, func() { + w.AddOnce(interval, job) + }) +} + +// 延迟添加只运行一次的循环任务,delay参数单位为时间轮刻度 +func (w *Wheel) DelayAddTimes(delay int, interval int, times int, job JobFunc) { + w.AddOnce(delay, func() { + w.AddTimes(interval, times, job) + }) +} + +// 当前时间轮已注册的任务数 +func (w *Wheel) Size() int { + size := 0 + for _, l := range w.slots { + size += l.Len() + } + return size +} + +// 关闭循环任务 +func (w *Wheel) Close() { + w.status.Set(STATUS_CLOSED) + w.ticker.Stop() + w.closed <- struct{}{} +} + +// 获取所有已注册的循环任务项(按照注册时间从小到大进行排序) +func (w *Wheel) Entries() []*Entry { + entries := make([]*Entry, 0) + for _, l := range w.slots { + for e := l.Front(); e != nil; e = e.Next() { + entries = append(entries, e.Value().(*Entry)) + } + } + return entries +} diff --git a/g/os/gtimew/gtimew_bench_test.go b/g/os/gwheel/gwheel_z_bench_test.go similarity index 83% rename from g/os/gtimew/gtimew_bench_test.go rename to g/os/gwheel/gwheel_z_bench_test.go index 27fefac60..9e495ae28 100644 --- a/g/os/gtimew/gtimew_bench_test.go +++ b/g/os/gwheel/gwheel_z_bench_test.go @@ -4,17 +4,17 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. -package gtimew_test +package gwheel_test import ( - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" "testing" ) func Benchmark_Add(b *testing.B) { for i := 0; i < b.N; i++ { // 基准测试的时候不能设置为1秒,否则大量的任务会崩掉系统 - gtimew.Add(100000, func() { + gwheel.Add(100000, func() { }) } diff --git a/g/os/gwheel/gwheel_z_unit_1_test.go b/g/os/gwheel/gwheel_z_unit_1_test.go new file mode 100644 index 000000000..6c6aef4b5 --- /dev/null +++ b/g/os/gwheel/gwheel_z_unit_1_test.go @@ -0,0 +1,150 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 包方法操作 + +package gwheel_test + +import ( + "gitee.com/johng/gf/g/container/garray" + "gitee.com/johng/gf/g/os/gwheel" + "gitee.com/johng/gf/g/util/gtest" + "testing" + "time" +) + +func TestWheel_Add_Close(t *testing.T) { + gtest.Case(func() { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + entry1 := wheel.Add(10, func() { + array.Append(1) + }) + entry2 := wheel.Add(10, func() { + array.Append(1) + }) + entry3 := wheel.Add(20, func() { + array.Append(1) + }) + gtest.AssertNE(entry1, nil) + gtest.AssertNE(entry2, nil) + gtest.AssertNE(entry3, nil) + gtest.Assert(wheel.Size(), 3) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 2) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 5) + wheel.Close() + time.Sleep(1200*time.Millisecond) + fixedLength := array.Len() + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), fixedLength) + }) +} + +func TestWheel_Singlton(t *testing.T) { + gtest.Case(func() { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + entry := wheel.AddSingleton(10, func() { + array.Append(1) + time.Sleep(10*time.Second) + }) + gtest.AssertNE(entry, nil) + gtest.Assert(wheel.Size(), 1) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 1) + + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 1) + }) +} + +func TestWheel_Once(t *testing.T) { + gtest.Case(func() { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + entry1 := wheel.AddOnce(10, func() { + array.Append(1) + }) + entry2 := wheel.AddOnce(10, func() { + array.Append(1) + }) + gtest.AssertNE(entry1, nil) + gtest.AssertNE(entry2, nil) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 2) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 2) + wheel.Close() + time.Sleep(1200*time.Millisecond) + fixedLength := array.Len() + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), fixedLength) + }) +} + +func TestWheel_DelayAdd(t *testing.T) { + gtest.Case(func() { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + wheel.DelayAdd(10, 10, func() { + array.Append(1) + }) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 0) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 1) + }) +} + +func TestWheel_DelayAdd_Singleton(t *testing.T) { + gtest.Case(func() { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + wheel.DelayAddSingleton(10, 10, func() { + array.Append(1) + time.Sleep(10*time.Second) + }) + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), 0) + + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), 1) + }) +} + +func TestWheel_DelayAdd_Once(t *testing.T) { + gtest.Case(func() { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + wheel.DelayAddOnce(10, 10, func() { + array.Append(1) + }) + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), 0) + + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), 1) + + time.Sleep(1100*time.Millisecond) + gtest.Assert(array.Len(), 1) + }) +} + +func TestWheel_ExitJob(t *testing.T) { + gtest.Case(func() { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + wheel.Add(10, func() { + array.Append(1) + gwheel.ExitJob() + }) + time.Sleep(1200*time.Millisecond) + gtest.Assert(array.Len(), 1) + gtest.Assert(wheel.Size(), 0) + }) +} diff --git a/g/os/gtimew/gtimew_unit_2_test.go b/g/os/gwheel/gwheel_z_unit_2_test.go similarity index 65% rename from g/os/gtimew/gtimew_unit_2_test.go rename to g/os/gwheel/gwheel_z_unit_2_test.go index 99bf32dab..fa9858bf6 100644 --- a/g/os/gtimew/gtimew_unit_2_test.go +++ b/g/os/gwheel/gwheel_z_unit_2_test.go @@ -6,63 +6,63 @@ // Entry操作 -package gtimew_test +package gwheel_test import ( "gitee.com/johng/gf/g/container/garray" - "gitee.com/johng/gf/g/os/gtimew" + "gitee.com/johng/gf/g/os/gwheel" "gitee.com/johng/gf/g/util/gtest" "testing" "time" ) func TestWheel_Entry_Operation(t *testing.T) { - wheel := gtimew.New() + wheel := gwheel.NewDefault() array := garray.New(0, 0) - entry := wheel.Add(1, func() { + entry := wheel.Add(10, func() { array.Append(1) }) gtest.AssertNE(entry, nil) gtest.Assert(wheel.Size(), 1) - time.Sleep(1100*time.Millisecond) + time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 1) entry.Stop() - time.Sleep(1100*time.Millisecond) + time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 1) entry.Start() - time.Sleep(1100*time.Millisecond) + time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 2) } -func TestWheel_Entry_Singlton(t *testing.T) { - wheel := gtimew.New() +func TestWheel_Entry_Singleton(t *testing.T) { + wheel := gwheel.NewDefault() array := garray.New(0, 0) - entry := wheel.Add(1, func() { + entry := wheel.Add(10, func() { array.Append(1) time.Sleep(10*time.Second) }) - entry.SetMode(gtimew.MODE_SINGLETON) + entry.SetMode(gwheel.MODE_SINGLETON) gtest.AssertNE(entry, nil) gtest.Assert(wheel.Size(), 1) - time.Sleep(1100*time.Millisecond) + time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 1) - time.Sleep(1100*time.Millisecond) + time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 1) } func TestWheel_Entry_Once(t *testing.T) { - wheel := gtimew.New() + wheel := gwheel.NewDefault() array := garray.New(0, 0) - entry := wheel.Add(1, func() { + entry := wheel.Add(10, func() { array.Append(1) }) - entry.SetMode(gtimew.MODE_ONCE) + entry.SetMode(gwheel.MODE_ONCE) gtest.AssertNE(entry, nil) gtest.Assert(wheel.Size(), 1) - time.Sleep(1100*time.Millisecond) + time.Sleep(1200*time.Millisecond) gtest.Assert(array.Len(), 1) gtest.Assert(wheel.Size(), 0) } diff --git a/g/os/gwheel/gwheel_z_unit_3_test.go b/g/os/gwheel/gwheel_z_unit_3_test.go new file mode 100644 index 000000000..3c0e05d9c --- /dev/null +++ b/g/os/gwheel/gwheel_z_unit_3_test.go @@ -0,0 +1,28 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 指定次数运行测试 + +package gwheel_test + +import ( + "gitee.com/johng/gf/g/container/garray" + "gitee.com/johng/gf/g/os/gwheel" + "gitee.com/johng/gf/g/util/gtest" + "testing" + "time" +) + +func TestWheel_Times(t *testing.T) { + wheel := gwheel.NewDefault() + array := garray.New(0, 0) + entry := wheel.AddTimes(10, 20, func() { + array.Append(1) + }) + gtest.AssertNE(entry, nil) + time.Sleep(3500*time.Millisecond) + gtest.Assert(array.Len(), 2) +} diff --git a/geg/other/test.go b/geg/other/test.go index ca62b666a..cfd115023 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -4,6 +4,6 @@ import "time" func main() { for { - time.Sleep(time.Second) + time.Sleep(10*time.Millisecond) } }