milvus/internal/util/rocksmq/global_rmq.go
zhenshan.cao 0679954922 Refactor tso and global id allocator
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
2021-02-24 17:12:06 +08:00

61 lines
1.2 KiB
Go

package rocksmq
import (
"os"
"sync"
"github.com/zilliztech/milvus-distributed/internal/allocator"
rocksdbkv "github.com/zilliztech/milvus-distributed/internal/kv/rocksdb"
)
var Rmq *RocksMQ
var once sync.Once
type Consumer struct {
GroupName string
ChannelName string
MsgNum chan int
}
func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error {
var err error
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
return err
}
func InitRocksMQ(rocksdbName string) error {
var err error
once.Do(func() {
kvname := rocksdbName + "_kv"
if _, err := os.Stat(kvname); !os.IsNotExist(err) {
_ = os.RemoveAll(kvname)
}
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvname)
if err != nil {
panic(err)
}
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
if _, err := os.Stat(rocksdbName); !os.IsNotExist(err) {
_ = os.RemoveAll(rocksdbName)
}
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
if err != nil {
panic(err)
}
})
return err
}
func CloseRocksMQ() {
if Rmq != nil && Rmq.store != nil {
Rmq.store.Close()
rocksdbName := Rmq.store.Name()
_ = os.RemoveAll(rocksdbName)
kvname := rocksdbName + "_kv"
os.RemoveAll(kvname)
}
}