mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Align kafka consume from latest with puslar and rocksmq (#17433)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
parent
a32f1da293
commit
dbef4c6ec2
@ -18,6 +18,7 @@ type Consumer struct {
|
||||
hasSeek bool
|
||||
hasConsume bool
|
||||
skipMsg bool
|
||||
latestMsgOffset kafka.Offset
|
||||
topic string
|
||||
groupID string
|
||||
closeCh chan struct{}
|
||||
@ -48,6 +49,25 @@ func (kc *Consumer) createKafkaConsumer() error {
|
||||
log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
latestMsgID, err := kc.GetLatestMsgID()
|
||||
if err != nil {
|
||||
switch v := err.(type) {
|
||||
case kafka.Error:
|
||||
if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart {
|
||||
log.Warn("get latest msg ID failed, topic or partition does not exists!",
|
||||
zap.String("topic", kc.topic),
|
||||
zap.String("err msg", v.String()))
|
||||
kc.latestMsgOffset = kafka.OffsetBeginning
|
||||
}
|
||||
default:
|
||||
log.Error("get latest msg ID failed", zap.String("topic", kc.topic), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
kc.latestMsgOffset = kafka.Offset(latestMsgID.(*kafkaID).messageID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -77,13 +97,35 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message {
|
||||
// we assume that case is Chan starting before producing message with auto create topic config,
|
||||
// consuming messages will fail that error is 'Subscribed topic not available'
|
||||
// if invoke Subscribe method of kafka, so we use Assign instead of Subscribe.
|
||||
tps := []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}
|
||||
if err := kc.c.Assign(tps); err != nil {
|
||||
log.Error("kafka consumer subscribe failed ", zap.String("topic name", kc.topic), zap.Error(err))
|
||||
var tps []kafka.TopicPartition
|
||||
if offset == kafka.OffsetEnd && kc.latestMsgOffset != kafka.OffsetBeginning {
|
||||
// kafka consumer will start when assign invoked, in order to guarantee the latest message
|
||||
// position is same with created consumer time, there will use a seek to the latest to
|
||||
// replace consuming from the latest position.
|
||||
if err := kc.internalSeek(kc.latestMsgOffset, false); err != nil {
|
||||
log.Error("kafka consumer subscribe failed ",
|
||||
zap.String("topic name", kc.topic),
|
||||
zap.Any("latestMsgOffset", kc.latestMsgOffset),
|
||||
zap.Any("offset", offset),
|
||||
zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
tps = []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}
|
||||
if err := kc.c.Assign(tps); err != nil {
|
||||
log.Error("kafka consumer subscribe failed ",
|
||||
zap.String("topic name", kc.topic),
|
||||
zap.Any("latestMsgOffset", kc.latestMsgOffset),
|
||||
zap.Any("offset", offset),
|
||||
zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("starting kafka consume", zap.String("topic name", kc.topic), zap.Any("offset", offset))
|
||||
log.Debug("starting kafka consume",
|
||||
zap.String("topic name", kc.topic),
|
||||
zap.Any("latestMsgOffset", kc.latestMsgOffset),
|
||||
zap.Any("offset", offset))
|
||||
}
|
||||
|
||||
go func() {
|
||||
@ -117,6 +159,11 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message {
|
||||
}
|
||||
|
||||
func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
|
||||
offset := kafka.Offset(id.(*kafkaID).messageID)
|
||||
return kc.internalSeek(offset, inclusive)
|
||||
}
|
||||
|
||||
func (kc *Consumer) internalSeek(offset kafka.Offset, inclusive bool) error {
|
||||
if kc.hasSeek {
|
||||
return errors.New("unsupported multiple seek with the same kafka consumer")
|
||||
}
|
||||
@ -125,11 +172,10 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
|
||||
return errors.New("unsupported seek after consume message with the same kafka consumer")
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
offset := kafka.Offset(id.(*kafkaID).messageID)
|
||||
log.Debug("kafka consumer seek start", zap.String("topic name", kc.topic),
|
||||
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))
|
||||
|
||||
start := time.Now()
|
||||
err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
|
||||
if err != nil {
|
||||
log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err))
|
||||
@ -151,13 +197,11 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
|
||||
Offset: offset}, 1000); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kc.hasSeek = true
|
||||
|
||||
cost = time.Since(start).Milliseconds()
|
||||
log.Debug("kafka consumer seek finished", zap.String("topic name", kc.topic),
|
||||
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
|
||||
|
||||
kc.hasSeek = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -61,9 +61,7 @@ func TestKafkaConsumer_GetSeek(t *testing.T) {
|
||||
err = consumer.Seek(msgID, false)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
consumer.Seek(msgID, false)
|
||||
})
|
||||
assert.Error(t, consumer.Seek(msgID, false))
|
||||
}
|
||||
|
||||
func TestKafkaConsumer_SeekAfterChan(t *testing.T) {
|
||||
@ -108,6 +106,29 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
||||
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
||||
|
||||
data := []int{111, 222, 333}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
|
||||
config := createConfig(groupID)
|
||||
config.SetKey("auto.offset.reset", "latest")
|
||||
consumer, err := newKafkaConsumer(config, topic, groupID)
|
||||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
|
||||
data = []int{444, 555}
|
||||
testKafkaConsumerProduceData(t, topic, data)
|
||||
|
||||
msg := <-consumer.Chan()
|
||||
assert.Equal(t, 444, BytesToInt(msg.Payload()))
|
||||
msg = <-consumer.Chan()
|
||||
assert.Equal(t, 555, BytesToInt(msg.Payload()))
|
||||
}
|
||||
|
||||
func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) {
|
||||
ctx := context.Background()
|
||||
kc := createKafkaClient(t)
|
||||
|
Loading…
Reference in New Issue
Block a user