From d6f96ec9fcf1a07ec6f252c4e279b6e83fd8c5cf Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Wed, 29 Sep 2021 09:54:04 +0800 Subject: [PATCH] Add unit tests for mq_msgstream.go (#8812) Signed-off-by: Xiangyu Wang --- internal/metrics/metrics.go | 2 +- internal/msgstream/mq_msgstream_test.go | 40 +++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 11c73ef14a..ce149fffc7 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -608,7 +608,7 @@ var ( Help: "Counter of flush segments", }, []string{"type"}) - // DataNodeWatchDmChannelCounter used to count the num of calls of WatchDmChannels + // DataNodeWatchDmChannelsCounter used to count the num of calls of WatchDmChannels DataNodeWatchDmChannelsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace, diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index f0d85b753a..7984afc4d8 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -298,6 +298,46 @@ func TestMqMsgStream_Consume(t *testing.T) { } } +func TestMqMsgStream_Chan(t *testing.T) { + f := &fixture{t: t} + parameters := f.setup() + defer f.teardown() + + factory := &ProtoUDFactory{} + for i := range parameters { + func(client mqclient.Client) { + m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) + assert.Nil(t, err) + + ch := m.Chan() + assert.NotNil(t, ch) + }(parameters[i].client) + } +} + +func TestMqMsgStream_Seek(t *testing.T) { + f := &fixture{t: t} + parameters := f.setup() + defer f.teardown() + + factory := &ProtoUDFactory{} + for i := range parameters { + func(client mqclient.Client) { + m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) + assert.Nil(t, err) + + // seek in not subscribed channel + p := []*internalpb.MsgPosition{ + { + ChannelName: "b", + }, + } + err = m.Seek(p) + assert.NotNil(t, err) + }(parameters[i].client) + } +} + /* ========================== Pulsar & RocksMQ Tests ========================== */ func TestStream_PulsarMsgStream_Insert(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress")