mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 12:29:36 +08:00
Add log for rocksmq (#8889)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
445f5426d1
commit
fdbfa62b27
@ -162,7 +162,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
|
||||
rmq.retentionInfo = ri
|
||||
|
||||
rmq.retentionInfo.startRetentionInfo()
|
||||
|
||||
log.Debug("Rocksmq start successfully ", zap.String("name", name))
|
||||
return rmq, nil
|
||||
}
|
||||
|
||||
@ -202,15 +202,16 @@ func (rmq *rocksmq) checkKeyExist(key string) bool {
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) CreateTopic(topicName string) error {
|
||||
start := time.Now()
|
||||
beginKey := topicName + "/begin_id"
|
||||
endKey := topicName + "/end_id"
|
||||
|
||||
// Check if topic exist
|
||||
if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) {
|
||||
log.Debug("RocksMQ: " + beginKey + " or " + endKey + " existed.")
|
||||
log.Warn("RocksMQ: " + beginKey + " or " + endKey + " existed.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO change rmq kv save logic into a batch
|
||||
err := rmq.kv.Save(beginKey, "0")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -265,11 +266,12 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
||||
rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{
|
||||
ackedTs: map[UniqueID]int64{},
|
||||
})
|
||||
log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
||||
log.Debug("In DestroyTopic")
|
||||
start := time.Now()
|
||||
beginKey := topicName + "/begin_id"
|
||||
endKey := topicName + "/end_id"
|
||||
|
||||
@ -312,7 +314,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
||||
rmq.retentionInfo.ackedInfo.Delete(topicName)
|
||||
rmq.retentionInfo.lastRetentionTime.Delete(topicName)
|
||||
rmq.retentionInfo.pageInfo.Delete(topicName)
|
||||
|
||||
log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -331,6 +333,7 @@ func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Cons
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
|
||||
start := time.Now()
|
||||
key := constructCurrentID(topicName, groupName)
|
||||
if rmq.checkKeyExist(key) {
|
||||
log.Debug("RocksMQ: " + key + " existed.")
|
||||
@ -340,11 +343,14 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug("Rocksmq create consumer group successfully ", zap.String("topic", topicName),
|
||||
zap.String("group", groupName),
|
||||
zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) {
|
||||
start := time.Now()
|
||||
if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
if v.GroupName == consumer.GroupName {
|
||||
@ -359,9 +365,11 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) {
|
||||
consumers[0] = consumer
|
||||
rmq.consumers.Store(consumer.Topic, consumers)
|
||||
}
|
||||
log.Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
||||
start := time.Now()
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic name = %s not exist", topicName)
|
||||
@ -389,7 +397,9 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("Rocksmq destroy consumer group successfully ", zap.String("topic", topicName),
|
||||
zap.String("group", groupName),
|
||||
zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -479,14 +489,14 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
|
||||
|
||||
// Update message page info
|
||||
// TODO(yukun): Should this be in a go routine
|
||||
err = rmq.UpdatePageInfo(topicName, msgIDs, msgSizes)
|
||||
err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
|
||||
if err != nil {
|
||||
return []UniqueID{}, err
|
||||
}
|
||||
return msgIDs, nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) UpdatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {
|
||||
func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {
|
||||
msgSizeKey := MessageSizeTitle + topicName
|
||||
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
|
||||
if err != nil {
|
||||
@ -622,7 +632,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
||||
}
|
||||
|
||||
msgSize := len(consumerMessage[len(consumerMessage)-1].Payload)
|
||||
go rmq.UpdateAckedInfo(topicName, groupName, newID, int64(msgSize))
|
||||
go rmq.updateAckedInfo(topicName, groupName, newID, int64(msgSize))
|
||||
|
||||
return consumerMessage, nil
|
||||
}
|
||||
@ -633,13 +643,13 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
|
||||
defer rmq.storeMu.Unlock()
|
||||
key := constructCurrentID(topicName, groupName)
|
||||
if !rmq.checkKeyExist(key) {
|
||||
log.Debug("RocksMQ: channel " + key + " not exists")
|
||||
log.Warn("RocksMQ: channel " + key + " not exists")
|
||||
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
|
||||
}
|
||||
|
||||
storeKey, err := combKey(topicName, msgID)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: combKey(" + topicName + "," + strconv.FormatInt(msgID, 10) + ") failed")
|
||||
log.Warn("RocksMQ: combKey(" + topicName + "," + strconv.FormatInt(msgID, 10) + ") failed")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -648,14 +658,14 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
|
||||
val, err := rmq.store.Get(opts, []byte(storeKey))
|
||||
defer val.Free()
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: get " + storeKey + " failed")
|
||||
log.Warn("RocksMQ: get " + storeKey + " failed")
|
||||
return err
|
||||
}
|
||||
|
||||
/* Step II: Save current_id in kv */
|
||||
err = rmq.kv.Save(key, strconv.FormatInt(msgID, 10))
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: save " + key + " failed")
|
||||
log.Warn("RocksMQ: save " + key + " failed")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -708,7 +718,7 @@ func (rmq *rocksmq) Notify(topicName, groupName string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error {
|
||||
func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error {
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic name = %s not exist", topicName)
|
||||
|
Loading…
Reference in New Issue
Block a user