Rainbond/monitor/prometheus/service_monitor.go
2021-04-01 19:08:57 +08:00

409 lines
13 KiB
Go

// RAINBOND, Application Management Platform
// Copyright (C) 2020-2020 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package prometheus
import (
"context"
"fmt"
"net/url"
"regexp"
"sort"
"strings"
"time"
mv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
externalversions "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions"
"github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
//ServiceMonitorController service monitor
type ServiceMonitorController struct {
ctx context.Context
Prometheus *Manager
lastScrapes []*ScrapeConfig
smClient *versioned.Clientset
smInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
}
//NewServiceMonitorController new sm controller
func NewServiceMonitorController(ctx context.Context, config *rest.Config, pm *Manager) (*ServiceMonitorController, error) {
var smc ServiceMonitorController
c, err := versioned.NewForConfig(config)
if err != nil {
return nil, err
}
smc.smClient = c
smc.ctx = ctx
smFactory := externalversions.NewSharedInformerFactoryWithOptions(c, 5*time.Minute,
externalversions.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = "creator=Rainbond"
}))
informer := smFactory.Monitoring().V1().ServiceMonitors().Informer()
informer.AddEventHandlerWithResyncPeriod(&smc, time.Second*30)
smc.smInf = informer
smc.Prometheus = pm
smc.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "sm-monitor")
return &smc, nil
}
//Run run controller
func (s *ServiceMonitorController) Run(stopCh <-chan struct{}) {
go s.worker(s.ctx)
go func() {
defer s.queue.ShutDown()
s.smInf.Run(stopCh)
}()
cache.WaitForCacheSync(stopCh, s.smInf.HasSynced)
logrus.Info("service monitor controller start success")
}
//OnAdd sm add
func (s *ServiceMonitorController) OnAdd(obj interface{}) {
s.enqueue(obj)
}
//OnUpdate sm update
func (s *ServiceMonitorController) OnUpdate(oldObj, newObj interface{}) {
s.enqueue(newObj)
}
//OnDelete sm delete
func (s *ServiceMonitorController) OnDelete(obj interface{}) {
s.enqueue(obj)
}
func (s *ServiceMonitorController) enqueue(obj interface{}) {
if obj == nil {
return
}
key, ok := obj.(string)
if !ok {
key, ok = s.keyFunc(obj)
if !ok {
return
}
}
s.queue.Add(key)
}
func (s *ServiceMonitorController) keyFunc(obj interface{}) (string, bool) {
//namespace and name
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
logrus.Errorf("creating key failed %s", err.Error())
return k, false
}
return k, true
}
func (s *ServiceMonitorController) worker(ctx context.Context) {
for s.processNextWorkItem(ctx) {
}
}
func (s *ServiceMonitorController) processNextWorkItem(ctx context.Context) bool {
key, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(key)
s.sync()
s.queue.Forget(key)
return true
}
func (s *ServiceMonitorController) sync() {
logrus.Debug("start sync service monitor config to prometheus config")
smList := s.smInf.GetStore().List()
var scrapes []*ScrapeConfig
sMonIdentifiers := make([]string, len(smList))
sMons := make(map[string]*mv1.ServiceMonitor, len(smList))
i := 0
for j := range smList {
if sm, ok := smList[j].(*mv1.ServiceMonitor); ok && sm != nil {
sMonIdentifiers[i] = sm.GetName()
sMons[sm.GetName()] = sm
i++
}
}
// Sorting ensures, that we always generate the config in the same order.
sort.Strings(sMonIdentifiers)
var scrapeMap = make(map[string]*ScrapeConfig, len(sMonIdentifiers))
for _, name := range sMonIdentifiers {
for i, end := range sMons[name].Spec.Endpoints {
scrape := s.createScrapeBySM(sMons[name], end, i)
scrapeMap[scrape.JobName] = scrape
scrapes = append(scrapes, scrape)
}
}
var remove []*ScrapeConfig
for i, ls := range s.lastScrapes {
if _, ok := scrapeMap[ls.JobName]; !ok {
remove = append(remove, s.lastScrapes[i])
}
}
s.Prometheus.UpdateAndRemoveScrape(remove, scrapes...)
s.lastScrapes = scrapes
logrus.Debugf("success sync service monitor config and or update %d , remove %d", len(scrapes), len(remove))
}
func (s *ServiceMonitorController) createScrapeBySM(sm *mv1.ServiceMonitor, ep mv1.Endpoint, i int) *ScrapeConfig {
var sc = ScrapeConfig{
JobName: fmt.Sprintf("%s/%s/%d", sm.Namespace, sm.Name, i),
ServiceDiscoveryConfig: ServiceDiscoveryConfig{
KubernetesSDConfigs: []*SDConfig{
&SDConfig{
Role: RoleEndpoint,
NamespaceDiscovery: NamespaceDiscovery{
Names: []string{sm.Namespace},
},
Selectors: []SelectorConfig{
SelectorConfig{
Role: RoleEndpoint,
},
},
},
},
},
}
if ep.Interval != "" {
sc.ScrapeInterval = parseDuration(ep.Interval, time.Second*15)
}
if ep.ScrapeTimeout != "" {
sc.ScrapeTimeout = parseDuration(ep.ScrapeTimeout, time.Second*10)
}
if ep.Path != "" {
sc.MetricsPath = ep.Path
}
if ep.ProxyURL != nil && *ep.ProxyURL != "" {
purl, _ := url.Parse(*ep.ProxyURL)
if purl != nil {
sc.HTTPClientConfig.ProxyURL = URL{purl}
}
}
if ep.Params != nil {
sc.Params = ep.Params
}
if ep.Scheme != "" {
sc.Scheme = ep.Scheme
}
if ep.TLSConfig != nil {
sc.HTTPClientConfig.TLSConfig = TLSConfig{
CAFile: ep.TLSConfig.CAFile,
CertFile: ep.TLSConfig.CertFile,
KeyFile: ep.TLSConfig.KeyFile,
}
}
if ep.BearerTokenFile != "" {
sc.HTTPClientConfig.BearerTokenFile = ep.BearerTokenFile
}
var labelKeys []string
for k := range sm.Spec.Selector.MatchLabels {
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)
for _, k := range labelKeys {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "keep",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_label_" + sanitizeLabelName(k))},
Regex: MustNewRegexp(sm.Spec.Selector.MatchLabels[k]),
})
}
// Set based label matching. We have to map the valid relations
// `In`, `NotIn`, `Exists`, and `DoesNotExist`, into relabeling rules.
for _, exp := range sm.Spec.Selector.MatchExpressions {
switch exp.Operator {
case metav1.LabelSelectorOpIn:
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "keep",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_label_" + sanitizeLabelName(exp.Key))},
Regex: MustNewRegexp(strings.Join(exp.Values, "|")),
})
case metav1.LabelSelectorOpNotIn:
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "drop",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_label_" + sanitizeLabelName(exp.Key))},
Regex: MustNewRegexp(strings.Join(exp.Values, "|")),
})
case metav1.LabelSelectorOpExists:
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "keep",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_label_" + sanitizeLabelName(exp.Key))},
Regex: MustNewRegexp(".+"),
})
case metav1.LabelSelectorOpDoesNotExist:
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "drop",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_label_" + sanitizeLabelName(exp.Key))},
Regex: MustNewRegexp(".+"),
})
}
}
// Filter targets based on correct port for the endpoint.
if ep.Port != "" {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "keep",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_endpoint_port_name")},
Regex: MustNewRegexp(ep.Port),
})
} else if ep.TargetPort != nil {
if ep.TargetPort.StrVal != "" {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "keep",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_pod_container_port_name")},
Regex: MustNewRegexp(ep.TargetPort.String()),
})
} else if ep.TargetPort.IntVal != 0 {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
Action: "keep",
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_pod_container_port_number")},
Regex: MustNewRegexp(ep.TargetPort.String()),
})
}
}
sc.RelabelConfigs = append(sc.RelabelConfigs, []*RelabelConfig{
{ // Relabel node labels for pre v2.3 meta labels
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_endpoint_address_target_kind"), model.LabelName("__meta_kubernetes_endpoint_address_target_name")},
Regex: MustNewRegexp("Node;(.*)"),
Replacement: "${1}",
TargetLabel: "node",
Separator: ";",
},
{ // Relabel pod labels for >=v2.3 meta labels
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_endpoint_address_target_kind"), model.LabelName("__meta_kubernetes_endpoint_address_target_name")},
Regex: MustNewRegexp("Pod;(.*)"),
Replacement: "${1}",
TargetLabel: "pod",
Separator: ";",
},
{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_namespace")},
TargetLabel: "namespace",
},
{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_name")},
TargetLabel: "service",
},
{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_pod_name")},
TargetLabel: "pod",
},
{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_pod_container_name")},
TargetLabel: "container",
},
}...)
// Relabel targetLabels from Service onto target.
for _, l := range sm.Spec.TargetLabels {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_label_" + sanitizeLabelName(l))},
TargetLabel: sanitizeLabelName(l),
Regex: MustNewRegexp("(.+)"),
Replacement: "${1}",
})
}
for _, l := range sm.Spec.PodTargetLabels {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_pod_label_" + sanitizeLabelName(l))},
TargetLabel: sanitizeLabelName(l),
Regex: MustNewRegexp("(.+)"),
Replacement: "${1}",
})
}
// By default, generate a safe job name from the service name. We also keep
// this around if a jobLabel is set in case the targets don't actually have a
// value for it. A single service may potentially have multiple metrics
// endpoints, therefore the endpoints labels is filled with the ports name or
// as a fallback the port number.
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_name")},
TargetLabel: "job",
Replacement: "${1}",
})
if sm.Spec.JobLabel != "" {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
SourceLabels: model.LabelNames{model.LabelName("__meta_kubernetes_service_label_" + sanitizeLabelName(sm.Spec.JobLabel))},
TargetLabel: "job",
Replacement: "${1}",
Regex: MustNewRegexp("(.+)"),
})
}
if ep.Port != "" {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
TargetLabel: "endpoint",
Replacement: ep.Port,
})
} else if ep.TargetPort != nil && ep.TargetPort.String() != "" {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
TargetLabel: "endpoint",
Replacement: ep.TargetPort.String(),
})
}
if ep.RelabelConfigs != nil {
for _, c := range ep.RelabelConfigs {
sc.RelabelConfigs = append(sc.RelabelConfigs, &RelabelConfig{
SourceLabels: func() (re model.LabelNames) {
for _, l := range c.SourceLabels {
re = append(re, model.LabelName(l))
}
return
}(),
Separator: c.Separator,
TargetLabel: c.TargetLabel,
Regex: MustNewRegexp(c.Regex),
Modulus: c.Modulus,
Replacement: c.Replacement,
Action: RelabelAction(c.Action),
})
}
}
return &sc
}
func parseDuration(source string, def time.Duration) model.Duration {
d, err := time.ParseDuration(source)
if err != nil {
return model.Duration(def)
}
return model.Duration(d)
}
var (
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
)
func sanitizeLabelName(name string) string {
return invalidLabelCharRE.ReplaceAllString(name, "_")
}