diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index cf8f3ffb05..c46e05d371 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -74,6 +74,7 @@ func (pc *pulsarClient) CreateReader(options ReaderOptions) (Reader, error) { return reader, nil } +// Subscribe creates a pulsar consumer instance and subscribe a topic func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize) consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{