mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Enable ZSTD compression for pulsar (#16014)
Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
0cec1b9448
commit
2022a361c0
@ -104,7 +104,7 @@ func (ms *mqMsgStream) AsProducer(channels []string) {
|
||||
break
|
||||
}
|
||||
fn := func() error {
|
||||
pp, err := ms.client.CreateProducer(mqwrapper.ProducerOptions{Topic: channel})
|
||||
pp, err := ms.client.CreateProducer(mqwrapper.ProducerOptions{Topic: channel, EnableCompression: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -22,6 +22,10 @@ import "context"
|
||||
type ProducerOptions struct {
|
||||
// The topic that this Producer will publish
|
||||
Topic string
|
||||
|
||||
// Enable compression
|
||||
// For Pulsar, this enables ZSTD compression with default compression level
|
||||
EnableCompression bool
|
||||
}
|
||||
|
||||
// ProducerMessage contains the messages of a producer
|
||||
|
@ -71,6 +71,10 @@ func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) {
|
||||
// CreateProducer create a pulsar producer from options
|
||||
func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) {
|
||||
opts := pulsar.ProducerOptions{Topic: options.Topic}
|
||||
if options.EnableCompression {
|
||||
opts.CompressionType = pulsar.ZSTD
|
||||
}
|
||||
|
||||
pp, err := pc.client.CreateProducer(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -17,6 +17,7 @@
|
||||
package pulsar
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
@ -41,6 +42,7 @@ func TestPulsarConsumer_Subscription(t *testing.T) {
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, consumer)
|
||||
defer consumer.Close()
|
||||
|
||||
str := consumer.Subscription()
|
||||
assert.NotNil(t, str)
|
||||
@ -57,6 +59,54 @@ func Test_PatchEarliestMessageID(t *testing.T) {
|
||||
assert.Equal(t, "-1:-1:0", fmt.Sprintf("%v", mid))
|
||||
}
|
||||
|
||||
func TestComsumeCompressedMessage(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
assert.Nil(t, err)
|
||||
defer pc.Close()
|
||||
|
||||
receiveChannel := make(chan pulsar.ConsumerMessage, 100)
|
||||
consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{
|
||||
Topic: "TestTopics",
|
||||
SubscriptionName: "SubName",
|
||||
Type: pulsar.Exclusive,
|
||||
SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(mqwrapper.SubscriptionPositionEarliest),
|
||||
MessageChannel: receiveChannel,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
|
||||
producer, err := pc.CreateProducer(mqwrapper.ProducerOptions{Topic: "TestTopics"})
|
||||
assert.NoError(t, err)
|
||||
compressProducer, err := pc.CreateProducer(mqwrapper.ProducerOptions{Topic: "TestTopics", EnableCompression: true})
|
||||
assert.NoError(t, err)
|
||||
|
||||
msg := []byte("test message")
|
||||
compressedMsg := []byte("test compressed message")
|
||||
_, err = producer.Send(context.Background(), &mqwrapper.ProducerMessage{
|
||||
Payload: msg,
|
||||
Properties: map[string]string{},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
recvMsg, err := consumer.Receive(context.Background())
|
||||
assert.NoError(t, err)
|
||||
consumer.Ack(recvMsg)
|
||||
assert.Equal(t, msg, recvMsg.Payload())
|
||||
|
||||
_, err = compressProducer.Send(context.Background(), &mqwrapper.ProducerMessage{
|
||||
Payload: compressedMsg,
|
||||
Properties: map[string]string{},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
recvMsg, err = consumer.Receive(context.Background())
|
||||
assert.NoError(t, err)
|
||||
consumer.Ack(recvMsg)
|
||||
assert.Equal(t, compressedMsg, recvMsg.Payload())
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, consumer)
|
||||
}
|
||||
|
||||
func TestPulsarConsumer_Close(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
|
Loading…
Reference in New Issue
Block a user