From 5e25041254dd4de26cf798316567daf2fdeb9db3 Mon Sep 17 00:00:00 2001 From: yukun Date: Fri, 24 Sep 2021 21:45:56 +0800 Subject: [PATCH] Add rocksmq comments (#8474) Signed-off-by: fishpenguin --- internal/util/rocksmq/server/rocksmq/global_rmq.go | 8 ++++++++ internal/util/rocksmq/server/rocksmq/rocksmq_impl.go | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/internal/util/rocksmq/server/rocksmq/global_rmq.go b/internal/util/rocksmq/server/rocksmq/global_rmq.go index e1b8e27904..e8c9acdc91 100644 --- a/internal/util/rocksmq/server/rocksmq/global_rmq.go +++ b/internal/util/rocksmq/server/rocksmq/global_rmq.go @@ -24,16 +24,23 @@ import ( rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" ) +// Global rocksmq instance that will be initialized only once var Rmq *rocksmq + +// once is used to init global rocksmq var once sync.Once + +// Params provide params that rocksmq needs var params paramtable.BaseTable +// InitRmq is deprecate implementation of global rocksmq. will be removed later func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error { var err error Rmq, err = NewRocksMQ(rocksdbName, idAllocator) return err } +// InitRocksMQ init global rocksmq single instance func InitRocksMQ() error { var err error once.Do(func() { @@ -68,6 +75,7 @@ func InitRocksMQ() error { return err } +// CloseRocksMQ is used to close global rocksmq func CloseRocksMQ() { log.Debug("Close Rocksmq!") if Rmq != nil && Rmq.store != nil { diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 5bac3ca972..b39bbd0fe6 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -28,10 +28,13 @@ import ( rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" ) +// UniqueID is the type of message ID type UniqueID = typeutil.UniqueID +// RocksmqPageSize is the size of a message page, default 2GB var RocksmqPageSize int64 = 2 << 30 +// Const variable that will be used in rocksmqs const ( DefaultMessageID = "-1" FixedChannelNameLen = 320 @@ -109,6 +112,9 @@ type rocksmq struct { retentionInfo *retentionInfo } +// 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname +// 2. Init retention info, load retention info to memory +// 3. Start retention goroutine func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) { bbto := gorocksdb.NewDefaultBlockBasedTableOptions() bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity))