Rocksmq client should be closed when close a msgstream (#13865)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
yukun 2021-12-29 10:04:57 +08:00 committed by GitHub
parent f6ed8635bc
commit 4ba974d193
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 3 additions and 6 deletions

View File

@ -766,6 +766,7 @@ func (ms *MqTtMsgStream) Close() {
reader.Close()
}
}
ms.client.Close()
}
func (ms *MqTtMsgStream) bufMsgPackToChannel() {

View File

@ -123,5 +123,6 @@ func (pc *pulsarClient) BytesToMsgID(id []byte) (MessageID, error) {
}
func (pc *pulsarClient) Close() {
pc.client.Close()
// FIXME(yukun): pulsar.client is a singleton, so can't invoke this close when server run
// pc.client.Close()
}

View File

@ -183,10 +183,5 @@ func (c *client) Close() {
c.closeOnce.Do(func() {
close(c.closeCh)
c.wg.Wait()
if c.server != nil {
c.server.Close()
}
// Wait all consume goroutines exit
c.consumerOptions = nil
})
}