mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 03:18:29 +08:00
Return rocksdb reader timeout error (#12317)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
parent
45fac75889
commit
8cd1fccfb1
@ -52,7 +52,7 @@ type RocksMQ interface {
|
||||
|
||||
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error)
|
||||
ReaderSeek(topicName string, readerName string, msgID UniqueID)
|
||||
Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error)
|
||||
Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (*ConsumerMessage, error)
|
||||
HasNext(topicName string, readerName string, messageIDInclusive bool) bool
|
||||
CloseReader(topicName string, readerName string)
|
||||
}
|
||||
|
@ -1040,13 +1040,13 @@ func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID Unique
|
||||
reader.Seek(msgID)
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) {
|
||||
func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (*ConsumerMessage, error) {
|
||||
if rmq.isClosed() {
|
||||
return ConsumerMessage{}, errors.New(RmqNotServingErrMsg)
|
||||
return nil, errors.New(RmqNotServingErrMsg)
|
||||
}
|
||||
reader := rmq.getReader(topicName, readerName)
|
||||
if reader == nil {
|
||||
return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName)
|
||||
return nil, fmt.Errorf("reader of %s doesn't exist", topicName)
|
||||
}
|
||||
return reader.Next(ctx, messageIDInclusive)
|
||||
}
|
||||
|
@ -43,26 +43,26 @@ func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet
|
||||
}
|
||||
}
|
||||
|
||||
func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (ConsumerMessage, error) {
|
||||
func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (*ConsumerMessage, error) {
|
||||
ll, ok := topicMu.Load(rr.topic)
|
||||
if !ok {
|
||||
return ConsumerMessage{}, fmt.Errorf("topic name = %s not exist", rr.topic)
|
||||
return nil, fmt.Errorf("topic name = %s not exist", rr.topic)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return ConsumerMessage{}, fmt.Errorf("get mutex failed, topic name = %s", rr.topic)
|
||||
return nil, fmt.Errorf("get mutex failed, topic name = %s", rr.topic)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
fixChanName, err := fixChannelName(rr.topic)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: fixChannelName " + rr.topic + " failed")
|
||||
return ConsumerMessage{}, err
|
||||
return nil, err
|
||||
}
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
|
||||
var msg ConsumerMessage
|
||||
var msg *ConsumerMessage
|
||||
readOpts.SetPrefixSameAsStart(true)
|
||||
iter := rr.store.NewIterator(readOpts)
|
||||
defer iter.Close()
|
||||
@ -71,11 +71,11 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Debug("Stop get next reader message!")
|
||||
return ConsumerMessage{}, nil
|
||||
return nil, ctx.Err()
|
||||
case _, ok := <-rr.readerMutex:
|
||||
if !ok {
|
||||
log.Warn("reader Mutex closed")
|
||||
return ConsumerMessage{}, nil
|
||||
return nil, fmt.Errorf("reader Mutex closed")
|
||||
}
|
||||
dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10))
|
||||
if iter.Seek([]byte(dataKey)); !iter.Valid() {
|
||||
@ -84,12 +84,12 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con
|
||||
if messageIDInclusive {
|
||||
val, err := rr.store.Get(readOpts, []byte(dataKey))
|
||||
if err != nil {
|
||||
return ConsumerMessage{}, err
|
||||
return nil, err
|
||||
}
|
||||
if !val.Exists() {
|
||||
continue
|
||||
}
|
||||
msg = ConsumerMessage{
|
||||
msg = &ConsumerMessage{
|
||||
MsgID: rr.currentID,
|
||||
}
|
||||
origData := val.Data()
|
||||
@ -112,7 +112,7 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con
|
||||
key.Free()
|
||||
}
|
||||
if err != nil {
|
||||
return ConsumerMessage{}, err
|
||||
return nil, err
|
||||
}
|
||||
rr.readerMutex <- struct{}{}
|
||||
} else {
|
||||
@ -127,10 +127,10 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con
|
||||
key.Free()
|
||||
id, err := strconv.ParseInt(tmpKey[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return ConsumerMessage{}, err
|
||||
return nil, err
|
||||
}
|
||||
val := iter.Value()
|
||||
msg = ConsumerMessage{
|
||||
msg = &ConsumerMessage{
|
||||
MsgID: id,
|
||||
}
|
||||
origData := val.Data()
|
||||
|
Loading…
Reference in New Issue
Block a user