Add isClosed for rocksmq (#12267)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
yukun 2021-11-26 11:27:16 +08:00 committed by GitHub
parent d82022ec46
commit 080eaa9a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 3 deletions

View File

@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/milvus-io/milvus/internal/allocator"
@ -36,6 +37,9 @@ import (
// UniqueID is the type of message ID
type UniqueID = typeutil.UniqueID
// Rocksmq state
type RmqState = int64
// RocksmqPageSize is the size of a message page, default 2GB
var RocksmqPageSize int64 = 2 << 30
@ -57,6 +61,15 @@ const (
CurrentIDSuffix = "current_id"
ReaderNamePrefix = "reader-"
RmqNotServingErrMsg = "Rocksmq is not serving"
)
const (
// RmqStateStopped state stands for just created or stopped `Rocksmq` instance
RmqStateStopped RmqState = 0
// RmqStateHealthy state stands for healthy `Rocksmq` instance
RmqStateHealthy RmqState = 1
)
/**
@ -143,6 +156,7 @@ type rocksmq struct {
retentionInfo *retentionInfo
readers sync.Map
state RmqState
}
// NewRocksMQ step:
@ -189,18 +203,21 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
if checkRetention() {
rmq.retentionInfo.startRetentionInfo()
}
atomic.StoreInt64(&rmq.state, RmqStateHealthy)
return rmq, nil
}
func (rmq *rocksmq) isClosed() bool {
return atomic.LoadInt64(&rmq.state) != RmqStateHealthy
}
// Close step:
// 1. Stop retention
// 2. Destroy all consumer groups and topics
// 3. Close rocksdb instance
func (rmq *rocksmq) Close() {
atomic.StoreInt64(&rmq.state, RmqStateStopped)
rmq.stopRetention()
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
rmq.consumers.Range(func(k, v interface{}) bool {
var topic string
for _, consumer := range v.([]*Consumer) {
@ -218,6 +235,8 @@ func (rmq *rocksmq) Close() {
}
return true
})
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
rmq.store.Close()
}
@ -234,6 +253,9 @@ func (rmq *rocksmq) checkKeyExist(key string) bool {
// CreateTopic writes initialized messages for topic in rocksdb
func (rmq *rocksmq) CreateTopic(topicName string) error {
if rmq.isClosed() {
return errors.New(RmqNotServingErrMsg)
}
start := time.Now()
beginKey := topicName + "/begin_id"
endKey := topicName + "/end_id"
@ -353,6 +375,9 @@ func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Cons
// CreateConsumerGroup creates an nonexistent consumer group for topic
func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
if rmq.isClosed() {
return errors.New(RmqNotServingErrMsg)
}
start := time.Now()
key := constructCurrentID(topicName, groupName)
if rmq.checkKeyExist(key) {
@ -371,6 +396,9 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
// RegisterConsumer registers a consumer in rocksmq consumers
func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) {
if rmq.isClosed() {
return
}
start := time.Now()
if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
for _, v := range vals.([]*Consumer) {
@ -427,6 +455,9 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
// Produce produces messages for topic and updates page infos for retention
func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) {
if rmq.isClosed() {
return nil, errors.New(RmqNotServingErrMsg)
}
start := time.Now()
ll, ok := topicMu.Load(topicName)
if !ok {
@ -578,6 +609,9 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
// 2. Update current_id to the last consumed message
// 3. Update ack informations in rocksdb
func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) {
if rmq.isClosed() {
return nil, errors.New(RmqNotServingErrMsg)
}
start := time.Now()
ll, ok := topicMu.Load(topicName)
if !ok {
@ -719,6 +753,9 @@ func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID Uni
// Seek updates the current id to the given msgID
func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) error {
if rmq.isClosed() {
return errors.New(RmqNotServingErrMsg)
}
/* Step I: Check if key exists */
ll, ok := topicMu.Load(topicName)
if !ok {
@ -736,6 +773,9 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
// SeekToLatest updates current id to the msg id of latest message + 1
func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
if rmq.isClosed() {
return errors.New(RmqNotServingErrMsg)
}
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
key := constructCurrentID(topicName, groupName)
@ -933,6 +973,9 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
}
func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) {
if rmq.isClosed() {
return "", errors.New(RmqNotServingErrMsg)
}
readOpts := gorocksdb.NewDefaultReadOptions()
readOpts.SetPrefixSameAsStart(true)
iter := rmq.store.NewIterator(readOpts)
@ -986,6 +1029,9 @@ func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader {
}
func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) {
if rmq.isClosed() {
return
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
@ -995,6 +1041,9 @@ func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID Unique
}
func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) {
if rmq.isClosed() {
return ConsumerMessage{}, errors.New(RmqNotServingErrMsg)
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName)
@ -1003,6 +1052,9 @@ func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName strin
}
func (rmq *rocksmq) HasNext(topicName string, readerName string, messageIDInclusive bool) bool {
if rmq.isClosed() {
return false
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
@ -1012,6 +1064,9 @@ func (rmq *rocksmq) HasNext(topicName string, readerName string, messageIDInclus
}
func (rmq *rocksmq) CloseReader(topicName string, readerName string) {
if rmq.isClosed() {
return
}
reader := rmq.getReader(topicName, readerName)
if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))

View File

@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -766,3 +767,40 @@ func TestRocksmq_Reader(t *testing.T) {
}
assert.False(t, rmq.HasNext(channelName, readerName, false))
}
func TestRocksmq_Close(t *testing.T) {
ep := etcdEndpoints()
etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root")
assert.Nil(t, err)
defer etcdKV.Close()
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_close"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
defer rmq.Close()
atomic.StoreInt64(&rmq.state, RmqStateStopped)
assert.Error(t, rmq.CreateTopic(""))
assert.Error(t, rmq.CreateConsumerGroup("", ""))
rmq.RegisterConsumer(&Consumer{})
_, err = rmq.Produce("", nil)
assert.Error(t, err)
_, err = rmq.Consume("", "", 0)
assert.Error(t, err)
assert.Error(t, rmq.seek("", "", 0))
assert.Error(t, rmq.SeekToLatest("", ""))
_, err = rmq.CreateReader("", 0, false, "")
assert.Error(t, err)
rmq.ReaderSeek("", "", 0)
_, err = rmq.Next(nil, "", "", false)
assert.Error(t, err)
rmq.HasNext("", "", false)
rmq.CloseReader("", "")
}