Rainbond/util/etcd/queue.go

95 lines
2.6 KiB
Go
Raw Normal View History

2018-03-14 14:12:26 +08:00
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
2017-11-07 11:40:44 +08:00
// RAINBOND, Application Management Platform
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package etcd
import (
"fmt"
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
// Queue implements a multi-reader, multi-writer distributed queue.
type Queue struct {
client *v3.Client
ctx context.Context
keyPrefix string
}
2019-12-24 22:41:19 +08:00
// NewQueue new queue
func NewQueue(ctx context.Context, client *v3.Client, keyPrefix string) *Queue {
2017-11-07 11:40:44 +08:00
return &Queue{client, ctx, keyPrefix}
}
2019-12-24 22:41:19 +08:00
// Enqueue en queue
2017-11-07 11:40:44 +08:00
func (q *Queue) Enqueue(val string) error {
_, err := newUniqueKV(q.ctx, q.client, q.keyPrefix, val)
return err
}
// Dequeue returns Enqueue()'d elements in FIFO order. If the
// queue is empty, Dequeue blocks until elements are available.
func (q *Queue) Dequeue() (string, error) {
2020-07-18 15:50:18 +08:00
for {
// TODO: fewer round trips by fetching more than one key
resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
if err != nil {
return "", err
}
2017-11-07 11:40:44 +08:00
2020-07-18 15:50:18 +08:00
kv, err := claimFirstKey(q.ctx, q.client, resp.Kvs)
if err != nil {
return "", err
} else if kv != nil {
return string(kv.Value), nil
} else if resp.More {
// missed some items, retry to read in more
return q.Dequeue()
}
2017-11-07 11:40:44 +08:00
2020-07-18 15:50:18 +08:00
// nothing yet; wait on elements
ev, err := WaitPrefixEvents(
q.client,
q.keyPrefix,
resp.Header.Revision,
[]mvccpb.Event_EventType{mvccpb.PUT})
if err != nil {
if err.Error() == "not updated for a long time" {
continue
}
return "", err
}
if ev == nil {
return "", fmt.Errorf("event is nil")
}
if ev.Kv == nil {
return "", fmt.Errorf("event key value is nil")
}
ok, err := deleteRevKey(q.ctx, q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
if err != nil {
return "", err
} else if !ok {
return q.Dequeue()
}
return string(ev.Kv.Value), err
2017-11-07 11:40:44 +08:00
}
}