From 2022a361c055c6e9243b4dfc57456f87a12ae017 Mon Sep 17 00:00:00 2001 From: yah01 Date: Sun, 24 Apr 2022 10:25:43 +0800 Subject: [PATCH] Enable ZSTD compression for pulsar (#16014) Signed-off-by: yah01 --- internal/mq/msgstream/mq_msgstream.go | 2 +- internal/mq/msgstream/mqwrapper/producer.go | 4 ++ .../mqwrapper/pulsar/pulsar_client.go | 4 ++ .../mqwrapper/pulsar/pulsar_consumer_test.go | 50 +++++++++++++++++++ 4 files changed, 59 insertions(+), 1 deletion(-) diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 72a545a2bc..a54359a113 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -104,7 +104,7 @@ func (ms *mqMsgStream) AsProducer(channels []string) { break } fn := func() error { - pp, err := ms.client.CreateProducer(mqwrapper.ProducerOptions{Topic: channel}) + pp, err := ms.client.CreateProducer(mqwrapper.ProducerOptions{Topic: channel, EnableCompression: true}) if err != nil { return err } diff --git a/internal/mq/msgstream/mqwrapper/producer.go b/internal/mq/msgstream/mqwrapper/producer.go index b229e00b44..270e1dec51 100644 --- a/internal/mq/msgstream/mqwrapper/producer.go +++ b/internal/mq/msgstream/mqwrapper/producer.go @@ -22,6 +22,10 @@ import "context" type ProducerOptions struct { // The topic that this Producer will publish Topic string + + // Enable compression + // For Pulsar, this enables ZSTD compression with default compression level + EnableCompression bool } // ProducerMessage contains the messages of a producer diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index 0337b4f21a..8602c85f60 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -71,6 +71,10 @@ func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) { // CreateProducer create a pulsar producer from options func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { opts := pulsar.ProducerOptions{Topic: options.Topic} + if options.EnableCompression { + opts.CompressionType = pulsar.ZSTD + } + pp, err := pc.client.CreateProducer(opts) if err != nil { return nil, err diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index 2abdf10ecf..0ec8e6de82 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -17,6 +17,7 @@ package pulsar import ( + "context" "fmt" "testing" @@ -41,6 +42,7 @@ func TestPulsarConsumer_Subscription(t *testing.T) { }) assert.Nil(t, err) assert.NotNil(t, consumer) + defer consumer.Close() str := consumer.Subscription() assert.NotNil(t, str) @@ -57,6 +59,54 @@ func Test_PatchEarliestMessageID(t *testing.T) { assert.Equal(t, "-1:-1:0", fmt.Sprintf("%v", mid)) } +func TestComsumeCompressedMessage(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + assert.Nil(t, err) + defer pc.Close() + + receiveChannel := make(chan pulsar.ConsumerMessage, 100) + consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ + Topic: "TestTopics", + SubscriptionName: "SubName", + Type: pulsar.Exclusive, + SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(mqwrapper.SubscriptionPositionEarliest), + MessageChannel: receiveChannel, + }) + assert.NoError(t, err) + defer consumer.Close() + + producer, err := pc.CreateProducer(mqwrapper.ProducerOptions{Topic: "TestTopics"}) + assert.NoError(t, err) + compressProducer, err := pc.CreateProducer(mqwrapper.ProducerOptions{Topic: "TestTopics", EnableCompression: true}) + assert.NoError(t, err) + + msg := []byte("test message") + compressedMsg := []byte("test compressed message") + _, err = producer.Send(context.Background(), &mqwrapper.ProducerMessage{ + Payload: msg, + Properties: map[string]string{}, + }) + assert.NoError(t, err) + recvMsg, err := consumer.Receive(context.Background()) + assert.NoError(t, err) + consumer.Ack(recvMsg) + assert.Equal(t, msg, recvMsg.Payload()) + + _, err = compressProducer.Send(context.Background(), &mqwrapper.ProducerMessage{ + Payload: compressedMsg, + Properties: map[string]string{}, + }) + assert.NoError(t, err) + recvMsg, err = consumer.Receive(context.Background()) + assert.NoError(t, err) + consumer.Ack(recvMsg) + assert.Equal(t, compressedMsg, recvMsg.Payload()) + + assert.Nil(t, err) + assert.NotNil(t, consumer) +} + func TestPulsarConsumer_Close(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})