[Improvement] add pulsar metrics and fix timeout (#26907)

Signed-off-by: chyezh <ye.zhen@zilliz.com>
This commit is contained in:
chyezh 2023-09-13 12:03:19 +08:00 committed by GitHub
parent c033580af4
commit 791e6ef6c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 9 deletions

View File

@ -114,6 +114,7 @@ pulsar:
tenant: public
namespace: default
requestTimeout: 60 # pulsar client global request timeout in seconds
enableClientMetrics: false # Whether to register pulsar client metrics into milvus metrics path.
# If you want to enable kafka, needs to comment the pulsar configs
# kafka:

View File

@ -113,10 +113,23 @@ var (
lockType,
lockOp,
})
metricRegisterer prometheus.Registerer
)
// GetRegisterer returns the global prometheus registerer
// metricsRegistry must be call after Register is called or no Register is called.
func GetRegisterer() prometheus.Registerer {
if metricRegisterer == nil {
return prometheus.DefaultRegisterer
}
return metricRegisterer
}
// Register serves prometheus http service
func Register(r *prometheus.Registry) {
// Should be called by init function.
func Register(r prometheus.Registerer) {
r.MustRegister(NumNodes)
r.MustRegister(LockCosts)
metricRegisterer = r
}

View File

@ -39,3 +39,14 @@ func TestRegisterMetrics(t *testing.T) {
RegisterMsgStreamMetrics(r)
})
}
func TestGetRegisterer(t *testing.T) {
register := GetRegisterer()
assert.NotNil(t, register)
assert.Equal(t, prometheus.DefaultRegisterer, register)
r := prometheus.NewRegistry()
Register(r)
register = GetRegisterer()
assert.NotNil(t, register)
assert.Equal(t, r, register)
}

View File

@ -23,11 +23,13 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/cockroachdb/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/streamnative/pulsarctl/pkg/cli"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
kafkawrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/kafka"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/nmq"
@ -49,11 +51,12 @@ type PmsFactory struct {
PulsarTenant string
PulsarNameSpace string
RequestTimeout time.Duration
metricRegisterer prometheus.Registerer
}
func NewPmsFactory(serviceParam *paramtable.ServiceParam) *PmsFactory {
config := &serviceParam.PulsarCfg
return &PmsFactory{
f := &PmsFactory{
MQBufSize: serviceParam.MQCfg.MQBufSize.GetAsInt64(),
ReceiveBufSize: serviceParam.MQCfg.ReceiveBufSize.GetAsInt64(),
PulsarAddress: config.Address.GetValue(),
@ -64,6 +67,11 @@ func NewPmsFactory(serviceParam *paramtable.ServiceParam) *PmsFactory {
PulsarNameSpace: config.Namespace.GetValue(),
RequestTimeout: config.RequestTimeout.GetAsDuration(time.Second),
}
if config.EnableClientMetrics.GetAsBool() {
// Enable client metrics if config.EnableClientMetrics is true, use pkg-defined registerer.
f.metricRegisterer = metrics.GetRegisterer()
}
return f
}
// NewMsgStream is used to generate a new Msgstream object
@ -82,9 +90,10 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
return nil, err
}
clientOpts := pulsar.ClientOptions{
URL: f.PulsarAddress,
Authentication: auth,
OperationTimeout: timeout,
URL: f.PulsarAddress,
Authentication: auth,
OperationTimeout: timeout,
MetricsRegisterer: f.metricRegisterer,
}
pulsarClient, err := pulsarmqwrapper.NewClient(f.PulsarTenant, f.PulsarNameSpace, clientOpts)
@ -108,9 +117,10 @@ func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
return nil, err
}
clientOpts := pulsar.ClientOptions{
URL: f.PulsarAddress,
Authentication: auth,
OperationTimeout: timeout,
URL: f.PulsarAddress,
Authentication: auth,
OperationTimeout: timeout,
MetricsRegisterer: f.metricRegisterer,
}
pulsarClient, err := pulsarmqwrapper.NewClient(f.PulsarTenant, f.PulsarNameSpace, clientOpts)

View File

@ -502,6 +502,9 @@ type PulsarConfig struct {
// Global request timeout
RequestTimeout ParamItem `refreshable:"false"`
// Enable Client side metrics
EnableClientMetrics ParamItem `refreshable:"false"`
}
func (p *PulsarConfig) Init(base *BaseTable) {
@ -615,6 +618,14 @@ func (p *PulsarConfig) Init(base *BaseTable) {
Export: true,
}
p.RequestTimeout.Init(base.mgr)
p.EnableClientMetrics = ParamItem{
Key: "pulsar.enableClientMetrics",
Version: "2.3.0",
DefaultValue: "false",
Export: true,
}
p.EnableClientMetrics.Init(base.mgr)
}
// --- kafka ---
@ -1067,5 +1078,4 @@ Leave it empty if you want to use AWS default endpoint`,
Export: true,
}
p.UseVirtualHost.Init(base.mgr)
}