gf/g/os/grpool/grpool.go

120 lines
3.4 KiB
Go
Raw Normal View History

2018-07-27 15:48:49 +08:00
// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
2019-01-16 13:02:59 +08:00
// Package grpool implements a goroutine reusable pool.
2018-07-27 15:48:49 +08:00
// Goroutine池.
2018-12-30 11:08:07 +08:00
// 用于goroutine复用提升异步操作执行效率(避免goroutine限制并节约内存开销).
2018-07-27 15:48:49 +08:00
// 需要注意的是grpool提供给的公共池不提供关闭方法自创建的池可以手动关闭掉。
package grpool
import (
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
2018-12-30 11:08:07 +08:00
"math"
2018-07-27 15:48:49 +08:00
)
// goroutine池对象
type Pool struct {
workerChan chan struct{} // 使用channel限制最大的goroutine数量
workerNum *gtype.Int // 当前正在运行的worker/goroutine数量
jobQueue *glist.List // 待处理任务操作队列
jobEvents chan struct{} // 任务添加事件(jobQueue+jobEvents结合使用)
closed *gtype.Bool
}
// 默认的goroutine池管理对象
// 该对象与进程同生命周期无需Close
var defaultPool = New()
2018-09-19 09:47:50 +08:00
// 创建goroutine池管理对象 参数用于限制限制最大的goroutine数量/线程数/worker数量非必需参数默认不做限制
2018-07-27 15:48:49 +08:00
func New(size...int) *Pool {
s := 0
if len(size) > 0 {
s = size[0]
}
p := &Pool {
workerNum : gtype.NewInt(),
jobQueue : glist.New(),
jobEvents : make(chan struct{}, math.MaxInt32),
workerChan : make(chan struct{}, s),
closed : gtype.NewBool(),
}
return p
}
// 添加异步任务(使用默认的池对象)
func Add(f func()) error {
return defaultPool.Add(f)
}
// 查询当前goroutine总数
func Size() int {
return defaultPool.workerNum.Val()
}
// 查询当前等待处理的任务总数
func Jobs() int {
return len(defaultPool.jobEvents)
}
// 添加异步任务
func (p *Pool) Add(f func()) error {
p.jobQueue.PushBack(f)
p.jobEvents <- struct{}{}
// 判断是否创建新的worker
if p.Jobs() > 1 || p.workerNum.Val() == 0 {
p.ForkWorker()
}
return nil
}
// 查询当前goroutine worker总数
func (p *Pool) Size() int {
return p.workerNum.Val()
}
// 查询当前等待处理的任务总数
func (p *Pool) Jobs() int {
return p.jobQueue.Len()
}
// 创建新的worker执行任务
func (p *Pool) ForkWorker() {
if cap(p.workerChan) > 0 {
// 如果worker数量已经达到限制那么不创建新worker直接返回
if p.workerNum.Val() == cap(p.workerChan) {
return
}
p.workerNum.Add(1)
p.workerChan <- struct{}{}
} else {
p.workerNum.Add(1)
}
go func() {
for !p.closed.Val() {
select {
case <- p.jobEvents:
if job := p.jobQueue.PopFront(); job != nil {
job.(func())()
} else {
goto WorkerDone
}
default:
goto WorkerDone
}
}
WorkerDone:
p.workerNum.Add(-1)
if cap(p.workerChan) > 0 {
<- p.workerChan
}
}()
}
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
func (p *Pool) Close() {
p.closed.Set(true)
}