mirror of
https://gitee.com/johng/gf.git
synced 2024-12-02 20:28:17 +08:00
Merge branch 'qiangg_groutine'
This commit is contained in:
commit
38850545ba
@ -78,6 +78,18 @@ func (this *SafeList) PopBack() interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 从链表头端出栈数据项(删除)
|
||||
func (this *SafeList) PopFront() interface{} {
|
||||
this.Lock()
|
||||
if elem := this.L.Front(); elem != nil {
|
||||
item := this.L.Remove(elem)
|
||||
this.Unlock()
|
||||
return item
|
||||
}
|
||||
this.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// 批量从链表尾端出栈数据项(删除)
|
||||
func (this *SafeList) BatchPopBack(max int) []interface{} {
|
||||
this.Lock()
|
||||
@ -90,35 +102,66 @@ func (this *SafeList) BatchPopBack(max int) []interface{} {
|
||||
if count > max {
|
||||
count = max
|
||||
}
|
||||
items := make([]interface{}, 0, count)
|
||||
items := make([]interface{}, count)
|
||||
for i := 0; i < count; i++ {
|
||||
item := this.L.Remove(this.L.Back())
|
||||
items = append(items, item)
|
||||
items[i] = this.L.Remove(this.L.Back())
|
||||
}
|
||||
this.Unlock()
|
||||
return items
|
||||
}
|
||||
|
||||
// 批量从链表尾端依次获取所有数据
|
||||
func (this *SafeList) PopBackAll() []interface{} {
|
||||
// 批量从链表头端出栈数据项(删除)
|
||||
func (this *SafeList) BatchPopFront(max int) []interface{} {
|
||||
this.Lock()
|
||||
|
||||
count := this.L.Len()
|
||||
if count == 0 {
|
||||
this.Unlock()
|
||||
return []interface{}{}
|
||||
}
|
||||
|
||||
items := make([]interface{}, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
item := this.L.Remove(this.L.Back())
|
||||
items = append(items, item)
|
||||
if count > max {
|
||||
count = max
|
||||
}
|
||||
items := make([]interface{}, count)
|
||||
for i := 0; i < count; i++ {
|
||||
items[i] = this.L.Remove(this.L.Front())
|
||||
}
|
||||
|
||||
this.Unlock()
|
||||
return items
|
||||
}
|
||||
|
||||
// 批量从链表尾端依次获取所有数据(删除)
|
||||
func (this *SafeList) PopBackAll() []interface{} {
|
||||
this.Lock()
|
||||
count := this.L.Len()
|
||||
if count == 0 {
|
||||
this.Unlock()
|
||||
return []interface{}{}
|
||||
}
|
||||
items := make([]interface{}, count)
|
||||
for i := 0; i < count; i++ {
|
||||
items[i] = this.L.Remove(this.L.Back())
|
||||
}
|
||||
this.Unlock()
|
||||
return items
|
||||
}
|
||||
|
||||
// 批量从链表头端依次获取所有数据(删除)
|
||||
func (this *SafeList) PopFrontAll() []interface{} {
|
||||
this.Lock()
|
||||
count := this.L.Len()
|
||||
if count == 0 {
|
||||
this.Unlock()
|
||||
return []interface{}{}
|
||||
}
|
||||
items := make([]interface{}, count)
|
||||
for i := 0; i < count; i++ {
|
||||
items[i] = this.L.Remove(this.L.Front())
|
||||
}
|
||||
this.Unlock()
|
||||
return items
|
||||
}
|
||||
|
||||
// 删除数据项
|
||||
func (this *SafeList) Remove(e *list.Element) interface{} {
|
||||
this.Lock()
|
||||
|
@ -22,6 +22,15 @@ func NewIntSet() *IntSet {
|
||||
return &IntSet{M: make(map[int]struct{})}
|
||||
}
|
||||
|
||||
// 给定回调函数对原始内容进行遍历
|
||||
func (this *IntSet) Iterator(f func (v int)) {
|
||||
this.RLock()
|
||||
for k, _ := range this.M {
|
||||
f(k)
|
||||
}
|
||||
this.RUnlock()
|
||||
}
|
||||
|
||||
// 设置键
|
||||
func (this *IntSet) Add(item int) *IntSet {
|
||||
if this.Contains(item) {
|
||||
|
@ -21,7 +21,16 @@ func NewInterfaceSet() *InterfaceSet {
|
||||
return &InterfaceSet{M: make(map[interface{}]struct{})}
|
||||
}
|
||||
|
||||
// 设置键
|
||||
// 给定回调函数对原始内容进行遍历
|
||||
func (this *InterfaceSet) Iterator(f func (v interface{})) {
|
||||
this.RLock()
|
||||
for k, _ := range this.M {
|
||||
f(k)
|
||||
}
|
||||
this.RUnlock()
|
||||
}
|
||||
|
||||
// 添加
|
||||
func (this *InterfaceSet) Add(item interface{}) *InterfaceSet {
|
||||
if this.Contains(item) {
|
||||
return this
|
||||
@ -32,7 +41,7 @@ func (this *InterfaceSet) Add(item interface{}) *InterfaceSet {
|
||||
return this
|
||||
}
|
||||
|
||||
// 批量添加设置键
|
||||
// 批量添加
|
||||
func (this *InterfaceSet) BatchAdd(items []interface{}) *InterfaceSet {
|
||||
count := len(items)
|
||||
if count == 0 {
|
||||
@ -97,13 +106,12 @@ func (this *InterfaceSet) Clear() {
|
||||
// 转换为数组
|
||||
func (this *InterfaceSet) Slice() []interface{} {
|
||||
this.RLock()
|
||||
i := 0
|
||||
ret := make([]interface{}, len(this.M))
|
||||
i := 0
|
||||
for item := range this.M {
|
||||
ret[i] = item
|
||||
i++
|
||||
}
|
||||
|
||||
this.RUnlock()
|
||||
return ret
|
||||
}
|
||||
|
@ -21,6 +21,15 @@ func NewStringSet() *StringSet {
|
||||
return &StringSet{M: make(map[string]struct{})}
|
||||
}
|
||||
|
||||
// 给定回调函数对原始内容进行遍历
|
||||
func (this *StringSet) Iterator(f func (v string)) {
|
||||
this.RLock()
|
||||
for k, _ := range this.M {
|
||||
f(k)
|
||||
}
|
||||
this.RUnlock()
|
||||
}
|
||||
|
||||
// 设置键
|
||||
func (this *StringSet) Add(item string) *StringSet {
|
||||
if this.Contains(item) {
|
||||
|
57
g/os/groutine/groutine_api.go
Normal file
57
g/os/groutine/groutine_api.go
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// Goroutine池.
|
||||
// 用于goroutine复用,提升异步操作执行效率.
|
||||
package groutine
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"gitee.com/johng/gf/g/container/gset"
|
||||
"gitee.com/johng/gf/g/container/glist"
|
||||
)
|
||||
|
||||
// goroutine池对象
|
||||
type Pool struct {
|
||||
jobs *gset.InterfaceSet // 当前任务对象(*PoolJob)
|
||||
queue *glist.SafeList // 空闲任务队列(*PoolJob)
|
||||
funcs *glist.SafeList // 待处理任务操作队列
|
||||
events chan struct{} // 任务操作处理事件(用于任务事件通知)
|
||||
}
|
||||
|
||||
// goroutine任务
|
||||
type PoolJob struct {
|
||||
mu sync.RWMutex
|
||||
job chan func() // 当前任务(当为nil时表示关闭)
|
||||
pool *Pool // 所属池
|
||||
}
|
||||
|
||||
// 创建goroutine池管理对象
|
||||
func New() *Pool {
|
||||
p := &Pool {
|
||||
jobs : gset.NewInterfaceSet(),
|
||||
queue : glist.NewSafeList(),
|
||||
funcs : glist.NewSafeList(),
|
||||
events : make(chan struct{}, math.MaxUint32),
|
||||
}
|
||||
p.loop()
|
||||
return p
|
||||
}
|
||||
|
||||
// 添加异步任务
|
||||
func (p *Pool) Add(f func()) {
|
||||
p.funcs.PushBack(f)
|
||||
p.events <- struct{}{}
|
||||
}
|
||||
|
||||
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
|
||||
func (p *Pool) Close() {
|
||||
p.Add(nil)
|
||||
p.jobs.Iterator(func(v interface{}){
|
||||
v.(*PoolJob).stop()
|
||||
})
|
||||
}
|
36
g/os/groutine/groutine_job.go
Normal file
36
g/os/groutine/groutine_job.go
Normal file
@ -0,0 +1,36 @@
|
||||
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
package groutine
|
||||
|
||||
// 开始任务
|
||||
func (j *PoolJob) start() {
|
||||
go func() {
|
||||
for {
|
||||
if f := <- j.job; f != nil {
|
||||
// 执行任务
|
||||
f()
|
||||
// 清空任务(GC可回收f对应资源)
|
||||
j.job = nil
|
||||
// 执行完毕后添加到空闲队列
|
||||
j.pool.addJob(j)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 关闭当前任务
|
||||
func (j *PoolJob) stop() {
|
||||
j.setJob(nil)
|
||||
}
|
||||
|
||||
// 设置当前任务的执行函数
|
||||
func (j *PoolJob) setJob(f func()) {
|
||||
j.job <- f
|
||||
}
|
||||
|
48
g/os/groutine/groutine_pool.go
Normal file
48
g/os/groutine/groutine_pool.go
Normal file
@ -0,0 +1,48 @@
|
||||
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
package groutine
|
||||
|
||||
// 任务分配循环
|
||||
func (p *Pool) loop() {
|
||||
go func() {
|
||||
for {
|
||||
// 阻塞监听任务事件
|
||||
if _, ok := <- p.events; ok {
|
||||
// 如果任务为nil,表示池关闭
|
||||
if r := p.funcs.PopFront(); r != nil {
|
||||
p.getJob().setJob(r.(func()))
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 创建一个空的任务对象
|
||||
func (p *Pool) newJob() *PoolJob {
|
||||
j := &PoolJob {
|
||||
job : make(chan func(), 1),
|
||||
pool : p,
|
||||
}
|
||||
j.start()
|
||||
p.jobs.Add(j)
|
||||
return j
|
||||
}
|
||||
|
||||
// 添加任务对象到队列
|
||||
func (p *Pool) addJob(j *PoolJob) {
|
||||
p.queue.PushBack(j)
|
||||
}
|
||||
|
||||
// 获取/创建任务
|
||||
func (p *Pool) getJob() *PoolJob {
|
||||
if r := p.queue.PopFront(); r != nil {
|
||||
return r.(*PoolJob)
|
||||
}
|
||||
return p.newJob()
|
||||
}
|
36
g/os/groutine/groutine_test.go
Normal file
36
g/os/groutine/groutine_test.go
Normal file
@ -0,0 +1,36 @@
|
||||
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// go test *.go -bench=".*"
|
||||
|
||||
package groutine_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"gitee.com/johng/gf/g/os/groutine"
|
||||
)
|
||||
|
||||
func test() {
|
||||
num := 0
|
||||
for i := 0; i < 1000000; i++ {
|
||||
num += i
|
||||
}
|
||||
}
|
||||
|
||||
var pool = groutine.New()
|
||||
|
||||
func BenchmarkGroutine(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.Add(test)
|
||||
}
|
||||
//pool.Close()
|
||||
}
|
||||
|
||||
func BenchmarkGoRoutine(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
go test()
|
||||
}
|
||||
}
|
27
geg/os/groutine.go
Normal file
27
geg/os/groutine.go
Normal file
@ -0,0 +1,27 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/groutine"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func job() {
|
||||
time.Sleep(3*time.Second)
|
||||
fmt.Println("job done")
|
||||
}
|
||||
|
||||
func main() {
|
||||
p := groutine.New()
|
||||
p.Add(job)
|
||||
p.Add(job)
|
||||
p.Add(job)
|
||||
p.Add(job)
|
||||
|
||||
|
||||
time.Sleep(1*time.Second)
|
||||
|
||||
p.Close()
|
||||
|
||||
time.Sleep(5*time.Second)
|
||||
}
|
@ -1,9 +1,21 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/container/glist"
|
||||
"math"
|
||||
)
|
||||
|
||||
func main() {
|
||||
glog.Error("发生错误!")
|
||||
|
||||
t1 := gtime.Microsecond()
|
||||
c := make(chan struct{}, math.MaxInt64)
|
||||
c <- struct{}{}
|
||||
fmt.Println(gtime.Microsecond() - t1)
|
||||
|
||||
t2 := gtime.Microsecond()
|
||||
l := glist.NewSafeList()
|
||||
l.PushBack(func() {})
|
||||
fmt.Println(gtime.Microsecond() - t2)
|
||||
}
|
Loading…
Reference in New Issue
Block a user