change some code bug #370

This commit is contained in:
barnettZQG 2020-09-25 10:58:54 +08:00
parent 45aefa196b
commit 4b7644351f
4 changed files with 84 additions and 9 deletions

View File

@ -27,10 +27,10 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/sirupsen/logrus"
"github.com/go-chi/chi" "github.com/go-chi/chi"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -87,7 +87,12 @@ func Run(s *option.GWServer) error {
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector()) reg.MustRegister(prometheus.NewGoCollector())
reg.MustRegister(prometheus.NewProcessCollector(os.Getpid(), "gateway")) reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
PidFn: func() (int, error) {
return os.Getpid(), nil
},
Namespace: "gateway",
}))
mc := metric.NewDummyCollector() mc := metric.NewDummyCollector()
if s.Config.EnableMetrics { if s.Config.EnableMetrics {
mc, err = metric.NewCollector(s.NodeName, reg) mc, err = metric.NewCollector(s.NodeName, reg)

View File

@ -251,3 +251,35 @@ func (p *Manager) UpdateScrape(scrapes ...*ScrapeConfig) {
logrus.Errorf("save prometheus config failure:%s", err.Error()) logrus.Errorf("save prometheus config failure:%s", err.Error())
} }
} }
// UpdateAndRemoveScrape update and remove scrape
func (p *Manager) UpdateAndRemoveScrape(remove []*ScrapeConfig, scrapes ...*ScrapeConfig) {
p.l.Lock()
defer p.l.Unlock()
for _, scrape := range scrapes {
logrus.Debugf("update scrape: %+v", scrape)
exist := false
for i, s := range p.Config.ScrapeConfigs {
if s.JobName == scrape.JobName {
p.Config.ScrapeConfigs[i] = scrape
exist = true
break
}
}
if !exist {
p.Config.ScrapeConfigs = append(p.Config.ScrapeConfigs, scrape)
}
}
for _, rm := range remove {
for i, s := range p.Config.ScrapeConfigs {
if s.JobName == rm.JobName {
logrus.Infof("remove scrape %s", rm.JobName)
p.Config.ScrapeConfigs = append(p.Config.ScrapeConfigs[0:i], p.Config.ScrapeConfigs[i+1:]...)
break
}
}
}
if err := p.SaveConfig(); err != nil {
logrus.Errorf("save prometheus config failure:%s", err.Error())
}
}

View File

@ -40,11 +40,12 @@ import (
//ServiceMonitorController service monitor //ServiceMonitorController service monitor
type ServiceMonitorController struct { type ServiceMonitorController struct {
ctx context.Context ctx context.Context
Prometheus *Manager Prometheus *Manager
smClient *versioned.Clientset lastScrapes []*ScrapeConfig
smInf cache.SharedIndexInformer smClient *versioned.Clientset
queue workqueue.RateLimitingInterface smInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
} }
//NewServiceMonitorController new sm controller //NewServiceMonitorController new sm controller
@ -133,6 +134,7 @@ func (s *ServiceMonitorController) processNextWorkItem(ctx context.Context) bool
} }
func (s *ServiceMonitorController) sync() { func (s *ServiceMonitorController) sync() {
logrus.Debug("start sync service monitor config to prometheus config")
smList := s.smInf.GetStore().List() smList := s.smInf.GetStore().List()
var scrapes []*ScrapeConfig var scrapes []*ScrapeConfig
sMonIdentifiers := make([]string, len(smList)) sMonIdentifiers := make([]string, len(smList))
@ -148,12 +150,23 @@ func (s *ServiceMonitorController) sync() {
// Sorting ensures, that we always generate the config in the same order. // Sorting ensures, that we always generate the config in the same order.
sort.Strings(sMonIdentifiers) sort.Strings(sMonIdentifiers)
var scrapeMap = make(map[string]*ScrapeConfig, len(sMonIdentifiers))
for _, name := range sMonIdentifiers { for _, name := range sMonIdentifiers {
for i, end := range sMons[name].Spec.Endpoints { for i, end := range sMons[name].Spec.Endpoints {
scrapes = append(scrapes, s.createScrapeBySM(sMons[name], end, i)) scrape := s.createScrapeBySM(sMons[name], end, i)
scrapeMap[scrape.JobName] = scrape
scrapes = append(scrapes, scrape)
} }
} }
s.Prometheus.UpdateScrape(scrapes...) var remove []*ScrapeConfig
for i, ls := range s.lastScrapes {
if _, ok := scrapeMap[ls.JobName]; !ok {
remove = append(remove, s.lastScrapes[i])
}
}
s.Prometheus.UpdateAndRemoveScrape(remove, scrapes...)
s.lastScrapes = scrapes
logrus.Debugf("success sync service monitor config and or update %d , remove %d", len(scrapes), len(remove))
} }
func (s *ServiceMonitorController) createScrapeBySM(sm *mv1.ServiceMonitor, ep mv1.Endpoint, i int) *ScrapeConfig { func (s *ServiceMonitorController) createScrapeBySM(sm *mv1.ServiceMonitor, ep mv1.Endpoint, i int) *ScrapeConfig {

View File

@ -20,11 +20,14 @@ package prometheus
import ( import (
"bytes" "bytes"
"fmt"
"testing" "testing"
"time"
mv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1" mv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
k8syaml "k8s.io/apimachinery/pkg/util/yaml" k8syaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/util/workqueue"
) )
var smYaml = ` var smYaml = `
@ -67,3 +70,25 @@ func TestCreateScrapeBySM(t *testing.T) {
} }
t.Log(string(out)) t.Log(string(out))
} }
func TestQueue(t *testing.T) {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "sm-monitor")
defer queue.ShutDown()
go func() {
for i := 0; i < 10; i++ {
queue.Add("abc")
time.Sleep(time.Second * 1)
}
}()
for {
item, close := queue.Get()
if close {
t.Fatal("queue closed")
}
time.Sleep(time.Second * 2)
fmt.Println(item)
queue.Forget(item)
queue.Done(item)
}
}