support configurable sasl mechanisms for kafka (#17542)

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2022-06-29 10:32:17 +08:00 committed by GitHub
parent 4ae1ca2cac
commit 762e3b78a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 6 deletions

View File

@ -84,6 +84,8 @@ pulsar:
# brokerList: localhost1:9092,localhost2:9092,localhost3:9092
# saslUsername: username
# saslPassword: password
# saslMechanisms: PLAIN
# securityProtocol: SASL_SSL
rocksmq:
# please adjust in embedded Milvus: /tmp/milvus/rdb_data

View File

@ -43,8 +43,8 @@ func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClie
}
if config.SaslUsername != "" && config.SaslPassword != "" {
kafkaConfig.SetKey("sasl.mechanisms", "PLAIN")
kafkaConfig.SetKey("security.protocol", "SASL_SSL")
kafkaConfig.SetKey("sasl.mechanisms", config.SaslMechanisms)
kafkaConfig.SetKey("security.protocol", config.SecurityProtocol)
kafkaConfig.SetKey("sasl.username", config.SaslUsername)
kafkaConfig.SetKey("sasl.password", config.SaslPassword)
}

View File

@ -246,10 +246,12 @@ func (p *PulsarConfig) initMaxMessageSize() {
// --- kafka ---
type KafkaConfig struct {
Base *BaseTable
Address string
SaslUsername string
SaslPassword string
Base *BaseTable
Address string
SaslUsername string
SaslPassword string
SaslMechanisms string
SecurityProtocol string
}
func (k *KafkaConfig) init(base *BaseTable) {
@ -257,6 +259,8 @@ func (k *KafkaConfig) init(base *BaseTable) {
k.initAddress()
k.initSaslUsername()
k.initSaslPassword()
k.initSaslMechanisms()
k.initSecurityProtocol()
}
func (k *KafkaConfig) initAddress() {
@ -275,6 +279,14 @@ func (k *KafkaConfig) initSaslPassword() {
k.SaslPassword = k.Base.LoadWithDefault("kafka.saslPassword", "")
}
func (k *KafkaConfig) initSaslMechanisms() {
k.SaslMechanisms = k.Base.LoadWithDefault("kafka.saslMechanisms", "PLAIN")
}
func (k *KafkaConfig) initSecurityProtocol() {
k.SecurityProtocol = k.Base.LoadWithDefault("kafka.securityProtocol", "SASL_SSL")
}
///////////////////////////////////////////////////////////////////////////////
// --- rocksmq ---
type RocksmqConfig struct {