diff --git a/pkg/mq/mqimpl/rocksmq/client/client_impl.go b/pkg/mq/mqimpl/rocksmq/client/client_impl.go index 3b540f1911..3334f8a553 100644 --- a/pkg/mq/mqimpl/rocksmq/client/client_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/client_impl.go @@ -15,6 +15,7 @@ import ( "reflect" "sync" + "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" @@ -22,6 +23,10 @@ import ( "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" ) +const ( + minimalConsumePendingBufferSize = 16 +) + type client struct { server RocksMQ wg *sync.WaitGroup @@ -118,64 +123,89 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { func (c *client) consume(consumer *consumer) { defer c.wg.Done() + + if err := c.blockUntilInitDone(consumer); err != nil { + log.Warn("consumer init failed", zap.Error(err)) + return + } + + var pendingMsgs []*RmqMessage for { + if len(pendingMsgs) == 0 { + pendingMsgs = c.tryToConsume(consumer) + } + + var consumerCh chan<- common.Message + var waitForSent *RmqMessage + var newIncomingMsgCh <-chan struct{} + if len(pendingMsgs) > 0 { + // If there's pending sent messages, we can try to deliver them first. + consumerCh = consumer.messageCh + waitForSent = pendingMsgs[0] + } else { + // If there's no more pending messages, we can wait for new incoming messages. + // !!! TODO: MsgMutex may lost, not sync up with the consumer, + // so the tailing message cannot be consumed if no new producing message. + newIncomingMsgCh = consumer.MsgMutex() + } + select { case <-c.closeCh: + log.Info("Client is closed, consumer goroutine exit") return - case _, ok := <-consumer.initCh: - if !ok { - return - } - c.deliver(consumer) - case _, ok := <-consumer.MsgMutex(): + case consumerCh <- waitForSent: + pendingMsgs = pendingMsgs[1:] + case _, ok := <-newIncomingMsgCh: if !ok { // consumer MsgMutex closed, goroutine exit - log.Debug("Consumer MsgMutex closed") + log.Info("Consumer MsgMutex closed") return } - c.deliver(consumer) } } } -func (c *client) deliver(consumer *consumer) { - for { - n := cap(consumer.messageCh) - len(consumer.messageCh) - if n == 0 { - return - } - - msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n) - if err != nil { - log.Warn("Consumer's goroutine cannot consume from (" + consumer.topic + "," + consumer.consumerName + "): " + err.Error()) - break - } - - // no more msgs - if len(msgs) == 0 { - break - } - for _, msg := range msgs { - // This is the hack, we put property into pl - properties := make(map[string]string, 0) - pl, err := UnmarshalHeader(msg.Payload) - if err == nil && pl != nil && pl.Base != nil { - properties = pl.Base.Properties - } - select { - case consumer.messageCh <- &RmqMessage{ - msgID: msg.MsgID, - payload: msg.Payload, - properties: properties, - topic: consumer.Topic(), - }: - case <-c.closeCh: - return - } +// blockUntilInitDone block until consumer is initialized +func (c *client) blockUntilInitDone(consumer *consumer) error { + select { + case <-c.closeCh: + return errors.New("client is closed") + case _, ok := <-consumer.initCh: + if !ok { + return errors.New("consumer init failure") } + return nil } } +func (c *client) tryToConsume(consumer *consumer) []*RmqMessage { + n := cap(consumer.messageCh) - len(consumer.messageCh) + if n <= minimalConsumePendingBufferSize { + n = minimalConsumePendingBufferSize + } + msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n) + if err != nil { + log.Warn("Consumer's goroutine cannot consume from (" + consumer.topic + "," + consumer.consumerName + "): " + err.Error()) + return nil + } + rmqMsgs := make([]*RmqMessage, 0, len(msgs)) + for _, msg := range msgs { + // This is the hack, we put property into pl + properties := make(map[string]string, 0) + pl, err := UnmarshalHeader(msg.Payload) + if err == nil && pl != nil && pl.Base != nil { + properties = pl.Base.Properties + } + rmqMsgs = append(rmqMsgs, &RmqMessage{ + msgID: msg.MsgID, + payload: msg.Payload, + properties: properties, + topic: consumer.Topic(), + }) + } + return rmqMsgs +} + // Close close the channel to notify rocksmq to stop operation and close rocksmq server func (c *client) Close() { c.closeOnce.Do(func() {