Remove reader API from Mqstream and replace it with consumer API (#15971)

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2022-03-15 14:45:22 +08:00 committed by GitHub
parent 1ca71a2a65
commit 97b1ed7bca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 705 additions and 712 deletions

2
go.mod
View File

@ -54,7 +54,7 @@ require (
)
replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.5
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.1-0.20220310065106-1ef6d309ead7
github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt v3.2.2+incompatible // Fix security alert for jwt-go 3.2.0
github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
google.golang.org/grpc => google.golang.org/grpc v1.38.0

8
go.sum
View File

@ -380,12 +380,8 @@ github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/pulsar-client-go v0.6.1 h1:7uB71ZDP9aQvB9eQ1WRGh8IA1LMUmVYmRGtxxJDrYvU=
github.com/milvus-io/pulsar-client-go v0.6.1/go.mod h1:A1P5VjjljsFKAD13w7/jmU3Dly2gcRvcobiULqQXhz4=
github.com/milvus-io/pulsar-client-go v0.6.4 h1:RAPdcbNU49A7A8q5GciZqj+8EwQ3aMZcghU3/V2vW1M=
github.com/milvus-io/pulsar-client-go v0.6.4/go.mod h1:A1P5VjjljsFKAD13w7/jmU3Dly2gcRvcobiULqQXhz4=
github.com/milvus-io/pulsar-client-go v0.6.5 h1:oTKYQObVpW4vXgK6lwYAXHmUXopSohMtH+vgsEb7QBY=
github.com/milvus-io/pulsar-client-go v0.6.5/go.mod h1:A1P5VjjljsFKAD13w7/jmU3Dly2gcRvcobiULqQXhz4=
github.com/milvus-io/pulsar-client-go v0.6.1-0.20220310065106-1ef6d309ead7 h1:oe+20W/i8wBJ05HznX+HKbk1xDk0+vuVNfNh90l6weo=
github.com/milvus-io/pulsar-client-go v0.6.1-0.20220310065106-1ef6d309ead7/go.mod h1:A1P5VjjljsFKAD13w7/jmU3Dly2gcRvcobiULqQXhz4=
github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94=

View File

@ -67,7 +67,6 @@ func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
}
func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
func (mtm *mockTtMsgStream) AsReader(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
}
@ -94,16 +93,10 @@ func (mtm *mockTtMsgStream) BroadcastMark(*msgstream.MsgPack) (map[string][]msgs
func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error {
return nil
}
func (mtm *mockTtMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error {
return nil
}
func (mtm *mockTtMsgStream) Next(ctx context.Context, channelName string) (msgstream.TsMsg, error) {
func (mtm *mockTtMsgStream) GetLatestMsgID(channel string) (msgstream.MessageID, error) {
return nil, nil
}
func (mtm *mockTtMsgStream) HasNext(channelName string) bool {
return true
}
func TestNewDmInputNode(t *testing.T) {
ctx := context.Background()

View File

@ -76,4 +76,7 @@ type Consumer interface {
// Close consumer
Close()
// GetLatestMsgID get the latest msgID
GetLatestMsgID() (int64, error)
}

View File

@ -133,3 +133,12 @@ func (c *consumer) Close() {
log.Debug("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err))
}
}
func (c *consumer) GetLatestMsgID() (int64, error) {
msgID, err := c.client.server.GetLatestMsg(c.topic)
if err != nil {
return msgID, err
}
return msgID, nil
}

View File

@ -42,6 +42,7 @@ type RocksMQ interface {
Close()
RegisterConsumer(consumer *Consumer) error
GetLatestMsg(topicName string) (int64, error)
Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error)
Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)

View File

@ -458,6 +458,18 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error {
return nil
}
func (rmq *rocksmq) GetLatestMsg(topicName string) (int64, error) {
if rmq.isClosed() {
return DefaultMessageID, errors.New(RmqNotServingErrMsg)
}
msgID, err := rmq.getLatestMsg(topicName)
if err != nil {
return DefaultMessageID, err
}
return msgID, nil
}
// DestroyConsumerGroup removes a consumer group from rocksdb_kv
func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
if rmq.isClosed() {
@ -809,6 +821,19 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
}
msgID, err := rmq.getLatestMsg(topicName)
if err != nil {
return err
}
// current msgID should not be included
rmq.moveConsumePos(topicName, groupName, msgID+1)
log.Debug("successfully seek to latest", zap.String("topic", topicName),
zap.String("group", groupName), zap.Uint64("latest", uint64(msgID+1)))
return nil
}
func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) {
readOpts := gorocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
iter := rocksdbkv.NewRocksIterator(rmq.store, readOpts)
@ -820,12 +845,12 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
// if iterate fail
if err := iter.Err(); err != nil {
return err
return DefaultMessageID, err
}
// should find the last key we written into, start with fixTopicName/
// if not find, start from 0
if !iter.Valid() {
return nil
return DefaultMessageID, nil
}
iKey := iter.Key()
@ -833,20 +858,18 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
if iKey != nil {
iKey.Free()
}
// if find message is not belong to current channel, start from 0
if !strings.Contains(seekMsgID, prefix) {
return nil
return DefaultMessageID, nil
}
msgID, err := strconv.ParseInt(seekMsgID[len(topicName)+1:], 10, 64)
if err != nil {
return err
return DefaultMessageID, err
}
// current msgID should not be included
rmq.moveConsumePos(topicName, groupName, msgID+1)
log.Debug("successfully seek to latest", zap.String("topic", topicName),
zap.String("group", groupName), zap.Uint64("latest", uint64(msgID+1)))
return nil
return msgID, nil
}
// Notify sends a mutex in MsgMutex channel to tell consumers to consume

View File

@ -848,6 +848,80 @@ func TestReader_CornerCase(t *testing.T) {
assert.Equal(t, string(msg.Payload), "extra_message")
}
func TestRocksmq_GetLatestMsg(t *testing.T) {
ep := etcdEndpoints()
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
assert.Nil(t, err)
defer etcdCli.Close()
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
assert.Nil(t, err)
defer etcdKV.Close()
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_data"
defer os.RemoveAll(name)
kvName := name + "_meta_kv"
_ = os.RemoveAll(kvName)
defer os.RemoveAll(kvName)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
channelName := newChanName()
err = rmq.CreateTopic(channelName)
assert.Nil(t, err)
// Consume loopNum message once
groupName := "last_msg_test"
_ = rmq.DestroyConsumerGroup(channelName, groupName)
err = rmq.CreateConsumerGroup(channelName, groupName)
assert.Nil(t, err)
msgID, err := rmq.GetLatestMsg(channelName)
assert.Equal(t, msgID, int64(DefaultMessageID))
assert.Nil(t, err)
loopNum := 10
pMsgs1 := make([]ProducerMessage, loopNum)
pMsgs2 := make([]ProducerMessage, loopNum)
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
pMsgs1[i] = pMsg
msg = "2message_" + strconv.Itoa(i)
pMsg = ProducerMessage{Payload: []byte(msg)}
pMsgs2[i] = pMsg
}
ids, err := rmq.Produce(channelName, pMsgs1)
assert.Nil(t, err)
assert.Equal(t, len(ids), loopNum)
// test latest msg when one topic is created
msgID, err = rmq.GetLatestMsg(channelName)
assert.Nil(t, err)
assert.Equal(t, msgID, ids[loopNum-1])
// test latest msg when two topics are created
channelName2 := newChanName()
err = rmq.CreateTopic(channelName2)
assert.Nil(t, err)
ids, err = rmq.Produce(channelName2, pMsgs2)
assert.Nil(t, err)
msgID, err = rmq.GetLatestMsg(channelName2)
assert.Nil(t, err)
assert.Equal(t, msgID, ids[loopNum-1])
// test close rmq
rmq.DestroyTopic(channelName)
rmq.Close()
msgID, err = rmq.GetLatestMsg(channelName)
assert.Equal(t, msgID, int64(DefaultMessageID))
assert.NotNil(t, err)
}
func TestRocksmq_Close(t *testing.T) {
ep := etcdEndpoints()
etcdCli, err := etcd.GetRemoteEtcdClient(ep)

View File

@ -45,17 +45,16 @@ type mqMsgStream struct {
producerChannels []string
consumers map[string]mqwrapper.Consumer
consumerChannels []string
readers map[string]mqwrapper.Reader
readerChannels []string
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
wait *sync.WaitGroup
streamCancel func()
bufSize int64
producerLock *sync.Mutex
consumerLock *sync.Mutex
readerLock *sync.Mutex
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
wait *sync.WaitGroup
streamCancel func()
bufSize int64
producerLock *sync.Mutex
consumerLock *sync.Mutex
readerLock *sync.Mutex
}
// NewMqMsgStream is used to generate a new mqMsgStream object
@ -68,10 +67,8 @@ func NewMqMsgStream(ctx context.Context,
streamCtx, streamCancel := context.WithCancel(ctx)
producers := make(map[string]mqwrapper.Producer)
consumers := make(map[string]mqwrapper.Consumer)
readers := make(map[string]mqwrapper.Reader)
producerChannels := make([]string, 0)
consumerChannels := make([]string, 0)
readerChannels := make([]string, 0)
receiveBuf := make(chan *MsgPack, receiveBufSize)
stream := &mqMsgStream{
@ -81,16 +78,15 @@ func NewMqMsgStream(ctx context.Context,
producerChannels: producerChannels,
consumers: consumers,
consumerChannels: consumerChannels,
readers: readers,
readerChannels: readerChannels,
unmarshal: unmarshal,
bufSize: bufSize,
receiveBuf: receiveBuf,
streamCancel: streamCancel,
producerLock: &sync.Mutex{},
consumerLock: &sync.Mutex{},
readerLock: &sync.Mutex{},
wait: &sync.WaitGroup{},
unmarshal: unmarshal,
bufSize: bufSize,
receiveBuf: receiveBuf,
streamCancel: streamCancel,
producerLock: &sync.Mutex{},
consumerLock: &sync.Mutex{},
readerLock: &sync.Mutex{},
wait: &sync.WaitGroup{},
}
return stream, nil
@ -131,6 +127,15 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
ms.AsConsumerWithPosition(channels, subName, mqwrapper.SubscriptionPositionEarliest)
}
func (ms *mqMsgStream) GetLatestMsgID(channel string) (MessageID, error) {
lastMsg, err := ms.consumers[channel].GetLatestMsgID()
if err != nil {
errMsg := "Failed to get latest MsgID from channel: " + channel + ", error = " + err.Error()
return nil, errors.New(errMsg)
}
return lastMsg, nil
}
// AsConsumerWithPosition Create consumer to receive message from channels, with initial position
// if initial position is set to latest, last message in the channel is exclusive
func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
@ -142,7 +147,6 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
pc, err := ms.client.Subscribe(mqwrapper.ConsumerOptions{
Topic: channel,
SubscriptionName: subName,
Type: mqwrapper.Exclusive,
SubscriptionInitialPosition: position,
BufSize: ms.bufSize,
})
@ -167,40 +171,6 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
}
}
// AsReader create producer to send message to channels
func (ms *mqMsgStream) AsReader(channels []string, subName string) {
for _, channel := range channels {
if len(channel) == 0 {
log.Error("MsgStream asProducer's channel is an empty string")
break
}
fn := func() error {
r, err := ms.client.CreateReader(mqwrapper.ReaderOptions{
Topic: channel,
StartMessageID: ms.client.EarliestMessageID(),
SubscriptionRolePrefix: subName,
})
if err != nil {
return err
}
if r == nil {
return errors.New("reader is nil")
}
ms.readerLock.Lock()
defer ms.readerLock.Unlock()
ms.readers[channel] = r
ms.readerChannels = append(ms.readerChannels, channel)
return nil
}
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
if err != nil {
errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
panic(errMsg)
}
}
}
func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
ms.repackFunc = repackFunc
}
@ -227,11 +197,6 @@ func (ms *mqMsgStream) Close() {
}
}
for _, reader := range ms.readers {
if reader != nil {
reader.Close()
}
}
ms.client.Close()
}
@ -551,60 +516,6 @@ func (ms *mqMsgStream) Chan() <-chan *MsgPack {
return ms.receiveBuf
}
func (ms *mqMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error {
for _, mp := range msgPositions {
reader, ok := ms.readers[mp.ChannelName]
if !ok {
return fmt.Errorf("channel %s not subscribed", mp.ChannelName)
}
messageID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
return err
}
log.Debug("MsgStream reader begin to seek", zap.Any("MessageID", mp.MsgID))
err = reader.Seek(messageID)
if err != nil {
log.Debug("Failed to seek", zap.Error(err))
return err
}
}
return nil
}
func (ms *mqMsgStream) Next(ctx context.Context, channelName string) (TsMsg, error) {
reader, ok := ms.readers[channelName]
if !ok {
return nil, fmt.Errorf("reader for channel %s is not exist", channelName)
}
msg, err := reader.Next(ctx)
if err != nil {
return nil, err
}
if msg.Payload() == nil {
log.Warn("mqMsgStream reader Next get msg whose payload is nil")
return nil, nil
}
tsMsg, err := ms.getTsMsgFromConsumerMsg(msg)
if err != nil {
log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err))
return nil, errors.New("Failed to getTsMsgFromConsumerMsg")
}
pos := tsMsg.Position()
tsMsg.SetPosition(&MsgPosition{
ChannelName: pos.ChannelName,
MsgID: pos.MsgID,
Timestamp: tsMsg.BeginTs(),
})
return tsMsg, nil
}
func (ms *mqMsgStream) HasNext(channelName string) bool {
reader, ok := ms.readers[channelName]
if !ok {
return false
}
return reader.HasNext()
}
// Seek reset the subscription associated with this consumer to a specific position, the seek position is exclusive
// User has to ensure mq_msgstream is not closed before seek, and the seek position is already written.
func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
@ -617,7 +528,8 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
if err != nil {
return err
}
log.Debug("MsgStream begin to seek", zap.Any("MessageID", mp.MsgID))
log.Debug("MsgStream begin to seek start msg: ", zap.Any("MessageID", messageID))
err = consumer.Seek(messageID, false)
if err != nil {
log.Debug("Failed to seek", zap.Error(err))
@ -704,7 +616,6 @@ func (ms *MqTtMsgStream) AsConsumerWithPosition(channels []string, subName strin
pc, err := ms.client.Subscribe(mqwrapper.ConsumerOptions{
Topic: channel,
SubscriptionName: subName,
Type: mqwrapper.Exclusive,
SubscriptionInitialPosition: position,
BufSize: ms.bufSize,
})
@ -752,11 +663,6 @@ func (ms *MqTtMsgStream) Close() {
consumer.Close()
}
}
for _, reader := range ms.readers {
if reader != nil {
reader.Close()
}
}
ms.client.Close()
}

