mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Change the key to make read faster in rocksmq (#26404)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
94846995bf
commit
eae02de8bd
@ -30,6 +30,17 @@ func NewRocksIterator(db *gorocksdb.DB, opts *gorocksdb.ReadOptions) *RocksItera
|
||||
return it
|
||||
}
|
||||
|
||||
func NewRocksIteratorCF(db *gorocksdb.DB, cf *gorocksdb.ColumnFamilyHandle, opts *gorocksdb.ReadOptions) *RocksIterator {
|
||||
iter := db.NewIteratorCF(opts, cf)
|
||||
it := &RocksIterator{iter, nil, false}
|
||||
runtime.SetFinalizer(it, func(rocksit *RocksIterator) {
|
||||
if !rocksit.close {
|
||||
log.Error("iterator is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return it
|
||||
}
|
||||
|
||||
func NewRocksIteratorWithUpperBound(db *gorocksdb.DB, upperBoundString string, opts *gorocksdb.ReadOptions) *RocksIterator {
|
||||
upperBound := []byte(upperBoundString)
|
||||
opts.SetIterateUpperBound(upperBound)
|
||||
@ -43,6 +54,19 @@ func NewRocksIteratorWithUpperBound(db *gorocksdb.DB, upperBoundString string, o
|
||||
return it
|
||||
}
|
||||
|
||||
func NewRocksIteratorCFWithUpperBound(db *gorocksdb.DB, cf *gorocksdb.ColumnFamilyHandle, upperBoundString string, opts *gorocksdb.ReadOptions) *RocksIterator {
|
||||
upperBound := []byte(upperBoundString)
|
||||
opts.SetIterateUpperBound(upperBound)
|
||||
iter := db.NewIteratorCF(opts, cf)
|
||||
it := &RocksIterator{iter, upperBound, false}
|
||||
runtime.SetFinalizer(it, func(rocksit *RocksIterator) {
|
||||
if !rocksit.close {
|
||||
log.Error("iteratorCF is leaking.. please check")
|
||||
}
|
||||
})
|
||||
return it
|
||||
}
|
||||
|
||||
// Valid returns false only when an Iterator has iterated past either the
|
||||
// first or the last key in the database.
|
||||
func (iter *RocksIterator) Valid() bool {
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
@ -118,6 +117,7 @@ var topicMu = sync.Map{}
|
||||
|
||||
type rocksmq struct {
|
||||
store *gorocksdb.DB
|
||||
cfh []*gorocksdb.ColumnFamilyHandle
|
||||
kv kv.BaseKV
|
||||
idAllocator allocator.Interface
|
||||
storeMu *sync.Mutex
|
||||
@ -222,8 +222,13 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
|
||||
optsStore.IncreaseParallelism(parallelism)
|
||||
// enable back ground flush
|
||||
optsStore.SetMaxBackgroundFlushes(1)
|
||||
// use properties as the column families to store trace id
|
||||
optsStore.SetCreateIfMissingColumnFamilies(true)
|
||||
|
||||
db, err := gorocksdb.OpenDb(optsStore, name)
|
||||
// db, err := gorocksdb.OpenDb(opts, name)
|
||||
// use properties as the column families to store trace id
|
||||
giveColumnFamilies := []string{"default", "properties"}
|
||||
db, cfHandles, err := gorocksdb.OpenDbColumnFamilies(optsStore, name, giveColumnFamilies, []*gorocksdb.Options{optsStore, optsStore})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -243,6 +248,7 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error)
|
||||
|
||||
rmq := &rocksmq{
|
||||
store: db,
|
||||
cfh: cfHandles,
|
||||
kv: kv,
|
||||
idAllocator: mqIDAllocator,
|
||||
storeMu: &sync.Mutex{},
|
||||
@ -634,17 +640,17 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
|
||||
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
|
||||
msgID := idStart + UniqueID(i)
|
||||
key := path.Join(topicName, strconv.FormatInt(msgID, 10))
|
||||
batch.Put([]byte(key), messages[i].Payload)
|
||||
properties, err := json.Marshal(messages[i].Properties)
|
||||
if err != nil {
|
||||
log.Warn("properties marshal failed",
|
||||
zap.Int64("msgID", msgID),
|
||||
zap.String("topicName", topicName),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
batch.PutCF(rmq.cfh[0], []byte(key), messages[i].Payload)
|
||||
// batch.Put([]byte(key), messages[i].Payload)
|
||||
if messages[i].Properties != nil {
|
||||
properties, err := json.Marshal(messages[i].Properties)
|
||||
if err != nil {
|
||||
log.Warn("properties marshal failed", zap.Int64("msgID", msgID), zap.String("topicName", topicName),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
batch.PutCF(rmq.cfh[1], []byte(key), properties)
|
||||
}
|
||||
pKey := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10))
|
||||
batch.Put([]byte(pKey), properties)
|
||||
msgIDs[i] = msgID
|
||||
msgSizes[msgID] = int64(len(messages[i].Payload))
|
||||
}
|
||||
@ -777,8 +783,10 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
prefix := topicName + "/"
|
||||
iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(prefix), readOpts)
|
||||
iter := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[0], typeutil.AddOne(prefix), readOpts)
|
||||
iterProperty := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[1], typeutil.AddOne(prefix), readOpts)
|
||||
defer iter.Close()
|
||||
defer iterProperty.Close()
|
||||
|
||||
var dataKey string
|
||||
if currentID == DefaultMessageID {
|
||||
@ -787,30 +795,39 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
||||
dataKey = path.Join(topicName, strconv.FormatInt(currentID, 10))
|
||||
}
|
||||
iter.Seek([]byte(dataKey))
|
||||
iterProperty.Seek([]byte(dataKey))
|
||||
|
||||
consumerMessage := make([]ConsumerMessage, 0, n)
|
||||
offset := 0
|
||||
|
||||
for ; iter.Valid() && offset < n; iter.Next() {
|
||||
key := iter.Key()
|
||||
val := iter.Value()
|
||||
strKey := string(key.Data())
|
||||
key.Free()
|
||||
offset++
|
||||
properties := make(map[string]string)
|
||||
var propertiesValue []byte
|
||||
|
||||
msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64)
|
||||
if err != nil {
|
||||
val.Free()
|
||||
return nil, err
|
||||
}
|
||||
askedProperties := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10))
|
||||
opts := gorocksdb.NewDefaultReadOptions()
|
||||
defer opts.Destroy()
|
||||
propertiesValue, err := rmq.store.GetBytes(opts, []byte(askedProperties))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
offset++
|
||||
|
||||
if iterProperty.Valid() && string(iterProperty.Key().Data()) == string(iter.Key().Data()) {
|
||||
// the key of properties is the same with the key of payload
|
||||
// to prevent mix message with or without property column family
|
||||
propertiesValue = iterProperty.Value().Data()
|
||||
iterProperty.Next()
|
||||
}
|
||||
properties := make(map[string]string)
|
||||
|
||||
// between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload
|
||||
// will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3
|
||||
|
||||
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
|
||||
// when produce before 2.2.0, but consume after 2.2.0, propertiesValue will be []
|
||||
if len(propertiesValue) != 0 {
|
||||
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
|
||||
// when produce before 2.2.0, but consume in 2.2.0, propertiesValue will be []
|
||||
if err = json.Unmarshal(propertiesValue, &properties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1008,11 +1025,11 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
|
||||
func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) {
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
iter := rocksdbkv.NewRocksIterator(rmq.store, readOpts)
|
||||
iter := rocksdbkv.NewRocksIteratorCF(rmq.store, rmq.cfh[0], readOpts)
|
||||
defer iter.Close()
|
||||
|
||||
prefix := topicName + "/"
|
||||
// seek to the last message of thie topic
|
||||
// seek to the last message of the topic
|
||||
iter.SeekForPrev([]byte(typeutil.AddOne(prefix)))
|
||||
|
||||
// if iterate fail
|
||||
@ -1037,6 +1054,7 @@ func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) {
|
||||
}
|
||||
|
||||
msgID, err := strconv.ParseInt(seekMsgID[len(topicName)+1:], 10, 64)
|
||||
|
||||
if err != nil {
|
||||
return DefaultMessageID, err
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
@ -49,7 +50,7 @@ func TestMain(m *testing.M) {
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
type producerMessageBefore struct {
|
||||
type producerMessageBefore2 struct {
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
@ -81,7 +82,7 @@ func etcdEndpoints() []string {
|
||||
}
|
||||
|
||||
// to test compatibility concern
|
||||
func (rmq *rocksmq) produceBefore(topicName string, messages []producerMessageBefore) ([]UniqueID, error) {
|
||||
func (rmq *rocksmq) produceBefore2(topicName string, messages []producerMessageBefore2) ([]UniqueID, error) {
|
||||
if rmq.isClosed() {
|
||||
return nil, errors.New(RmqNotServingErrMsg)
|
||||
}
|
||||
@ -161,6 +162,99 @@ func (rmq *rocksmq) produceBefore(topicName string, messages []producerMessageBe
|
||||
return msgIDs, nil
|
||||
}
|
||||
|
||||
// to test compatibility concern
|
||||
func (rmq *rocksmq) produceIn2(topicName string, messages []ProducerMessage) ([]UniqueID, error) {
|
||||
if rmq.isClosed() {
|
||||
return nil, errors.New(RmqNotServingErrMsg)
|
||||
}
|
||||
start := time.Now()
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
getLockTime := time.Since(start).Milliseconds()
|
||||
|
||||
msgLen := len(messages)
|
||||
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
|
||||
|
||||
if err != nil {
|
||||
return []UniqueID{}, err
|
||||
}
|
||||
allocTime := time.Since(start).Milliseconds()
|
||||
if UniqueID(msgLen) != idEnd-idStart {
|
||||
return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
|
||||
}
|
||||
|
||||
// Insert data to store system
|
||||
batch := gorocksdb.NewWriteBatch()
|
||||
defer batch.Destroy()
|
||||
msgSizes := make(map[UniqueID]int64)
|
||||
msgIDs := make([]UniqueID, msgLen)
|
||||
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
|
||||
msgID := idStart + UniqueID(i)
|
||||
key := path.Join(topicName, strconv.FormatInt(msgID, 10))
|
||||
batch.Put([]byte(key), messages[i].Payload)
|
||||
properties, err := json.Marshal(messages[i].Properties)
|
||||
if err != nil {
|
||||
log.Warn("properties marshal failed",
|
||||
zap.Int64("msgID", msgID),
|
||||
zap.String("topicName", topicName),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
pKey := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10))
|
||||
batch.Put([]byte(pKey), properties)
|
||||
msgIDs[i] = msgID
|
||||
msgSizes[msgID] = int64(len(messages[i].Payload))
|
||||
}
|
||||
|
||||
opts := gorocksdb.NewDefaultWriteOptions()
|
||||
defer opts.Destroy()
|
||||
err = rmq.store.Write(opts, batch)
|
||||
if err != nil {
|
||||
return []UniqueID{}, err
|
||||
}
|
||||
writeTime := time.Since(start).Milliseconds()
|
||||
if vals, ok := rmq.consumers.Load(topicName); ok {
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
select {
|
||||
case v.MsgMutex <- struct{}{}:
|
||||
continue
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update message page info
|
||||
err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
|
||||
if err != nil {
|
||||
return []UniqueID{}, err
|
||||
}
|
||||
|
||||
// TODO add this to monitor metrics
|
||||
getProduceTime := time.Since(start).Milliseconds()
|
||||
if getProduceTime > 200 {
|
||||
log.Warn("rocksmq produce too slowly", zap.String("topic", topicName),
|
||||
zap.Int64("get lock elapse", getLockTime),
|
||||
zap.Int64("alloc elapse", allocTime-getLockTime),
|
||||
zap.Int64("write elapse", writeTime-allocTime),
|
||||
zap.Int64("updatePage elapse", getProduceTime-writeTime),
|
||||
zap.Int64("produce total elapse", getProduceTime),
|
||||
)
|
||||
}
|
||||
|
||||
rmq.topicLastID.Store(topicName, msgIDs[len(msgIDs)-1])
|
||||
return msgIDs, nil
|
||||
}
|
||||
|
||||
func TestRocksmq_RegisterConsumer(t *testing.T) {
|
||||
suffix := "_register"
|
||||
kvPath := rmqPath + kvPathSuffix + suffix
|
||||
@ -310,12 +404,12 @@ func TestRocksmq_Compatibility(t *testing.T) {
|
||||
defer rmq.DestroyTopic(channelName)
|
||||
|
||||
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
|
||||
// it aims to test if produce before 2.2.0, but consume after 2.2.0
|
||||
// it aims to test if produce before 2.2.0, will consume after 2.2.0 successfully
|
||||
msgD := "d_message"
|
||||
tMsgs := make([]producerMessageBefore, 1)
|
||||
tMsgD := producerMessageBefore{Payload: []byte(msgD)}
|
||||
tMsgs := make([]producerMessageBefore2, 1)
|
||||
tMsgD := producerMessageBefore2{Payload: []byte(msgD)}
|
||||
tMsgs[0] = tMsgD
|
||||
_, err = rmq.produceBefore(channelName, tMsgs)
|
||||
_, err = rmq.produceBefore2(channelName, tMsgs)
|
||||
assert.NoError(t, err)
|
||||
|
||||
groupName := "test_group"
|
||||
@ -324,6 +418,9 @@ func TestRocksmq_Compatibility(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
cMsgs, err := rmq.Consume(channelName, groupName, 1)
|
||||
if err != nil {
|
||||
log.Info("test", zap.Any("err", err))
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(cMsgs), 1)
|
||||
assert.Equal(t, string(cMsgs[0].Payload), "d_message")
|
||||
@ -332,6 +429,58 @@ func TestRocksmq_Compatibility(t *testing.T) {
|
||||
// it will be set empty map if produce message has no properties field
|
||||
expect := make(map[string]string)
|
||||
assert.Equal(t, cMsgs[0].Properties, expect)
|
||||
|
||||
// between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload
|
||||
// will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3
|
||||
// after 2.3, the properties will be stored in column families
|
||||
// it aims to test if produce in 2.2.0, but consume in 2.3.0, will get properties successfully
|
||||
msg1 := "1_message"
|
||||
tMsgs1 := make([]ProducerMessage, 1)
|
||||
properties := make(map[string]string)
|
||||
properties[common.TraceIDKey] = "1"
|
||||
tMsg1 := ProducerMessage{Payload: []byte(msg1), Properties: properties}
|
||||
tMsgs1[0] = tMsg1
|
||||
_, err = rmq.produceIn2(channelName, tMsgs1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
msg2, err := rmq.Consume(channelName, groupName, 1)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg2), 1)
|
||||
assert.Equal(t, string(msg2[0].Payload), "1_message")
|
||||
_, ok = msg2[0].Properties[common.TraceIDKey]
|
||||
assert.False(t, ok)
|
||||
// will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3
|
||||
expect = make(map[string]string)
|
||||
assert.Equal(t, cMsgs[0].Properties, expect)
|
||||
|
||||
// between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload
|
||||
// after 2.3, the properties will be stored in column families
|
||||
// it aims to test the mixed message before 2.3.0 and after 2.3.0, will get properties successfully
|
||||
msg3 := "3_message"
|
||||
tMsgs3 := make([]ProducerMessage, 2)
|
||||
properties3 := make(map[string]string)
|
||||
properties3[common.TraceIDKey] = "3"
|
||||
tMsg3 := ProducerMessage{Payload: []byte(msg3), Properties: properties3}
|
||||
tMsgs3[0] = tMsg3
|
||||
msg4 := "4_message"
|
||||
tMsg4 := ProducerMessage{Payload: []byte(msg4)}
|
||||
tMsgs3[1] = tMsg4
|
||||
_, err = rmq.Produce(channelName, tMsgs3)
|
||||
assert.NoError(t, err)
|
||||
|
||||
msg5, err := rmq.Consume(channelName, groupName, 2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg5), 2)
|
||||
assert.Equal(t, string(msg5[0].Payload), "3_message")
|
||||
_, ok = msg5[0].Properties[common.TraceIDKey]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, msg5[0].Properties, properties3)
|
||||
assert.Equal(t, string(msg5[1].Payload), "4_message")
|
||||
_, ok = msg5[1].Properties[common.TraceIDKey]
|
||||
assert.False(t, ok)
|
||||
// it will be set empty map if produce message has no properties field
|
||||
expect = make(map[string]string)
|
||||
assert.Equal(t, msg5[1].Properties, expect)
|
||||
}
|
||||
|
||||
func TestRocksmq_MultiConsumer(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user