gf/os/grpool/grpool.go

176 lines
4.6 KiB
Go
Raw Normal View History

2021-01-17 21:46:25 +08:00
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
2018-07-27 15:48:49 +08:00
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
2018-07-27 15:48:49 +08:00
2019-01-16 13:02:59 +08:00
// Package grpool implements a goroutine reusable pool.
2018-07-27 15:48:49 +08:00
package grpool
import (
"context"
2019-06-22 15:05:15 +08:00
2021-10-11 21:41:56 +08:00
"github.com/gogf/gf/v2/container/glist"
"github.com/gogf/gf/v2/container/gtype"
2021-11-15 20:49:02 +08:00
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
2018-07-27 15:48:49 +08:00
)
// Func is the pool function which contains context parameter.
type Func func(ctx context.Context)
2021-09-28 19:04:36 +08:00
// Pool manages the goroutines using pool.
2018-07-27 15:48:49 +08:00
type Pool struct {
2019-06-19 09:06:52 +08:00
limit int // Max goroutine count limit.
count *gtype.Int // Current running goroutine count.
list *glist.List // Job list for asynchronous job adding purpose.
closed *gtype.Bool // Is pool closed or not.
2018-07-27 15:48:49 +08:00
}
type internalPoolItem struct {
Ctx context.Context
Func Func
}
2019-06-01 19:34:03 +08:00
// Default goroutine pool.
var (
pool = New()
)
2018-07-27 15:48:49 +08:00
2019-06-01 19:34:03 +08:00
// New creates and returns a new goroutine pool object.
// The parameter `limit` is used to limit the max goroutine count,
2019-06-01 19:34:03 +08:00
// which is not limited in default.
2019-06-19 09:06:52 +08:00
func New(limit ...int) *Pool {
p := &Pool{
limit: -1,
count: gtype.NewInt(),
list: glist.New(true),
2019-06-19 09:06:52 +08:00
closed: gtype.NewBool(),
}
if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0]
}
return p
2018-07-27 15:48:49 +08:00
}
2019-06-01 19:34:03 +08:00
// Add pushes a new job to the pool using default goroutine pool.
// The job will be executed asynchronously.
func Add(ctx context.Context, f Func) error {
return pool.Add(ctx, f)
2018-07-27 15:48:49 +08:00
}
// AddWithRecover pushes a new job to the pool with specified recover function.
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error {
return pool.AddWithRecover(ctx, userFunc, recoverFunc...)
}
2019-06-01 19:34:03 +08:00
// Size returns current goroutine count of default goroutine pool.
2018-07-27 15:48:49 +08:00
func Size() int {
2019-06-22 15:05:15 +08:00
return pool.Size()
2018-07-27 15:48:49 +08:00
}
2019-06-01 19:34:03 +08:00
// Jobs returns current job count of default goroutine pool.
2018-07-27 15:48:49 +08:00
func Jobs() int {
2019-06-22 15:05:15 +08:00
return pool.Jobs()
2018-07-27 15:48:49 +08:00
}
2019-06-01 19:34:03 +08:00
// Add pushes a new job to the pool.
// The job will be executed asynchronously.
func (p *Pool) Add(ctx context.Context, f Func) error {
2019-06-22 15:05:15 +08:00
for p.closed.Val() {
return gerror.NewCode(gcode.CodeInvalidOperation, "pool closed")
2019-06-19 09:06:52 +08:00
}
p.list.PushFront(&internalPoolItem{
Ctx: ctx,
Func: f,
})
2019-10-28 17:16:50 +08:00
// Check whether fork new goroutine or not.
2019-06-22 15:05:15 +08:00
var n int
for {
n = p.count.Val()
if p.limit != -1 && n >= p.limit {
2019-10-28 17:16:50 +08:00
// No need fork new goroutine.
2019-06-22 15:05:15 +08:00
return nil
}
if p.count.Cas(n, n+1) {
2019-10-28 17:16:50 +08:00
// Use CAS to guarantee atomicity.
2019-06-22 15:05:15 +08:00
break
}
}
p.fork()
2019-06-22 15:05:15 +08:00
return nil
}
// AddWithRecover pushes a new job to the pool with specified recover function.
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error {
return p.Add(ctx, func(ctx context.Context) {
defer func() {
2021-07-20 23:02:02 +08:00
if exception := recover(); exception != nil {
if len(recoverFunc) > 0 && recoverFunc[0] != nil {
if v, ok := exception.(error); ok && gerror.HasStack(v) {
recoverFunc[0](v)
2021-07-20 23:02:02 +08:00
} else {
recoverFunc[0](gerror.Newf(`%+v`, exception))
2021-07-20 23:02:02 +08:00
}
}
}
}()
userFunc(ctx)
})
}
2019-06-22 15:05:15 +08:00
// Cap returns the capacity of the pool.
// This capacity is defined when pool is created.
2019-10-28 17:16:50 +08:00
// It returns -1 if there's no limit.
2019-06-22 15:05:15 +08:00
func (p *Pool) Cap() int {
return p.limit
2018-07-27 15:48:49 +08:00
}
2019-06-01 19:34:03 +08:00
// Size returns current goroutine count of the pool.
2018-07-27 15:48:49 +08:00
func (p *Pool) Size() int {
2019-06-19 09:06:52 +08:00
return p.count.Val()
2018-07-27 15:48:49 +08:00
}
2019-06-01 19:34:03 +08:00
// Jobs returns current job count of the pool.
2019-10-28 17:16:50 +08:00
// Note that, it does not return worker/goroutine count but the job/task count.
2018-07-27 15:48:49 +08:00
func (p *Pool) Jobs() int {
2019-06-19 09:06:52 +08:00
return p.list.Size()
2018-07-27 15:48:49 +08:00
}
2019-10-28 17:16:50 +08:00
// fork creates a new goroutine worker.
// Note that the worker dies if the job function panics.
2019-05-31 22:54:57 +08:00
func (p *Pool) fork() {
2019-06-19 09:06:52 +08:00
go func() {
defer p.count.Add(-1)
2019-10-28 17:16:50 +08:00
var (
listItem interface{}
poolItem *internalPoolItem
)
2019-06-19 09:06:52 +08:00
for !p.closed.Val() {
if listItem = p.list.PopBack(); listItem != nil {
poolItem = listItem.(*internalPoolItem)
poolItem.Func(poolItem.Ctx)
2019-06-19 09:06:52 +08:00
} else {
return
}
}
}()
2018-07-27 15:48:49 +08:00
}
2019-06-22 15:05:15 +08:00
// IsClosed returns if pool is closed.
func (p *Pool) IsClosed() bool {
return p.closed.Val()
}
2019-06-01 19:34:03 +08:00
// Close closes the goroutine pool, which makes all goroutines exit.
2018-07-27 15:48:49 +08:00
func (p *Pool) Close() {
2019-06-01 15:11:32 +08:00
p.closed.Set(true)
2019-06-19 09:06:52 +08:00
}