View File

@ -770,6 +770,83 @@ func TestStream_PulsarTtMsgStream_NoSeek(t *testing.T) {
assert.Equal(t, o3.BeginTs, p3.BeginTs)
}
func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
consumerChannels := []string{c}
consumerSubName := funcutil.RandomString(8)
msgPack := &MsgPack{}
ctx := context.Background()
inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels)
defer inputStream.Close()
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
for i := 0; i < 10; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
// produce test data
err := inputStream.Produce(msgPack)
assert.Nil(t, err)
// pick a seekPosition
var seekPosition *internalpb.MsgPosition
for i := 0; i < 10; i++ {
result := consumer(ctx, outputStream)
assert.Equal(t, result.Msgs[0].ID(), int64(i))
if i == 5 {
seekPosition = result.EndPositions[0]
}
}
outputStream.Close()
// create a consumer can consume data from seek position to last msg
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName)
lastMsgID, err := outputStream2.GetLatestMsgID(c)
defer outputStream2.Close()
assert.Nil(t, err)
err = outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})
assert.Nil(t, err)
outputStream2.Start()
cnt := 0
var value int64 = 6
hasMore := true
for hasMore {
select {
case <-ctx.Done():
hasMore = false
case msgPack, ok := <-outputStream2.Chan():
if !ok {
assert.Fail(t, "Should not reach here")
}
assert.Equal(t, 1, len(msgPack.Msgs))
for _, tsMsg := range msgPack.Msgs {
assert.Equal(t, value, tsMsg.ID())
value++
cnt++
ret, err := lastMsgID.LessOrEqualThan(tsMsg.Position().MsgID)
assert.Nil(t, err)
if ret {
hasMore = false
break
}
}
}
}
assert.Equal(t, 4, cnt)
}
func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1 := funcutil.RandomString(8)
@ -1311,70 +1388,6 @@ func TestStream_MqMsgStream_SeekLatest(t *testing.T) {
outputStream2.Close()
}
func TestStream_MqMsgStream_Reader(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
readerChannels := []string{c}
msgPack := &MsgPack{}
inputStream := getPulsarInputStream(context.Background(), pulsarAddress, producerChannels)
defer inputStream.Close()
n := 10
p := 5
for i := 0; i < n; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err := inputStream.Produce(msgPack)
assert.Nil(t, err)
readStream := getPulsarReader(pulsarAddress, readerChannels)
defer readStream.Close()
var seekPosition *internalpb.MsgPosition
for i := 0; i < n; i++ {
hasNext := readStream.HasNext(c)
assert.True(t, hasNext)
result, err := readStream.Next(ctx, c)
assert.Nil(t, err)
assert.Equal(t, result.ID(), int64(i))
if i == p {
seekPosition = result.Position()
}
}
hasNext := readStream.HasNext(c)
assert.False(t, hasNext)
timeoutCtx1, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
result, err := readStream.Next(timeoutCtx1, c)
assert.NotNil(t, err)
assert.Nil(t, result)
readStream2 := getPulsarReader(pulsarAddress, readerChannels)
defer readStream2.Close()
readStream2.SeekReaders([]*internalpb.MsgPosition{seekPosition})
for i := p; i < 10; i++ {
hasNext := readStream2.HasNext(c)
assert.True(t, hasNext)
result, err := readStream2.Next(ctx, c)
assert.Nil(t, err)
assert.Equal(t, result.ID(), int64(i))
}
hasNext = readStream2.HasNext(c)
assert.False(t, hasNext)
timeoutCtx2, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
result2, err := readStream2.Next(timeoutCtx2, c)
assert.NotNil(t, err)
assert.Nil(t, result2)
}
/****************************************Rmq test******************************************/
func initRmq(name string) *etcdkv.EtcdKV {
@ -1990,14 +2003,6 @@ func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerCh
return outputStream
}
func getPulsarReader(pulsarAddress string, consumerChannels []string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsReader(consumerChannels, "pulsar-reader-prefix-")
return outputStream
}
func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})

