From 9fe62cb5f3ca9bc03d7d0fd2ad0e9f59c25c1d9f Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Tue, 18 Jul 2023 18:24:57 +0800 Subject: [PATCH] Fix kafka producer panic by nil (#25691) Signed-off-by: Enwei Jiao --- internal/rootcoord/dml_channels.go | 4 ++-- .../msgstream/mqwrapper/kafka/kafka_client.go | 21 ++++++++----------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 79790fd80e..9be083f4af 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -150,7 +150,7 @@ type dmlChannels struct { } func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePrefixDefault string, chanNumDefault int64) *dmlChannels { - params := paramtable.Get().CommonCfg + params := ¶mtable.Get().CommonCfg var ( chanNamePrefix string chanNum int64 @@ -347,7 +347,7 @@ func (d *dmlChannels) removeChannels(names ...string) { } func getChannelName(prefix string, idx int64) string { - params := paramtable.Get().CommonCfg + params := ¶mtable.Get().CommonCfg if params.PreCreatedTopicEnabled.GetAsBool() { return params.TopicNames.GetAsStrings()[idx] } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index 4c657e689f..0ee3679ada 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -15,7 +15,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/timerecord" ) -var Producer *kafka.Producer +var producer *kafka.Producer var once sync.Once @@ -85,13 +85,15 @@ func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap { } func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) { - var err error + config := kc.newProducerConfig() + producer, err := kafka.NewProducer(config) + if err != nil { + log.Error("create sync kafka producer failed", zap.Error(err)) + return nil, err + } once.Do(func() { - config := kc.newProducerConfig() - Producer, err = kafka.NewProducer(config) - go func() { - for e := range Producer.Events() { + for e := range producer.Events() { switch ev := e.(type) { case kafka.Error: // Generic client instance-level errors, such as broker connection failures, @@ -109,12 +111,7 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) { }() }) - if err != nil { - log.Error("create sync kafka producer failed", zap.Error(err)) - return nil, err - } - - return Producer, nil + return producer, nil } func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {