gf/container/gqueue/gqueue.go

145 lines
3.8 KiB
Go
Raw Normal View History

2021-01-17 21:46:25 +08:00
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
2020-03-06 11:01:03 +08:00
// Package gqueue provides dynamic/static concurrent-safe queue.
2019-01-16 13:35:16 +08:00
//
// Features:
//
// 1. FIFO queue(data -> list -> chan);
//
// 2. Fast creation and initialization;
//
// 3. Support dynamic queue size(unlimited queue size);
//
// 4. Blocking when reading data from queue;
2019-01-16 13:35:16 +08:00
//
package gqueue
import (
2019-06-17 19:54:00 +08:00
"math"
2019-07-29 21:01:19 +08:00
"github.com/gogf/gf/container/glist"
"github.com/gogf/gf/container/gtype"
)
2020-03-06 11:01:03 +08:00
// Queue is a concurrent-safe queue built on doubly linked list and channel.
type Queue struct {
2019-06-19 09:06:52 +08:00
limit int // Limit for queue size.
list *glist.List // Underlying list structure for data maintaining.
closed *gtype.Bool // Whether queue is closed.
events chan struct{} // Events for data writing.
C chan interface{} // Underlying channel for data reading.
}
const (
defaultQueueSize = 10000 // Size for queue buffer.
defaultBatchSize = 10 // Max batch size per-fetching from list.
)
// New returns an empty queue object.
2019-06-09 10:33:16 +08:00
// Optional parameter <limit> is used to limit the size of the queue, which is unlimited in default.
// When <limit> is given, the queue will be static and high performance which is comparable with stdlib channel.
2019-06-19 09:06:52 +08:00
func New(limit ...int) *Queue {
q := &Queue{
closed: gtype.NewBool(),
}
if len(limit) > 0 && limit[0] > 0 {
2019-06-19 09:06:52 +08:00
q.limit = limit[0]
q.C = make(chan interface{}, limit[0])
} else {
q.list = glist.New(true)
2019-06-19 09:06:52 +08:00
q.events = make(chan struct{}, math.MaxInt32)
q.C = make(chan interface{}, defaultQueueSize)
2020-03-06 11:01:03 +08:00
go q.asyncLoopFromListToChannel()
2019-06-19 09:06:52 +08:00
}
return q
}
2020-03-06 11:01:03 +08:00
// asyncLoopFromListToChannel starts an asynchronous goroutine,
// which handles the data synchronization from list <q.list> to channel <q.C>.
2020-03-06 11:01:03 +08:00
func (q *Queue) asyncLoopFromListToChannel() {
2019-06-19 09:06:52 +08:00
defer func() {
if q.closed.Val() {
_ = recover()
}
}()
for !q.closed.Val() {
<-q.events
for !q.closed.Val() {
if length := q.list.Len(); length > 0 {
if length > defaultBatchSize {
length = defaultBatchSize
2019-06-19 09:06:52 +08:00
}
for _, v := range q.list.PopFronts(length) {
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
// If any error occurs here, it will be caught by recover and be ignored.
q.C <- v
}
} else {
break
}
}
// Clear q.events to remain just one event to do the next synchronization check.
for i := 0; i < len(q.events)-1; i++ {
<-q.events
}
}
// It should be here to close q.C if <q> is unlimited size.
2019-06-19 09:06:52 +08:00
// It's the sender's responsibility to close channel when it should be closed.
2019-06-17 19:54:00 +08:00
close(q.C)
}
// Push pushes the data <v> into the queue.
// Note that it would panics if Push is called after the queue is closed.
func (q *Queue) Push(v interface{}) {
2019-06-19 09:06:52 +08:00
if q.limit > 0 {
q.C <- v
} else {
q.list.PushBack(v)
if len(q.events) < defaultQueueSize {
2019-06-19 09:06:52 +08:00
q.events <- struct{}{}
}
}
}
// Pop pops an item from the queue in FIFO way.
// Note that it would return nil immediately if Pop is called after the queue is closed.
func (q *Queue) Pop() interface{} {
2019-06-19 09:06:52 +08:00
return <-q.C
}
// Close closes the queue.
// Notice: It would notify all goroutines return immediately,
2019-06-09 10:33:16 +08:00
// which are being blocked reading using Pop method.
func (q *Queue) Close() {
2019-06-19 09:06:52 +08:00
q.closed.Set(true)
2019-06-17 19:54:00 +08:00
if q.events != nil {
close(q.events)
}
if q.limit > 0 {
close(q.C)
}
for i := 0; i < defaultBatchSize; i++ {
2019-06-19 09:06:52 +08:00
q.Pop()
}
}
2019-06-15 18:30:09 +08:00
// Len returns the length of the queue.
2019-10-25 17:32:03 +08:00
// Note that the result might not be accurate as there's a
2021-05-15 18:13:51 +08:00
// asynchronous channel reading the list constantly.
2019-06-15 18:30:09 +08:00
func (q *Queue) Len() (length int) {
if q.list != nil {
length += q.list.Len()
}
length += len(q.C)
2019-06-19 09:06:52 +08:00
return
2018-03-13 15:54:56 +08:00
}
2019-06-15 18:30:09 +08:00
// Size is alias of Len.
func (q *Queue) Size() int {
return q.Len()
}