Add unit tests for mq_msgstream.go (#8812)

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
This commit is contained in:
Xiangyu Wang 2021-09-29 09:54:04 +08:00 committed by GitHub
parent cf8600077f
commit d6f96ec9fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 1 deletions

View File

@ -608,7 +608,7 @@ var (
Help: "Counter of flush segments",
}, []string{"type"})
// DataNodeWatchDmChannelCounter used to count the num of calls of WatchDmChannels
// DataNodeWatchDmChannelsCounter used to count the num of calls of WatchDmChannels
DataNodeWatchDmChannelsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,

View File

@ -298,6 +298,46 @@ func TestMqMsgStream_Consume(t *testing.T) {
}
}
func TestMqMsgStream_Chan(t *testing.T) {
f := &fixture{t: t}
parameters := f.setup()
defer f.teardown()
factory := &ProtoUDFactory{}
for i := range parameters {
func(client mqclient.Client) {
m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.Nil(t, err)
ch := m.Chan()
assert.NotNil(t, ch)
}(parameters[i].client)
}
}
func TestMqMsgStream_Seek(t *testing.T) {
f := &fixture{t: t}
parameters := f.setup()
defer f.teardown()
factory := &ProtoUDFactory{}
for i := range parameters {
func(client mqclient.Client) {
m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.Nil(t, err)
// seek in not subscribed channel
p := []*internalpb.MsgPosition{
{
ChannelName: "b",
},
}
err = m.Seek(p)
assert.NotNil(t, err)
}(parameters[i].client)
}
}
/* ========================== Pulsar & RocksMQ Tests ========================== */
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")