Change receive msg logic (#5605)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-06-04 16:39:34 +08:00 committed by zhenshan.cao
parent 0a82c6381f
commit 91ef35bad4
5 changed files with 36 additions and 31 deletions

View File

@ -58,22 +58,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
msgChannel := make(chan ConsumerMessage) pConsumer := &pulsarConsumer{c: consumer}
pConsumer := &pulsarConsumer{c: consumer, msgChannel: msgChannel}
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-pConsumer.c.Chan():
if !ok {
close(msgChannel)
log.Debug("pulsar consumer channel closed")
return
}
msgChannel <- &pulsarMessage{msg: msg}
}
}
}()
return pConsumer, nil return pConsumer, nil
} }

View File

@ -13,6 +13,7 @@ package mqclient
import ( import (
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log"
) )
type pulsarConsumer struct { type pulsarConsumer struct {
@ -25,6 +26,22 @@ func (pc *pulsarConsumer) Subscription() string {
} }
func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage {
if pc.msgChannel == nil {
pc.msgChannel = make(chan ConsumerMessage)
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-pc.c.Chan():
if !ok {
close(pc.msgChannel)
log.Debug("pulsar consumer channel closed")
return
}
pc.msgChannel <- &pulsarMessage{msg: msg}
}
}
}()
}
return pc.msgChannel return pc.msgChannel
} }

View File

@ -55,22 +55,8 @@ func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
return nil, err return nil, err
} }
msgChannel := make(chan ConsumerMessage, 1) rConsumer := &rmqConsumer{c: cli}
rConsumer := &rmqConsumer{c: cli, msgChannel: msgChannel}
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-rConsumer.c.Chan():
if !ok {
close(msgChannel)
return
}
msg.Topic = options.Topic
msgChannel <- &rmqMessage{msg: msg}
}
}
}()
return rConsumer, nil return rConsumer, nil
} }

View File

@ -25,6 +25,22 @@ func (rc *rmqConsumer) Subscription() string {
} }
func (rc *rmqConsumer) Chan() <-chan ConsumerMessage { func (rc *rmqConsumer) Chan() <-chan ConsumerMessage {
if rc.msgChannel == nil {
rc.msgChannel = make(chan ConsumerMessage)
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-rc.c.Chan():
if !ok {
close(rc.msgChannel)
return
}
rc.msgChannel <- &rmqMessage{msg: msg}
}
}
}()
}
return rc.msgChannel return rc.msgChannel
} }

View File

@ -134,6 +134,7 @@ func consume(ctx context.Context, consumer *consumer) {
consumer.messageCh <- ConsumerMessage{ consumer.messageCh <- ConsumerMessage{
MsgID: msg[0].MsgID, MsgID: msg[0].MsgID,
Payload: msg[0].Payload, Payload: msg[0].Payload,
Topic: consumer.Topic(),
} }
} }
} }