diff --git a/g/container/glist/safelist.go b/g/container/glist/safelist.go index 20ba7a1d9..c1a241f8c 100644 --- a/g/container/glist/safelist.go +++ b/g/container/glist/safelist.go @@ -78,6 +78,18 @@ func (this *SafeList) PopBack() interface{} { return nil } +// 从链表头端出栈数据项(删除) +func (this *SafeList) PopFront() interface{} { + this.Lock() + if elem := this.L.Front(); elem != nil { + item := this.L.Remove(elem) + this.Unlock() + return item + } + this.Unlock() + return nil +} + // 批量从链表尾端出栈数据项(删除) func (this *SafeList) BatchPopBack(max int) []interface{} { this.Lock() @@ -90,35 +102,66 @@ func (this *SafeList) BatchPopBack(max int) []interface{} { if count > max { count = max } - items := make([]interface{}, 0, count) + items := make([]interface{}, count) for i := 0; i < count; i++ { - item := this.L.Remove(this.L.Back()) - items = append(items, item) + items[i] = this.L.Remove(this.L.Back()) } this.Unlock() return items } -// 批量从链表尾端依次获取所有数据 -func (this *SafeList) PopBackAll() []interface{} { +// 批量从链表头端出栈数据项(删除) +func (this *SafeList) BatchPopFront(max int) []interface{} { this.Lock() - count := this.L.Len() if count == 0 { this.Unlock() return []interface{}{} } - items := make([]interface{}, 0, count) - for i := 0; i < count; i++ { - item := this.L.Remove(this.L.Back()) - items = append(items, item) + if count > max { + count = max + } + items := make([]interface{}, count) + for i := 0; i < count; i++ { + items[i] = this.L.Remove(this.L.Front()) } - this.Unlock() return items } +// 批量从链表尾端依次获取所有数据(删除) +func (this *SafeList) PopBackAll() []interface{} { + this.Lock() + count := this.L.Len() + if count == 0 { + this.Unlock() + return []interface{}{} + } + items := make([]interface{}, count) + for i := 0; i < count; i++ { + items[i] = this.L.Remove(this.L.Back()) + } + this.Unlock() + return items +} + +// 批量从链表头端依次获取所有数据(删除) +func (this *SafeList) PopFrontAll() []interface{} { + this.Lock() + count := this.L.Len() + if count == 0 { + this.Unlock() + return []interface{}{} + } + items := make([]interface{}, count) + for i := 0; i < count; i++ { + items[i] = this.L.Remove(this.L.Front()) + } + this.Unlock() + return items +} + // 删除数据项 func (this *SafeList) Remove(e *list.Element) interface{} { this.Lock() diff --git a/g/container/gset/int_set.go b/g/container/gset/int_set.go index 509e4234e..e0a31e147 100644 --- a/g/container/gset/int_set.go +++ b/g/container/gset/int_set.go @@ -22,6 +22,15 @@ func NewIntSet() *IntSet { return &IntSet{M: make(map[int]struct{})} } +// 给定回调函数对原始内容进行遍历 +func (this *IntSet) Iterator(f func (v int)) { + this.RLock() + for k, _ := range this.M { + f(k) + } + this.RUnlock() +} + // 设置键 func (this *IntSet) Add(item int) *IntSet { if this.Contains(item) { diff --git a/g/container/gset/interface_set.go b/g/container/gset/interface_set.go index 065f770a2..53416f956 100644 --- a/g/container/gset/interface_set.go +++ b/g/container/gset/interface_set.go @@ -21,7 +21,16 @@ func NewInterfaceSet() *InterfaceSet { return &InterfaceSet{M: make(map[interface{}]struct{})} } -// 设置键 +// 给定回调函数对原始内容进行遍历 +func (this *InterfaceSet) Iterator(f func (v interface{})) { + this.RLock() + for k, _ := range this.M { + f(k) + } + this.RUnlock() +} + +// 添加 func (this *InterfaceSet) Add(item interface{}) *InterfaceSet { if this.Contains(item) { return this @@ -32,7 +41,7 @@ func (this *InterfaceSet) Add(item interface{}) *InterfaceSet { return this } -// 批量添加设置键 +// 批量添加 func (this *InterfaceSet) BatchAdd(items []interface{}) *InterfaceSet { count := len(items) if count == 0 { @@ -97,13 +106,12 @@ func (this *InterfaceSet) Clear() { // 转换为数组 func (this *InterfaceSet) Slice() []interface{} { this.RLock() + i := 0 ret := make([]interface{}, len(this.M)) - i := 0 for item := range this.M { ret[i] = item i++ } - this.RUnlock() return ret } diff --git a/g/container/gset/string_set.go b/g/container/gset/string_set.go index 1f9495c74..d16ed21e7 100644 --- a/g/container/gset/string_set.go +++ b/g/container/gset/string_set.go @@ -21,6 +21,15 @@ func NewStringSet() *StringSet { return &StringSet{M: make(map[string]struct{})} } +// 给定回调函数对原始内容进行遍历 +func (this *StringSet) Iterator(f func (v string)) { + this.RLock() + for k, _ := range this.M { + f(k) + } + this.RUnlock() +} + // 设置键 func (this *StringSet) Add(item string) *StringSet { if this.Contains(item) { diff --git a/g/os/groutine/groutine_api.go b/g/os/groutine/groutine_api.go new file mode 100644 index 000000000..d00e1a79b --- /dev/null +++ b/g/os/groutine/groutine_api.go @@ -0,0 +1,57 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// Goroutine池. +// 用于goroutine复用,提升异步操作执行效率. +package groutine + +import ( + "math" + "sync" + "gitee.com/johng/gf/g/container/gset" + "gitee.com/johng/gf/g/container/glist" +) + +// goroutine池对象 +type Pool struct { + jobs *gset.InterfaceSet // 当前任务对象(*PoolJob) + queue *glist.SafeList // 空闲任务队列(*PoolJob) + funcs *glist.SafeList // 待处理任务操作队列 + events chan struct{} // 任务操作处理事件(用于任务事件通知) +} + +// goroutine任务 +type PoolJob struct { + mu sync.RWMutex + job chan func() // 当前任务(当为nil时表示关闭) + pool *Pool // 所属池 +} + +// 创建goroutine池管理对象 +func New() *Pool { + p := &Pool { + jobs : gset.NewInterfaceSet(), + queue : glist.NewSafeList(), + funcs : glist.NewSafeList(), + events : make(chan struct{}, math.MaxUint32), + } + p.loop() + return p +} + +// 添加异步任务 +func (p *Pool) Add(f func()) { + p.funcs.PushBack(f) + p.events <- struct{}{} +} + +// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 +func (p *Pool) Close() { + p.Add(nil) + p.jobs.Iterator(func(v interface{}){ + v.(*PoolJob).stop() + }) +} \ No newline at end of file diff --git a/g/os/groutine/groutine_job.go b/g/os/groutine/groutine_job.go new file mode 100644 index 000000000..933dc5baa --- /dev/null +++ b/g/os/groutine/groutine_job.go @@ -0,0 +1,36 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package groutine + +// 开始任务 +func (j *PoolJob) start() { + go func() { + for { + if f := <- j.job; f != nil { + // 执行任务 + f() + // 清空任务(GC可回收f对应资源) + j.job = nil + // 执行完毕后添加到空闲队列 + j.pool.addJob(j) + } else { + break + } + } + }() +} + +// 关闭当前任务 +func (j *PoolJob) stop() { + j.setJob(nil) +} + +// 设置当前任务的执行函数 +func (j *PoolJob) setJob(f func()) { + j.job <- f +} + diff --git a/g/os/groutine/groutine_pool.go b/g/os/groutine/groutine_pool.go new file mode 100644 index 000000000..4aff9d7c6 --- /dev/null +++ b/g/os/groutine/groutine_pool.go @@ -0,0 +1,48 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package groutine + +// 任务分配循环 +func (p *Pool) loop() { + go func() { + for { + // 阻塞监听任务事件 + if _, ok := <- p.events; ok { + // 如果任务为nil,表示池关闭 + if r := p.funcs.PopFront(); r != nil { + p.getJob().setJob(r.(func())) + } else { + return + } + } + } + }() +} + +// 创建一个空的任务对象 +func (p *Pool) newJob() *PoolJob { + j := &PoolJob { + job : make(chan func(), 1), + pool : p, + } + j.start() + p.jobs.Add(j) + return j +} + +// 添加任务对象到队列 +func (p *Pool) addJob(j *PoolJob) { + p.queue.PushBack(j) +} + +// 获取/创建任务 +func (p *Pool) getJob() *PoolJob { + if r := p.queue.PopFront(); r != nil { + return r.(*PoolJob) + } + return p.newJob() +} diff --git a/g/os/groutine/groutine_test.go b/g/os/groutine/groutine_test.go new file mode 100644 index 000000000..2d0bcee75 --- /dev/null +++ b/g/os/groutine/groutine_test.go @@ -0,0 +1,36 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// go test *.go -bench=".*" + +package groutine_test + +import ( + "testing" + "gitee.com/johng/gf/g/os/groutine" +) + +func test() { + num := 0 + for i := 0; i < 1000000; i++ { + num += i + } +} + +var pool = groutine.New() + +func BenchmarkGroutine(b *testing.B) { + for i := 0; i < b.N; i++ { + pool.Add(test) + } + //pool.Close() +} + +func BenchmarkGoRoutine(b *testing.B) { + for i := 0; i < b.N; i++ { + go test() + } +} \ No newline at end of file diff --git a/geg/os/groutine.go b/geg/os/groutine.go new file mode 100644 index 000000000..6113b20bd --- /dev/null +++ b/geg/os/groutine.go @@ -0,0 +1,27 @@ +package main + +import ( + "time" + "gitee.com/johng/gf/g/os/groutine" + "fmt" +) + +func job() { + time.Sleep(3*time.Second) + fmt.Println("job done") +} + +func main() { + p := groutine.New() + p.Add(job) + p.Add(job) + p.Add(job) + p.Add(job) + + + time.Sleep(1*time.Second) + + p.Close() + + time.Sleep(5*time.Second) +} diff --git a/geg/other/test.go b/geg/other/test.go index 2ca2821be..3f77e3182 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,9 +1,21 @@ package main import ( - "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" + "fmt" + "gitee.com/johng/gf/g/container/glist" + "math" ) func main() { - glog.Error("发生错误!") + + t1 := gtime.Microsecond() + c := make(chan struct{}, math.MaxInt64) + c <- struct{}{} + fmt.Println(gtime.Microsecond() - t1) + + t2 := gtime.Microsecond() + l := glist.NewSafeList() + l.PushBack(func() {}) + fmt.Println(gtime.Microsecond() - t2) } \ No newline at end of file