improve grpool

This commit is contained in:
John 2019-06-01 15:11:32 +08:00
parent 9ad94eccad
commit 2bf2f1b822
8 changed files with 109 additions and 63 deletions

View File

@ -209,6 +209,11 @@ func (l *List) Len() (length int) {
return
}
// Alias of Len.
func (l *List) Size() int {
return l.Len()
}
// MoveBefore moves element <e> to its new position before <p>.
// If <e> or <p> is not an element of <l>, or <e> == <p>, the list is not modified.
// The element and <p> must not be nil.

View File

@ -5,50 +5,42 @@
// You can obtain one at https://github.com/gogf/gf.
// Package grpool implements a goroutine reusable pool.
//
// Goroutine池,
// 用于goroutine复用提升异步操作执行效率(避免goroutine限制并节约内存开销).
// 需要注意的是grpool提供给的公共池不提供关闭方法自创建的池可以手动关闭掉。
package grpool
import (
"github.com/gogf/gf/g/container/glist"
"github.com/gogf/gf/g/container/gtype"
"math"
"github.com/gogf/gf/g/container/glist"
"github.com/gogf/gf/g/container/gtype"
)
// goroutine池对象
// Goroutine Pool
type Pool struct {
limit int // 最大的goroutine数量限制
count *gtype.Int // 当前正在运行的goroutine数量
list *glist.List // 待处理任务操作列表
events chan struct{} // 任务添加事件
closed *gtype.Bool
limit int // 最大的goroutine数量限制
count *gtype.Int // 当前正在运行的goroutine数量
list *glist.List // 待处理任务操作列表
closed *gtype.Bool // 是否关闭
}
// 默认的goroutine池管理对象
// 默认的goroutine池管理对象,
// 该对象与进程同生命周期无需Close
var defaultPool = New()
// 创建goroutine池管理对象参数用于限制限制最大的goroutine数量非必需参数默认不做限制
func New(size...int) *Pool {
limit := -1
if len(size) > 0 {
limit = size[0]
}
func New(limit...int) *Pool {
p := &Pool {
limit : limit,
limit : -1,
count : gtype.NewInt(),
list : glist.New(),
events : make(chan struct{}, math.MaxInt32),
closed : gtype.NewBool(),
}
if len(limit) > 0 {
p.limit = limit[0]
}
return p
}
// 添加异步任务(使用默认的池对象)
func Add(f func()) error {
return defaultPool.Add(f)
func Add(f func()) {
defaultPool.Add(f)
}
// 查询当前goroutine总数
@ -58,18 +50,16 @@ func Size() int {
// 查询当前等待处理的任务总数
func Jobs() int {
return len(defaultPool.events)
return defaultPool.list.Len()
}
// 添加异步任务
func (p *Pool) Add(f func()) error {
p.list.PushBack(f)
p.events <- struct{}{}
// 判断是否创建新的worker
if p.list.Len() > 1 || p.count.Val() == 0 {
func (p *Pool) Add(f func()) {
p.list.PushFront(f)
// 判断是否创建新的goroutine
if p.count.Val() != p.limit {
p.fork()
}
return nil
}
// 查询当前goroutine总数
@ -79,35 +69,26 @@ func (p *Pool) Size() int {
// 查询当前等待处理的任务总数
func (p *Pool) Jobs() int {
return p.list.Len()
return p.list.Size()
}
// 创建新的worker执行任务
// 检查并创建新的goroutine执行任务
func (p *Pool) fork() {
// 如果worker数量已经达到限制那么不创建新worker直接返回
if p.count.Val() == p.limit {
return
}
p.count.Add(1)
go func() {
defer p.count.Add(-1)
job := (interface{})(nil)
for !p.closed.Val() {
select {
case <- p.events:
if job := p.list.PopFront(); job != nil {
job.(func())()
} else {
p.count.Add(-1)
return
}
default:
p.count.Add(-1)
return
}
if job = p.list.PopBack(); job != nil {
job.(func())()
} else {
return
}
}
}()
}
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
func (p *Pool) Close() {
p.closed.Set(true)
p.closed.Set(true)
}

View File

@ -22,7 +22,7 @@ func Test_Basic(t *testing.T) {
gtest.Case(t, func() {
wg := sync.WaitGroup{}
array := garray.NewArray()
size := 100000
size := 10000
wg.Add(size)
for i := 0; i < size; i++ {
grpool.Add(func() {
@ -33,7 +33,45 @@ func Test_Basic(t *testing.T) {
wg.Wait()
gtest.Assert(array.Len(), size)
})
}
func Test_Limit1(t *testing.T) {
gtest.Case(t, func() {
wg := sync.WaitGroup{}
array := garray.NewArray()
size := 10000
pool := grpool.New(10)
wg.Add(size)
for i := 0; i < size; i++ {
pool.Add(func() {
array.Append(1)
wg.Done()
})
}
wg.Wait()
gtest.Assert(array.Len(), size)
})
}
func Test_Limit2(t *testing.T) {
gtest.Case(t, func() {
wg := sync.WaitGroup{}
array := garray.NewArray()
size := 10000
pool := grpool.New(1)
wg.Add(size)
for i := 0; i < size; i++ {
pool.Add(func() {
array.Append(1)
wg.Done()
})
}
wg.Wait()
gtest.Assert(array.Len(), size)
})
}
func Test_Limit3(t *testing.T) {
gtest.Case(t, func() {
array := garray.NewArray()
size := 100000
@ -50,7 +88,7 @@ func Test_Basic(t *testing.T) {
gtest.Assert(array.Len(), 10000)
pool.Close()
time.Sleep(2*time.Second)
gtest.Assert(pool.Size(), 10000)
gtest.Assert(pool.Size(), 0)
gtest.Assert(pool.Jobs(), 90000)
gtest.Assert(array.Len(), 10000)
})

View File

@ -18,12 +18,12 @@ func main() {
// 消费者,不停读取队列数据并输出到终端
for {
select {
case v := <-queue.C:
if v != nil {
fmt.Println(v)
} else {
return
}
case v := <-queue.C:
if v != nil {
fmt.Println(v)
} else {
return
}
}
}
}

View File

@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/gogf/gf/g/os/grpool"
"github.com/gogf/gf/g/os/gtime"
"github.com/gogf/gf/g/os/gtimer"
"time"
)
@ -18,11 +18,11 @@ func main() {
}
fmt.Println("worker:", pool.Size())
fmt.Println(" jobs:", pool.Jobs())
gtime.SetInterval(time.Second, func() bool {
gtimer.SetInterval(time.Second, func() {
fmt.Println("worker:", pool.Size())
fmt.Println(" jobs:", pool.Jobs())
fmt.Println()
return true
gtimer.Exit()
})
select {}

View File

@ -10,8 +10,9 @@ func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
v := i
grpool.Add(func() {
fmt.Println(i)
fmt.Println(v)
wg.Done()
})
}

View File

@ -2,17 +2,20 @@ package main
import (
"fmt"
"github.com/gogf/gf/g/os/grpool"
"sync"
)
func main() {
p := grpool.New(1)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(v int) {
v := i
p.Add(func() {
fmt.Println(v)
wg.Done()
}(i)
})
}
wg.Wait()
}

18
geg/os/grpool/grpool4.go Normal file
View File

@ -0,0 +1,18 @@
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(v int) {
fmt.Println(v)
wg.Done()
}(i)
}
wg.Wait()
}