enhance: [2.4] Add param item to ignore bad message id in checkpoint (#33123) (#33249)

Cherry-pick from master
pr: #33123 #33158
See also  #33122

This pr add param item `mq.ignoreBadPosition` to control behavior when
mq failed to parse message id from checkpoint

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-05-23 15:27:41 +08:00 committed by GitHub
parent 3bd8137062
commit d6bc95de55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 109 additions and 11 deletions

View File

@ -481,7 +481,16 @@ func (ms *mqMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPositi
}
messageID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
return err
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
// try to use latest message ID first
messageID, err = consumer.GetLatestMsgID()
if err != nil {
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
continue
}
} else {
return err
}
}
log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
@ -835,34 +844,44 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi
var consumer mqwrapper.Consumer
var mp *MsgPosition
var err error
fn := func() error {
fn := func() (bool, error) {
var ok bool
consumer, ok = ms.consumers[mp.ChannelName]
if !ok {
return fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName)
return false, fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName)
}
if consumer == nil {
return fmt.Errorf("consumer is nil")
return false, fmt.Errorf("consumer is nil")
}
seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
return err
if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() {
// try to use latest message ID first
seekMsgID, err = consumer.GetLatestMsgID()
if err != nil {
log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err))
return false, nil
}
} else {
return false, err
}
}
log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID))
err = consumer.Seek(seekMsgID, true)
if err != nil {
log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err))
// stop retry if consumer topic not exist
if errors.Is(err, merr.ErrMqTopicNotFound) {
return retry.Unrecoverable(err)
return false, err
}
return err
return true, err
}
log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName))
return nil
return false, nil
}
ms.consumerLock.Lock()
@ -873,7 +892,8 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi
if len(mp.MsgID) == 0 {
return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface")
}
err = retry.Do(ctx, fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
err = retry.Handle(ctx, fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
// err = retry.Do(ctx, fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
if err != nil {
return fmt.Errorf("failed to seek, error %s", err.Error())
}

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
kafkawrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/kafka"
pulsarwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -1013,6 +1014,74 @@ func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) {
assert.Equal(t, result.Msgs[0].ID(), int64(1))
}
func TestSTream_MqMsgStream_SeekBadMessageID(t *testing.T) {
pulsarAddress := getPulsarAddress()
c := funcutil.RandomString(8)
producerChannels := []string{c}
consumerChannels := []string{c}
msgPack := &MsgPack{}
ctx := context.Background()
inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels)
defer inputStream.Close()
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, funcutil.RandomString(8))
defer outputStream.Close()
for i := 0; i < 10; i++ {
insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i))
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
}
err := inputStream.Produce(msgPack)
assert.NoError(t, err)
var seekPosition *msgpb.MsgPosition
for i := 0; i < 10; i++ {
result := consumer(ctx, outputStream)
assert.Equal(t, result.Msgs[0].ID(), int64(i))
seekPosition = result.EndPositions[0]
}
// produce timetick for mqtt msgstream seek
msgPack = &MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(1000))
err = inputStream.Produce(msgPack)
assert.NoError(t, err)
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionLatest)
defer outputStream2.Close()
outputStream3, err := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream3.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionEarliest)
require.NoError(t, err)
defer paramtable.Get().Reset(paramtable.Get().MQCfg.IgnoreBadPosition.Key)
p := []*msgpb.MsgPosition{
{
ChannelName: seekPosition.ChannelName,
Timestamp: seekPosition.Timestamp,
MsgGroup: seekPosition.MsgGroup,
MsgID: kafkawrapper.SerializeKafkaID(123),
},
}
paramtable.Get().Save(paramtable.Get().MQCfg.IgnoreBadPosition.Key, "false")
err = outputStream2.Seek(ctx, p)
assert.Error(t, err)
err = outputStream3.Seek(ctx, p)
assert.Error(t, err)
paramtable.Get().Save(paramtable.Get().MQCfg.IgnoreBadPosition.Key, "true")
err = outputStream2.Seek(ctx, p)
assert.NoError(t, err)
err = outputStream3.Seek(ctx, p)
assert.NoError(t, err)
}
func TestStream_MqMsgStream_SeekLatest(t *testing.T) {
pulsarAddress := getPulsarAddress()
c := funcutil.RandomString(8)

View File

@ -472,8 +472,9 @@ type MQConfig struct {
PursuitLag ParamItem `refreshable:"true"`
PursuitBufferSize ParamItem `refreshable:"true"`
MQBufSize ParamItem `refreshable:"false"`
ReceiveBufSize ParamItem `refreshable:"false"`
MQBufSize ParamItem `refreshable:"false"`
ReceiveBufSize ParamItem `refreshable:"false"`
IgnoreBadPosition ParamItem `refreshable:"true"`
}
// Init initializes the MQConfig object with a BaseTable.
@ -531,6 +532,14 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
Doc: "MQ consumer chan buffer length",
}
p.ReceiveBufSize.Init(base.mgr)
p.IgnoreBadPosition = ParamItem{
Key: "mq.ignoreBadPosition",
Version: "2.3.16",
DefaultValue: "false",
Doc: "A switch for ignoring message queue failing to parse message ID from checkpoint position. Usually caused by switching among different mq implementations. May caused data loss when used by mistake",
}
p.IgnoreBadPosition.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////