diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index b27e3b15a..4ff09808c 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -72,38 +72,38 @@ func (q *Queue) PushFront(v interface{}) error { // 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 func (q *Queue) PopFront() interface{} { - select { - case <- q.events: - if q.limit > 0 { - <- q.limits - } - q.mu.Lock() - if elem := q.list.Front(); elem != nil { - item := q.list.Remove(elem) - q.mu.Unlock() - return item - } - q.mu.Unlock() + <- q.events + + if q.limit > 0 { + <- q.limits } + q.mu.Lock() + if elem := q.list.Front(); elem != nil { + item := q.list.Remove(elem) + q.mu.Unlock() + return item + } + q.mu.Unlock() + return nil } // 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 // 第二个返回值表示队列是否关闭 func (q *Queue) PopBack() interface{} { - select { - case <- q.events: - if q.limit > 0 { - <- q.limits - } - q.mu.Lock() - if elem := q.list.Front(); elem != nil { - item := q.list.Remove(elem) - q.mu.Unlock() - return item - } - q.mu.Unlock() + <- q.events + + if q.limit > 0 { + <- q.limits } + q.mu.Lock() + if elem := q.list.Front(); elem != nil { + item := q.list.Remove(elem) + q.mu.Unlock() + return item + } + q.mu.Unlock() + return nil } diff --git a/g/os/grpool/grpool_api.go b/g/os/grpool/grpool_api.go index 773fabdb1..bf4c3a76e 100644 --- a/g/os/grpool/grpool_api.go +++ b/g/os/grpool/grpool_api.go @@ -10,31 +10,32 @@ package grpool import ( "math" + "runtime" "sync/atomic" "gitee.com/johng/gf/g/container/glist" ) const ( - gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间 - gDEFAULT_CLEAR_INTERVAL = 60 // 定期检查任务过期时间间隔 + gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间(秒) + gDEFAULT_CLEAR_INTERVAL = 60 // 定期检查任务过期时间间隔(秒) ) // goroutine池对象 type Pool struct { - size int32 // 限制最大的goroutine数量/线程数/worker数量 + size int32 // 限制最大的goroutine数量/协程数/worker数量 expire int32 // goroutine过期时间(秒) number int32 // 当前goroutine数量(非任务数) queue *glist.SafeList // 空闲任务队列(*PoolJob) funcs *glist.SafeList // 待处理任务操作队列 - freeEvents chan struct{} // 空闲线程通知事件 + freeEvents chan struct{} // 空闲协程通知事件 funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知) - stopEvents chan struct{} // 池关闭事件(用于池相关异步线程通知) + stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知) } // goroutine任务 type PoolJob struct { job chan func() // 当前任务(当为nil时表示关闭) - pool *Pool // 所属池 + pool *Pool // 所属协程池 update int64 // 更新时间 } @@ -65,8 +66,7 @@ func New(expire int, sizes...int) *Pool { // 添加异步任务(使用默认的池对象) func Add(f func()) { - defaultPool.funcs.PushBack(f) - defaultPool.funcEvents <- struct{}{} + defaultPool.Add(f) } // 查询当前goroutine总数 @@ -119,6 +119,8 @@ func (p *Pool) SetExpire(expire int) { func (p *Pool) Close() { // 必须首先标识让任务过期自动关闭 p.SetExpire(-1) - p.stopEvents <- struct{}{} // 通知workloop - p.stopEvents <- struct{}{} // 通知clearloop + // 使用stopEvents事件通知所有的异步协程自动退出 + for i := 0; i < runtime.GOMAXPROCS(-1) + 1; i++ { + p.stopEvents <- struct{}{} + } } \ No newline at end of file diff --git a/g/os/grpool/grpool_pool.go b/g/os/grpool/grpool_pool.go index 17b52bddd..3d21da382 100644 --- a/g/os/grpool/grpool_pool.go +++ b/g/os/grpool/grpool_pool.go @@ -8,30 +8,37 @@ package grpool import ( "time" + "runtime" "sync/atomic" "gitee.com/johng/gf/g/os/gtime" ) -// 任务分配循环,这是一个独立的goroutine,单线程处理 +// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度 func (p *Pool) startWorkLoop() { - go func() { - for { - select { - case <-p.funcEvents: - p.getJob().setJob(p.funcs.PopFront().(func())) - case <-p.stopEvents: - return + for i := 0; i < runtime.GOMAXPROCS(-1); i++ { + go func() { + for { + select { + case <-p.funcEvents: + p.getJob().setJob(p.funcs.PopFront().(func())) + case <-p.stopEvents: + return + } } - } - }() + }() + } } // 定时清理过期任务,单线程处理 func (p *Pool) startClearLoop() { go func() { for { + // 如果接收到关闭通知(池已经关闭),那么不再执行清理操作,直接退出 + if len(p.stopEvents) > 0 { + break + } time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) - if len(p.stopEvents) > 0 || len(p.funcEvents) == 0 { + if len(p.funcEvents) == 0 { var j *PoolJob for { if r := p.queue.PopFront(); r != nil { @@ -48,10 +55,6 @@ func (p *Pool) startClearLoop() { } } } - // 判断是池已经关闭,并且所有goroutine已退出,那么该goroutine终止执行 - if len(p.stopEvents) > 0 && atomic.LoadInt32(&p.number) == 0 { - break - } } }() } @@ -70,11 +73,11 @@ func (p *Pool) getExpire() int32 { func (p *Pool) newJob() *PoolJob { // 如果达到goroutine数限制,那么阻塞等待有空闲goroutine后继续 if p.reachSizeLimit() { - // 阻塞等待空闲goroutine - select { - case <- p.freeEvents: - return p.getJob() - } + // 阻塞等待空闲的协程资源, + // 这是一个递归循环,因为该流程中存在协程抢占机制, + // 如果进入getJob方法没有抢占到协程资源,那么该任务执行会继续等待下一个freeEvents + <- p.freeEvents + return p.getJob() } j := &PoolJob { job : make(chan func(), 1),