print some group info in rocksmq monitor (#19863)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2022-10-20 16:05:27 +08:00 committed by GitHub
parent 2950353e14
commit 7fe8e50689
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 1 deletions

View File

@ -222,7 +222,8 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.
// TODO add this to monitor metrics
go func() {
for {
time.Sleep(5 * time.Minute)
time.Sleep(10 * time.Minute)
log.Info("Rocksmq stats",
zap.String("cache", kv.DB.GetProperty("rocksdb.block-cache-usage")),
zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.size-all-mem-tables")),
@ -237,6 +238,7 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.
zap.String("store l3 file num", db.GetProperty("rocksdb.num-files-at-level3")),
zap.String("store l4 file num", db.GetProperty("rocksdb.num-files-at-level4")),
)
rmq.Info()
}
}()
@ -272,6 +274,30 @@ func (rmq *rocksmq) Close() {
log.Info("Successfully close rocksmq")
}
//print rmq consumer Info
func (rmq *rocksmq) Info() {
rmq.consumers.Range(func(key, vals interface{}) bool {
topic, _ := key.(string)
consumers, _ := vals.([]*Consumer)
consumersPosition := make([]UniqueID, len(consumers))
consumersName := make([]string, len(consumers))
for id, consumer := range consumers {
groupKey := constructCurrentID(consumer.Topic, consumer.GroupName)
groupPosition, ok := rmq.consumersID.Load(groupKey)
if !ok {
log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName))
continue
}
consumersPosition[id] = groupPosition.(UniqueID)
consumersName[id] = consumer.GroupName
}
log.Info("Rocksmq Info", zap.String("topic", topic), zap.Int("consumer num", len(consumers)), zap.Any("group names", consumersName), zap.Any("group positions", consumersPosition))
return true
})
}
func (rmq *rocksmq) stopRetention() {
if rmq.retentionInfo != nil {
rmq.retentionInfo.Stop()
@ -603,6 +629,7 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
// Current page is full
newPageSize := curMsgSize + msgSize
pageEndID := id
log.Info("new page", zap.String("topic", topicName), zap.Int64("pageId", pageEndID))
// Update page message size for current page. key is page end ID
pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)

View File

@ -91,6 +91,7 @@ func TestRmqRetention_Basic(t *testing.T) {
}
assert.Equal(t, len(cMsgs), msgNum)
rmq.Info()
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
// Seek to a previous consumed message, the message should be clean up