From de1047edc984026d615f96b1fabca9db52ceb17f Mon Sep 17 00:00:00 2001 From: John Date: Wed, 28 Feb 2018 11:45:06 +0800 Subject: [PATCH] =?UTF-8?q?gchan,gqueue=E5=8C=85=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E6=94=B9=E8=BF=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/container/gchan/gchan.go | 15 +++++++++------ g/container/gqueue/gqueue.go | 34 +++++++++++++--------------------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/g/container/gchan/gchan.go b/g/container/gchan/gchan.go index 69bcab9f8..a70930763 100644 --- a/g/container/gchan/gchan.go +++ b/g/container/gchan/gchan.go @@ -10,13 +10,12 @@ package gchan import ( "sync" "errors" - "sync/atomic" ) type Chan struct { mu sync.RWMutex list chan interface{} - closed int32 + closed bool } func New(limit int) *Chan { @@ -27,25 +26,29 @@ func New(limit int) *Chan { // 将数据压入队列 func (q *Chan) Push(v interface{}) error { - if atomic.LoadInt32(&q.closed) > 0 { + q.mu.RLock() + if q.closed { + q.mu.RUnlock() return errors.New("closed") } q.list <- v + q.mu.RUnlock() return nil } // 先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 -// 第二个返回值表示队列是否关闭 func (q *Chan) Pop() interface{} { return <- q.list } // 关闭队列(通知所有通过Pop阻塞的协程退出) func (q *Chan) Close() { - if atomic.LoadInt32(&q.closed) == 0 { - atomic.StoreInt32(&q.closed, 1) + q.mu.Lock() + if !q.closed { + q.closed = true close(q.list) } + q.mu.Unlock() } // 获取当前队列大小 diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index 4ff09808c..f3088b362 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -14,7 +14,6 @@ import ( "math" "sync" "errors" - "sync/atomic" "container/list" ) @@ -24,7 +23,7 @@ type Queue struct { limit int // 队列限制大小 limits chan struct{} // 用于队列大小限制 events chan struct{} // 用于内部数据写入事件通知 - closed int32 // 队列是否关闭 + closed bool // 队列是否关闭 } // 队列大小为非必须参数,默认不限制 @@ -42,78 +41,71 @@ func New(limit...int) *Queue { // 将数据压入队列, 队尾 func (q *Queue) PushBack(v interface{}) error { - if atomic.LoadInt32(&q.closed) > 0 { + q.mu.RLock() + if q.closed { + q.mu.RUnlock() return errors.New("closed") } if q.limit > 0 { q.limits <- struct{}{} } - q.mu.Lock() q.list.PushBack(v) - q.mu.Unlock() q.events <- struct{}{} + q.mu.RUnlock() return nil } // 将数据压入队列, 队头 func (q *Queue) PushFront(v interface{}) error { - if atomic.LoadInt32(&q.closed) > 0 { + q.mu.RLock() + if q.closed { + q.mu.RUnlock() return errors.New("closed") } if q.limit > 0 { q.limits <- struct{}{} } - q.mu.Lock() q.list.PushFront(v) - q.mu.Unlock() q.events <- struct{}{} + q.mu.RUnlock() return nil } // 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 func (q *Queue) PopFront() interface{} { <- q.events - if q.limit > 0 { <- q.limits } - q.mu.Lock() if elem := q.list.Front(); elem != nil { item := q.list.Remove(elem) - q.mu.Unlock() return item } - q.mu.Unlock() - return nil } // 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 -// 第二个返回值表示队列是否关闭 func (q *Queue) PopBack() interface{} { <- q.events - if q.limit > 0 { <- q.limits } - q.mu.Lock() if elem := q.list.Front(); elem != nil { item := q.list.Remove(elem) - q.mu.Unlock() return item } - q.mu.Unlock() - return nil } // 关闭队列(通知所有通过Pop*阻塞的协程退出) func (q *Queue) Close() { - if atomic.LoadInt32(&q.closed) == 0 { - atomic.StoreInt32(&q.closed, 1) + q.mu.Lock() + if !q.closed { + q.closed = true close(q.limits) close(q.events) } + q.mu.Unlock() } // 获取当前队列大小