View File

@ -18,9 +18,6 @@ package mqwrapper
// Client is the interface that provides operations of message queues
type Client interface {
// CreateReader creates a reader instance
CreateReader(options ReaderOptions) (Reader, error)
// CreateProducer creates a producer instance
CreateProducer(options ProducerOptions) (Producer, error)

View File

@ -27,27 +27,7 @@ const (
SubscriptionPositionEarliest
)
// SubscriptionType is the type of subsription position
type SubscriptionType int
const (
// Exclusive there can be only 1 consumer on the same topic with the same subscription name
Exclusive SubscriptionType = iota
// Shared subscription mode, multiple consumer will be able to use the same subscription name
// and the messages will be dispatched according to
// a round-robin rotation between the connected consumers
Shared
// Failover subscription mode, multiple consumer will be able to use the same subscription name
// but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover
// KeyShared subscription mode, multiple consumer will be able to use the same
// subscription and all messages with the same key will be dispatched to only one consumer
KeyShared
)
const DefaultPartitionIdx = 0
// UniqueID is the type of message id
type UniqueID = int64
@ -66,10 +46,6 @@ type ConsumerOptions struct {
// Set receive channel size
BufSize int64
// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
}
// Consumer is the interface that provides operations of a consumer
@ -88,4 +64,6 @@ type Consumer interface {
// Close consumer
Close()
GetLatestMsgID() (MessageID, error)
}

View File

@ -21,15 +21,7 @@ type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored somewhere else
Serialize() []byte
// Get the message ledgerID
LedgerID() int64
AtEarliestPosition() bool
// Get the message entryID
EntryID() int64
// Get the message batchIdx
BatchIdx() int32
// Get the message partitionIdx
PartitionIdx() int32
LessOrEqualThan(msgID []byte) (bool, error)
}

