Merge branch 'master' into 测试gqueue

This commit is contained in:
jroam 2019-06-16 23:16:18 +08:00
commit 211678c7d3

View File

@ -20,14 +20,15 @@ package gqueue
import ( import (
"github.com/gogf/gf/g/container/glist" "github.com/gogf/gf/g/container/glist"
"github.com/gogf/gf/g/container/gtype"
"math" "math"
) )
type Queue struct { type Queue struct {
limit int // Limit for queue size. limit int // Limit for queue size.
list *glist.List // Underlying list structure for data maintaining. list *glist.List // Underlying list structure for data maintaining.
closed *gtype.Bool // Whether queue is closed.
events chan struct{} // Events for data writing. events chan struct{} // Events for data writing.
closed chan struct{} // Events for queue closing.
C chan interface{} // Underlying channel for data reading. C chan interface{} // Underlying channel for data reading.
} }
@ -41,7 +42,7 @@ const (
// When <limit> is given, the queue will be static and high performance which is comparable with stdlib channel. // When <limit> is given, the queue will be static and high performance which is comparable with stdlib channel.
func New(limit...int) *Queue { func New(limit...int) *Queue {
q := &Queue { q := &Queue {
closed : make(chan struct{}, 0), closed : gtype.NewBool(),
} }
if len(limit) > 0 { if len(limit) > 0 {
q.limit = limit[0] q.limit = limit[0]
@ -58,28 +59,27 @@ func New(limit...int) *Queue {
// startAsyncLoop starts an asynchronous goroutine, // startAsyncLoop starts an asynchronous goroutine,
// which handles the data synchronization from list <q.list> to channel <q.C>. // which handles the data synchronization from list <q.list> to channel <q.C>.
func (q *Queue) startAsyncLoop() { func (q *Queue) startAsyncLoop() {
for { defer func() {
select { if q.closed.Val() {
case <- q.closed: _ = recover()
return }
case <- q.events: }()
for { for !q.closed.Val() {
if length := q.list.Len(); length > 0 { <- q.events
array := make([]interface{}, length) if length := q.list.Len(); length > 0 {
for i := 0; i < length; i++ { array := make([]interface{}, length)
if e := q.list.Front(); e != nil { for i := 0; i < length; i++ {
array[i] = q.list.Remove(e) if e := q.list.Front(); e != nil {
} else { array[i] = q.list.Remove(e)
break } else {
} break
}
for _, v := range array {
q.C <- v
}
} else {
break
}
} }
}
for _, v := range array {
// When q.C closes, it will panic here, especially q.C is being blocked for writing.
// It will be caught by recover and be ignored, if any error occurs here.
q.C <- v
}
} }
} }
} }
@ -105,14 +105,12 @@ func (q *Queue) Pop() interface{} {
// Notice: It would notify all goroutines return immediately, // Notice: It would notify all goroutines return immediately,
// which are being blocked reading using Pop method. // which are being blocked reading using Pop method.
func (q *Queue) Close() { func (q *Queue) Close() {
if q.C != nil { q.closed.Set(true)
close(q.C)
}
if q.events != nil { if q.events != nil {
close(q.events) close(q.events)
} }
if q.closed != nil { if q.C != nil {
close(q.closed) close(q.C)
} }
} }