mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Fix standalone seek hang after restart (#6073)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
parent
710e2ca185
commit
0f409da136
@ -392,15 +392,16 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msg, ok := <-consumer.Chan()
|
||||
if !ok {
|
||||
return errors.New("consumer closed")
|
||||
}
|
||||
consumer.Ack(msg)
|
||||
|
||||
if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) {
|
||||
err = fmt.Errorf("seek msg not correct")
|
||||
log.Error("msMsgStream seek", zap.Error(err))
|
||||
if _, ok := consumer.(*mqclient.RmqConsumer); !ok {
|
||||
msg, ok := <-consumer.Chan()
|
||||
if !ok {
|
||||
return errors.New("consumer closed")
|
||||
}
|
||||
consumer.Ack(msg)
|
||||
if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) {
|
||||
err = fmt.Errorf("seek msg not correct")
|
||||
log.Error("msMsgStream seek", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -1450,6 +1450,11 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if request.Expr == "" {
|
||||
errMsg := "Query expression is empty!"
|
||||
return nil, fmt.Errorf(errMsg)
|
||||
}
|
||||
|
||||
parseRetrieveTask := func(exprString string) ([]int64, error) {
|
||||
expr, err := parseQueryExpr(schema, exprString)
|
||||
if err != nil {
|
||||
|
@ -55,7 +55,7 @@ func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rConsumer := &rmqConsumer{c: cli}
|
||||
rConsumer := &RmqConsumer{c: cli}
|
||||
|
||||
return rConsumer, nil
|
||||
}
|
||||
|
@ -15,16 +15,16 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
|
||||
)
|
||||
|
||||
type rmqConsumer struct {
|
||||
type RmqConsumer struct {
|
||||
c rocksmq.Consumer
|
||||
msgChannel chan ConsumerMessage
|
||||
}
|
||||
|
||||
func (rc *rmqConsumer) Subscription() string {
|
||||
func (rc *RmqConsumer) Subscription() string {
|
||||
return rc.c.Subscription()
|
||||
}
|
||||
|
||||
func (rc *rmqConsumer) Chan() <-chan ConsumerMessage {
|
||||
func (rc *RmqConsumer) Chan() <-chan ConsumerMessage {
|
||||
|
||||
if rc.msgChannel == nil {
|
||||
rc.msgChannel = make(chan ConsumerMessage)
|
||||
@ -44,13 +44,13 @@ func (rc *rmqConsumer) Chan() <-chan ConsumerMessage {
|
||||
return rc.msgChannel
|
||||
}
|
||||
|
||||
func (rc *rmqConsumer) Seek(id MessageID) error {
|
||||
func (rc *RmqConsumer) Seek(id MessageID) error {
|
||||
msgID := id.(*rmqID).messageID
|
||||
return rc.c.Seek(msgID)
|
||||
}
|
||||
|
||||
func (rc *rmqConsumer) Ack(message ConsumerMessage) {
|
||||
func (rc *RmqConsumer) Ack(message ConsumerMessage) {
|
||||
}
|
||||
|
||||
func (rc *rmqConsumer) Close() {
|
||||
func (rc *RmqConsumer) Close() {
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user