From 8cd1fccfb18bf097e56bfff7c6585c06965dc0d8 Mon Sep 17 00:00:00 2001 From: godchen Date: Mon, 29 Nov 2021 13:27:17 +0800 Subject: [PATCH] Return rocksdb reader timeout error (#12317) Signed-off-by: godchen --- .../util/rocksmq/server/rocksmq/rocksmq.go | 2 +- .../rocksmq/server/rocksmq/rocksmq_impl.go | 6 ++--- .../rocksmq/server/rocksmq/rocksmq_reader.go | 24 +++++++++---------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq.go b/internal/util/rocksmq/server/rocksmq/rocksmq.go index 89e7faa842..172e5e9554 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq.go @@ -52,7 +52,7 @@ type RocksMQ interface { CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) ReaderSeek(topicName string, readerName string, msgID UniqueID) - Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) + Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (*ConsumerMessage, error) HasNext(topicName string, readerName string, messageIDInclusive bool) bool CloseReader(topicName string, readerName string) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 574e7199f1..b9c2ee41b3 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -1040,13 +1040,13 @@ func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID Unique reader.Seek(msgID) } -func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) { +func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (*ConsumerMessage, error) { if rmq.isClosed() { - return ConsumerMessage{}, errors.New(RmqNotServingErrMsg) + return nil, errors.New(RmqNotServingErrMsg) } reader := rmq.getReader(topicName, readerName) if reader == nil { - return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName) + return nil, fmt.Errorf("reader of %s doesn't exist", topicName) } return reader.Next(ctx, messageIDInclusive) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go index db09eb6b47..f42bdbf5b4 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go @@ -43,26 +43,26 @@ func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet } } -func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (ConsumerMessage, error) { +func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (*ConsumerMessage, error) { ll, ok := topicMu.Load(rr.topic) if !ok { - return ConsumerMessage{}, fmt.Errorf("topic name = %s not exist", rr.topic) + return nil, fmt.Errorf("topic name = %s not exist", rr.topic) } lock, ok := ll.(*sync.Mutex) if !ok { - return ConsumerMessage{}, fmt.Errorf("get mutex failed, topic name = %s", rr.topic) + return nil, fmt.Errorf("get mutex failed, topic name = %s", rr.topic) } lock.Lock() defer lock.Unlock() fixChanName, err := fixChannelName(rr.topic) if err != nil { log.Debug("RocksMQ: fixChannelName " + rr.topic + " failed") - return ConsumerMessage{}, err + return nil, err } readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() - var msg ConsumerMessage + var msg *ConsumerMessage readOpts.SetPrefixSameAsStart(true) iter := rr.store.NewIterator(readOpts) defer iter.Close() @@ -71,11 +71,11 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con select { case <-ctx.Done(): log.Debug("Stop get next reader message!") - return ConsumerMessage{}, nil + return nil, ctx.Err() case _, ok := <-rr.readerMutex: if !ok { log.Warn("reader Mutex closed") - return ConsumerMessage{}, nil + return nil, fmt.Errorf("reader Mutex closed") } dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10)) if iter.Seek([]byte(dataKey)); !iter.Valid() { @@ -84,12 +84,12 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con if messageIDInclusive { val, err := rr.store.Get(readOpts, []byte(dataKey)) if err != nil { - return ConsumerMessage{}, err + return nil, err } if !val.Exists() { continue } - msg = ConsumerMessage{ + msg = &ConsumerMessage{ MsgID: rr.currentID, } origData := val.Data() @@ -112,7 +112,7 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con key.Free() } if err != nil { - return ConsumerMessage{}, err + return nil, err } rr.readerMutex <- struct{}{} } else { @@ -127,10 +127,10 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con key.Free() id, err := strconv.ParseInt(tmpKey[FixedChannelNameLen+1:], 10, 64) if err != nil { - return ConsumerMessage{}, err + return nil, err } val := iter.Value() - msg = ConsumerMessage{ + msg = &ConsumerMessage{ MsgID: id, } origData := val.Data()