改进grpool协程池内部细节,提升操作性能

This commit is contained in:
John 2018-02-07 10:52:18 +08:00
parent 5db721a250
commit 9a32b203fb
3 changed files with 59 additions and 54 deletions

View File

@ -72,38 +72,38 @@ func (q *Queue) PushFront(v interface{}) error {
// 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 // 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopFront() interface{} { func (q *Queue) PopFront() interface{} {
select { <- q.events
case <- q.events:
if q.limit > 0 { if q.limit > 0 {
<- q.limits <- q.limits
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
} }
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil return nil
} }
// 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 // 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
// 第二个返回值表示队列是否关闭 // 第二个返回值表示队列是否关闭
func (q *Queue) PopBack() interface{} { func (q *Queue) PopBack() interface{} {
select { <- q.events
case <- q.events:
if q.limit > 0 { if q.limit > 0 {
<- q.limits <- q.limits
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
} }
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil return nil
} }

View File

@ -10,31 +10,32 @@ package grpool
import ( import (
"math" "math"
"runtime"
"sync/atomic" "sync/atomic"
"gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/glist"
) )
const ( const (
gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间 gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间(秒)
gDEFAULT_CLEAR_INTERVAL = 60 // 定期检查任务过期时间间隔 gDEFAULT_CLEAR_INTERVAL = 60 // 定期检查任务过期时间间隔(秒)
) )
// goroutine池对象 // goroutine池对象
type Pool struct { type Pool struct {
size int32 // 限制最大的goroutine数量/线程数/worker数量 size int32 // 限制最大的goroutine数量/程数/worker数量
expire int32 // goroutine过期时间(秒) expire int32 // goroutine过期时间(秒)
number int32 // 当前goroutine数量(非任务数) number int32 // 当前goroutine数量(非任务数)
queue *glist.SafeList // 空闲任务队列(*PoolJob) queue *glist.SafeList // 空闲任务队列(*PoolJob)
funcs *glist.SafeList // 待处理任务操作队列 funcs *glist.SafeList // 待处理任务操作队列
freeEvents chan struct{} // 空闲线程通知事件 freeEvents chan struct{} // 空闲程通知事件
funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知) funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知)
stopEvents chan struct{} // 池关闭事件(用于池相关异步线程通知) stopEvents chan struct{} // 池关闭事件(用于池相关异步程通知)
} }
// goroutine任务 // goroutine任务
type PoolJob struct { type PoolJob struct {
job chan func() // 当前任务(当为nil时表示关闭) job chan func() // 当前任务(当为nil时表示关闭)
pool *Pool // 所属 pool *Pool // 所属协程
update int64 // 更新时间 update int64 // 更新时间
} }
@ -65,8 +66,7 @@ func New(expire int, sizes...int) *Pool {
// 添加异步任务(使用默认的池对象) // 添加异步任务(使用默认的池对象)
func Add(f func()) { func Add(f func()) {
defaultPool.funcs.PushBack(f) defaultPool.Add(f)
defaultPool.funcEvents <- struct{}{}
} }
// 查询当前goroutine总数 // 查询当前goroutine总数
@ -119,6 +119,8 @@ func (p *Pool) SetExpire(expire int) {
func (p *Pool) Close() { func (p *Pool) Close() {
// 必须首先标识让任务过期自动关闭 // 必须首先标识让任务过期自动关闭
p.SetExpire(-1) p.SetExpire(-1)
p.stopEvents <- struct{}{} // 通知workloop // 使用stopEvents事件通知所有的异步协程自动退出
p.stopEvents <- struct{}{} // 通知clearloop for i := 0; i < runtime.GOMAXPROCS(-1) + 1; i++ {
p.stopEvents <- struct{}{}
}
} }

View File

@ -8,30 +8,37 @@ package grpool
import ( import (
"time" "time"
"runtime"
"sync/atomic" "sync/atomic"
"gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/os/gtime"
) )
// 任务分配循环这是一个独立的goroutine单线程处理 // 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度
func (p *Pool) startWorkLoop() { func (p *Pool) startWorkLoop() {
go func() { for i := 0; i < runtime.GOMAXPROCS(-1); i++ {
for { go func() {
select { for {
case <-p.funcEvents: select {
p.getJob().setJob(p.funcs.PopFront().(func())) case <-p.funcEvents:
case <-p.stopEvents: p.getJob().setJob(p.funcs.PopFront().(func()))
return case <-p.stopEvents:
return
}
} }
} }()
}() }
} }
// 定时清理过期任务,单线程处理 // 定时清理过期任务,单线程处理
func (p *Pool) startClearLoop() { func (p *Pool) startClearLoop() {
go func() { go func() {
for { for {
// 如果接收到关闭通知(池已经关闭),那么不再执行清理操作,直接退出
if len(p.stopEvents) > 0 {
break
}
time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second)
if len(p.stopEvents) > 0 || len(p.funcEvents) == 0 { if len(p.funcEvents) == 0 {
var j *PoolJob var j *PoolJob
for { for {
if r := p.queue.PopFront(); r != nil { if r := p.queue.PopFront(); r != nil {
@ -48,10 +55,6 @@ func (p *Pool) startClearLoop() {
} }
} }
} }
// 判断是池已经关闭并且所有goroutine已退出那么该goroutine终止执行
if len(p.stopEvents) > 0 && atomic.LoadInt32(&p.number) == 0 {
break
}
} }
}() }()
} }
@ -70,11 +73,11 @@ func (p *Pool) getExpire() int32 {
func (p *Pool) newJob() *PoolJob { func (p *Pool) newJob() *PoolJob {
// 如果达到goroutine数限制那么阻塞等待有空闲goroutine后继续 // 如果达到goroutine数限制那么阻塞等待有空闲goroutine后继续
if p.reachSizeLimit() { if p.reachSizeLimit() {
// 阻塞等待空闲goroutine // 阻塞等待空闲的协程资源,
select { // 这是一个递归循环,因为该流程中存在协程抢占机制,
case <- p.freeEvents: // 如果进入getJob方法没有抢占到协程资源那么该任务执行会继续等待下一个freeEvents
return p.getJob() <- p.freeEvents
} return p.getJob()
} }
j := &PoolJob { j := &PoolJob {
job : make(chan func(), 1), job : make(chan func(), 1),