From 0f409da136ea4b37af6afc9b27e043fa2af5512d Mon Sep 17 00:00:00 2001 From: yukun Date: Thu, 24 Jun 2021 16:44:07 +0800 Subject: [PATCH] Fix standalone seek hang after restart (#6073) Signed-off-by: fishpenguin --- internal/msgstream/mq_msgstream.go | 19 ++++++++++--------- internal/proxy/impl.go | 5 +++++ internal/util/mqclient/rmq_client.go | 2 +- internal/util/mqclient/rmq_consumer.go | 12 ++++++------ 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 930b62ced7..2556098257 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -392,15 +392,16 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { if err != nil { return err } - msg, ok := <-consumer.Chan() - if !ok { - return errors.New("consumer closed") - } - consumer.Ack(msg) - - if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) { - err = fmt.Errorf("seek msg not correct") - log.Error("msMsgStream seek", zap.Error(err)) + if _, ok := consumer.(*mqclient.RmqConsumer); !ok { + msg, ok := <-consumer.Chan() + if !ok { + return errors.New("consumer closed") + } + consumer.Ack(msg) + if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) { + err = fmt.Errorf("seek msg not correct") + log.Error("msMsgStream seek", zap.Error(err)) + } } return nil diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 580fe763c4..f48fef92a6 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1450,6 +1450,11 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* return nil, err } + if request.Expr == "" { + errMsg := "Query expression is empty!" + return nil, fmt.Errorf(errMsg) + } + parseRetrieveTask := func(exprString string) ([]int64, error) { expr, err := parseQueryExpr(schema, exprString) if err != nil { diff --git a/internal/util/mqclient/rmq_client.go b/internal/util/mqclient/rmq_client.go index a8a3253224..6dc0ac10ea 100644 --- a/internal/util/mqclient/rmq_client.go +++ b/internal/util/mqclient/rmq_client.go @@ -55,7 +55,7 @@ func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) { return nil, err } - rConsumer := &rmqConsumer{c: cli} + rConsumer := &RmqConsumer{c: cli} return rConsumer, nil } diff --git a/internal/util/mqclient/rmq_consumer.go b/internal/util/mqclient/rmq_consumer.go index c210dc98b4..b2d4d007ef 100644 --- a/internal/util/mqclient/rmq_consumer.go +++ b/internal/util/mqclient/rmq_consumer.go @@ -15,16 +15,16 @@ import ( "github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq" ) -type rmqConsumer struct { +type RmqConsumer struct { c rocksmq.Consumer msgChannel chan ConsumerMessage } -func (rc *rmqConsumer) Subscription() string { +func (rc *RmqConsumer) Subscription() string { return rc.c.Subscription() } -func (rc *rmqConsumer) Chan() <-chan ConsumerMessage { +func (rc *RmqConsumer) Chan() <-chan ConsumerMessage { if rc.msgChannel == nil { rc.msgChannel = make(chan ConsumerMessage) @@ -44,13 +44,13 @@ func (rc *rmqConsumer) Chan() <-chan ConsumerMessage { return rc.msgChannel } -func (rc *rmqConsumer) Seek(id MessageID) error { +func (rc *RmqConsumer) Seek(id MessageID) error { msgID := id.(*rmqID).messageID return rc.c.Seek(msgID) } -func (rc *rmqConsumer) Ack(message ConsumerMessage) { +func (rc *RmqConsumer) Ack(message ConsumerMessage) { } -func (rc *rmqConsumer) Close() { +func (rc *RmqConsumer) Close() { }