milvus/internal/msgstream/msgstream_test.go
bigsheeper c9c8fb9def Close pulsar channel
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2020-11-12 11:18:23 +08:00

530 lines
16 KiB
Go

package msgstream
import (
"context"
"fmt"
"log"
"testing"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
func repackFunc(msgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
keys := hashKeys[i]
for _, channelID := range keys {
_, ok := result[channelID]
if ok == false {
msgPack := MsgPack{}
result[channelID] = &msgPack
}
result[channelID].Msgs = append(result[channelID].Msgs, request)
}
}
return result, nil
}
func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
var tsMsg TsMsg
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{hashValue},
}
switch msgType {
case internalPb.MsgType_kInsert:
insertRequest := internalPb.InsertRequest{
MsgType: internalPb.MsgType_kInsert,
ReqId: reqID,
CollectionName: "Collection",
PartitionTag: "Partition",
SegmentId: 1,
ChannelId: 1,
ProxyId: 1,
Timestamps: []Timestamp{1},
RowIds: []int64{1},
RowData: []*commonPb.Blob{{}},
}
insertMsg := &InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
tsMsg = insertMsg
case internalPb.MsgType_kDelete:
deleteRequest := internalPb.DeleteRequest{
MsgType: internalPb.MsgType_kDelete,
ReqId: reqID,
CollectionName: "Collection",
ChannelId: 1,
ProxyId: 1,
Timestamps: []Timestamp{1},
PrimaryKeys: []IntPrimaryKey{1},
}
deleteMsg := &DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
tsMsg = deleteMsg
case internalPb.MsgType_kSearch:
searchRequest := internalPb.SearchRequest{
MsgType: internalPb.MsgType_kSearch,
ReqId: reqID,
ProxyId: 1,
Timestamp: 1,
ResultChannelId: 1,
}
searchMsg := &SearchMsg{
BaseMsg: baseMsg,
SearchRequest: searchRequest,
}
tsMsg = searchMsg
case internalPb.MsgType_kSearchResult:
searchResult := internalPb.SearchResult{
MsgType: internalPb.MsgType_kSearchResult,
Status: &commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS},
ReqId: reqID,
ProxyId: 1,
QueryNodeId: 1,
Timestamp: 1,
ResultChannelId: 1,
}
searchResultMsg := &SearchResultMsg{
BaseMsg: baseMsg,
SearchResult: searchResult,
}
tsMsg = searchResultMsg
case internalPb.MsgType_kTimeTick:
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerId: reqID,
Timestamp: 1,
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
tsMsg = timeTickMsg
}
return &tsMsg
}
func getTimeTickMsg(reqID UniqueID, hashValue int32, time uint64) *TsMsg {
var tsMsg TsMsg
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{hashValue},
}
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerId: reqID,
Timestamp: time,
}
timeTickMsg := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
tsMsg = timeTickMsg
return &tsMsg
}
func initPulsarStream(pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
opts ...RepackFunc) (*MsgStream, *MsgStream) {
// set input stream
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
inputStream.Start()
var input MsgStream = inputStream
// set output stream
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
return &input, &output
}
func initPulsarTtStream(pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
opts ...RepackFunc) (*MsgStream, *MsgStream) {
// set input stream
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
inputStream.Start()
var input MsgStream = inputStream
// set output stream
outputStream := NewPulsarTtMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
return &input, &output
}
func receiveMsg(outputStream *MsgStream, msgCount int) {
receiveCount := 0
for {
result := (*outputStream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount >= msgCount {
break
}
}
}
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(outputStream, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"delete"}
consumerChannels := []string{"delete"}
consumerSubName := "subDelete"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(outputStream, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_Search(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
consumerSubName := "subSearch"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(outputStream, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"searchResult"}
consumerChannels := []string{"searchResult"}
consumerSubName := "subSearchResult"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(outputStream, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"timeTick"}
consumerChannels := []string{"timeTick"}
consumerSubName := "subTimeTick"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(outputStream, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Broadcast(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(outputStream, len(consumerChannels)*len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc)
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(outputStream, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{1, 3},
}
insertRequest := internalPb.InsertRequest{
MsgType: internalPb.MsgType_kInsert,
ReqId: 1,
CollectionName: "Collection",
PartitionTag: "Partition",
SegmentId: 1,
ChannelId: 1,
ProxyId: 1,
Timestamps: []Timestamp{1, 1},
RowIds: []int64{1, 3},
RowData: []*commonPb.Blob{{}, {}},
}
insertMsg := &InsertMsg{
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
var tsMsg TsMsg = insertMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(&output, len(msgPack.Msgs)*2)
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{1, 3},
}
deleteRequest := internalPb.DeleteRequest{
MsgType: internalPb.MsgType_kDelete,
ReqId: 1,
CollectionName: "Collection",
ChannelId: 1,
ProxyId: 1,
Timestamps: []Timestamp{1, 1},
PrimaryKeys: []int64{1, 3},
}
deleteMsg := &DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
var tsMsg TsMsg = deleteMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(&output, len(msgPack.Msgs)*2)
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{1},
}
timeTickRequest := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerId: int64(1),
Timestamp: uint64(1),
}
timeTick := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickRequest,
}
var tsMsg TsMsg = timeTick
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
var output MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(&output, len(msgPack.Msgs))
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
msgPack0 := MsgPack{}
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
msgPack1 := MsgPack{}
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
msgPack2 := MsgPack{}
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := (*inputStream).Broadcast(&msgPack0)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
err = (*inputStream).Produce(&msgPack1)
if err != nil {
log.Fatalf("produce error = %v", err)
}
err = (*inputStream).Broadcast(&msgPack2)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
receiveMsg(outputStream, len(msgPack1.Msgs))
outputTtStream := (*outputStream).(*PulsarTtMsgStream)
fmt.Printf("timestamp = %v", outputTtStream.lastTimeStamp)
(*inputStream).Close()
(*outputStream).Close()
}