Fix rocksmq retention no trigger at datacoord timetick channel (#24170)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2023-05-19 16:23:24 +08:00 committed by GitHub
parent 3a66e1de65
commit 9729b6079d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 632 additions and 11 deletions

View File

@ -74,6 +74,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
if reflect.ValueOf(c.server).IsNil() {
return nil, newError(0, "Rmq server is nil")
}
exist, con, err := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err
@ -103,12 +104,6 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
return nil, err
}
if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest {
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err
}
}
// Register self in rocksmq server
cons := &server.Consumer{
Topic: consumer.topic,
@ -117,6 +112,13 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
}
c.server.RegisterConsumer(cons)
if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest {
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
if err != nil {
return nil, err
}
}
// Take messages from RocksDB and put it into consumer.Chan(),
// trigger by consumer.MsgMutex which trigger by producer
c.consumerOptions = append(c.consumerOptions, options)

View File

@ -12,12 +12,15 @@
package client
import (
"fmt"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -135,6 +138,29 @@ func TestClient_Subscribe(t *testing.T) {
assert.NoError(t, err)
}
func TestClient_SubscribeError(t *testing.T) {
mockMQ := server.NewMockRocksMQ(t)
client, err := NewClient(Options{
Server: mockMQ,
})
testTopic := newTopicName()
testGroupName := newConsumerName()
assert.NoError(t, err)
mockMQ.EXPECT().ExistConsumerGroup(testTopic, testGroupName).Return(false, nil, nil)
mockMQ.EXPECT().CreateConsumerGroup(testTopic, testGroupName).Return(nil)
mockMQ.EXPECT().RegisterConsumer(mock.Anything).Return(nil)
mockMQ.EXPECT().SeekToLatest(testTopic, testGroupName).Return(fmt.Errorf("test error"))
consumer, err := client.Subscribe(ConsumerOptions{
Topic: testTopic,
SubscriptionName: testGroupName,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionLatest,
})
assert.Error(t, err)
assert.Nil(t, consumer)
}
func TestClient_SeekLatest(t *testing.T) {
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/seekLatest"

View File

@ -0,0 +1,583 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
package server
import mock "github.com/stretchr/testify/mock"
// MockRocksMQ is an autogenerated mock type for the RocksMQ type
type MockRocksMQ struct {
mock.Mock
}
type MockRocksMQ_Expecter struct {
mock *mock.Mock
}
func (_m *MockRocksMQ) EXPECT() *MockRocksMQ_Expecter {
return &MockRocksMQ_Expecter{mock: &_m.Mock}
}
// CheckTopicValid provides a mock function with given fields: topicName
func (_m *MockRocksMQ) CheckTopicValid(topicName string) error {
ret := _m.Called(topicName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(topicName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_CheckTopicValid_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckTopicValid'
type MockRocksMQ_CheckTopicValid_Call struct {
*mock.Call
}
// CheckTopicValid is a helper method to define mock.On call
// - topicName string
func (_e *MockRocksMQ_Expecter) CheckTopicValid(topicName interface{}) *MockRocksMQ_CheckTopicValid_Call {
return &MockRocksMQ_CheckTopicValid_Call{Call: _e.mock.On("CheckTopicValid", topicName)}
}
func (_c *MockRocksMQ_CheckTopicValid_Call) Run(run func(topicName string)) *MockRocksMQ_CheckTopicValid_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockRocksMQ_CheckTopicValid_Call) Return(_a0 error) *MockRocksMQ_CheckTopicValid_Call {
_c.Call.Return(_a0)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockRocksMQ) Close() {
_m.Called()
}
// MockRocksMQ_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockRocksMQ_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockRocksMQ_Expecter) Close() *MockRocksMQ_Close_Call {
return &MockRocksMQ_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockRocksMQ_Close_Call) Run(run func()) *MockRocksMQ_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockRocksMQ_Close_Call) Return() *MockRocksMQ_Close_Call {
_c.Call.Return()
return _c
}
// Consume provides a mock function with given fields: topicName, groupName, n
func (_m *MockRocksMQ) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) {
ret := _m.Called(topicName, groupName, n)
var r0 []ConsumerMessage
if rf, ok := ret.Get(0).(func(string, string, int) []ConsumerMessage); ok {
r0 = rf(topicName, groupName, n)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]ConsumerMessage)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, string, int) error); ok {
r1 = rf(topicName, groupName, n)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockRocksMQ_Consume_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Consume'
type MockRocksMQ_Consume_Call struct {
*mock.Call
}
// Consume is a helper method to define mock.On call
// - topicName string
// - groupName string
// - n int
func (_e *MockRocksMQ_Expecter) Consume(topicName interface{}, groupName interface{}, n interface{}) *MockRocksMQ_Consume_Call {
return &MockRocksMQ_Consume_Call{Call: _e.mock.On("Consume", topicName, groupName, n)}
}
func (_c *MockRocksMQ_Consume_Call) Run(run func(topicName string, groupName string, n int)) *MockRocksMQ_Consume_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string), args[2].(int))
})
return _c
}
func (_c *MockRocksMQ_Consume_Call) Return(_a0 []ConsumerMessage, _a1 error) *MockRocksMQ_Consume_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// CreateConsumerGroup provides a mock function with given fields: topicName, groupName
func (_m *MockRocksMQ) CreateConsumerGroup(topicName string, groupName string) error {
ret := _m.Called(topicName, groupName)
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(topicName, groupName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_CreateConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateConsumerGroup'
type MockRocksMQ_CreateConsumerGroup_Call struct {
*mock.Call
}
// CreateConsumerGroup is a helper method to define mock.On call
// - topicName string
// - groupName string
func (_e *MockRocksMQ_Expecter) CreateConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_CreateConsumerGroup_Call {
return &MockRocksMQ_CreateConsumerGroup_Call{Call: _e.mock.On("CreateConsumerGroup", topicName, groupName)}
}
func (_c *MockRocksMQ_CreateConsumerGroup_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_CreateConsumerGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *MockRocksMQ_CreateConsumerGroup_Call) Return(_a0 error) *MockRocksMQ_CreateConsumerGroup_Call {
_c.Call.Return(_a0)
return _c
}
// CreateTopic provides a mock function with given fields: topicName
func (_m *MockRocksMQ) CreateTopic(topicName string) error {
ret := _m.Called(topicName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(topicName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_CreateTopic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateTopic'
type MockRocksMQ_CreateTopic_Call struct {
*mock.Call
}
// CreateTopic is a helper method to define mock.On call
// - topicName string
func (_e *MockRocksMQ_Expecter) CreateTopic(topicName interface{}) *MockRocksMQ_CreateTopic_Call {
return &MockRocksMQ_CreateTopic_Call{Call: _e.mock.On("CreateTopic", topicName)}
}
func (_c *MockRocksMQ_CreateTopic_Call) Run(run func(topicName string)) *MockRocksMQ_CreateTopic_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockRocksMQ_CreateTopic_Call) Return(_a0 error) *MockRocksMQ_CreateTopic_Call {
_c.Call.Return(_a0)
return _c
}
// DestroyConsumerGroup provides a mock function with given fields: topicName, groupName
func (_m *MockRocksMQ) DestroyConsumerGroup(topicName string, groupName string) error {
ret := _m.Called(topicName, groupName)
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(topicName, groupName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_DestroyConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DestroyConsumerGroup'
type MockRocksMQ_DestroyConsumerGroup_Call struct {
*mock.Call
}
// DestroyConsumerGroup is a helper method to define mock.On call
// - topicName string
// - groupName string
func (_e *MockRocksMQ_Expecter) DestroyConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_DestroyConsumerGroup_Call {
return &MockRocksMQ_DestroyConsumerGroup_Call{Call: _e.mock.On("DestroyConsumerGroup", topicName, groupName)}
}
func (_c *MockRocksMQ_DestroyConsumerGroup_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_DestroyConsumerGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *MockRocksMQ_DestroyConsumerGroup_Call) Return(_a0 error) *MockRocksMQ_DestroyConsumerGroup_Call {
_c.Call.Return(_a0)
return _c
}
// DestroyTopic provides a mock function with given fields: topicName
func (_m *MockRocksMQ) DestroyTopic(topicName string) error {
ret := _m.Called(topicName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(topicName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_DestroyTopic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DestroyTopic'
type MockRocksMQ_DestroyTopic_Call struct {
*mock.Call
}
// DestroyTopic is a helper method to define mock.On call
// - topicName string
func (_e *MockRocksMQ_Expecter) DestroyTopic(topicName interface{}) *MockRocksMQ_DestroyTopic_Call {
return &MockRocksMQ_DestroyTopic_Call{Call: _e.mock.On("DestroyTopic", topicName)}
}
func (_c *MockRocksMQ_DestroyTopic_Call) Run(run func(topicName string)) *MockRocksMQ_DestroyTopic_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockRocksMQ_DestroyTopic_Call) Return(_a0 error) *MockRocksMQ_DestroyTopic_Call {
_c.Call.Return(_a0)
return _c
}
// ExistConsumerGroup provides a mock function with given fields: topicName, groupName
func (_m *MockRocksMQ) ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer, error) {
ret := _m.Called(topicName, groupName)
var r0 bool
if rf, ok := ret.Get(0).(func(string, string) bool); ok {
r0 = rf(topicName, groupName)
} else {
r0 = ret.Get(0).(bool)
}
var r1 *Consumer
if rf, ok := ret.Get(1).(func(string, string) *Consumer); ok {
r1 = rf(topicName, groupName)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*Consumer)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(string, string) error); ok {
r2 = rf(topicName, groupName)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockRocksMQ_ExistConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExistConsumerGroup'
type MockRocksMQ_ExistConsumerGroup_Call struct {
*mock.Call
}
// ExistConsumerGroup is a helper method to define mock.On call
// - topicName string
// - groupName string
func (_e *MockRocksMQ_Expecter) ExistConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_ExistConsumerGroup_Call {
return &MockRocksMQ_ExistConsumerGroup_Call{Call: _e.mock.On("ExistConsumerGroup", topicName, groupName)}
}
func (_c *MockRocksMQ_ExistConsumerGroup_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_ExistConsumerGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *MockRocksMQ_ExistConsumerGroup_Call) Return(_a0 bool, _a1 *Consumer, _a2 error) *MockRocksMQ_ExistConsumerGroup_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
// GetLatestMsg provides a mock function with given fields: topicName
func (_m *MockRocksMQ) GetLatestMsg(topicName string) (int64, error) {
ret := _m.Called(topicName)
var r0 int64
if rf, ok := ret.Get(0).(func(string) int64); ok {
r0 = rf(topicName)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(topicName)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockRocksMQ_GetLatestMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMsg'
type MockRocksMQ_GetLatestMsg_Call struct {
*mock.Call
}
// GetLatestMsg is a helper method to define mock.On call
// - topicName string
func (_e *MockRocksMQ_Expecter) GetLatestMsg(topicName interface{}) *MockRocksMQ_GetLatestMsg_Call {
return &MockRocksMQ_GetLatestMsg_Call{Call: _e.mock.On("GetLatestMsg", topicName)}
}
func (_c *MockRocksMQ_GetLatestMsg_Call) Run(run func(topicName string)) *MockRocksMQ_GetLatestMsg_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockRocksMQ_GetLatestMsg_Call) Return(_a0 int64, _a1 error) *MockRocksMQ_GetLatestMsg_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// Notify provides a mock function with given fields: topicName, groupName
func (_m *MockRocksMQ) Notify(topicName string, groupName string) {
_m.Called(topicName, groupName)
}
// MockRocksMQ_Notify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notify'
type MockRocksMQ_Notify_Call struct {
*mock.Call
}
// Notify is a helper method to define mock.On call
// - topicName string
// - groupName string
func (_e *MockRocksMQ_Expecter) Notify(topicName interface{}, groupName interface{}) *MockRocksMQ_Notify_Call {
return &MockRocksMQ_Notify_Call{Call: _e.mock.On("Notify", topicName, groupName)}
}
func (_c *MockRocksMQ_Notify_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_Notify_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *MockRocksMQ_Notify_Call) Return() *MockRocksMQ_Notify_Call {
_c.Call.Return()
return _c
}
// Produce provides a mock function with given fields: topicName, messages
func (_m *MockRocksMQ) Produce(topicName string, messages []ProducerMessage) ([]int64, error) {
ret := _m.Called(topicName, messages)
var r0 []int64
if rf, ok := ret.Get(0).(func(string, []ProducerMessage) []int64); ok {
r0 = rf(topicName, messages)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, []ProducerMessage) error); ok {
r1 = rf(topicName, messages)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockRocksMQ_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'
type MockRocksMQ_Produce_Call struct {
*mock.Call
}
// Produce is a helper method to define mock.On call
// - topicName string
// - messages []ProducerMessage
func (_e *MockRocksMQ_Expecter) Produce(topicName interface{}, messages interface{}) *MockRocksMQ_Produce_Call {
return &MockRocksMQ_Produce_Call{Call: _e.mock.On("Produce", topicName, messages)}
}
func (_c *MockRocksMQ_Produce_Call) Run(run func(topicName string, messages []ProducerMessage)) *MockRocksMQ_Produce_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].([]ProducerMessage))
})
return _c
}
func (_c *MockRocksMQ_Produce_Call) Return(_a0 []int64, _a1 error) *MockRocksMQ_Produce_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// RegisterConsumer provides a mock function with given fields: consumer
func (_m *MockRocksMQ) RegisterConsumer(consumer *Consumer) error {
ret := _m.Called(consumer)
var r0 error
if rf, ok := ret.Get(0).(func(*Consumer) error); ok {
r0 = rf(consumer)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_RegisterConsumer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterConsumer'
type MockRocksMQ_RegisterConsumer_Call struct {
*mock.Call
}
// RegisterConsumer is a helper method to define mock.On call
// - consumer *Consumer
func (_e *MockRocksMQ_Expecter) RegisterConsumer(consumer interface{}) *MockRocksMQ_RegisterConsumer_Call {
return &MockRocksMQ_RegisterConsumer_Call{Call: _e.mock.On("RegisterConsumer", consumer)}
}
func (_c *MockRocksMQ_RegisterConsumer_Call) Run(run func(consumer *Consumer)) *MockRocksMQ_RegisterConsumer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*Consumer))
})
return _c
}
func (_c *MockRocksMQ_RegisterConsumer_Call) Return(_a0 error) *MockRocksMQ_RegisterConsumer_Call {
_c.Call.Return(_a0)
return _c
}
// Seek provides a mock function with given fields: topicName, groupName, msgID
func (_m *MockRocksMQ) Seek(topicName string, groupName string, msgID int64) error {
ret := _m.Called(topicName, groupName, msgID)
var r0 error
if rf, ok := ret.Get(0).(func(string, string, int64) error); ok {
r0 = rf(topicName, groupName, msgID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_Seek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Seek'
type MockRocksMQ_Seek_Call struct {
*mock.Call
}
// Seek is a helper method to define mock.On call
// - topicName string
// - groupName string
// - msgID int64
func (_e *MockRocksMQ_Expecter) Seek(topicName interface{}, groupName interface{}, msgID interface{}) *MockRocksMQ_Seek_Call {
return &MockRocksMQ_Seek_Call{Call: _e.mock.On("Seek", topicName, groupName, msgID)}
}
func (_c *MockRocksMQ_Seek_Call) Run(run func(topicName string, groupName string, msgID int64)) *MockRocksMQ_Seek_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string), args[2].(int64))
})
return _c
}
func (_c *MockRocksMQ_Seek_Call) Return(_a0 error) *MockRocksMQ_Seek_Call {
_c.Call.Return(_a0)
return _c
}
// SeekToLatest provides a mock function with given fields: topicName, groupName
func (_m *MockRocksMQ) SeekToLatest(topicName string, groupName string) error {
ret := _m.Called(topicName, groupName)
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(topicName, groupName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRocksMQ_SeekToLatest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SeekToLatest'
type MockRocksMQ_SeekToLatest_Call struct {
*mock.Call
}
// SeekToLatest is a helper method to define mock.On call
// - topicName string
// - groupName string
func (_e *MockRocksMQ_Expecter) SeekToLatest(topicName interface{}, groupName interface{}) *MockRocksMQ_SeekToLatest_Call {
return &MockRocksMQ_SeekToLatest_Call{Call: _e.mock.On("SeekToLatest", topicName, groupName)}
}
func (_c *MockRocksMQ_SeekToLatest_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_SeekToLatest_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *MockRocksMQ_SeekToLatest_Call) Return(_a0 error) *MockRocksMQ_SeekToLatest_Call {
_c.Call.Return(_a0)
return _c
}
type mockConstructorTestingTNewMockRocksMQ interface {
mock.TestingT
Cleanup(func())
}
// NewMockRocksMQ creates a new instance of MockRocksMQ. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRocksMQ(t mockConstructorTestingTNewMockRocksMQ) *MockRocksMQ {
mock := &MockRocksMQ{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1040,7 +1040,8 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI
if vals, ok := rmq.consumers.Load(topicName); ok {
consumers, ok := vals.([]*Consumer)
if !ok || len(consumers) == 0 {
return nil
log.Error("update ack with no consumer", zap.String("topic", topicName))
panic("update ack with no consumer")
}
// find min id of all consumer

View File

@ -1214,15 +1214,22 @@ func TestRocksmq_updateAckedInfoErr(t *testing.T) {
consumer := &Consumer{
Topic: topicName,
GroupName: groupName + strconv.Itoa(i),
MsgMutex: make(chan struct{}),
}
//make sure consumer not in rmq.consumersID
_ = rmq.DestroyConsumerGroup(topicName, groupName)
rmq.DestroyConsumerGroup(topicName, groupName+strconv.Itoa(i))
//add consumer to rmq.consumers
rmq.RegisterConsumer(consumer)
}
// update acked for all page in rmq but some consumer not in rmq.consumers
assert.Error(t, rmq.updateAckedInfo(topicName, groupName, 0, ids[len(ids)-1]))
for i := 0; i < 2; i++ {
rmq.DestroyConsumerGroup(topicName, groupName+strconv.Itoa(i))
}
// update acked for topic without any consumer
assert.Panics(t, func() { rmq.updateAckedInfo(topicName, groupName, 0, ids[len(ids)-1]) })
}
func TestRocksmq_Info(t *testing.T) {

View File

@ -137,6 +137,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
start := time.Now()
var deletedAckedSize int64
var pageCleaned UniqueID
var lastAck int64
var pageEndID UniqueID
var err error
@ -181,6 +182,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
if err != nil {
return err
}
lastAck = ackedTs
if msgTimeExpiredCheck(ackedTs) {
pageEndID = pageID
pValue := pageIter.Value()
@ -201,9 +203,9 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
return err
}
log.Debug("Expired check by retention time", zap.Any("topic", topic),
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize),
zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", time.Since(start).Milliseconds()))
log.Info("Expired check by retention time", zap.String("topic", topic),
zap.Int64("pageEndID", pageEndID), zap.Int64("deletedAckedSize", deletedAckedSize), zap.Int64("lastAck", lastAck),
zap.Int64("pageCleaned", pageCleaned), zap.Int64("time taken", time.Since(start).Milliseconds()))
for ; pageIter.Valid(); pageIter.Next() {
pValue := pageIter.Value()