[FIX] Asynchronous statistics disk

This commit is contained in:
goodrain 2018-01-31 17:04:05 +08:00
parent 3381939e1f
commit cf06567a93
3 changed files with 137 additions and 34 deletions

119
pkg/worker/monitor/cache/cache.go vendored Normal file
View File

@ -0,0 +1,119 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"context"
"fmt"
"os"
"time"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/pkg/db"
"github.com/goodrain/rainbond/pkg/db/model"
"github.com/goodrain/rainbond/pkg/status"
"github.com/goodrain/rainbond/pkg/util"
)
//DiskCache 磁盘异步统计
type DiskCache struct {
cache map[string]float64
dbmanager db.Manager
statusManager status.ServiceStatusManager
ctx context.Context
}
//CreatDiskCache 创建
func CreatDiskCache(ctx context.Context, statusManager status.ServiceStatusManager) *DiskCache {
return &DiskCache{
dbmanager: db.GetManager(),
statusManager: statusManager,
ctx: ctx,
}
}
//Start 开始启动统计
func (d *DiskCache) Start() {
d.setcache()
timer := time.NewTimer(time.Minute * 5)
defer timer.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-timer.C:
d.setcache()
timer.Reset(time.Minute * 5)
}
}
}
func (d *DiskCache) setcache() {
logrus.Info("start get all service disk size")
start := time.Now()
d.cache = nil
d.cache = make(map[string]float64)
services, err := d.dbmanager.TenantServiceDao().GetAllServices()
if err != nil {
logrus.Errorln("Error get tenant service when select db :", err)
}
volumes, err := d.dbmanager.TenantServiceVolumeDao().GetAllVolumes()
if err != nil {
logrus.Errorln("Error get tenant service volume when select db :", err)
}
localPath := os.Getenv("LOCAL_DATA_PATH")
sharePath := os.Getenv("SHARE_DATA_PATH")
if localPath == "" {
localPath = "/grlocaldata"
}
if sharePath == "" {
sharePath = "/grdata"
}
var cache = make(map[string]*model.TenantServices)
for _, service := range services {
//默认目录
size := util.GetDirSize(fmt.Sprintf("%s/tenant/%s/service/%s", sharePath, service.TenantID, service.ServiceID))
if size != 0 {
d.cache[service.ServiceID+"_"+service.TenantID] = size
}
cache[service.ServiceID] = service
}
gettenantID := func(serviceID string) string {
if service, ok := cache[serviceID]; ok {
return service.TenantID
}
return ""
}
for _, v := range volumes {
if v.VolumeType == string(model.LocalVolumeType) {
//默认目录
size := util.GetDirSize(fmt.Sprintf("%s/tenant/%s/service/%s", localPath, gettenantID(v.ServiceID), v.ServiceID))
if size != 0 {
d.cache[v.ServiceID+"_"+gettenantID(v.ServiceID)] += size
}
}
}
logrus.Infof("end get all service disk size,time consum %d s", time.Now().Sub(start).Seconds())
}
//Get 获取磁盘统计结果
func (d *DiskCache) Get() map[string]float64 {
return d.cache
}

View File

