From d845153de468cd79db3b0af5ac141228818435c7 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Fri, 15 Oct 2021 20:31:16 +0800 Subject: [PATCH] Add Buffer for consumer channel (#9578) Signed-off-by: xiaofan-luan --- internal/datanode/flow_graph_dmstream_input_node.go | 5 ++++- internal/msgstream/mq_msgstream.go | 5 ----- internal/util/mqclient/pulsar_consumer.go | 2 +- internal/util/mqclient/rmq_consumer.go | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 0359b0d025..51b4c9bc88 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -19,6 +19,7 @@ package datanode import ( "context" "fmt" + "time" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -47,11 +48,13 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode if seekPos != nil { seekPos.ChannelName = pchannelName - log.Debug("datanode Seek", zap.String("channelName", seekPos.GetChannelName())) + start := time.Now() + log.Debug("datanode begin to seek: " + seekPos.GetChannelName()) err = insertStream.Seek([]*internalpb.MsgPosition{seekPos}) if err != nil { return nil, err } + log.Debug("datanode Seek successfully: "+seekPos.GetChannelName(), zap.Int64("elapse ", time.Since(start).Milliseconds())) } node := flowgraph.NewInputNode(insertStream, "dmInputNode", dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 573c4a3870..bce0ac642a 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -874,11 +874,6 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { } ms.addConsumer(consumer, mp.ChannelName) - //TODO: May cause problem - //if len(consumer.Chan()) == 0 { - // return nil - //} - runLoop := true for runLoop { select { diff --git a/internal/util/mqclient/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go index ae83869bb8..8d89469f8a 100644 --- a/internal/util/mqclient/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -34,7 +34,7 @@ func (pc *pulsarConsumer) Subscription() string { func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { if pc.msgChannel == nil { pc.once.Do(func() { - pc.msgChannel = make(chan ConsumerMessage) + pc.msgChannel = make(chan ConsumerMessage, 256) // this part handles msgstream expectation when the consumer is not seeked // pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked // yet, our message stream is to setting to the very start point of the topic diff --git a/internal/util/mqclient/rmq_consumer.go b/internal/util/mqclient/rmq_consumer.go index c18423e06f..9af9a025dc 100644 --- a/internal/util/mqclient/rmq_consumer.go +++ b/internal/util/mqclient/rmq_consumer.go @@ -34,7 +34,7 @@ func (rc *RmqConsumer) Subscription() string { func (rc *RmqConsumer) Chan() <-chan ConsumerMessage { if rc.msgChannel == nil { rc.once.Do(func() { - rc.msgChannel = make(chan ConsumerMessage) + rc.msgChannel = make(chan ConsumerMessage, 256) go func() { for { //nolint:gosimple select {