Make mq_msgstream_test log more infos (#11168)

See also: #11146

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-11-04 15:58:17 +08:00 committed by GitHub
parent 8863c01ff7
commit 91be4b10cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,6 +14,7 @@ package msgstream
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"os"
@ -24,6 +25,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -354,9 +356,7 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(outputStream, len(msgPack.Msgs))
inputStream.Close()
@ -377,9 +377,8 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) {
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(outputStream, len(msgPack.Msgs))
inputStream.Close()
outputStream.Close()
@ -400,9 +399,8 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) {
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(outputStream, len(msgPack.Msgs))
inputStream.Close()
outputStream.Close()
@ -422,9 +420,8 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(outputStream, len(msgPack.Msgs))
inputStream.Close()
outputStream.Close()
@ -444,9 +441,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(outputStream, len(msgPack.Msgs))
inputStream.Close()
outputStream.Close()
@ -467,9 +463,8 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
receiveMsg(outputStream, len(consumerChannels)*len(msgPack.Msgs))
inputStream.Close()
outputStream.Close()
@ -489,9 +484,8 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
inputStream := getPulsarInputStream(pulsarAddress, producerChannels, repackFunc)
outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(outputStream, len(msgPack.Msgs))
inputStream.Close()
outputStream.Close()
@ -546,9 +540,8 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
var output MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(output, len(msgPack.Msgs)*2)
(*inputStream).Close()
(*outputStream).Close()
@ -600,9 +593,8 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
var output MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(output, len(msgPack.Msgs)*1)
(*inputStream).Close()
(*outputStream).Close()
@ -634,9 +626,8 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
var output MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(output, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
@ -662,17 +653,14 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack0)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
err = inputStream.Produce(&msgPack1)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
err = inputStream.Broadcast(&msgPack2)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
receiveMsg(outputStream, len(msgPack1.Msgs))
inputStream.Close()
outputStream.Close()
@ -821,17 +809,14 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack0)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
err = inputStream.Produce(&msgPack1)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
err = inputStream.Broadcast(&msgPack2)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
receiveMsg(outputStream, len(msgPack1.Msgs))
inputStream.Close()
outputStream.Close()
@ -1159,9 +1144,7 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) {
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerGroupName)
err := inputStream.Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(outputStream, len(msgPack.Msgs))
Close(rocksdbName, inputStream, outputStream, etcdKV)
@ -1187,17 +1170,13 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
inputStream, outputStream := initRmqTtStream(producerChannels, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack0)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
err = inputStream.Produce(&msgPack1)
if err != nil {
log.Fatalf("produce error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
err = inputStream.Broadcast(&msgPack2)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
receiveMsg(outputStream, len(msgPack1.Msgs))
Close(rocksdbName, inputStream, outputStream, etcdKV)