milvus/internal/mq/msgstream/mqwrapper/rmq/rocksmq_msgstream_test.go
XuanYang-cn 7f1ae35e72
Add timeout in dispatcher, AsConsumer and Seek (#26686)
See also: #25309

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2023-09-08 09:51:17 +08:00

731 lines
23 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rmq
import (
"context"
"fmt"
"log"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/funcutil"
)
func Test_NewMqMsgStream(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
_, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
}
// TODO(wxyu): add a mock implement of mqwrapper.Client, then inject errors to improve coverage
func TestMqMsgStream_AsProducer(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
// empty channel name
m.AsProducer([]string{""})
}
// TODO(wxyu): add a mock implement of mqwrapper.Client, then inject errors to improve coverage
func TestMqMsgStream_AsConsumer(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
// repeat calling AsConsumer
m.AsConsumer(context.Background(), []string{"a"}, "b", mqwrapper.SubscriptionPositionUnknown)
m.AsConsumer(context.Background(), []string{"a"}, "b", mqwrapper.SubscriptionPositionUnknown)
}
func TestMqMsgStream_ComputeProduceChannelIndexes(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
// empty parameters
reBucketValues := m.ComputeProduceChannelIndexes([]msgstream.TsMsg{})
assert.Nil(t, reBucketValues)
// not called AsProducer yet
insertMsg := &msgstream.InsertMsg{
BaseMsg: generateBaseMsg(),
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: 1,
Timestamp: 2,
SourceID: 3,
},
DbName: "test_db",
CollectionName: "test_collection",
PartitionName: "test_partition",
DbID: 4,
CollectionID: 5,
PartitionID: 6,
SegmentID: 7,
ShardName: "test-channel",
Timestamps: []uint64{2, 1, 3},
RowData: []*commonpb.Blob{},
},
}
reBucketValues = m.ComputeProduceChannelIndexes([]msgstream.TsMsg{insertMsg})
assert.Nil(t, reBucketValues)
}
func TestMqMsgStream_GetProduceChannels(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
// empty if not called AsProducer yet
chs := m.GetProduceChannels()
assert.Equal(t, 0, len(chs))
// not empty after AsProducer
m.AsProducer([]string{"a"})
chs = m.GetProduceChannels()
assert.Equal(t, 1, len(chs))
}
func TestMqMsgStream_Produce(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
// Produce before called AsProducer
insertMsg := &msgstream.InsertMsg{
BaseMsg: generateBaseMsg(),
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: 1,
Timestamp: 2,
SourceID: 3,
},
DbName: "test_db",
CollectionName: "test_collection",
PartitionName: "test_partition",
DbID: 4,
CollectionID: 5,
PartitionID: 6,
SegmentID: 7,
ShardName: "test-channel",
Timestamps: []uint64{2, 1, 3},
RowData: []*commonpb.Blob{},
},
}
msgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{insertMsg},
}
err = m.Produce(msgPack)
assert.Error(t, err)
}
func TestMqMsgStream_Broadcast(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
// Broadcast nil pointer
_, err = m.Broadcast(nil)
assert.Error(t, err)
}
func TestMqMsgStream_Consume(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
// Consume return nil when ctx canceled
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
m, err := msgstream.NewMqMsgStream(ctx, 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
wg.Add(1)
go func() {
defer wg.Done()
msgPack := consumer(ctx, m)
assert.Nil(t, msgPack)
}()
cancel()
wg.Wait()
}
func consumer(ctx context.Context, mq msgstream.MsgStream) *msgstream.MsgPack {
for {
select {
case msgPack, ok := <-mq.Chan():
if !ok {
panic("Should not reach here")
}
return msgPack
case <-ctx.Done():
return nil
}
}
}
func TestMqMsgStream_Chan(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
ch := m.Chan()
assert.NotNil(t, ch)
}
func TestMqMsgStream_SeekNotSubscribed(t *testing.T) {
client, _ := createRmqClient()
defer client.Close()
factory := &msgstream.ProtoUDFactory{}
m, err := msgstream.NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
// seek in not subscribed channel
p := []*msgpb.MsgPosition{
{
ChannelName: "b",
},
}
err = m.Seek(context.Background(), p)
assert.Error(t, err)
}
func generateBaseMsg() msgstream.BaseMsg {
ctx := context.Background()
return msgstream.BaseMsg{
Ctx: ctx,
BeginTimestamp: msgstream.Timestamp(0),
EndTimestamp: msgstream.Timestamp(1),
HashValues: []uint32{2},
MsgPosition: nil,
}
}
/****************************************Rmq test******************************************/
func initRmqStream(ctx context.Context,
producerChannels []string,
consumerChannels []string,
consumerGroupName string,
opts ...msgstream.RepackFunc,
) (msgstream.MsgStream, msgstream.MsgStream) {
factory := msgstream.ProtoUDFactory{}
rmqClient, _ := NewClientWithDefaultOptions(ctx)
inputStream, _ := msgstream.NewMqMsgStream(ctx, 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
var input msgstream.MsgStream = inputStream
rmqClient2, _ := NewClientWithDefaultOptions(ctx)
outputStream, _ := msgstream.NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(ctx, consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest)
var output msgstream.MsgStream = outputStream
return input, output
}
func initRmqTtStream(ctx context.Context,
producerChannels []string,
consumerChannels []string,
consumerGroupName string,
opts ...msgstream.RepackFunc,
) (msgstream.MsgStream, msgstream.MsgStream) {
factory := msgstream.ProtoUDFactory{}
rmqClient, _ := NewClientWithDefaultOptions(ctx)
inputStream, _ := msgstream.NewMqMsgStream(ctx, 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
var input msgstream.MsgStream = inputStream
rmqClient2, _ := NewClientWithDefaultOptions(ctx)
outputStream, _ := msgstream.NewMqTtMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(ctx, consumerChannels, consumerGroupName, mqwrapper.SubscriptionPositionEarliest)
var output msgstream.MsgStream = outputStream
return input, output
}
func TestStream_RmqMsgStream_Insert(t *testing.T) {
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerGroupName := "InsertGroup"
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
ctx := context.Background()
inputStream, outputStream := initRmqStream(ctx, producerChannels, consumerChannels, consumerGroupName)
err := inputStream.Produce(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(ctx, outputStream, len(msgPack.Msgs))
inputStream.Close()
outputStream.Close()
}
func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack0 := msgstream.MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
msgPack1 := msgstream.MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
msgPack2 := msgstream.MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5))
ctx := context.Background()
inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName)
_, err := inputStream.Broadcast(&msgPack0)
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
err = inputStream.Produce(&msgPack1)
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
_, err = inputStream.Broadcast(&msgPack2)
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
receiveMsg(ctx, outputStream, len(msgPack1.Msgs))
inputStream.Close()
outputStream.Close()
}
func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) {
c1 := funcutil.RandomString(8)
producerChannels := []string{c1}
consumerChannels := []string{c1}
consumerSubName := funcutil.RandomString(8)
msgPack0 := msgstream.MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
msgPack1 := msgstream.MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
// would not dedup for non-dml messages
msgPack2 := msgstream.MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, getTsMsg(commonpb.MsgType_CreateCollection, 2))
msgPack2.Msgs = append(msgPack2.Msgs, getTsMsg(commonpb.MsgType_CreateCollection, 2))
msgPack3 := msgstream.MsgPack{}
msgPack3.Msgs = append(msgPack3.Msgs, getTimeTickMsg(15))
ctx := context.Background()
inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName)
_, err := inputStream.Broadcast(&msgPack0)
assert.NoError(t, err)
err = inputStream.Produce(&msgPack1)
assert.NoError(t, err)
err = inputStream.Produce(&msgPack2)
assert.NoError(t, err)
_, err = inputStream.Broadcast(&msgPack3)
assert.NoError(t, err)
receivedMsg := consumer(ctx, outputStream)
assert.Equal(t, len(receivedMsg.Msgs), 3)
assert.Equal(t, receivedMsg.BeginTs, uint64(0))
assert.Equal(t, receivedMsg.EndTs, uint64(15))
outputStream.Close()
factory := msgstream.ProtoUDFactory{}
rmqClient, _ := NewClientWithDefaultOptions(ctx)
outputStream, _ = msgstream.NewMqTtMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
consumerSubName = funcutil.RandomString(8)
outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown)
outputStream.Seek(ctx, receivedMsg.StartPositions)
seekMsg := consumer(ctx, outputStream)
assert.Equal(t, len(seekMsg.Msgs), 1+2)
assert.EqualValues(t, seekMsg.Msgs[0].BeginTs(), 1)
assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[1].Type())
assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[2].Type())
inputStream.Close()
outputStream.Close()
}
func TestStream_RmqTtMsgStream_Seek(t *testing.T) {
c1 := funcutil.RandomString(8)
producerChannels := []string{c1}
consumerChannels := []string{c1}
consumerSubName := funcutil.RandomString(8)
msgPack0 := msgstream.MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0))
msgPack1 := msgstream.MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19))
msgPack2 := msgstream.MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5))
msgPack3 := msgstream.MsgPack{}
msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14))
msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9))
msgPack4 := msgstream.MsgPack{}
msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11))
msgPack5 := msgstream.MsgPack{}
msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 12))
msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 13))
msgPack6 := msgstream.MsgPack{}
msgPack6.Msgs = append(msgPack6.Msgs, getTimeTickMsg(15))
msgPack7 := msgstream.MsgPack{}
msgPack7.Msgs = append(msgPack7.Msgs, getTimeTickMsg(20))
ctx := context.Background()
inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName)
_, err := inputStream.Broadcast(&msgPack0)
assert.NoError(t, err)
err = inputStream.Produce(&msgPack1)
assert.NoError(t, err)
_, err = inputStream.Broadcast(&msgPack2)
assert.NoError(t, err)
err = inputStream.Produce(&msgPack3)
assert.NoError(t, err)
_, err = inputStream.Broadcast(&msgPack4)
assert.NoError(t, err)
err = inputStream.Produce(&msgPack5)
assert.NoError(t, err)
_, err = inputStream.Broadcast(&msgPack6)
assert.NoError(t, err)
_, err = inputStream.Broadcast(&msgPack7)
assert.NoError(t, err)
receivedMsg := consumer(ctx, outputStream)
assert.Equal(t, len(receivedMsg.Msgs), 2)
assert.Equal(t, receivedMsg.BeginTs, uint64(0))
assert.Equal(t, receivedMsg.EndTs, uint64(5))
assert.Equal(t, receivedMsg.StartPositions[0].Timestamp, uint64(0))
assert.Equal(t, receivedMsg.EndPositions[0].Timestamp, uint64(5))
receivedMsg2 := consumer(ctx, outputStream)
assert.Equal(t, len(receivedMsg2.Msgs), 1)
assert.Equal(t, receivedMsg2.BeginTs, uint64(5))
assert.Equal(t, receivedMsg2.EndTs, uint64(11))
assert.Equal(t, receivedMsg2.StartPositions[0].Timestamp, uint64(5))
assert.Equal(t, receivedMsg2.EndPositions[0].Timestamp, uint64(11))
receivedMsg3 := consumer(ctx, outputStream)
assert.Equal(t, len(receivedMsg3.Msgs), 3)
assert.Equal(t, receivedMsg3.BeginTs, uint64(11))
assert.Equal(t, receivedMsg3.EndTs, uint64(15))
assert.Equal(t, receivedMsg3.StartPositions[0].Timestamp, uint64(11))
assert.Equal(t, receivedMsg3.EndPositions[0].Timestamp, uint64(15))
receivedMsg4 := consumer(ctx, outputStream)
assert.Equal(t, len(receivedMsg4.Msgs), 1)
assert.Equal(t, receivedMsg4.BeginTs, uint64(15))
assert.Equal(t, receivedMsg4.EndTs, uint64(20))
assert.Equal(t, receivedMsg4.StartPositions[0].Timestamp, uint64(15))
assert.Equal(t, receivedMsg4.EndPositions[0].Timestamp, uint64(20))
outputStream.Close()
factory := msgstream.ProtoUDFactory{}
rmqClient, _ := NewClientWithDefaultOptions(ctx)
outputStream, _ = msgstream.NewMqTtMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
consumerSubName = funcutil.RandomString(8)
outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionUnknown)
outputStream.Seek(ctx, receivedMsg3.StartPositions)
seekMsg := consumer(ctx, outputStream)
assert.Equal(t, len(seekMsg.Msgs), 3)
result := []uint64{14, 12, 13}
for i, msg := range seekMsg.Msgs {
assert.Equal(t, msg.BeginTs(), result[i])
}
seekMsg2 := consumer(ctx, outputStream)
assert.Equal(t, len(seekMsg2.Msgs), 1)
for _, msg := range seekMsg2.Msgs {
assert.Equal(t, msg.BeginTs(), uint64(19))
}
inputStream.Close()
outputStream.Close()
}
func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) {
c := funcutil.RandomString(8)
producerChannels := []string{c}
consumerChannels := []string{c}
consumerSubName := funcutil.RandomString(8)
ctx := context.Background()
inputStream, outputStream := initRmqStream(ctx, producerChannels, consumerChannels, consumerSubName)
msgPack := &msgstream.MsgPack{}
for i := 0; i < 10; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err := inputStream.Produce(msgPack)
assert.NoError(t, err)
var seekPosition *msgpb.MsgPosition
for i := 0; i < 10; i++ {
result := consumer(ctx, outputStream)
assert.Equal(t, result.Msgs[0].ID(), int64(i))
seekPosition = result.EndPositions[0]
}
outputStream.Close()
factory := msgstream.ProtoUDFactory{}
rmqClient2, _ := NewClientWithDefaultOptions(ctx)
outputStream2, _ := msgstream.NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionUnknown)
id := common.Endian.Uint64(seekPosition.MsgID) + 10
bs := make([]byte, 8)
common.Endian.PutUint64(bs, id)
p := []*msgpb.MsgPosition{
{
ChannelName: seekPosition.ChannelName,
Timestamp: seekPosition.Timestamp,
MsgGroup: seekPosition.MsgGroup,
MsgID: bs,
},
}
err = outputStream2.Seek(ctx, p)
assert.NoError(t, err)
for i := 10; i < 20; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err = inputStream.Produce(msgPack)
assert.NoError(t, err)
result := consumer(ctx, outputStream2)
assert.Equal(t, result.Msgs[0].ID(), int64(1))
inputStream.Close()
outputStream2.Close()
}
func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
producerChannels := []string{"insert1"}
consumerChannels := []string{"insert1"}
consumerSubName := "subInsert"
factory := msgstream.ProtoUDFactory{}
rmqClient, _ := NewClientWithDefaultOptions(context.Background())
otherInputStream, _ := msgstream.NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
otherInputStream.AsProducer([]string{"root_timetick"})
otherInputStream.Produce(getTimeTickMsgPack(999))
inputStream, _ := msgstream.NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for i := 0; i < 100; i++ {
inputStream.Produce(getTimeTickMsgPack(int64(i)))
}
rmqClient2, _ := NewClientWithDefaultOptions(context.Background())
outputStream, _ := msgstream.NewMqMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(context.Background(), consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
inputStream.Produce(getTimeTickMsgPack(1000))
pack := <-outputStream.Chan()
assert.NotNil(t, pack)
assert.Equal(t, 1, len(pack.Msgs))
assert.EqualValues(t, 1000, pack.Msgs[0].BeginTs())
inputStream.Close()
outputStream.Close()
}
func getTimeTickMsgPack(reqID msgstream.UniqueID) *msgstream.MsgPack {
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(reqID))
return &msgPack
}
func getTsMsg(msgType msgstream.MsgType, reqID msgstream.UniqueID) msgstream.TsMsg {
hashValue := uint32(reqID)
time := uint64(reqID)
switch msgType {
case commonpb.MsgType_Insert:
insertRequest := msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
CollectionName: "Collection",
PartitionName: "Partition",
SegmentID: 1,
ShardName: "0",
Timestamps: []msgstream.Timestamp{time},
RowIDs: []int64{1},
RowData: []*commonpb.Blob{{}},
}
insertMsg := &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
},
InsertRequest: insertRequest,
}
return insertMsg
case commonpb.MsgType_CreateCollection:
createCollectionRequest := msgpb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
MsgID: reqID,
Timestamp: 11,
SourceID: reqID,
},
DbName: "test_db",
CollectionName: "test_collection",
PartitionName: "test_partition",
DbID: 4,
CollectionID: 5,
PartitionID: 6,
Schema: []byte{},
VirtualChannelNames: []string{},
PhysicalChannelNames: []string{},
}
createCollectionMsg := &msgstream.CreateCollectionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
},
CreateCollectionRequest: createCollectionRequest,
}
return createCollectionMsg
}
return nil
}
func getTimeTickMsg(reqID msgstream.UniqueID) msgstream.TsMsg {
hashValue := uint32(reqID)
time := uint64(reqID)
timeTickResult := msgpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: reqID,
Timestamp: time,
SourceID: reqID,
},
}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
},
TimeTickMsg: timeTickResult,
}
return timeTickMsg
}
func receiveMsg(ctx context.Context, outputStream msgstream.MsgStream, msgCount int) {
receiveCount := 0
for {
select {
case <-ctx.Done():
return
case result, ok := <-outputStream.Chan():
if !ok || result == nil || len(result.Msgs) == 0 {
return
}
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
log.Println("msg type: ", v.Type(), ", msg value: ", v)
}
log.Println("================")
}
if receiveCount >= msgCount {
return
}
}
}
}