View File

@ -82,32 +82,13 @@ func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwra
return producer, nil
}
// CreateReader creates a pulsar reader instance
func (pc *pulsarClient) CreateReader(options mqwrapper.ReaderOptions) (mqwrapper.Reader, error) {
opts := pulsar.ReaderOptions{
Topic: options.Topic,
StartMessageID: options.StartMessageID.(*pulsarID).messageID,
StartMessageIDInclusive: options.StartMessageIDInclusive,
SubscriptionRolePrefix: options.SubscriptionRolePrefix,
}
pr, err := pc.client.CreateReader(opts)
if err != nil {
return nil, err
}
if pr == nil {
return nil, errors.New("pulsar is not ready, producer is nil")
}
reader := &pulsarReader{r: pr}
return reader, nil
}
// Subscribe creates a pulsar consumer instance and subscribe a topic
func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) {
receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize)
consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{
Topic: options.Topic,
SubscriptionName: options.SubscriptionName,
Type: pulsar.SubscriptionType(options.Type),
Type: pulsar.Exclusive,
SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(options.SubscriptionInitialPosition),
MessageChannel: receiveChannel,
})

View File

@ -99,7 +99,6 @@ func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
Topic: topic,
SubscriptionName: subName,
BufSize: 1024,
Type: mqwrapper.KeyShared,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, err)
@ -137,7 +136,6 @@ func Consume2(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
Topic: topic,
SubscriptionName: subName,
BufSize: 1024,
Type: mqwrapper.KeyShared,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, err)
@ -172,7 +170,6 @@ func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
Topic: topic,
SubscriptionName: subName,
BufSize: 1024,
Type: mqwrapper.KeyShared,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, err)
@ -276,7 +273,7 @@ func Consume21(ctx context.Context, t *testing.T, pc *pulsarClient, topic string
//log.Debug("total", zap.Int("val", *total))
}
}
c <- msg.ID()
c <- &pulsarID{messageID: msg.ID()}
log.Info("Consume1 randomly RECV", zap.Any("number", cnt))
log.Info("Consume1 done")
@ -294,7 +291,7 @@ func Consume22(ctx context.Context, t *testing.T, pc *pulsarClient, topic string
assert.NotNil(t, consumer)
defer consumer.Close()
err = consumer.Seek(msgID)
err = consumer.Seek(msgID.(*pulsarID).messageID)
assert.Nil(t, err)
// skip the last received message
@ -546,7 +543,7 @@ func TestPulsarClient_StringToMsgID(t *testing.T) {
client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
defer client.Close()
mid := client.EarliestMessageID()
mid := pulsar.EarliestMessageID()
str := msgIDToString(mid)
res, err := client.StringToMsgID(str)

View File

@ -32,8 +32,7 @@ import (
// Consumer consumes from pulsar
type Consumer struct {
c pulsar.Consumer
pulsar.Reader
c pulsar.Consumer
msgChannel chan mqwrapper.Message
hasSeek bool
AtLatest bool
@ -66,6 +65,7 @@ func (pc *Consumer) Chan() <-chan mqwrapper.Message {
patchEarliestMessageID(&mid)
pc.c.Seek(mid)
}
go func() {
for { //nolint:gosimple
select {
@ -126,6 +126,11 @@ func (pc *Consumer) Close() {
})
}
func (pc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
msgID, err := pc.c.GetLastMessageID(pc.c.Name(), mqwrapper.DefaultPartitionIdx)
return &pulsarID{messageID: msgID}, err
}
// patchEarliestMessageID unsafe patch logic to change messageID partitionIdx to 0
// ONLY used in Chan() function
// DON'T use elsewhere

View File

@ -36,7 +36,6 @@ func TestPulsarConsumer_Subscription(t *testing.T) {
consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{
Topic: "Topic",
SubscriptionName: "SubName",
Type: pulsar.SubscriptionType(mqwrapper.Exclusive),
SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(mqwrapper.SubscriptionPositionEarliest),
MessageChannel: receiveChannel,
})
@ -67,7 +66,6 @@ func TestPulsarConsumer_Close(t *testing.T) {
consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{
Topic: "Topic-1",
SubscriptionName: "SubName-1",
Type: pulsar.SubscriptionType(mqwrapper.Exclusive),
SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(mqwrapper.SubscriptionPositionEarliest),
MessageChannel: receiveChannel,
})

View File

@ -28,28 +28,36 @@ type pulsarID struct {
messageID pulsar.MessageID
}
// Check if pulsarID implements pulsar.MessageID and MessageID interface
var _ pulsar.MessageID = &pulsarID{}
// Check if pulsarID implements and MessageID interface
var _ mqwrapper.MessageID = &pulsarID{}
func (pid *pulsarID) Serialize() []byte {
return pid.messageID.Serialize()
}
func (pid *pulsarID) LedgerID() int64 {
return pid.messageID.LedgerID()
func (pid *pulsarID) AtEarliestPosition() bool {
if pid.messageID.PartitionIdx() <= 0 &&
pid.messageID.LedgerID() <= 0 &&
pid.messageID.EntryID() <= 0 &&
pid.messageID.BatchIdx() <= 0 {
return true
}
return false
}
func (pid *pulsarID) EntryID() int64 {
return pid.messageID.EntryID()
}
func (pid *pulsarID) LessOrEqualThan(msgID []byte) (bool, error) {
pMsgID, err := pulsar.DeserializeMessageID(msgID)
if err != nil {
return false, err
}
func (pid *pulsarID) BatchIdx() int32 {
return pid.messageID.BatchIdx()
}
if pid.messageID.LedgerID() <= pMsgID.LedgerID() &&
pid.messageID.EntryID() <= pMsgID.EntryID() &&
pid.messageID.BatchIdx() <= pMsgID.BatchIdx() {
return true, nil
}
func (pid *pulsarID) PartitionIdx() int32 {
return pid.messageID.PartitionIdx()
return false, nil
}
// SerializePulsarMsgID returns the serialized message ID

View File

@ -32,11 +32,44 @@ func TestPulsarID_Serialize(t *testing.T) {
binary := pid.Serialize()
assert.NotNil(t, binary)
assert.NotZero(t, len(binary))
}
pid.LedgerID()
pid.EntryID()
pid.BatchIdx()
pid.PartitionIdx()
func Test_AtEarliestPosition(t *testing.T) {
mid := pulsar.EarliestMessageID()
pid := &pulsarID{
messageID: mid,
}
assert.True(t, pid.AtEarliestPosition())
mid = pulsar.LatestMessageID()
pid = &pulsarID{
messageID: mid,
}
assert.False(t, pid.AtEarliestPosition())
}
func TestLessOrEqualThan(t *testing.T) {
msg1 := pulsar.EarliestMessageID()
pid1 := &pulsarID{
messageID: msg1,
}
msg2 := pulsar.LatestMessageID()
pid2 := &pulsarID{
messageID: msg2,
}
ret, err := pid1.LessOrEqualThan(pid2.Serialize())
assert.Nil(t, err)
assert.True(t, ret)
ret, err = pid2.LessOrEqualThan(pid1.Serialize())
assert.Nil(t, err)
assert.False(t, ret)
ret, err = pid2.LessOrEqualThan([]byte{1})
assert.NotNil(t, err)
assert.False(t, ret)
}
func Test_SerializePulsarMsgID(t *testing.T) {

View File

@ -1,63 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pulsar
import (
"context"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/apache/pulsar-client-go/pulsar"
)
// pulsarReader contains a pulsar reader
type pulsarReader struct {
r pulsar.Reader
}
// Topic returns the topic of pulsar reader
func (pr *pulsarReader) Topic() string {
return pr.r.Topic()
}
// Next read the next message in the topic, blocking until a message is available
func (pr *pulsarReader) Next(ctx context.Context) (mqwrapper.Message, error) {
pm, err := pr.r.Next(ctx)
if err != nil {
return nil, err
}
return &pulsarMessage{msg: pm}, nil
}
// HasNext check if there is any message available to read from the current position
func (pr *pulsarReader) HasNext() bool {
return pr.r.HasNext()
}
func (pr *pulsarReader) Close() {
pr.r.Close()
}
func (pr *pulsarReader) Seek(id mqwrapper.MessageID) error {
messageID := id.(*pulsarID).messageID
err := pr.r.Seek(messageID)
if err != nil {
return err
}
return nil
}

View File

@ -1,112 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pulsar
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
)
func TestPulsarReader(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
defer pc.Close()
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-%d", rand.Int())
producer, err := pc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic})
assert.Nil(t, err)
assert.NotNil(t, producer)
defer producer.Close()
const N = 10
var seekID mqwrapper.MessageID
for i := 0; i < N; i++ {
msg := &mqwrapper.ProducerMessage{
Payload: []byte(fmt.Sprintf("helloworld-%d", i)),
Properties: map[string]string{},
}
id, err := producer.Send(ctx, msg)
assert.Nil(t, err)
if i == 4 {
seekID = &pulsarID{messageID: id.(*pulsarID).messageID}
}
}
reader, err := pc.CreateReader(mqwrapper.ReaderOptions{
Topic: topic,
StartMessageID: pc.EarliestMessageID(),
})
assert.Nil(t, err)
assert.NotNil(t, reader)
defer reader.Close()
str := reader.Topic()
assert.NotNil(t, str)
for i := 0; i < N; i++ {
revMsg, err := reader.Next(ctx)
assert.Nil(t, err)
assert.NotNil(t, revMsg)
}
readerOfStartMessageID, err := pc.CreateReader(mqwrapper.ReaderOptions{
Topic: topic,
StartMessageID: seekID,
StartMessageIDInclusive: true,
})
assert.Nil(t, err)
defer readerOfStartMessageID.Close()
for i := 4; i < N; i++ {
assert.True(t, readerOfStartMessageID.HasNext())
revMsg, err := readerOfStartMessageID.Next(ctx)
assert.Nil(t, err)
assert.NotNil(t, revMsg)
}
readerOfSeek, err := pc.CreateReader(mqwrapper.ReaderOptions{
Topic: topic,
StartMessageID: pc.EarliestMessageID(),
})
assert.Nil(t, err)
defer readerOfSeek.Close()
err = reader.Seek(seekID)
assert.Nil(t, err)
for i := 4; i < N; i++ {
assert.True(t, readerOfSeek.HasNext())
revMsg, err := readerOfSeek.Next(ctx)
assert.Nil(t, err)
assert.NotNil(t, revMsg)
}
}

View File

@ -1,72 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mqwrapper
import (
"context"
)
// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
Reader
Message
}
// ReaderOptions abstraction Reader options to use.
type ReaderOptions struct {
// Topic specify the topic this consumer will subscribe on.
// This argument is required when constructing the reader.
Topic string
// Name set the reader name.
Name string
// Attach a set of application defined properties to the reader
// This properties will be visible in the topic stats
Properties map[string]string
// StartMessageID initial reader positioning is done by specifying a message id. The options are:
// * `MessageID` : Start reading from a particular message id, the reader will position itself on that
// specific position. The first message to be read will be the message next to the specified
// messageID
StartMessageID MessageID
// If true, the reader will start at the `StartMessageID`, included.
// Default is `false` and the reader will start from the "next" message
StartMessageIDInclusive bool
// SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
}
// Reader can be used to scan through all the messages currently available in a topic.
type Reader interface {
// Topic from which this reader is reading from
Topic() string
// Next read the next message in the topic, blocking until a message is available
Next(context.Context) (Message, error)
// HasNext check if there is any message available to read from the current position
HasNext() bool
// Close the reader and stop the broker to push more messages
Close()
// Reset the subscription associated with this reader to a specific message id.
Seek(MessageID) error
}

View File

@ -17,7 +17,6 @@
package rmq
import (
"errors"
"strconv"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
@ -60,25 +59,6 @@ func (rc *rmqClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrappe
return &rp, nil
}
// CreateReader creates a rocksmq reader from reader options
func (rc *rmqClient) CreateReader(options mqwrapper.ReaderOptions) (mqwrapper.Reader, error) {
opts := client.ReaderOptions{
Topic: options.Topic,
StartMessageID: options.StartMessageID.(*rmqID).messageID,
StartMessageIDInclusive: options.StartMessageIDInclusive,
SubscriptionRolePrefix: options.SubscriptionRolePrefix,
}
pr, err := rc.client.CreateReader(opts)
if err != nil {
return nil, err
}
if pr == nil {
return nil, errors.New("pulsar is not ready, producer is nil")
}
reader := &rmqReader{r: pr}
return reader, nil
}
// Subscribe subscribes a consumer in rmq client
func (rc *rmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) {
receiveChannel := make(chan client.Message, options.BufSize)
@ -115,10 +95,7 @@ func (rc *rmqClient) StringToMsgID(id string) (mqwrapper.MessageID, error) {
// BytesToMsgID converts a byte array to messageID
func (rc *rmqClient) BytesToMsgID(id []byte) (mqwrapper.MessageID, error) {
rID, err := DeserializeRmqID(id)
if err != nil {
return nil, err
}
rID := DeserializeRmqID(id)
return &rmqID{messageID: rID}, nil
}

View File

@ -18,7 +18,6 @@ package rmq
import (
"context"
"fmt"
"os"
"testing"
"time"
@ -64,7 +63,6 @@ func TestRmqClient_CreateProducer(t *testing.T) {
topic := "TestRmqClient_CreateProducer"
proOpts := mqwrapper.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(proOpts)
fmt.Println("===========producer:", producer, err)
defer producer.Close()
assert.Nil(t, err)
@ -87,6 +85,57 @@ func TestRmqClient_CreateProducer(t *testing.T) {
assert.Error(t, e)
}
func TestRmqClient_GetLatestMsg(t *testing.T) {
client, err := createRmqClient()
assert.Nil(t, err)
defer client.Close()
topic := "t2GetLatestMsg"
proOpts := mqwrapper.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(proOpts)
assert.Nil(t, err)
defer producer.Close()
for i := 0; i < 10; i++ {
msg := &mqwrapper.ProducerMessage{
Payload: []byte{byte(i)},
Properties: nil,
}
_, err = producer.Send(context.TODO(), msg)
assert.Nil(t, err)
}
subName := "subName"
consumerOpts := mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
BufSize: 1024,
}
consumer, err := client.Subscribe(consumerOpts)
assert.Nil(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
expectLastMsg, err := consumer.GetLatestMsgID()
assert.Nil(t, err)
var actualLastMsg mqwrapper.Message
for {
select {
case <-ctx.Done():
ret, err := actualLastMsg.ID().LessOrEqualThan(expectLastMsg.Serialize())
assert.Nil(t, err)
assert.False(t, ret)
return
case msg := <-consumer.Chan():
consumer.Ack(msg)
actualLastMsg = msg
}
}
}
func TestRmqClient_Subscribe(t *testing.T) {
client, err := createRmqClient()
defer client.Close()

View File

@ -91,3 +91,8 @@ func (rc *Consumer) Close() {
close(rc.closeCh)
rc.wg.Wait()
}
func (rc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
msgID, err := rc.c.GetLatestMsgID()
return &rmqID{messageID: msgID}, err
}

View File

@ -35,24 +35,13 @@ func (rid *rmqID) Serialize() []byte {
return SerializeRmqID(rid.messageID)
}
func (rid *rmqID) LedgerID() int64 {
// TODO
return 0
func (rid *rmqID) AtEarliestPosition() bool {
return rid.messageID <= 0
}
func (rid *rmqID) EntryID() int64 {
// TODO
return 0
}
func (rid *rmqID) BatchIdx() int32 {
// TODO
return 0
}
func (rid *rmqID) PartitionIdx() int32 {
// TODO
return 0
func (rid *rmqID) LessOrEqualThan(msgID []byte) (bool, error) {
rMsgID := DeserializeRmqID(msgID)
return rid.messageID < rMsgID, nil
}
// SerializeRmqID is used to serialize a message ID to byte array
@ -63,6 +52,6 @@ func SerializeRmqID(messageID int64) []byte {
}
// DeserializeRmqID is used to deserialize a message ID from byte array
func DeserializeRmqID(messageID []byte) (int64, error) {
return int64(common.Endian.Uint64(messageID)), nil
func DeserializeRmqID(messageID []byte) int64 {
return int64(common.Endian.Uint64(messageID))
}

View File

@ -17,6 +17,7 @@
package rmq
import (
"math"
"testing"
"github.com/stretchr/testify/assert"
@ -30,11 +31,35 @@ func TestRmqID_Serialize(t *testing.T) {
bin := rid.Serialize()
assert.NotNil(t, bin)
assert.NotZero(t, len(bin))
}
rid.LedgerID()
rid.EntryID()
rid.BatchIdx()
rid.PartitionIdx()
func Test_AtEarliestPosition(t *testing.T) {
rid := &rmqID{
messageID: 0,
}
assert.True(t, rid.AtEarliestPosition())
rid = &rmqID{
messageID: math.MaxInt64,
}
assert.False(t, rid.AtEarliestPosition())
}
func TestLessOrEqualThan(t *testing.T) {
rid1 := &rmqID{
messageID: 0,
}
rid2 := &rmqID{
messageID: math.MaxInt64,
}
ret, err := rid1.LessOrEqualThan(rid2.Serialize())
assert.Nil(t, err)
assert.True(t, ret)
ret, err = rid2.LessOrEqualThan(rid1.Serialize())
assert.Nil(t, err)
assert.False(t, ret)
}
func Test_SerializeRmqID(t *testing.T) {
@ -45,7 +70,6 @@ func Test_SerializeRmqID(t *testing.T) {
func Test_DeserializeRmqID(t *testing.T) {
bin := SerializeRmqID(5)
id, err := DeserializeRmqID(bin)
assert.Nil(t, err)
id := DeserializeRmqID(bin)
assert.Equal(t, id, int64(5))
}

View File

@ -1,47 +0,0 @@
package rmq
import (
"context"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/client"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
)
var _ mqwrapper.Reader = (*rmqReader)(nil)
// rmqReader contains a rocksmq reader
type rmqReader struct {
r client.Reader
}
// Topic returns the topic name of a reader
func (rr *rmqReader) Topic() string {
return rr.r.Topic()
}
// Next returns the next message of reader, blocking until a message is available
func (rr *rmqReader) Next(ctx context.Context) (mqwrapper.Message, error) {
rMsg, err := rr.r.Next(ctx)
if err != nil {
return nil, err
}
msg := &rmqMessage{msg: rMsg}
return msg, nil
}
// HasNext returns whether reader has next message
func (rr *rmqReader) HasNext() bool {
return rr.r.HasNext()
}
// Seek seeks the reader position to id
func (rr *rmqReader) Seek(id mqwrapper.MessageID) error {
msgID := id.(*rmqID).messageID
return rr.r.Seek(msgID)
}
// Close closes the rocksmq reader
func (rr *rmqReader) Close() {
rr.r.Close()
}

View File

@ -55,22 +55,22 @@ type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, erro
type MsgStream interface {
Start()
Close()
Chan() <-chan *MsgPack
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
AsReader(channels []string, subName string)
AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
Produce(*MsgPack) error
SetRepackFunc(repackFunc RepackFunc)
ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32
GetProduceChannels() []string
Produce(*MsgPack) error
ProduceMark(*MsgPack) (map[string][]MessageID, error)
Broadcast(*MsgPack) error
BroadcastMark(*MsgPack) (map[string][]MessageID, error)
Next(ctx context.Context, channelName string) (TsMsg, error)
HasNext(channelName string) bool
AsConsumer(channels []string, subName string)
AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
Chan() <-chan *MsgPack
Seek(offset []*MsgPosition) error
SeekReaders(msgPositions []*internalpb.MsgPosition) error
GetLatestMsgID(channel string) (MessageID, error)
}
// Factory is an interface that can be used to generate a new msgstream object

View File

@ -25,7 +25,6 @@ import (
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -285,20 +284,6 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) {
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) AsReader(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) SeekReaders(msgPositions []*internalpb.MsgPosition) error {
return nil
}
func (ms *simpleMockMsgStream) Next(ctx context.Context, channelName string) (msgstream.TsMsg, error) {
return nil, nil
}
func (ms *simpleMockMsgStream) HasNext(channelName string) bool {
return true
}
func (ms *simpleMockMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) {
}
@ -374,6 +359,10 @@ func (ms *simpleMockMsgStream) Seek(offset []*msgstream.MsgPosition) error {
return nil
}
func (ms *simpleMockMsgStream) GetLatestMsgID(channel string) (msgstream.MessageID, error) {
return nil, nil
}
func newSimpleMockMsgStream() *simpleMockMsgStream {
return &simpleMockMsgStream{
msgChan: make(chan *msgstream.MsgPack, 1024),

View File

@ -1059,6 +1059,37 @@ func genSimpleInsertMsg() (*msgstream.InsertMsg, error) {
}, nil
}
func genDeleteMsg(reqID UniqueID, collectionID int64) msgstream.TsMsg {
hashValue := uint32(reqID)
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{hashValue},
MsgPosition: &internalpb.MsgPosition{
ChannelName: "",
MsgID: []byte{},
MsgGroup: "",
Timestamp: 10,
},
}
return &msgstream.DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: reqID,
},
CollectionName: defaultCollectionName,
PartitionName: defaultPartitionName,
CollectionID: collectionID,
PartitionID: defaultPartitionID,
PrimaryKeys: genSimpleDeleteID(),
Timestamps: genSimpleTimestampDeletedPK(),
},
}
}
func genSimpleDeleteMsg() (*msgstream.DeleteMsg, error) {
return &msgstream.DeleteMsg{
BaseMsg: genMsgStreamBaseMsg(),
@ -1183,12 +1214,16 @@ func genSimpleReplica() (ReplicaInterface, error) {
return r, err
}
func genSimpleSegmentLoader(ctx context.Context, historicalReplica ReplicaInterface, streamingReplica ReplicaInterface) (*segmentLoader, error) {
func genSimpleSegmentLoaderWithMqFactory(ctx context.Context, historicalReplica ReplicaInterface, streamingReplica ReplicaInterface, factory msgstream.Factory) (*segmentLoader, error) {
kv, err := genEtcdKV()
if err != nil {
return nil, err
}
return newSegmentLoader(ctx, historicalReplica, streamingReplica, kv, msgstream.NewPmsFactory()), nil
return newSegmentLoader(ctx, historicalReplica, streamingReplica, kv, factory), nil
}
func genSimpleSegmentLoader(ctx context.Context, historicalReplica ReplicaInterface, streamingReplica ReplicaInterface) (*segmentLoader, error) {
return genSimpleSegmentLoaderWithMqFactory(ctx, historicalReplica, streamingReplica, msgstream.NewPmsFactory())
}
func genSimpleHistorical(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*historical, error) {
@ -1661,12 +1696,7 @@ func saveChangeInfo(key string, value string) error {
return kv.Save(key, value)
}
// node
func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) {
fac, err := genFactory()
if err != nil {
return nil, err
}
func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac msgstream.Factory) (*QueryNode, error) {
node := NewQueryNode(ctx, fac)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
if err != nil {
@ -1694,7 +1724,7 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) {
node.streaming = streaming
node.historical = historical
loader, err := genSimpleSegmentLoader(node.queryNodeLoopCtx, historical.replica, streaming.replica)
loader, err := genSimpleSegmentLoaderWithMqFactory(node.queryNodeLoopCtx, historical.replica, streaming.replica, fac)
if err != nil {
return nil, err
}
@ -1712,6 +1742,15 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) {
return node, nil
}
// node
func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) {
fac, err := genFactory()
if err != nil {
return nil, err
}
return genSimpleQueryNodeWithMQFactory(ctx, fac)
}
func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType, fieldValue interface{}, dim int64) *schemapb.FieldData {
var fieldData *schemapb.FieldData
switch fieldType {
@ -1826,3 +1865,25 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType,
return fieldData
}
type mockMsgStreamFactory struct {
mockMqStream msgstream.MsgStream
}
var _ msgstream.Factory = &mockMsgStreamFactory{}
func (mm *mockMsgStreamFactory) SetParams(params map[string]interface{}) error {
return nil
}
func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return mm.mockMqStream, nil
}
func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return nil, nil
}
func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return nil, nil
}

View File

@ -517,48 +517,79 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
defer stream.Close()
pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName)
position.ChannelName = pChannelName
stream.AsReader([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.QueryNodeID, collectionID))
metrics.QueryNodeNumReaders.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
err = stream.SeekReaders([]*internalpb.MsgPosition{position})
stream.AsConsumer([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.QueryNodeID, collectionID))
lastMsgID, err := stream.GetLatestMsgID(pChannelName)
if err != nil {
return err
}
if lastMsgID.AtEarliestPosition() {
log.Debug("there is no more delta msg", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName))
return nil
}
metrics.QueryNodeNumReaders.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
err = stream.Seek([]*internalpb.MsgPosition{position})
if err != nil {
return err
}
stream.Start()
delData := &deleteData{
deleteIDs: make(map[UniqueID][]int64),
deleteTimestamps: make(map[UniqueID][]Timestamp),
deleteOffset: make(map[UniqueID]int64),
}
log.Debug("start read msg from stream reader", zap.Any("msg id", position.GetMsgID()))
for stream.HasNext(pChannelName) {
ctx, cancel := context.WithTimeout(ctx, timeoutForEachRead)
tsMsg, err := stream.Next(ctx, pChannelName)
if err != nil {
log.Warn("fail to load delete", zap.String("pChannelName", pChannelName), zap.Any("msg id", position.GetMsgID()), zap.Error(err))
cancel()
return err
}
if tsMsg == nil {
cancel()
continue
}
if tsMsg.Type() == commonpb.MsgType_Delete {
dmsg := tsMsg.(*msgstream.DeleteMsg)
if dmsg.CollectionID != collectionID {
cancel()
log.Debug("start read delta msg from seek position to last position",
zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName))
hasMore := true
for hasMore {
select {
case <-ctx.Done():
break
case msgPack, ok := <-stream.Chan():
if !ok {
log.Warn("fail to read delta msg", zap.String("pChannelName", pChannelName), zap.Any("msg id", position.GetMsgID()), zap.Error(err))
return err
}
if msgPack == nil {
continue
}
log.Debug("delete pk",
zap.Any("pk", dmsg.PrimaryKeys),
zap.String("vChannelName", position.GetChannelName()),
zap.Any("msg id", position.GetMsgID()),
)
processDeleteMessages(loader.historicalReplica, dmsg, delData)
for _, tsMsg := range msgPack.Msgs {
if tsMsg.Type() == commonpb.MsgType_Delete {
dmsg := tsMsg.(*msgstream.DeleteMsg)
if dmsg.CollectionID != collectionID {
continue
}
log.Debug("delete pk",
zap.Any("pk", dmsg.PrimaryKeys),
zap.String("vChannelName", position.GetChannelName()),
zap.Any("msg id", position.GetMsgID()),
)
processDeleteMessages(loader.historicalReplica, dmsg, delData)
}
ret, err := lastMsgID.LessOrEqualThan(tsMsg.Position().MsgID)
if err != nil {
log.Warn("check whether current MsgID less than last MsgID failed",
zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName), zap.Error(err))
return err
}
if ret {
hasMore = false
break
}
}
}
cancel()
}
log.Debug("All data has been read, there is no more data", zap.String("channel", pChannelName), zap.Any("msg id", position.GetMsgID()))
log.Debug("All data has been read, there is no more data", zap.Int64("Collection ID", collectionID),
zap.String("channel", pChannelName), zap.Any("msg id", position.GetMsgID()))
for segmentID, pks := range delData.deleteIDs {
segment, err := loader.historicalReplica.getSegmentByID(segmentID)
if err != nil {

View File

@ -18,10 +18,15 @@ package querynode
import (
"context"
"errors"
"math/rand"
"runtime"
"testing"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -509,3 +514,160 @@ func TestSegmentLoader_testLoadSealedSegmentWithIndex(t *testing.T) {
assert.NotNil(t, vecFieldInfo)
assert.Equal(t, true, vecFieldInfo.indexInfo.EnableIndex)
}
func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
position := &msgstream.MsgPosition{ChannelName: defaultDeltaChannel, MsgID: []byte{1}}
// test for seek failed
{
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(false, nil)
testSeekFailWhenConsumingDeltaMsg(ctx, t, position, mockMsg)
}
//test no more data when get last msg successfully
{
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(true, nil)
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
}
//test consume after seeking when get last msg successfully
{
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, nil)
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
}
//test compare msgID failed when get last msg successfully
{
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
}
//test consume after seeking when get last msg failed
{
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, mockMsg))
}
}
func testSeekFailWhenConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, mockMsg *mockMsgID) {
msgStream := &LoadDeleteMsgStream{}
errMsg := "seek failed"
err := errors.New(errMsg)
msgStream.On("AsConsumer", mock.AnythingOfTypeArgument("string"), mock.AnythingOfTypeArgument("string"))
msgStream.On("Seek", mock.AnythingOfType("string")).Return(err)
msgStream.On("GetLatestMsgID", mock.AnythingOfType("string")).Return(mockMsg, nil)
factory := &mockMsgStreamFactory{mockMqStream: msgStream}
node, err := genSimpleQueryNodeWithMQFactory(ctx, factory)
assert.NoError(t, err)
loader := node.loader
assert.NotNil(t, loader)
ret := loader.FromDmlCPLoadDelete(ctx, defaultCollectionID, position)
assert.EqualError(t, ret, errMsg)
}
func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc bool, mockMsg *mockMsgID) error {
msgStream := &LoadDeleteMsgStream{}
msgStream.On("AsConsumer", mock.AnythingOfTypeArgument("string"), mock.AnythingOfTypeArgument("string"))
msgStream.On("Seek", mock.AnythingOfType("string")).Return(nil)
if getLastSucc {
msgStream.On("GetLatestMsgID", mock.AnythingOfType("string")).Return(mockMsg, nil)
} else {
msgStream.On("GetLatestMsgID", mock.AnythingOfType("string")).Return(mockMsg, errors.New(""))
}
msgChan := make(chan *msgstream.MsgPack)
go func() {
msgChan <- nil
deleteMsg1 := genDeleteMsg(int64(1), defaultCollectionID+1)
deleteMsg2 := genDeleteMsg(int64(1), defaultCollectionID)
msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}}
}()
msgStream.On("Chan").Return(msgChan)
factory := &mockMsgStreamFactory{mockMqStream: msgStream}
node, err := genSimpleQueryNodeWithMQFactory(ctx, factory)
assert.NoError(t, err)
loader := node.loader
assert.NotNil(t, loader)
return loader.FromDmlCPLoadDelete(ctx, defaultCollectionID, position)
}
type mockMsgID struct {
msgstream.MessageID
mock.Mock
}
func (m2 *mockMsgID) AtEarliestPosition() bool {
args := m2.Called()
return args.Get(0).(bool)
}
func (m2 *mockMsgID) LessOrEqualThan(msgID []byte) (bool, error) {
args := m2.Called()
ret := args.Get(0)
if args.Get(1) != nil {
return false, args.Get(1).(error)
}
return ret.(bool), nil
}
type LoadDeleteMsgStream struct {
msgstream.MsgStream
mock.Mock
}
func (ms *LoadDeleteMsgStream) Close() {
}
func (ms *LoadDeleteMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *LoadDeleteMsgStream) Chan() <-chan *msgstream.MsgPack {
args := ms.Called()
return args.Get(0).(chan *msgstream.MsgPack)
}
func (ms *LoadDeleteMsgStream) Seek(offset []*internalpb.MsgPosition) error {
args := ms.Called()
if args.Get(0) == nil {
return nil
}
return args.Get(0).(error)
}
func (ms *LoadDeleteMsgStream) GetLatestMsgID(channel string) (msgstream.MessageID, error) {
args := ms.Called(channel)
msg := args.Get(0)
err := args.Get(1)
if msg == nil && err == nil {
return nil, nil
}
if msg == nil && err != nil {
return nil, err.(error)
}
if msg != nil && err == nil {
return msg.(msgstream.MessageID), nil
}
return msg.(msgstream.MessageID), err.(error)
}
func (ms *LoadDeleteMsgStream) Start() {}

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mq/msgstream"

View File

@ -159,9 +159,9 @@ func (ms *FailMsgStream) BroadcastMark(*msgstream.MsgPack) (map[string][]msgstre
}
return nil, nil
}
func (ms *FailMsgStream) Next(ctx context.Context, channelName string) (msgstream.TsMsg, error) {
func (ms *FailMsgStream) Consume() *msgstream.MsgPack { return nil }
func (ms *FailMsgStream) Seek(offset []*msgstream.MsgPosition) error { return nil }
func (ms *FailMsgStream) GetLatestMsgID(channel string) (msgstream.MessageID, error) {
return nil, nil
}
func (ms *FailMsgStream) HasNext(channelName string) bool { return true }
func (ms *FailMsgStream) Seek(offset []*msgstream.MsgPosition) error { return nil }
func (ms *FailMsgStream) SeekReaders(msgPositions []*msgstream.MsgPosition) error { return nil }