Fix rmq client unit test may panic (#22759)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-03-14 19:15:54 +08:00 committed by GitHub
parent 8799b06dbc
commit 59989e2806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,6 +18,8 @@ package rmq
import (
"context"
"fmt"
"math/rand"
"os"
"testing"
"time"
@ -32,9 +34,11 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
path := "/tmp/milvus/rdb_data"
defer os.RemoveAll(path)
paramtable.Init()
@ -88,7 +92,7 @@ func TestRmqClient_GetLatestMsg(t *testing.T) {
assert.Nil(t, err)
defer client.Close()
topic := "t2GetLatestMsg"
topic := fmt.Sprintf("t2GetLatestMsg-%d", rand.Int())
proOpts := mqwrapper.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(proOpts)
assert.Nil(t, err)
@ -113,25 +117,27 @@ func TestRmqClient_GetLatestMsg(t *testing.T) {
consumer, err := client.Subscribe(consumerOpts)
assert.Nil(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
expectLastMsg, err := consumer.GetLatestMsgID()
assert.Nil(t, err)
var actualLastMsg mqwrapper.Message
for {
var actualLastMsg mqwrapper.Message
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
ret, err := expectLastMsg.LessOrEqualThan(actualLastMsg.ID().Serialize())
assert.Nil(t, err)
assert.True(t, ret)
return
fmt.Println(i)
assert.FailNow(t, "consumer failed to yield message in 100 milliseconds")
case msg := <-consumer.Chan():
consumer.Ack(msg)
actualLastMsg = msg
}
}
require.NotNil(t, actualLastMsg)
ret, err := expectLastMsg.LessOrEqualThan(actualLastMsg.ID().Serialize())
assert.Nil(t, err)
assert.True(t, ret)
}
func TestRmqClient_Subscribe(t *testing.T) {
@ -173,25 +179,23 @@ func TestRmqClient_Subscribe(t *testing.T) {
_, err = producer.Send(context.TODO(), msg)
assert.Nil(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for {
select {
case <-ctx.Done():
return
case msg := <-consumer.Chan():
consumer.Ack(msg)
rmqmsg := msg.(*rmqMessage)
msgPayload := rmqmsg.Payload()
assert.NotEmpty(t, msgPayload)
msgTopic := rmqmsg.Topic()
assert.Equal(t, msgTopic, topic)
msgProp := rmqmsg.Properties()
assert.Empty(t, msgProp)
msgID := rmqmsg.ID()
rID := msgID.(*rmqID)
assert.NotZero(t, rID)
}
select {
case <-ctx.Done():
assert.FailNow(t, "consumer failed to yield message in 100 milliseconds")
case msg := <-consumer.Chan():
consumer.Ack(msg)
rmqmsg := msg.(*rmqMessage)
msgPayload := rmqmsg.Payload()
assert.NotEmpty(t, msgPayload)
msgTopic := rmqmsg.Topic()
assert.Equal(t, msgTopic, topic)
msgProp := rmqmsg.Properties()
assert.Empty(t, msgProp)
msgID := rmqmsg.ID()
rID := msgID.(*rmqID)
assert.NotZero(t, rID)
}
}