@ -19,15 +19,16 @@
package collector
import (
"fmt"
"os"
"strings"
"time"
"github.com/goodrain/rainbond/pkg/db/model"
"github.com/goodrain/rainbond/pkg/worker/monitor/cache"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/pkg/db"
"github.com/goodrain/rainbond/pkg/db/model"
"github.com/goodrain/rainbond/pkg/status"
"github.com/goodrain/rainbond/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)
@ -42,7 +43,7 @@ type Exporter struct {
workerUp prometheus.Gauge
dbmanager db.Manager
statusManager status.ServiceStatusManager
fscache map[string]float64
cache *cache.DiskCache
}
var scrapeDurationDesc = prometheus.NewDesc(
@ -90,12 +91,6 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
e.scrapeErrors.WithLabelValues("db.getservices").Inc()
e.error.Set(1)
}
volumes, err := e.dbmanager.TenantServiceVolumeDao().GetAllVolumes()
if err != nil {
logrus.Errorln("Error scraping for tenant service when select db :", err)
e.scrapeErrors.WithLabelValues("db.getvolumes").Inc()
e.error.Set(1)
}
localPath := os.Getenv("LOCAL_DATA_PATH")
sharePath := os.Getenv("SHARE_DATA_PATH")
if localPath == "" {
@ -105,36 +100,20 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
sharePath = "/grdata"
}
//获取内存使用情况
var cache = make(map[string]*model.TenantServices)
for _, service := range services {
if appstatus, err := e.statusManager.GetStatus(service.ServiceID); err == nil {
if appstatus != status.CLOSED && appstatus != status.UNDEPLOY && appstatus != status.DEPLOYING {
e.memoryUse.WithLabelValues(service.TenantID, service.ServiceID, appstatus).Set(float64(service.ContainerMemory * service.Replicas))
}
}
//默认目录
size := util.GetDirSize(fmt.Sprintf("%s/tenant/%s/service/%s", sharePath, service.TenantID, service.ServiceID))
if size != 0 {
e.fsUse.WithLabelValues(service.TenantID, service.ServiceID, string(model.ShareFileVolumeType)).Set(size)
}
cache[service.ServiceID] = service
}
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, time.Since(scrapeTime).Seconds(), "collect.memory")
scrapeTime = time.Now()
gettenantID := func(serviceID string) string {
if service, ok := cache[serviceID]; ok {
return service.TenantID
}
return ""
}
//获取磁盘使用情况
for _, volume := range volumes {
var size float64
if volume.VolumeType == string(model.LocalVolumeType) {
size = util.GetDirSize(volume.HostPath)
if size != 0 {
e.fsUse.WithLabelValues(gettenantID(volume.ServiceID), volume.ServiceID, volume.VolumeType).Set(size)
}
diskcache := e.cache.Get()
for k, v := range diskcache {
key := strings.Split(k, "_")
if len(key) == 2 {
e.fsUse.WithLabelValues(key[1], key[0], string(model.ShareFileVolumeType)).Set(v)
}
}
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, time.Since(scrapeTime).Seconds(), "collect.fs")
@ -143,7 +122,7 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
var namespace = "app_resource"
//New 创建一个收集器
func New(statusManager status.ServiceStatusManager) *Exporter {
func New(statusManager status.ServiceStatusManager, cache *cache.DiskCache) *Exporter {
return &Exporter{
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
@ -180,6 +159,6 @@ func New(statusManager status.ServiceStatusManager) *Exporter {
}, []string{"tenant_id", "service_id", "volume_type"}),
dbmanager: db.GetManager(),
statusManager: statusManager,
fscache: make(map[string]float64),
cache: cache,
}
}

View File

@ -25,6 +25,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/pkg/status"
"github.com/goodrain/rainbond/pkg/worker/monitor/cache"
"github.com/goodrain/rainbond/pkg/worker/monitor/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -38,22 +39,25 @@ type ExporterManager struct {
config option.Config
stopChan chan struct{}
statusManager status.ServiceStatusManager
cache *cache.DiskCache
}
//NewManager return *NewManager
func NewManager(c option.Config, statusManager status.ServiceStatusManager) *ExporterManager {
ctx, cancel := context.WithCancel(context.Background())
cache := cache.CreatDiskCache(ctx, statusManager)
return &ExporterManager{
ctx: ctx,
cancel: cancel,
config: c,
stopChan: make(chan struct{}),
statusManager: statusManager,
cache: cache,
}
}
func (t *ExporterManager) handler(w http.ResponseWriter, r *http.Request) {
registry := prometheus.NewRegistry()
registry.MustRegister(collector.New(t.statusManager))
registry.MustRegister(collector.New(t.statusManager, t.cache))
gatherers := prometheus.Gatherers{
prometheus.DefaultGatherer,
@ -77,6 +81,7 @@ func (t *ExporterManager) Start() error {
</html>
`))
})
go t.cache.Start()
log.Infoln("Listening on", t.config.Listen)
go func() {
log.Fatal(http.ListenAndServe(t.config.Listen, nil))