Make pulsar client singleton (#5782)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-06-15 19:03:56 +08:00 committed by GitHub
parent 96cb24e566
commit c2ecce61c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 30 additions and 29 deletions

View File

@ -38,7 +38,7 @@ func (f *PmsFactory) SetParams(params map[string]interface{}) error {
}
func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
pulsarClient, err := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: f.PulsarAddress})
pulsarClient, err := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: f.PulsarAddress})
if err != nil {
return nil, err
}
@ -46,7 +46,7 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
}
func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
pulsarClient, err := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: f.PulsarAddress})
pulsarClient, err := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: f.PulsarAddress})
if err != nil {
return nil, err
}

View File

@ -168,9 +168,6 @@ func (ms *mqMsgStream) Close() {
consumer.Close()
}
}
if ms.client != nil {
ms.client.Close()
}
}
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
@ -525,9 +522,6 @@ func (ms *MqTtMsgStream) Close() {
consumer.Close()
}
}
if ms.client != nil {
ms.client.Close()
}
}
func (ms *MqTtMsgStream) bufMsgPackToChannel() {

View File

@ -223,7 +223,7 @@ func getTimeTickMsgPack(reqID UniqueID) *MsgPack {
func getPulsarInputStream(pulsarAddress string, producerChannels []string, opts ...RepackFunc) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {
@ -235,7 +235,7 @@ func getPulsarInputStream(pulsarAddress string, producerChannels []string, opts
func getPulsarOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
@ -244,7 +244,7 @@ func getPulsarOutputStream(pulsarAddress string, consumerChannels []string, cons
func getPulsarTtOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
@ -253,7 +253,7 @@ func getPulsarTtOutputStream(pulsarAddress string, consumerChannels []string, co
func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPosition) MsgStream {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
consumerName := []string{}
for _, c := range positions {
@ -489,12 +489,12 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient2, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
@ -543,12 +543,12 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient2, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
@ -577,12 +577,12 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4))
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient2, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
@ -1006,7 +1006,7 @@ func TestStream_MqMsgStream_Seek(t *testing.T) {
outputStream.Close()
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream2, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
outputStream2.AsConsumer(consumerChannels, consumerSubName)
outputStream2.Seek([]*internalpb.MsgPosition{seekPosition})

View File

@ -13,6 +13,7 @@ package mqclient
import (
"errors"
"sync"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log"
@ -23,14 +24,20 @@ type pulsarClient struct {
client pulsar.Client
}
func NewPulsarClient(opts pulsar.ClientOptions) (*pulsarClient, error) {
c, err := pulsar.NewClient(opts)
if err != nil {
log.Error("Set pulsar client failed, error", zap.Error(err))
return nil, err
}
cli := &pulsarClient{client: c}
return cli, nil
var sc *pulsarClient
var once sync.Once
func GetPulsarClientInstance(opts pulsar.ClientOptions) (*pulsarClient, error) {
once.Do(func() {
c, err := pulsar.NewClient(opts)
if err != nil {
log.Error("Set pulsar client failed, error", zap.Error(err))
return
}
cli := &pulsarClient{client: c}
sc = cli
})
return sc, nil
}
func (pc *pulsarClient) CreateProducer(options ProducerOptions) (Producer, error) {

View File

@ -168,7 +168,7 @@ func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string,
func TestPulsarClient(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
pc, err := NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
defer pc.Close()
assert.NoError(t, err)
assert.NotNil(t, pc)
@ -319,7 +319,7 @@ func Consume23(ctx context.Context, t *testing.T, pc *pulsarClient, topic string
func TestPulsarClient2(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
pc, err := NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress})
pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
defer pc.Close()
assert.NoError(t, err)
assert.NotNil(t, pc)