Add complex unittest for rocksmq

Signed-off-by: yukun <kun.yu@zilliz.com>
This commit is contained in:
yukun 2021-01-19 10:22:27 +08:00 committed by yefu.chen
parent 76d92e73d1
commit b874a55c35
3 changed files with 128 additions and 1 deletions

View File

@ -348,6 +348,10 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons
return nil, err
}
if len(consumerMessage) == 0 {
return consumerMessage, nil
}
newID := consumerMessage[len(consumerMessage)-1].msgID
err = rmq.Seek(groupName, channelName, newID)
if err != nil {

View File

@ -1,6 +1,8 @@
package rocksmq
import (
"os"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
@ -23,10 +25,13 @@ func TestRocksMQ(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq"
_ = os.RemoveAll(name)
defer os.RemoveAll(name)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
@ -64,3 +69,121 @@ func TestRocksMQ(t *testing.T) {
assert.Equal(t, string(cMsgs[0].payload), "b_message")
assert.Equal(t, string(cMsgs[1].payload), "c_message")
}
func TestRocksMQ_Loop(t *testing.T) {
master.Init()
etcdAddr := master.Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_1"
_ = os.RemoveAll(name)
defer os.RemoveAll(name)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
loopNum := 100
channelName := "channel_test"
// Produce one message once
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{payload: []byte(msg)}
pMsgs := make([]ProducerMessage, 1)
pMsgs[0] = pMsg
err := rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
}
// Produce loopNum messages once
pMsgs := make([]ProducerMessage, loopNum)
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i+loopNum)
pMsg := ProducerMessage{payload: []byte(msg)}
pMsgs[i] = pMsg
}
err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
// Consume loopNum message once
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(groupName, channelName)
err = rmq.CreateConsumerGroup(groupName, channelName)
assert.Nil(t, err)
cMsgs, err := rmq.Consume(groupName, channelName, loopNum)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), loopNum)
assert.Equal(t, string(cMsgs[0].payload), "message_"+strconv.Itoa(0))
assert.Equal(t, string(cMsgs[loopNum-1].payload), "message_"+strconv.Itoa(loopNum-1))
// Consume one message once
for i := 0; i < loopNum; i++ {
oneMsgs, err := rmq.Consume(groupName, channelName, 1)
assert.Nil(t, err)
assert.Equal(t, len(oneMsgs), 1)
assert.Equal(t, string(oneMsgs[0].payload), "message_"+strconv.Itoa(i+loopNum))
}
cMsgs, err = rmq.Consume(groupName, channelName, 1)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 0)
}
//func TestRocksMQ_Goroutines(t *testing.T) {
// master.Init()
//
// etcdAddr := master.Params.EtcdAddress
// cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
// assert.Nil(t, err)
// etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
// defer etcdKV.Close()
// idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
// _ = idAllocator.Initialize()
//
// name := "/tmp/rocksmq"
// defer os.RemoveAll(name)
// rmq, err := NewRocksMQ(name, idAllocator)
// assert.Nil(t, err)
//
// loopNum := 100
// channelName := "channel_test"
// // Produce two message in each goroutine
// var wg sync.WaitGroup
// wg.Add(1)
// for i := 0; i < loopNum/2; i++ {
// go func() {
// wg.Add(2)
// msg_0 := "message_" + strconv.Itoa(i)
// msg_1 := "message_" + strconv.Itoa(i+1)
// pMsg_0 := ProducerMessage{payload: []byte(msg_0)}
// pMsg_1 := ProducerMessage{payload: []byte(msg_1)}
// pMsgs := make([]ProducerMessage, 2)
// pMsgs[0] = pMsg_0
// pMsgs[1] = pMsg_1
//
// err := rmq.Produce(channelName, pMsgs)
// assert.Nil(t, err)
// }()
// }
//
// groupName := "test_group"
// _ = rmq.DestroyConsumerGroup(groupName, channelName)
// err = rmq.CreateConsumerGroup(groupName, channelName)
// assert.Nil(t, err)
// // Consume one message in each goroutine
// for i := 0; i < loopNum; i++ {
// go func() {
// wg.Done()
// cMsgs, err := rmq.Consume(groupName, channelName, 1)
// fmt.Println(string(cMsgs[0].payload))
// assert.Nil(t, err)
// assert.Equal(t, len(cMsgs), 1)
// }()
// }
// wg.Done()
// wg.Wait()
//}

View File

@ -474,4 +474,4 @@ func badlock(m fluent.Matcher) {
m.Match(`$mu.Lock(); defer $mu.RUnlock()`).Report(`maybe $mu.RLock() was intended?`)
m.Match(`$mu.RLock(); defer $mu.Unlock()`).Report(`maybe $mu.Lock() was intended?`)
}
}