From 59989e2806d972eec8e2663535cd468563400fd2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 14 Mar 2023 19:15:54 +0800 Subject: [PATCH] Fix rmq client unit test may panic (#22759) Signed-off-by: Congqi Xia --- .../mqwrapper/rmq/rmq_client_test.go | 58 ++++++++++--------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go index 0270c8cdb2..5018984751 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go @@ -18,6 +18,8 @@ package rmq import ( "context" + "fmt" + "math/rand" "os" "testing" "time" @@ -32,9 +34,11 @@ import ( "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestMain(m *testing.M) { + rand.Seed(time.Now().UnixNano()) path := "/tmp/milvus/rdb_data" defer os.RemoveAll(path) paramtable.Init() @@ -88,7 +92,7 @@ func TestRmqClient_GetLatestMsg(t *testing.T) { assert.Nil(t, err) defer client.Close() - topic := "t2GetLatestMsg" + topic := fmt.Sprintf("t2GetLatestMsg-%d", rand.Int()) proOpts := mqwrapper.ProducerOptions{Topic: topic} producer, err := client.CreateProducer(proOpts) assert.Nil(t, err) @@ -113,25 +117,27 @@ func TestRmqClient_GetLatestMsg(t *testing.T) { consumer, err := client.Subscribe(consumerOpts) assert.Nil(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) - defer cancel() expectLastMsg, err := consumer.GetLatestMsgID() assert.Nil(t, err) - var actualLastMsg mqwrapper.Message - for { + var actualLastMsg mqwrapper.Message + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + for i := 0; i < 10; i++ { select { case <-ctx.Done(): - ret, err := expectLastMsg.LessOrEqualThan(actualLastMsg.ID().Serialize()) - assert.Nil(t, err) - assert.True(t, ret) - return + fmt.Println(i) + assert.FailNow(t, "consumer failed to yield message in 100 milliseconds") case msg := <-consumer.Chan(): consumer.Ack(msg) actualLastMsg = msg } } + require.NotNil(t, actualLastMsg) + ret, err := expectLastMsg.LessOrEqualThan(actualLastMsg.ID().Serialize()) + assert.Nil(t, err) + assert.True(t, ret) } func TestRmqClient_Subscribe(t *testing.T) { @@ -173,25 +179,23 @@ func TestRmqClient_Subscribe(t *testing.T) { _, err = producer.Send(context.TODO(), msg) assert.Nil(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - for { - select { - case <-ctx.Done(): - return - case msg := <-consumer.Chan(): - consumer.Ack(msg) - rmqmsg := msg.(*rmqMessage) - msgPayload := rmqmsg.Payload() - assert.NotEmpty(t, msgPayload) - msgTopic := rmqmsg.Topic() - assert.Equal(t, msgTopic, topic) - msgProp := rmqmsg.Properties() - assert.Empty(t, msgProp) - msgID := rmqmsg.ID() - rID := msgID.(*rmqID) - assert.NotZero(t, rID) - } + select { + case <-ctx.Done(): + assert.FailNow(t, "consumer failed to yield message in 100 milliseconds") + case msg := <-consumer.Chan(): + consumer.Ack(msg) + rmqmsg := msg.(*rmqMessage) + msgPayload := rmqmsg.Payload() + assert.NotEmpty(t, msgPayload) + msgTopic := rmqmsg.Topic() + assert.Equal(t, msgTopic, topic) + msgProp := rmqmsg.Properties() + assert.Empty(t, msgProp) + msgID := rmqmsg.ID() + rID := msgID.(*rmqID) + assert.NotZero(t, rID) } }