support pulsar tenant and auth (#19727)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2022-11-11 10:55:05 +08:00 committed by GitHub
parent 19aec18372
commit 701e7b5a6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 333 additions and 68 deletions

View File

@ -102,6 +102,8 @@ pulsar:
port: 6650 # Port of pulsar
webport: 80 # Web port of pulsar, if you connect direcly without proxy, should use 8080
maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar.
tenant: public
namespace: default
# If you want to enable kafka, needs to comment the pulsar configs
kafka:

View File

@ -18,6 +18,7 @@ package msgstream
import (
"context"
"errors"
"strings"
"github.com/apache/pulsar-client-go/pulsar"
@ -25,14 +26,12 @@ import (
rmqimplserver "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
kafkawrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka"
puslarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar"
pulsarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar"
rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/streamnative/pulsarctl/pkg/cli"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"go.uber.org/zap"
)
@ -44,6 +43,10 @@ type PmsFactory struct {
PulsarWebAddress string
ReceiveBufSize int64
PulsarBufSize int64
PulsarAuthPlugin string
PulsarAuthParams string
PulsarTenant string
PulsarNameSpace string
}
func NewPmsFactory(config *paramtable.PulsarConfig) *PmsFactory {
@ -52,12 +55,25 @@ func NewPmsFactory(config *paramtable.PulsarConfig) *PmsFactory {
ReceiveBufSize: 1024,
PulsarAddress: config.Address,
PulsarWebAddress: config.WebAddress,
PulsarAuthPlugin: config.AuthPlugin,
PulsarAuthParams: config.AuthParams,
PulsarTenant: config.Tenant,
PulsarNameSpace: config.Namespace,
}
}
// NewMsgStream is used to generate a new Msgstream object
func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
pulsarClient, err := puslarmqwrapper.NewClient(pulsar.ClientOptions{URL: f.PulsarAddress})
auth, err := f.getAuthentication()
if err != nil {
return nil, err
}
clientOpts := pulsar.ClientOptions{
URL: f.PulsarAddress,
Authentication: auth,
}
pulsarClient, err := pulsarmqwrapper.NewClient(f.PulsarTenant, f.PulsarNameSpace, clientOpts)
if err != nil {
return nil, err
}
@ -66,13 +82,34 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
// NewTtMsgStream is used to generate a new TtMsgstream object
func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
pulsarClient, err := puslarmqwrapper.NewClient(pulsar.ClientOptions{URL: f.PulsarAddress})
auth, err := f.getAuthentication()
if err != nil {
return nil, err
}
clientOpts := pulsar.ClientOptions{
URL: f.PulsarAddress,
Authentication: auth,
}
pulsarClient, err := pulsarmqwrapper.NewClient(f.PulsarTenant, f.PulsarNameSpace, clientOpts)
if err != nil {
return nil, err
}
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func (f *PmsFactory) getAuthentication() (pulsar.Authentication, error) {
auth, err := pulsar.NewAuthentication(f.PulsarAuthPlugin, f.PulsarAuthParams)
if err != nil {
log.Error("build authencation from config failed, please check it!",
zap.String("authPlugin", f.PulsarAuthPlugin),
zap.Error(err))
return nil, errors.New("build authencation from config failed")
}
return auth, nil
}
// NewQueryMsgStream is used to generate a new QueryMsgstream object
func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
return f.NewMsgStream(ctx)
@ -81,9 +118,16 @@ func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {
return func(channels []string, subname string) error {
// try to delete the old subscription
admin := cmdutils.NewPulsarClient()
admin, err := pulsarmqwrapper.NewAdminClient(f.PulsarWebAddress, f.PulsarAuthPlugin, f.PulsarAuthParams)
if err != nil {
return err
}
for _, channel := range channels {
topic, err := utils.GetTopicName(channel)
fullTopicName, err := pulsarmqwrapper.GetFullTopicName(f.PulsarTenant, f.PulsarNameSpace, channel)
if err != nil {
return err
}
topic, err := utils.GetTopicName(fullTopicName)
if err != nil {
log.Warn("failed to get topic name", zap.Error(err))
return retry.Unrecoverable(err)

View File

@ -21,6 +21,7 @@ import (
"os"
"testing"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
)
@ -36,6 +37,45 @@ func TestPmsFactory(t *testing.T) {
_, err = pmsFactory.NewQueryMsgStream(ctx)
assert.Nil(t, err)
err = pmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx")
assert.Nil(t, err)
}
func TestPmsFactoryWithAuth(t *testing.T) {
config := &paramtable.PulsarConfig{
Address: Params.PulsarCfg.Address,
WebAddress: Params.PulsarCfg.WebAddress,
MaxMessageSize: Params.PulsarCfg.MaxMessageSize,
AuthPlugin: "token",
AuthParams: "{\"token\":\"fake_token\"}",
}
pmsFactory := NewPmsFactory(config)
ctx := context.Background()
_, err := pmsFactory.NewMsgStream(ctx)
assert.Nil(t, err)
_, err = pmsFactory.NewTtMsgStream(ctx)
assert.Nil(t, err)
_, err = pmsFactory.NewQueryMsgStream(ctx)
assert.Nil(t, err)
config.AuthParams = ""
pmsFactory = NewPmsFactory(config)
ctx = context.Background()
_, err = pmsFactory.NewMsgStream(ctx)
assert.Error(t, err)
_, err = pmsFactory.NewTtMsgStream(ctx)
assert.Error(t, err)
_, err = pmsFactory.NewQueryMsgStream(ctx)
assert.Error(t, err)
}
func TestRmsFactory(t *testing.T) {
@ -54,6 +94,9 @@ func TestRmsFactory(t *testing.T) {
_, err = rmsFactory.NewQueryMsgStream(ctx)
assert.Nil(t, err)
err = rmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx")
assert.Nil(t, err)
}
func TestKafkaFactory(t *testing.T) {
@ -68,4 +111,7 @@ func TestKafkaFactory(t *testing.T) {
_, err = kmsFactory.NewQueryMsgStream(ctx)
assert.Nil(t, err)
// err = kmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx")
// assert.Nil(t, err)
}

View File

@ -51,6 +51,11 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
)
const (
DefaultPulsarTenant = "public"
DefaultPulsarNamespace = "default"
)
var Params paramtable.ComponentParam
func TestMain(m *testing.M) {
@ -94,7 +99,7 @@ type parameters struct {
func (f *fixture) setup() []parameters {
pulsarAddress := getPulsarAddress()
pulsarClient, err := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(f.t, err)
rocksdbName := "/tmp/rocksmq_unittest_" + f.t.Name()
@ -544,12 +549,12 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
factory := ProtoUDFactory{}
ctx := context.Background()
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
@ -599,12 +604,12 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
factory := ProtoUDFactory{}
ctx := context.Background()
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
@ -633,12 +638,12 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
factory := ProtoUDFactory{}
ctx := context.Background()
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
@ -788,7 +793,7 @@ func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) {
// create a consumer can consume data from seek position to last msg
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
lastMsgID, err := outputStream2.GetLatestMsgID(c)
@ -1194,7 +1199,7 @@ func TestStream_MqMsgStream_Seek(t *testing.T) {
outputStream.Close()
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})
@ -1237,7 +1242,7 @@ func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) {
}
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionEarliest)
defer outputStream2.Close()
@ -1348,7 +1353,7 @@ func TestStream_MqMsgStream_SeekLatest(t *testing.T) {
err := inputStream.Produce(msgPack)
assert.Nil(t, err)
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest)
outputStream2.Start()
@ -1689,7 +1694,7 @@ func TestStream_BroadcastMark(t *testing.T) {
producerChannels := []string{c1, c2}
factory := ProtoUDFactory{}
pulsarClient, err := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
outputStream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
assert.Nil(t, err)
@ -1751,7 +1756,7 @@ func TestStream_ProduceMark(t *testing.T) {
producerChannels := []string{c1, c2}
factory := ProtoUDFactory{}
pulsarClient, err := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
outputStream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
assert.Nil(t, err)
@ -2043,7 +2048,7 @@ func getTimeTickMsgPack(reqID UniqueID) *MsgPack {
func getPulsarInputStream(ctx context.Context, pulsarAddress string, producerChannels []string, opts ...RepackFunc) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {
@ -2055,7 +2060,7 @@ func getPulsarInputStream(ctx context.Context, pulsarAddress string, producerCha
func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
@ -2064,7 +2069,7 @@ func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerCh
func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest)
outputStream.Start()
@ -2073,7 +2078,7 @@ func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumer
func getPulsarTtOutputStreamAndSeek(ctx context.Context, pulsarAddress string, positions []*MsgPosition) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
consumerName := []string{}
for _, c := range positions {

View File

@ -18,6 +18,7 @@ package pulsar
import (
"errors"
"fmt"
"strings"
"sync"
"time"
@ -26,10 +27,14 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/util/retry"
pulsarctl "github.com/streamnative/pulsarctl/pkg/pulsar"
"github.com/streamnative/pulsarctl/pkg/pulsar/common"
"go.uber.org/zap"
)
type pulsarClient struct {
tenant string
namespace string
client pulsar.Client
}
@ -38,14 +43,18 @@ var once sync.Once
// NewClient creates a pulsarClient object
// according to the parameter opts of type pulsar.ClientOptions
func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) {
func NewClient(tenant string, namespace string, opts pulsar.ClientOptions) (*pulsarClient, error) {
once.Do(func() {
c, err := pulsar.NewClient(opts)
if err != nil {
log.Error("Failed to set pulsar client: ", zap.Error(err))
return
}
cli := &pulsarClient{client: c}
cli := &pulsarClient{
client: c,
tenant: tenant,
namespace: namespace,
}
sc = cli
})
return sc, nil
@ -53,7 +62,11 @@ func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) {
// CreateProducer create a pulsar producer from options
func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) {
opts := pulsar.ProducerOptions{Topic: options.Topic}
fullTopicName, err := GetFullTopicName(pc.tenant, pc.namespace, options.Topic)
if err != nil {
return nil, err
}
opts := pulsar.ProducerOptions{Topic: fullTopicName}
if options.EnableCompression {
opts.CompressionType = pulsar.ZSTD
opts.CompressionLevel = pulsar.Faster
@ -77,8 +90,12 @@ func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwra
// Subscribe creates a pulsar consumer instance and subscribe a topic
func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) {
receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize)
fullTopicName, err := GetFullTopicName(pc.tenant, pc.namespace, options.Topic)
if err != nil {
return nil, err
}
consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{
Topic: options.Topic,
Topic: fullTopicName,
SubscriptionName: options.SubscriptionName,
Type: pulsar.Exclusive,
SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(options.SubscriptionInitialPosition),
@ -100,6 +117,32 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.
return pConsumer, nil
}
func GetFullTopicName(tenant string, namespace string, topic string) (string, error) {
if len(tenant) == 0 || len(namespace) == 0 || len(topic) == 0 {
log.Error("build full topic name failed",
zap.String("tenant", tenant),
zap.String("namesapce", namespace),
zap.String("topic", topic))
return "", errors.New("build full topic name failed")
}
return fmt.Sprintf("%s/%s/%s", tenant, namespace, topic), nil
}
func NewAdminClient(address, authPlugin, authParams string) (pulsarctl.Client, error) {
config := common.Config{
WebServiceURL: address,
AuthPlugin: authPlugin,
AuthParams: authParams,
}
admin, err := pulsarctl.New(&config)
if err != nil {
return nil, fmt.Errorf("failed to build pulsar admin client due to %s", err.Error())
}
return admin, nil
}
// EarliestMessageID returns the earliest message id
func (pc *pulsarClient) EarliestMessageID() mqwrapper.MessageID {
msgID := pulsar.EarliestMessageID()

View File

@ -34,12 +34,16 @@ import (
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
const (
DefaultPulsarTenant = "public"
DefaultPulsarNamespace = "default"
)
var Params paramtable.BaseTable
func TestMain(m *testing.M) {
@ -204,7 +208,7 @@ func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
func TestPulsarClient_Consume1(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer pc.Close()
assert.NoError(t, err)
assert.NotNil(t, pc)
@ -355,7 +359,7 @@ func Consume23(ctx context.Context, t *testing.T, pc *pulsarClient, topic string
func TestPulsarClient_Consume2(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer pc.Close()
assert.NoError(t, err)
assert.NotNil(t, pc)
@ -405,7 +409,7 @@ func TestPulsarClient_Consume2(t *testing.T) {
func TestPulsarClient_SeekPosition(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer pc.Close()
assert.NoError(t, err)
assert.NotNil(t, pc)
@ -478,7 +482,7 @@ func TestPulsarClient_SeekPosition(t *testing.T) {
func TestPulsarClient_SeekLatest(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer pc.Close()
assert.NoError(t, err)
assert.NotNil(t, pc)
@ -541,7 +545,7 @@ func TestPulsarClient_SeekLatest(t *testing.T) {
func TestPulsarClient_EarliestMessageID(t *testing.T) {
pulsarAddress := getPulsarAddress()
client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
client, _ := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer client.Close()
mid := client.EarliestMessageID()
@ -550,7 +554,7 @@ func TestPulsarClient_EarliestMessageID(t *testing.T) {
func TestPulsarClient_StringToMsgID(t *testing.T) {
pulsarAddress := getPulsarAddress()
client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
client, _ := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer client.Close()
mid := pulsar.EarliestMessageID()
@ -568,7 +572,7 @@ func TestPulsarClient_StringToMsgID(t *testing.T) {
func TestPulsarClient_BytesToMsgID(t *testing.T) {
pulsarAddress := getPulsarAddress()
client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
client, _ := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer client.Close()
mid := pulsar.EarliestMessageID()
@ -647,21 +651,52 @@ func (c *mockPulsarClient) Close() {
func TestPulsarClient_SubscribeExclusiveFail(t *testing.T) {
t.Run("exclusive pulsar consumer failure", func(t *testing.T) {
pc := &pulsarClient{
tenant: DefaultPulsarTenant,
namespace: DefaultPulsarNamespace,
client: &mockPulsarClient{},
}
_, err := pc.Subscribe(mqwrapper.ConsumerOptions{})
_, err := pc.Subscribe(mqwrapper.ConsumerOptions{Topic: "test_topic_name"})
assert.Error(t, err)
assert.True(t, retry.IsUnRecoverable(err))
})
}
func TestPulsarClient_WithTenantAndNamespace(t *testing.T) {
tenant := "public"
namespace := "default"
topic := "test"
subName := "hello_world"
pulsarAddress := getPulsarAddress()
pc, err := NewClient(tenant, namespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
producer, err := pc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic})
defer producer.Close()
assert.Nil(t, err)
assert.NotNil(t, producer)
fullTopicName, err := GetFullTopicName(tenant, namespace, topic)
assert.Nil(t, err)
assert.Equal(t, fullTopicName, producer.(*pulsarProducer).Topic())
consumer, err := pc.Subscribe(mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
BufSize: 1024,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
defer consumer.Close()
assert.Nil(t, err)
assert.NotNil(t, consumer)
}
func TestPulsarCtl(t *testing.T) {
topic := "test"
subName := "hello"
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
consumer, err := pc.Subscribe(mqwrapper.ConsumerOptions{
Topic: topic,
@ -690,7 +725,9 @@ func TestPulsarCtl(t *testing.T) {
})
assert.Error(t, err)
topicName, err := utils.GetTopicName(topic)
fullTopicName, err := GetFullTopicName(DefaultPulsarTenant, DefaultPulsarNamespace, topic)
assert.Nil(t, err)
topicName, err := utils.GetTopicName(fullTopicName)
assert.NoError(t, err)
pulsarURL, err := url.ParseRequestURI(pulsarAddress)
@ -698,12 +735,14 @@ func TestPulsarCtl(t *testing.T) {
panic(err)
}
webport := Params.LoadWithDefault("pulsar.webport", "80")
cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + webport
admin := cmdutils.NewPulsarClient()
webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport
admin, err := NewAdminClient(webServiceURL, "", "")
assert.NoError(t, err)
err = admin.Subscriptions().Delete(*topicName, subName, true)
if err != nil {
cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080"
admin := cmdutils.NewPulsarClient()
webServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080"
admin, err := NewAdminClient(webServiceURL, "", "")
assert.NoError(t, err)
err = admin.Subscriptions().Delete(*topicName, subName, true)
assert.NoError(t, err)
}
@ -714,7 +753,29 @@ func TestPulsarCtl(t *testing.T) {
BufSize: 1024,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
defer consumer2.Close()
assert.Nil(t, err)
assert.NotNil(t, consumer2)
defer consumer2.Close()
}
func NewPulsarAdminClient() {
panic("unimplemented")
}
func TestPulsarClient_GetFullTopicName(t *testing.T) {
fullTopicName, err := GetFullTopicName("", "", "topic")
assert.Error(t, err)
assert.Empty(t, fullTopicName)
fullTopicName, err = GetFullTopicName("tenant", "", "topic")
assert.Error(t, err)
assert.Empty(t, fullTopicName)
fullTopicName, err = GetFullTopicName("", "namespace", "topic")
assert.Error(t, err)
assert.Empty(t, fullTopicName)
fullTopicName, err = GetFullTopicName("tenant", "namespace", "topic")
assert.Nil(t, err)
assert.Equal(t, "tenant/namespace/topic", fullTopicName)
}

View File

@ -24,7 +24,6 @@ import (
"testing"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"github.com/apache/pulsar-client-go/pulsar"
@ -33,7 +32,7 @@ import (
func TestPulsarConsumer_Subscription(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
defer pc.Close()
@ -65,7 +64,7 @@ func Test_PatchEarliestMessageID(t *testing.T) {
func TestComsumeCompressedMessage(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
defer pc.Close()
@ -113,7 +112,7 @@ func TestComsumeCompressedMessage(t *testing.T) {
func TestPulsarConsumer_Close(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
receiveChannel := make(chan pulsar.ConsumerMessage, 100)
@ -173,12 +172,14 @@ func TestPulsarClientCloseUnsubscribeError(t *testing.T) {
panic(err)
}
webport := Params.LoadWithDefault("pulsar.webport", "80")
cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + webport
admin := cmdutils.NewPulsarClient()
webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport
admin, err := NewAdminClient(webServiceURL, "", "")
assert.NoError(t, err)
err = admin.Subscriptions().Delete(*topicName, subName, true)
if err != nil {
cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080"
admin := cmdutils.NewPulsarClient()
webServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080"
admin, err := NewAdminClient(webServiceURL, "", "")
assert.NoError(t, err)
err = admin.Subscriptions().Delete(*topicName, subName, true)
assert.NoError(t, err)
}

View File

@ -28,7 +28,7 @@ import (
func TestPulsarProducer(t *testing.T) {
pulsarAddress := getPulsarAddress()
pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
defer pc.Close()
assert.NoError(t, err)
assert.NotNil(t, pc)
@ -39,7 +39,9 @@ func TestPulsarProducer(t *testing.T) {
assert.NotNil(t, producer)
pulsarProd := producer.(*pulsarProducer)
assert.Equal(t, pulsarProd.Topic(), topic)
fullTopicName, err := GetFullTopicName(DefaultPulsarTenant, DefaultPulsarNamespace, topic)
assert.Nil(t, err)
assert.Equal(t, pulsarProd.Topic(), fullTopicName)
msg := &mqwrapper.ProducerMessage{
Payload: []byte{},

View File

@ -17,22 +17,19 @@
package paramtable
import (
"encoding/json"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"go.uber.org/zap"
)
var pulsarOnce sync.Once
const (
// SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message.
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
@ -72,7 +69,7 @@ func (p *ServiceParam) Init() {
p.MinioCfg.init(&p.BaseTable)
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- etcd ---
type EtcdConfig struct {
Base *BaseTable
@ -230,7 +227,7 @@ func (p *MetaStoreConfig) initMetaStoreType() {
p.MetaStoreType = p.Base.LoadWithDefault("metastore.type", util.MetaStoreTypeEtcd)
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- meta db ---
type MetaDBConfig struct {
Base *BaseTable
@ -306,7 +303,7 @@ func (p *MetaDBConfig) initMaxIdleConns() {
p.MaxIdleConns = maxIdleConns
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- pulsar ---
type PulsarConfig struct {
Base *BaseTable
@ -314,6 +311,14 @@ type PulsarConfig struct {
Address string
WebAddress string
MaxMessageSize int
// support auth
AuthPlugin string
AuthParams string
// support tenant
Tenant string
Namespace string
}
func (p *PulsarConfig) init(base *BaseTable) {
@ -322,6 +327,10 @@ func (p *PulsarConfig) init(base *BaseTable) {
p.initAddress()
p.initWebAddress()
p.initMaxMessageSize()
p.initAuthPlugin()
p.initAuthParams()
p.initTenant()
p.initNamespace()
}
func (p *PulsarConfig) initAddress() {
@ -350,9 +359,6 @@ func (p *PulsarConfig) initWebAddress() {
webport := p.Base.LoadWithDefault("pulsar.webport", "80")
p.WebAddress = "http://" + pulsarURL.Hostname() + ":" + webport
}
pulsarOnce.Do(func() {
cmdutils.PulsarCtlConfig.WebServiceURL = p.WebAddress
})
}
func (p *PulsarConfig) initMaxMessageSize() {
@ -369,6 +375,41 @@ func (p *PulsarConfig) initMaxMessageSize() {
}
}
func (p *PulsarConfig) initAuthPlugin() {
p.AuthPlugin = p.Base.LoadWithDefault("pulsar.authPlugin", "")
}
func (p *PulsarConfig) initAuthParams() {
paramString := p.Base.LoadWithDefault("pulsar.authParams", "")
// need to parse params to json due to .yaml config file doesn't support json format config item
// official pulsar client JWT config : {"token","fake_token_string"}
// milvus config: token:fake_token_string
jsonMap := make(map[string]string)
params := strings.Split(paramString, ",")
for _, param := range params {
kv := strings.Split(param, ":")
if len(kv) == 2 {
jsonMap[kv[0]] = kv[1]
}
}
if len(jsonMap) == 0 {
p.AuthParams = ""
} else {
jsonData, _ := json.Marshal(&jsonMap)
p.AuthParams = string(jsonData)
}
}
func (p *PulsarConfig) initTenant() {
p.Tenant = p.Base.LoadWithDefault("pulsar.tenant", "public")
}
func (p *PulsarConfig) initNamespace() {
p.Namespace = p.Base.LoadWithDefault("pulsar.namespace", "default")
}
// --- kafka ---
type KafkaConfig struct {
Base *BaseTable
@ -416,7 +457,7 @@ func (k *KafkaConfig) initExtraKafkaConfig() {
k.ProducerExtraConfig = k.Base.GetConfigSubSet(KafkaProducerConfigPrefix)
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- rocksmq ---
type RocksmqConfig struct {
Base *BaseTable
@ -434,7 +475,7 @@ func (p *RocksmqConfig) initPath() {
p.Path = p.Base.LoadWithDefault("rocksmq.path", "")
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
type MinioConfig struct {
Base *BaseTable

View File

@ -105,6 +105,26 @@ func TestServiceParam(t *testing.T) {
}
})
t.Run("test pulsar auth config", func(t *testing.T) {
Params := SParams.PulsarCfg
Params.initAuthPlugin()
assert.Equal(t, "", Params.AuthPlugin)
Params.initAuthParams()
assert.Equal(t, "", Params.AuthParams)
})
t.Run("test pulsar tenant/namespace config", func(t *testing.T) {
Params := SParams.PulsarCfg
Params.initTenant()
assert.Equal(t, "public", Params.Tenant)
Params.initNamespace()
assert.Equal(t, "default", Params.Namespace)
})
t.Run("test rocksmqConfig", func(t *testing.T) {
Params := SParams.RocksmqCfg