This commit is contained in:
GLYASAI 2019-08-23 15:55:54 +08:00
commit ee813624fa
32 changed files with 497 additions and 839 deletions

View File

@ -32,6 +32,7 @@ func Routes() chi.Router {
r.Get("/monitor_message", controller.GetMonitorMessage().Get)
r.Get("/new_monitor_message", controller.GetMonitorMessage().Get)
r.Get("/event_log", controller.GetEventLog().Get)
r.Get("/services/{serviceID}/pubsub", controller.GetPubSubControll().Get)
return r
}
@ -43,7 +44,7 @@ func LogRoutes() chi.Router {
return r
}
//LogRoutes 应用导出包下载路由
//AppRoutes 应用导出包下载路由
func AppRoutes() chi.Router {
r := chi.NewRouter()
r.Get("/download/{format}/{fileName}", controller.GetManager().Download)

View File

@ -19,15 +19,15 @@
package controller
import (
"context"
"net/http"
"os"
"github.com/goodrain/rainbond/api/discover"
"github.com/goodrain/rainbond/api/proxy"
"github.com/Sirupsen/logrus"
"github.com/go-chi/chi"
"github.com/goodrain/rainbond/api/discover"
"github.com/goodrain/rainbond/api/handler"
"github.com/goodrain/rainbond/api/proxy"
)
//DockerConsole docker console
@ -175,3 +175,32 @@ func (d LogFile) GetInstallLog(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
}
}
var pubSubControll *PubSubControll
//PubSubControll service pub sub
type PubSubControll struct {
socketproxy proxy.Proxy
}
//GetPubSubControll get service pub sub controller
func GetPubSubControll() *PubSubControll {
if pubSubControll == nil {
pubSubControll = &PubSubControll{
socketproxy: proxy.CreateProxy("dockerlog", "websocket", defaultEventLogEndpoints),
}
discover.GetEndpointDiscover(defaultEtcdEndpoints).AddProject("event_log_event_http", pubSubControll.socketproxy)
}
return pubSubControll
}
//Get get
func (d PubSubControll) Get(w http.ResponseWriter, r *http.Request) {
serviceID := chi.URLParam(r, "serviceID")
name, _ := handler.GetEventHandler().GetLogInstance(serviceID)
if name != "" {
r.URL.Query().Add("host_id", name)
r = r.WithContext(context.WithValue(r.Context(), proxy.ContextKey("host_id"), name))
}
d.socketproxy.Proxy(w, r)
}

View File

@ -22,8 +22,13 @@ import (
"net/http"
"strings"
"sync/atomic"
"github.com/Sirupsen/logrus"
)
//ContextKey context key
type ContextKey string
// RoundRobin round robin loadBalance impl
type RoundRobin struct {
ops *uint64
@ -170,7 +175,14 @@ func (s *SelectBalance) Select(r *http.Request, endpoints EndpointList) Endpoint
if r.URL != nil {
hostID := r.URL.Query().Get("host_id")
if hostID == "" {
hostIDFromContext := r.Context().Value(ContextKey("host_id"))
if hostIDFromContext != nil {
hostID = hostIDFromContext.(string)
}
}
if e, ok := id2ip[hostID]; ok {
logrus.Debugf("[lb selelct] find host %s from name %s success", e, hostID)
return Endpoint(e)
}
}

View File

@ -38,93 +38,7 @@ type WebSocketProxy struct {
upgrader *websocket.Upgrader
}
//Proxy 代理
// func (h *WebSocketProxy) Proxy(w http.ResponseWriter, r *http.Request) {
// upgrader := websocket.Upgrader{
// EnableCompression: true,
// Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {
// w.WriteHeader(500)
// },
// CheckOrigin: func(r *http.Request) bool {
// return true
// },
// }
// conn, err := upgrader.Upgrade(w, r, nil)
// if err != nil {
// logrus.Error("Create web socket conn error.", err.Error())
// return
// }
// defer func() {
// conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
// conn.Close()
// }()
// endpoint := h.lb.Select(r, h.endpoints)
// path := r.RequestURI
// if strings.Contains(path, "?") {
// path = path[:strings.Index(path, "?")]
// }
// u := url.URL{Scheme: "ws", Host: endpoint.String(), Path: path}
// logrus.Infof("connecting to %s", u.String())
// c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
// if err != nil {
// logrus.Errorf("dial websocket endpoint %s error. %s", u.String(), err.Error())
// w.WriteHeader(500)
// return
// }
// defer c.Close()
// done := make(chan struct{})
// go func() {
// defer close(done)
// for {
// select {
// case <-r.Context().Done():
// return
// default:
// }
// t, message, err := c.ReadMessage()
// if err != nil {
// logrus.Println("read proxy websocket message error: ", err)
// return
// }
// err = conn.WriteMessage(t, message)
// if err != nil {
// logrus.Println("write client websocket message error: ", err)
// return
// }
// }
// }()
// for {
// select {
// case <-r.Context().Done():
// // To cleanly close a connection, a client should send a close
// // frame and wait for the server to close the connection.
// err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
// if err != nil {
// log.Println("write close:", err)
// return
// }
// select {
// case <-done:
// case <-time.After(time.Second):
// }
// return
// case <-done:
// return
// default:
// }
// t, message, err := conn.ReadMessage()
// if err != nil {
// logrus.Errorln("read client websocket message error: ", err)
// return
// }
// err = c.WriteMessage(t, message)
// if err != nil {
// logrus.Errorln("write proxy websocket message error: ", err)
// return
// }
// }
// }
//Proxy websocket proxy
func (h *WebSocketProxy) Proxy(w http.ResponseWriter, req *http.Request) {
endpoint := h.lb.Select(req, h.endpoints)
logrus.Info("Proxy webSocket to: ", endpoint)

View File

@ -374,16 +374,32 @@ type rollingUpgradeTaskBody struct {
}
func (e *exectorManager) sendAction(tenantID, serviceID, eventID, newVersion, actionType string, configs map[string]string, logger event.Logger) error {
// update build event complete status
logger.Info("Build success", map[string]string{"step": "last", "status": "running"})
switch actionType {
case "upgrade":
//add upgrade event
event := &dbmodel.ServiceEvent{
EventID: util.NewUUID(),
TenantID: tenantID,
ServiceID: serviceID,
StartTime: time.Now().Format(time.RFC3339),
OptType: "upgrade",
Status: "running",
}
if err := db.GetManager().ServiceEventDao().AddModel(event); err != nil {
logrus.Errorf("create upgrade event failure %s, service %s do not auto upgrade", err.Error(), serviceID)
return nil
}
if err := db.GetManager().TenantServiceDao().UpdateDeployVersion(serviceID, newVersion); err != nil {
return fmt.Errorf("Update app service deploy version failure.Please try the upgrade again")
logrus.Errorf("Update app service deploy version failure %s, service %s do not auto upgrade", err.Error(), serviceID)
return nil
}
body := workermodel.RollingUpgradeTaskBody{
TenantID: tenantID,
ServiceID: serviceID,
NewDeployVersion: newVersion,
EventID: eventID,
EventID: event.EventID,
Configs: configs,
}
if err := e.mqClient.SendBuilderTopic(mqclient.TaskStruct{
@ -393,10 +409,9 @@ func (e *exectorManager) sendAction(tenantID, serviceID, eventID, newVersion, ac
}); err != nil {
return err
}
logger.Info("Build success,start upgrade app service", map[string]string{"step": "builder", "status": "running"})
logger.Info("Build success", map[string]string{"step": "last", "status": "running"})
return nil
default:
logger.Info("Build success,do not other action", map[string]string{"step": "last", "status": "success"})
}
return nil
}

View File

@ -21,9 +21,10 @@ package main
import (
"os"
_ "net/http/pprof"
"github.com/goodrain/rainbond/cmd"
"github.com/goodrain/rainbond/cmd/eventlog/server"
"github.com/spf13/pflag"
)

View File

@ -28,7 +28,6 @@ import (
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/entry"
"github.com/goodrain/rainbond/eventlog/exit/web"
"github.com/goodrain/rainbond/eventlog/exit/webhook"
"github.com/goodrain/rainbond/eventlog/store"
"os"
@ -82,7 +81,7 @@ func (s *LogServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Conf.EventStore.GarbageMessageFile, "message.garbage.file", "/var/log/envent_garbage_message.log", "save garbage message file path when save type is file")
fs.Int64Var(&s.Conf.EventStore.PeerEventMaxLogNumber, "message.max.number", 100000, "the max number log message for peer event")
fs.IntVar(&s.Conf.EventStore.PeerEventMaxCacheLogNumber, "message.cache.number", 256, "Maintain log the largest number in the memory peer event")
fs.Int64Var(&s.Conf.EventStore.PeerDockerMaxCacheLogNumber, "dockermessage.cache.number", 512, "Maintain log the largest number in the memory peer docker service")
fs.Int64Var(&s.Conf.EventStore.PeerDockerMaxCacheLogNumber, "dockermessage.cache.number", 128, "Maintain log the largest number in the memory peer docker service")
fs.IntVar(&s.Conf.EventStore.HandleMessageCoreNumber, "message.handle.core.number", 2, "The number of concurrent processing receive log data.")
fs.IntVar(&s.Conf.EventStore.HandleSubMessageCoreNumber, "message.sub.handle.core.number", 3, "The number of concurrent processing receive log data. more than message.handle.core.number")
fs.IntVar(&s.Conf.EventStore.HandleDockerLogCoreNumber, "message.dockerlog.handle.core.number", 2, "The number of concurrent processing receive log data. more than message.handle.core.number")
@ -106,8 +105,6 @@ func (s *LogServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Conf.EventStore.DB.PoolSize, "db.pool.size", 3, "Data persistence db pool init size.")
fs.IntVar(&s.Conf.EventStore.DB.PoolMaxSize, "db.pool.maxsize", 10, "Data persistence db pool max size.")
fs.StringVar(&s.Conf.EventStore.DB.HomePath, "docker.log.homepath", "/grdata/logs/", "container log persistent home path")
fs.StringVar(&s.Conf.WebHook.ConsoleURL, "webhook.console.url", "http://console.goodrain.me", "console web api url")
fs.StringVar(&s.Conf.WebHook.ConsoleToken, "webhook.console.token", "", "console web api token")
fs.StringVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerHost, "monitor.udp.host", "0.0.0.0", "receive new monitor udp server host")
fs.IntVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerPort, "monitor.udp.port", 6166, "receive new monitor udp server port")
fs.StringVar(&s.Conf.Cluster.Discover.NodeIDFile, "nodeid-file", "/opt/rainbond/etc/node/node_host_uuid.conf", "the unique ID for this node. Just specify, don't modify")
@ -173,9 +170,6 @@ func (s *LogServer) InitConf() {
if os.Getenv("CLUSTER_BIND_IP") != "" {
s.Conf.Cluster.PubSub.PubBindIP = os.Getenv("CLUSTER_BIND_IP")
}
if os.Getenv("CONSOLE_TOKEN") != "" {
s.Conf.WebHook.ConsoleToken = os.Getenv("CONSOLE_TOKEN")
}
}
//Run 执行
@ -183,10 +177,6 @@ func (s *LogServer) Run() error {
s.Logger.Debug("Start run server.")
log := s.Logger
if err := webhook.InitManager(s.Conf.WebHook, log.WithField("module", "WebHook")); err != nil {
return err
}
//init new db
if err := db.CreateDBManager(s.Conf.EventStore.DB); err != nil {
logrus.Infof("create db manager error, %v", err)

View File

@ -143,7 +143,7 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&a.StatsdConfig.ReadBuffer, "statsd.read-buffer", 0, "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.")
fs.DurationVar(&a.MinResyncPeriod, "min-resync-period", time.Second*15, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod")
fs.StringVar(&a.ServiceListFile, "service-list-file", "/opt/rainbond/conf/", "Specifies the configuration file, which can be a directory, that configures the service running on the current node")
fs.BoolVar(&a.EnableInitStart, "enable-init-start", false, "Whether the node daemon launches docker and etcd service")
fs.BoolVar(&a.EnableInitStart, "enable-init-start", false, "Enable dependency - free initialization starts for services that support initialization starts")
fs.BoolVar(&a.AutoRegistNode, "auto-registnode", true, "Whether auto regist node info to cluster where node is not found")
fs.BoolVar(&a.AutoScheduler, "auto-scheduler", true, "Whether auto set node unscheduler where current node is unhealth")
fs.BoolVar(&a.EnableCollectLog, "enabel-collect-log", true, "Whether to collect container logs")

View File

@ -22,7 +22,6 @@ import (
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
"time"
@ -49,6 +48,8 @@ type Manager interface {
Close() error
ReleaseLogger(Logger)
}
// EventConfig event config struct
type EventConfig struct {
EventLogServers []string
DiscoverAddress []string
@ -263,6 +264,7 @@ func (m *manager) getLBChan() chan []byte {
m.qos = atomic.AddInt32(&(m.qos), 1)
server := m.eventServer[index]
if _, ok := m.abnormalServer[server]; ok {
logrus.Warnf("server[%s] is abnormal, skip it", server)
continue
}
if h, ok := m.handles[server]; ok {
@ -424,13 +426,10 @@ func (l *loggerWriter) SetFormat(f string) {
func (l *loggerWriter) Write(b []byte) (n int, err error) {
if b != nil && len(b) > 0 {
message := string(b)
message = strings.Replace(message, "\r", "", -1)
message = strings.Replace(message, "\n", "", -1)
message = strings.Replace(message, "\u0000", "", -1)
message = strings.Replace(message, "\"", "", -1)
if l.fmt != "" {
message = fmt.Sprintf(l.fmt, message)
}
logrus.Debugf("step: %s, level: %s;write message : %v", l.step, l.level, message)
l.l.send(message, map[string]string{"step": l.step, "level": l.level})
}
return len(b), nil

View File

@ -1,49 +0,0 @@
GO_LDFLAGS=-ldflags " -w"
IMAGE_NAME=hub.goodrain.com/dc-deploy/acp_event-log
IMAGE_TAG=3.2
install:
@go build ${GO_LDFLAGS}
dev:install
./event_log --log.level=debug --discover.etcd.addr=http://127.0.0.1:2379 \
--db.url="root:admin@tcp(127.0.0.1:3306)/event" \
--dockerlog.mode=stream \
--message.dockerlog.handle.core.number=2 \
--message.garbage.file="/tmp/garbage.log" \
--docker.log.homepath="/Users/qingguo/tmp"
dev-cluster:
@./event_log --log.level=debug --discover.etcd.addr=http://127.0.0.1:2379 \
--db.url="root:admin@tcp(127.0.0.1:3306)/event" \
--message.garbage.file="/tmp/garbage.log" \
--eventlog.bind.port=2000 \
--dockerlog.bind.port=2001 \
--cluster.bind.port=2002 \
--websocket.bind.port=2003\
--monitor.subaddress=tcp://127.0.0.1:9441\
--docker.log.homepath="/Users/qingguo"
build-alpine:
@echo "🐳 $@"
@docker build -t goodraim.me/event-build:v1 build
@echo "building..."
@docker run --rm -v `pwd`:/go/src/event_log -w /go/src/event_log goodraim.me/event-build:v1 go build ${GO_LDFLAGS} -o event_log
@echo "build done."
instance-ali:
@docker run --name event-log --net host -d -v /var/log/event-log/:/var/log/ ${IMAGE_NAME}:${IMAGE_TAG} \
--db.url="root:admin@tcp(172.30.0.9:3306)/event" \
--discover.etcd.addr="http://127.0.0.1:4001"
instance-test:build-alpine
@docker run --name event-log-test --net host -d ${IMAGE_NAME}:${IMAGE_TAG} \
--db.url="writer1:CeRYK8UzWD@tcp(10.0.55.72:3306)/region" \
--discover.etcd.addr="http://127.0.0.1:4001" \
--eventlog.bind.port=2000 \
--dockerlog.bind.port=2001 \
--cluster.bind.port=2002 \
--websocket.bind.port=2003\
--monitor.subaddress="tcp://10.0.1.11:9442"
instance-ali-build:build-debian instance-ali

View File

@ -1,37 +0,0 @@
# v1版本说明
1. 支持zmq-server接收消息。已完成
2. 支持集群自动发现和转发消息(已完成)
3. 支持webSocket转发消息,支持ssl协议已完成
* 建立连接后先发送缓存消息。
* 完成后再发送实时消息
4. 完成mysql存储。(已完成)
5. 完成消息分析和结果回调。(已完成)
6. 支持http接收消息。
# v2版本说明
1. 支持接收docker日志并选举接收节点。
选举策略:
* etcd中已有对应关系检测节点是否正常如果正常返回该节点。
* 对于新的serviceID根据监控数据选举闲节点。(以每分钟日志量+20倍服务量算标志标志最小为最优)
* 应用关闭时删除etcd中的对应数据。region-api完成
选举请求:
* 请求由dockerd为容器创建logger时完成。
* dockerd完成链接状态检测工作若分配的服务端状态异常。重新请求心得节点。
2. 支持集群间各类消息通信。
* 操作日志消息。(及时)
* docker日志监控数据消息。用于选举
* 容器状态检测停止消息。(用于应用启动和停止操作处理节点不在同一节点)
3. docker日志文件存储。
* 文件缓存写入。每128条写入或每1分钟写入
* 分布式文件存储。
* 历史日志文件压缩。
# v3版本说明
1. 支持监控数据websocket消息转发服务。
2. 消息输入zmq sub订阅数据源。
3. 进行数据切割,区分主题。
3. 消息输出以不同的主题进行区分和订阅。

View File

@ -20,31 +20,27 @@ package web
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/go-chi/chi"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/eventlog/cluster"
"github.com/goodrain/rainbond/eventlog/cluster/discover"
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/exit/monitor"
"github.com/goodrain/rainbond/eventlog/store"
"golang.org/x/net/context"
"fmt"
"strings"
_ "net/http/pprof"
"github.com/Sirupsen/logrus"
httputil "github.com/goodrain/rainbond/util/http"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/twinj/uuid"
"golang.org/x/net/context"
)
//SocketServer socket 服务
@ -438,15 +434,16 @@ func (s *SocketServer) Run() error {
return nil
}
func (s *SocketServer) listen() {
http.HandleFunc("/event_log", s.pushEventMessage)
http.HandleFunc("/docker_log", s.pushDockerLog)
http.HandleFunc("/monitor_message", s.pushMonitorMessage)
http.HandleFunc("/new_monitor_message", s.pushNewMonitorMessage)
http.HandleFunc("/monitor", func(w http.ResponseWriter, r *http.Request) {
r := chi.NewRouter()
r.Get("/event_log", s.pushEventMessage)
r.Get("/docker_log", s.pushDockerLog)
r.Get("/monitor_message", s.pushMonitorMessage)
r.Get("/new_monitor_message", s.pushNewMonitorMessage)
r.Get("/monitor", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("ok"))
})
http.HandleFunc("/docker-instance", func(w http.ResponseWriter, r *http.Request) {
r.Get("/docker-instance", func(w http.ResponseWriter, r *http.Request) {
ServiceID := r.FormValue("service_id")
if ServiceID == "" {
w.WriteHeader(412)
@ -466,21 +463,22 @@ func (s *SocketServer) listen() {
url := fmt.Sprintf("tcp://%s:%d", instance.HostIP, instance.DockerLogPort)
w.Write([]byte(`{"host":"` + url + `","status":"success"}`))
})
http.HandleFunc("/event_push", s.receiveEventMessage)
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
r.Get("/event_push", s.receiveEventMessage)
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
if s.healthInfo["status"] != "health" {
httputil.ReturnError(r, w, 400, "eventlog service unusual")
}
httputil.ReturnSuccess(r, w, s.healthInfo)
})
// new websocket pubsub
r.Get("/services/{serviceID}/pubsub", s.pubsub)
//monitor setting
s.prometheus()
s.prometheus(r)
if s.conf.SSL {
go func() {
addr := fmt.Sprintf("%s:%d", s.conf.BindIP, s.conf.SSLBindPort)
s.log.Infof("web socket ssl server listen %s", addr)
err := http.ListenAndServeTLS(addr, s.conf.CertFile, s.conf.KeyFile, nil)
err := http.ListenAndServeTLS(addr, s.conf.CertFile, s.conf.KeyFile, r)
if err != nil {
s.log.Error("websocket listen error.", err.Error())
s.listenErr <- err
@ -489,7 +487,7 @@ func (s *SocketServer) listen() {
}
addr := fmt.Sprintf("%s:%d", s.conf.BindIP, s.conf.BindPort)
s.log.Infof("web socket server listen %s", addr)
err := http.ListenAndServe(addr, nil)
err := http.ListenAndServe(addr, r)
if err != nil {
s.log.Error("websocket listen error.", err.Error())
s.listenErr <- err
@ -554,11 +552,11 @@ func (s *SocketServer) receiveEventMessage(w http.ResponseWriter, r *http.Reques
return
}
func (s *SocketServer) prometheus() {
func (s *SocketServer) prometheus(r *chi.Mux) {
prometheus.MustRegister(version.NewCollector("event_log"))
exporter := monitor.NewExporter(s.storemanager, s.cluster)
prometheus.MustRegister(exporter)
http.Handle(s.conf.PrometheusMetricPath, promhttp.Handler())
r.Handle(s.conf.PrometheusMetricPath, promhttp.Handler())
}
//ResponseType 返回内容

309
eventlog/exit/web/pusher.go Normal file
View File

@ -0,0 +1,309 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2019 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package web
import (
"net/http"
"strings"
"sync"
"time"
"github.com/goodrain/rainbond/eventlog/db"
"github.com/goodrain/rainbond/util"
"github.com/pquerna/ffjson/ffjson"
"github.com/gorilla/websocket"
)
//WebsocketMessage websocket message
type WebsocketMessage struct {
Event string `json:"event"`
Data interface{} `json:"data"`
Channel string `json:"channel,omitempty"`
}
//Encode return json encode data
func (w *WebsocketMessage) Encode() []byte {
reb, _ := ffjson.Marshal(w)
return reb
}
//PubContext websocket context
type PubContext struct {
ID string
upgrader websocket.Upgrader
conn *websocket.Conn
httpWriter http.ResponseWriter
httpRequest *http.Request
server *SocketServer
chans map[string]*Chan
lock sync.Mutex
close chan struct{}
}
//Chan handle
type Chan struct {
ch chan *db.EventLogMessage
id string
chtype string
reevent string
channel string
p *PubContext
}
//NewPubContext create context
func NewPubContext(upgrader websocket.Upgrader,
httpWriter http.ResponseWriter,
httpRequest *http.Request,
s *SocketServer,
) *PubContext {
return &PubContext{
ID: util.NewUUID(),
upgrader: upgrader,
httpWriter: httpWriter,
httpRequest: httpRequest,
server: s,
chans: make(map[string]*Chan, 2),
close: make(chan struct{}),
}
}
func (p *PubContext) handleMessage(me []byte) {
var wm WebsocketMessage
if err := ffjson.Unmarshal(me, &wm); err != nil {
p.SendMessage(WebsocketMessage{Event: "error", Data: "Invalid message"})
return
}
switch wm.Event {
case "pusher:subscribe":
p.handleSubscribe(wm)
}
}
func (p *PubContext) createChan(channel, chantype, id string) {
p.lock.Lock()
defer p.lock.Unlock()
if _, exist := p.chans[chantype+"-"+id]; exist {
return
}
ch := p.server.storemanager.WebSocketMessageChan(chantype, id, p.ID)
if ch != nil {
c := &Chan{
ch: ch,
channel: chantype + "-" + id,
id: id,
chtype: chantype,
reevent: func() string {
if chantype == "docker" {
return "service:log"
}
if chantype == "newmonitor" {
return "monitor"
}
if chantype == "event" {
return "event:log"
}
return ""
}(),
p: p,
}
p.chans[chantype+"-"+id] = c
// send success message
p.SendMessage(WebsocketMessage{
Event: "pusher:succeeded",
Channel: c.channel,
})
go c.handleChan()
}
}
func (p *PubContext) removeChan(key string) {
p.lock.Lock()
defer p.lock.Unlock()
if _, exist := p.chans[key]; exist {
delete(p.chans, key)
}
if len(p.chans) == 0 {
p.Close()
}
}
func (p *PubContext) handleSubscribe(wm WebsocketMessage) {
data := wm.Data.(map[string]interface{})
if channel, ok := data["channel"].(string); ok {
channelInfo := strings.SplitN(channel, "-", 2)
if len(channelInfo) < 2 {
p.SendMessage(WebsocketMessage{Event: "error", Data: "Invalid message"})
return
}
if channelInfo[0] == "s" {
p.createChan(channel, "docker", channelInfo[1])
p.createChan(channel, "newmonitor", channelInfo[1])
}
if channelInfo[0] == "e" {
p.createChan(channel, "newmonitor", channelInfo[1])
}
}
}
func (c *Chan) handleChan() {
defer func() {
c.p.server.log.Infof("pubsub message chan %s closed", c.channel)
c.p.removeChan(c.chtype + "-" + c.id)
}()
for {
select {
case <-c.p.close:
c.p.server.log.Info("pub sub context closed")
return
case <-c.p.httpRequest.Context().Done():
c.p.server.log.Info("pub sub request context cancel")
return
case message, ok := <-c.ch:
if !ok {
c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: "{}", Channel: c.channel})
c.p.SendWebsocketMessage(websocket.CloseMessage)
return
}
if message != nil {
if message.Step == "last" {
c.p.SendMessage(WebsocketMessage{Event: "event:success", Data: message.Message, Channel: c.channel})
c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: message.Message, Channel: c.channel})
return
}
if message.Step == "callback" {
c.p.SendMessage(WebsocketMessage{Event: "event:failure", Data: message.Message, Channel: c.channel})
c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: message.Message, Channel: c.channel})
return
}
if message.MonitorData != nil {
c.p.SendMessage(WebsocketMessage{Event: c.reevent, Data: string(message.MonitorData), Channel: c.channel})
} else {
c.p.SendMessage(WebsocketMessage{Event: c.reevent, Data: string(message.Content), Channel: c.channel})
}
}
}
}
}
func (p *PubContext) readMessage(closed chan struct{}) {
defer func() {
close(closed)
}()
if p.conn == nil {
p.server.log.Errorf("websocket connection is not connect")
}
for {
messageType, me, err := p.conn.ReadMessage()
if err != nil {
p.server.log.Errorf("read websocket message failure %s", err.Error())
break
}
if messageType == websocket.CloseMessage {
break
}
if messageType == websocket.TextMessage {
p.handleMessage(me)
continue
}
if messageType == websocket.PingMessage {
p.conn.WriteMessage(websocket.PongMessage, []byte{})
continue
}
if messageType == websocket.BinaryMessage {
continue
}
}
}
//SendMessage send websocket message
func (p *PubContext) SendMessage(message WebsocketMessage) error {
p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
return p.conn.WriteMessage(websocket.TextMessage, message.Encode())
}
//SendWebsocketMessage send websocket message
func (p *PubContext) SendWebsocketMessage(message int) error {
p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
return p.conn.WriteMessage(message, []byte{})
}
func (p *PubContext) sendPing(closed chan struct{}) {
err := util.Exec(p.httpRequest.Context(), func() error {
if err := p.SendWebsocketMessage(websocket.PingMessage); err != nil {
return err
}
return nil
}, time.Second*20)
if err != nil {
p.server.log.Errorf("send ping message failure %s will closed the connect", err.Error())
close(closed)
}
}
//Start start context
func (p *PubContext) Start() {
var err error
p.conn, err = p.upgrader.Upgrade(p.httpWriter, p.httpRequest, nil)
if err != nil {
p.server.log.Error("Create web socket conn error.", err.Error())
return
}
pingclosed := make(chan struct{})
readclosed := make(chan struct{})
go p.sendPing(pingclosed)
go p.readMessage(readclosed)
select {
case <-pingclosed:
case <-readclosed:
case <-p.close:
}
}
//Stop close context
func (p *PubContext) Stop() {
if p.conn != nil {
p.conn.Close()
}
for _, v := range p.chans {
p.server.storemanager.RealseWebSocketMessageChan(v.chtype, v.id, p.ID)
}
}
//Close close the context
func (p *PubContext) Close() {
close(p.close)
}
func (s *SocketServer) pubsub(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: s.conf.ReadBufferSize,
WriteBufferSize: s.conf.WriteBufferSize,
EnableCompression: s.conf.EnableCompression,
Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {
},
CheckOrigin: func(r *http.Request) bool {
return true
},
}
context := NewPubContext(upgrader, w, r, s)
defer context.Stop()
s.log.Infof("websocket pubsub context running %s", context.ID)
context.Start()
s.log.Infof("websocket pubsub context closed %s", context.ID)
}

View File

@ -1,187 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package webhook
import (
"github.com/goodrain/rainbond/eventlog/conf"
"net/http"
"sync"
"bytes"
"fmt"
"strings"
"net/url"
"strconv"
"github.com/Sirupsen/logrus"
)
type Manager struct {
hooks map[string]*WebHook
conf conf.WebHookConf
log *logrus.Entry
lock sync.Mutex
}
type WebHook struct {
EndPoint string
RequestParameter map[string]interface{}
RequestBody []byte
RequestHeader map[string]string
Name string
Method string
}
const UpDateEventStatus = "console_update_event_status"
const UpdateEventCodeVersion = "console_update_event_code_version"
var manager *Manager
func GetManager() *Manager {
return manager
}
func InitManager(conf conf.WebHookConf, log *logrus.Entry) error {
ma := &Manager{
conf: conf,
log: log,
hooks: make(map[string]*WebHook, 0),
}
eventStatus := &WebHook{
Name: UpDateEventStatus,
Method: "PUT",
EndPoint: conf.ConsoleURL + "/api/event/update",
RequestHeader: make(map[string]string, 0),
}
eventStatus.RequestHeader["Content-Type"] = "application/json"
if conf.ConsoleToken != "" {
eventStatus.RequestHeader["Authorization"] = "Token " + conf.ConsoleToken
}
codeVsersion := &WebHook{
Name: UpdateEventCodeVersion,
Method: "PUT",
EndPoint: conf.ConsoleURL + "/api/event/update-code",
RequestHeader: make(map[string]string, 0),
}
codeVsersion.RequestHeader["Content-Type"] = "application/json"
if conf.ConsoleToken != "" {
codeVsersion.RequestHeader["Authorization"] = "Token " + conf.ConsoleToken
}
ma.Regist(eventStatus)
ma.Regist(codeVsersion)
manager = ma
return nil
}
func (m *Manager) GetWebhook(name string) *WebHook {
m.lock.Lock()
defer m.lock.Unlock()
return m.hooks[name]
}
func (m *Manager) RunWebhook(name string, body []byte) {
w := m.GetWebhook(name)
w.RequestBody = body
if err := w.Run(); err != nil {
m.log.Errorf("Webhook %s run error. %s", w.Name, err.Error())
}
m.log.Debugf("Run web hook %s success", w.Name)
}
func (m *Manager) RunWebhookWithParameter(name string, body []byte, parameter map[string]interface{}) {
w := m.GetWebhook(name)
w.RequestBody = body
w.RequestParameter = parameter
if err := w.Run(); err != nil {
m.log.Errorf("Webhook %s run error. %s", w.Name, err.Error())
}
}
//Regist 注册
func (m *Manager) Regist(w *WebHook) {
if w == nil || w.Name == "" {
return
}
m.lock.Lock()
defer m.lock.Unlock()
m.hooks[w.Name] = w
}
//Run 执行
func (w *WebHook) Run() error {
var err error
if w.RequestParameter != nil && len(w.RequestParameter) > 0 {
if strings.ToUpper(w.Method) == "GET" {
values := make(url.Values)
for k, v := range w.RequestParameter {
switch v.(type) {
case int:
values.Set(k, strconv.Itoa(v.(int)))
case string:
values.Set(k, v.(string))
}
}
w.EndPoint += "?" + values.Encode()
} else {
var jsonStr = "{"
for k, v := range w.RequestParameter {
switch v.(type) {
case int:
jsonStr += `"` + k + `":` + strconv.Itoa(v.(int)) + `,`
case string:
jsonStr += `"` + k + `":"` + v.(string) + `",`
}
}
jsonStr = jsonStr[0:len(jsonStr)-1] + "}"
w.RequestBody = []byte(jsonStr)
}
}
var request *http.Request
if w.RequestBody != nil {
request, err = http.NewRequest(strings.ToUpper(w.Method), w.EndPoint, bytes.NewReader(w.RequestBody))
if err != nil {
return err
}
} else {
request, err = http.NewRequest(strings.ToUpper(w.Method), w.EndPoint, nil)
if err != nil {
return err
}
}
request.Header.Add("Content-Type", "application/json")
for k, v := range w.RequestHeader {
request.Header.Add(k, v)
}
res, err := http.DefaultClient.Do(request)
if err != nil {
return err
}
if res.Body != nil {
defer res.Body.Close()
}
if res.StatusCode/100 == 2 {
return nil
}
return fmt.Errorf("Endpoint return status code is %d", res.StatusCode)
}

View File

@ -1,34 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package webhook
import (
"github.com/goodrain/rainbond/eventlog/conf"
"testing"
"github.com/Sirupsen/logrus"
)
func TestEventUpdate(t *testing.T) {
InitManager(conf.WebHookConf{
ConsoleURL: "http://test.goodrain.com:7070/",
}, logrus.WithField("test", "test"))
GetManager().RunWebhookWithParameter(UpDateEventStatus, nil,
map[string]interface{}{"event_id": "4e9dafa96cb043cdb992adde2af421d4", "status": "success", "message": "操作成功"})
}

View File

@ -140,12 +140,6 @@ func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) {
func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -165,7 +159,8 @@ func (r *readEventBarrel) addSubChan(subID string) chan *db.EventLogMessage {
func (r *readEventBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if _, ok := r.subSocketChan[subID]; ok {
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}
@ -221,13 +216,6 @@ func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) {
func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
// send cache log will cause user illusion
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -247,7 +235,8 @@ func (r *dockerLogEventBarrel) addSubChan(subID string) chan *db.EventLogMessage
func (r *dockerLogEventBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if _, ok := r.subSocketChan[subID]; ok {
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}
@ -299,8 +288,6 @@ func (r *monitorMessageBarrel) empty() {
}
func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) {
//r.barrel = append(r.barrel, message)
//logrus.Info(string(message.Content))
r.updateTime = time.Now()
r.subLock.Lock()
defer r.subLock.Unlock()
@ -315,12 +302,6 @@ func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) {
func (r *monitorMessageBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -340,7 +321,8 @@ func (r *monitorMessageBarrel) addSubChan(subID string) chan *db.EventLogMessage
func (r *monitorMessageBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if _, ok := r.subSocketChan[subID]; ok {
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}

View File

@ -131,7 +131,7 @@ func (h *dockerLogStore) GetMonitorData() *db.MonitorData {
return data
}
func (h *dockerLogStore) Gc() {
tiker := time.NewTicker(time.Minute * 1)
tiker := time.NewTicker(time.Second * 30)
for {
select {
case <-tiker.C:
@ -151,14 +151,15 @@ func (h *dockerLogStore) handle() []string {
}
var gcEvent []string
for k, v := range h.barrels {
if v.updateTime.Add(time.Minute * 1).Before(time.Now()) { // barrel 超时未收到消息
if v.updateTime.Add(time.Minute * 1).Before(time.Now()) {
//gc without client link
if v.GetSubChanLength() == 0 {
h.saveBeforeGc(k, v)
gcEvent = append(gcEvent, k)
}
}
if v.persistenceTime.Add(time.Minute * 2).Before(time.Now()) { //超过2分钟未持久化 间隔需要大于1分钟。以分钟为单位
//The interval not persisted for more than 1 minute should be more than 30 seconds
if v.persistenceTime.Add(time.Minute * 1).Before(time.Now()) {
if len(v.barrel) > 0 {
v.persistence()
}

View File

@ -19,7 +19,6 @@
package store
import (
"strings"
"sync"
"time"
@ -114,7 +113,6 @@ func (h *handleMessageStore) Gc() {
}
}
func (h *handleMessageStore) gcRun() {
//h.log.Debugf("runGC %d", time.Now().UnixNano())
h.lock.Lock()
defer h.lock.Unlock()
t := time.Now()
@ -123,7 +121,7 @@ func (h *handleMessageStore) gcRun() {
}
var gcEvent []string
for k, v := range h.barrels {
if v.updateTime.Add(time.Second * 30).Before(time.Now()) { // barrel 超时未收到消息
if v.updateTime.Add(time.Second * 30).Before(time.Now()) {
h.saveBeforeGc(v)
gcEvent = append(gcEvent, k)
}
@ -132,7 +130,7 @@ func (h *handleMessageStore) gcRun() {
for _, id := range gcEvent {
barrel := h.barrels[id]
barrel.empty()
h.pool.Put(barrel) //放回对象池
h.pool.Put(barrel)
delete(h.barrels, id)
}
}
@ -240,7 +238,6 @@ func (h *handleMessageStore) saveGarbageMessage() {
}
err := util.AppendToFile(h.conf.GarbageMessageFile, content)
if err != nil {
//h.log.Error("Save garbage message to file error.context :\n " + content)
h.log.Error("Save garbage message to file error.context", err.Error())
} else {
h.log.Info("Save the garbage message to file.")
@ -296,17 +293,6 @@ func (h *handleMessageStore) handleBarrelEvent() {
}
}
}
if event[0] == "code-version" { //代码版本
if len(event) == 3 {
eventID := event[1]
codeVersion := strings.TrimSpace(event[2])
event := model.ServiceEvent{}
event.EventID = eventID
event.CodeVersion = codeVersion
cdb.GetManager().ServiceEventDao().UpdateModel(&event)
h.log.Infof("run web hook update code version .event_id %s code_version %s", eventID, codeVersion)
}
}
case <-h.ctx.Done():
return
}

View File

@ -23,7 +23,6 @@ import (
"strconv"
"github.com/goodrain/rainbond/eventlog/db"
"github.com/goodrain/rainbond/eventlog/util"
coreutil "github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/eventlog/conf"
@ -45,7 +44,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/pquerna/ffjson/ffjson"
"github.com/prometheus/client_golang/prometheus"
"github.com/tidwall/gjson"
)
//Manager 存储管理器
@ -68,11 +66,6 @@ type Manager interface {
//NewManager 存储管理器
func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) {
// event log do not save in db,will save in file
// dbPlugin, err := db.NewManager(conf.DB, log)
// if err != nil {
// return nil, err
// }
conf.DB.Type = "eventfile"
dbPlugin, err := db.NewManager(conf.DB, log)
if err != nil {
@ -103,12 +96,10 @@ func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) {
handle := NewStore("handle", storeManager)
read := NewStore("read", storeManager)
docker := NewStore("docker_log", storeManager)
monitor := NewStore("monitor", storeManager)
newmonitor := NewStore("newmonitor", storeManager)
storeManager.handleMessageStore = handle
storeManager.readMessageStore = read
storeManager.dockerLogStore = docker
storeManager.monitorMessageStore = monitor
storeManager.newmonitorMessageStore = newmonitor
return storeManager, nil
}
@ -119,7 +110,6 @@ type storeManager struct {
handleMessageStore MessageStore
readMessageStore MessageStore
dockerLogStore MessageStore
monitorMessageStore MessageStore
newmonitorMessageStore MessageStore
receiveChan chan []byte
pubChan, subChan chan [][]byte
@ -157,7 +147,7 @@ func (s *storeManager) Scrape(ch chan<- prometheus.Metric, namespace, exporter,
s.dockerLogStore.Scrape(ch, namespace, exporter, from)
s.handleMessageStore.Scrape(ch, namespace, exporter, from)
s.monitorMessageStore.Scrape(ch, namespace, exporter, from)
s.newmonitorMessageStore.Scrape(ch, namespace, exporter, from)
chanDesc := prometheus.NewDesc(
prometheus.BuildFQName(namespace, exporter, "chan_cache_size"),
"the handle chan cache size.",
@ -183,7 +173,7 @@ func (s *storeManager) Scrape(ch chan<- prometheus.Metric, namespace, exporter,
func (s *storeManager) Monitor() []db.MonitorData {
data := s.dockerLogStore.GetMonitorData()
moData := s.monitorMessageStore.GetMonitorData()
moData := s.newmonitorMessageStore.GetMonitorData()
if moData != nil {
data.LogSizePeerM += moData.LogSizePeerM
data.ServiceSize += moData.ServiceSize
@ -248,10 +238,6 @@ func (s *storeManager) WebSocketMessageChan(mode, eventID, subID string) chan *d
ch := s.dockerLogStore.SubChan(eventID, subID)
return ch
}
if mode == "monitor" {
ch := s.monitorMessageStore.SubChan(eventID, subID)
return ch
}
if mode == "newmonitor" {
ch := s.newmonitorMessageStore.SubChan(eventID, subID)
return ch
@ -264,7 +250,6 @@ func (s *storeManager) Run() error {
s.handleMessageStore.Run()
s.readMessageStore.Run()
s.dockerLogStore.Run()
s.monitorMessageStore.Run()
s.newmonitorMessageStore.Run()
for i := 0; i < s.conf.HandleMessageCoreNumber; i++ {
go s.handleReceiveMessage()
@ -275,9 +260,6 @@ func (s *storeManager) Run() error {
for i := 0; i < s.conf.HandleDockerLogCoreNumber; i++ {
go s.handleDockerLog()
}
for i := 0; i < s.conf.HandleMessageCoreNumber; i++ {
go s.handleMonitorMessage()
}
go s.handleNewMonitorMessage()
go s.cleanLog()
return nil
@ -454,10 +436,6 @@ func (s *storeManager) handleSubMessage() {
if string(msg[0]) == string(db.EventMessage) {
s.readMessageStore.InsertMessage(message)
}
if string(msg[0]) == string(db.ServiceMonitorMessage) {
s.monitorMessageStore.InsertMessage(message)
}
}
}
}
@ -499,14 +477,11 @@ loop:
Content: buffer.Bytes(),
EventID: serviceID,
}
//s.log.Debug("Receive docker log:", info)
s.dockerLogStore.InsertMessage(&message)
buffer.Reset()
}
}
s.errChan <- fmt.Errorf("handle docker log core exist")
}
type event struct {
@ -515,101 +490,6 @@ type event struct {
Update string `json:"update_time"`
}
func (s *storeManager) handleMonitorMessage() {
loop:
for {
select {
case <-s.context.Done():
return
case msg, ok := <-s.monitorMessageChan:
if !ok {
s.log.Error("handle monitor message core stop.monitor message log chan closed")
break loop
}
if msg == nil {
continue
}
if len(msg) == 2 {
message := strings.SplitAfterN(string(msg[1]), " ", 5)
name := message[3]
body := message[4]
var currentTopic string
var data []interface{}
switch strings.TrimSpace(name) {
case "SumTimeByUrl":
result := gjson.Parse(body).Array()
for _, r := range result {
var port int
if p, ok := r.Map()["port"]; ok {
port = int(p.Int())
}
wsTopic := fmt.Sprintf("%s.%s.statistic", r.Map()["tenant"], r.Map()["service"])
if port != 0 {
wsTopic = fmt.Sprintf("%s.%s.%d.statistic", r.Map()["tenant"], r.Map()["service"], port)
}
if currentTopic == "" {
currentTopic = wsTopic
}
if wsTopic == currentTopic {
data = append(data, util.Format(r.Map()))
} else {
s.sendMonitorData("SumTimeByUrl", data, currentTopic)
currentTopic = wsTopic
data = []interface{}{util.Format(r.Map())}
}
}
s.sendMonitorData("SumTimeByUrl", data, currentTopic)
case "SumTimeBySql":
result := gjson.Parse(body).Array()
for _, r := range result {
tenantID := r.Map()["tenant_id"].String()
serviceID := r.Map()["service_id"].String()
if len(tenantID) < 12 || len(serviceID) < 12 {
continue
}
tenantAlias := tenantID[len(tenantID)-12:]
serviceAlias := serviceID[len(serviceID)-12:]
wsTopic := fmt.Sprintf("%s.%s.statistic", tenantAlias, serviceAlias)
if currentTopic == "" {
currentTopic = wsTopic
}
if wsTopic == currentTopic {
data = append(data, util.Format(r.Map()))
} else {
s.sendMonitorData("SumTimeBySql", data, currentTopic)
currentTopic = wsTopic
data = []interface{}{util.Format(r.Map())}
}
}
s.sendMonitorData("SumTimeBySql", data, currentTopic)
}
}
}
}
s.errChan <- fmt.Errorf("handle monitor log core exist")
}
func (s *storeManager) sendMonitorData(name string, data []interface{}, topic string) {
e := event{
Name: name,
Update: time.Now().Format(time.Kitchen),
Data: data,
}
eventByte, _ := json.Marshal(e)
m := &db.EventLogMessage{
EventID: topic,
MonitorData: eventByte,
}
s.monitorMessageStore.InsertMessage(m)
d, err := json.Marshal(m)
if err != nil {
s.log.Error("Marshal monitor message to byte error.", err.Error())
return
}
s.pubChan <- [][]byte{[]byte(db.ServiceMonitorMessage), d}
}
func (s *storeManager) RealseWebSocketMessageChan(mode string, eventID, subID string) {
if mode == "event" {
s.readMessageStore.RealseSubChan(eventID, subID)
@ -617,8 +497,8 @@ func (s *storeManager) RealseWebSocketMessageChan(mode string, eventID, subID st
if mode == "docker" {
s.dockerLogStore.RealseSubChan(eventID, subID)
}
if mode == "monitor" {
s.monitorMessageStore.RealseSubChan(eventID, subID)
if mode == "newmonitor" {
s.newmonitorMessageStore.RealseSubChan(eventID, subID)
}
}
@ -626,7 +506,7 @@ func (s *storeManager) Stop() {
s.handleMessageStore.stop()
s.readMessageStore.stop()
s.dockerLogStore.stop()
s.monitorMessageStore.stop()
s.newmonitorMessageStore.stop()
s.cancel()
if s.filePlugin != nil {
s.filePlugin.Close()

View File

@ -1,149 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package store
import (
"sync"
"time"
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/db"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
)
type monitorMessageStore struct {
conf conf.EventStoreConf
log *logrus.Entry
barrels map[string]*monitorMessageBarrel
lock sync.RWMutex
cancel func()
ctx context.Context
pool *sync.Pool
size int64
allLogCount float64
}
func (h *monitorMessageStore) Scrape(ch chan<- prometheus.Metric, namespace, exporter, from string) error {
chanDesc := prometheus.NewDesc(
prometheus.BuildFQName(namespace, exporter, "monitor_store_barrel_count"),
"the handle container log count size.",
[]string{"from"}, nil,
)
ch <- prometheus.MustNewConstMetric(chanDesc, prometheus.GaugeValue, float64(len(h.barrels)), from)
logDesc := prometheus.NewDesc(
prometheus.BuildFQName(namespace, exporter, "monitor_store_log_count"),
"the handle monitor log count size.",
[]string{"from"}, nil,
)
ch <- prometheus.MustNewConstMetric(logDesc, prometheus.GaugeValue, h.allLogCount, from)
return nil
}
func (h *monitorMessageStore) insertMessage(message *db.EventLogMessage) bool {
h.lock.RLock()
defer h.lock.RUnlock()
if ba, ok := h.barrels[message.EventID]; ok {
ba.insertMessage(message)
return true
}
return false
}
func (h *monitorMessageStore) InsertMessage(message *db.EventLogMessage) {
if message == nil || message.EventID == "" {
return
}
//h.log.Debug("Receive a monitor message:" + string(message.Content))
h.size++
h.allLogCount++
if h.insertMessage(message) {
return
}
h.lock.Lock()
defer h.lock.Unlock()
ba := h.pool.Get().(*monitorMessageBarrel)
ba.insertMessage(message)
h.barrels[message.EventID] = ba
}
func (h *monitorMessageStore) GetMonitorData() *db.MonitorData {
data := &db.MonitorData{
ServiceSize: len(h.barrels),
LogSizePeerM: h.size,
}
return data
}
func (h *monitorMessageStore) SubChan(eventID, subID string) chan *db.EventLogMessage {
h.lock.Lock()
defer h.lock.Unlock()
if ba, ok := h.barrels[eventID]; ok {
return ba.addSubChan(subID)
}
ba := h.pool.Get().(*monitorMessageBarrel)
h.barrels[eventID] = ba
return ba.addSubChan(subID)
}
func (h *monitorMessageStore) RealseSubChan(eventID, subID string) {
h.lock.RLock()
defer h.lock.RUnlock()
if ba, ok := h.barrels[eventID]; ok {
ba.delSubChan(subID)
}
}
func (h *monitorMessageStore) Run() {
go h.Gc()
}
func (h *monitorMessageStore) Gc() {
tiker := time.NewTicker(time.Second * 30)
for {
select {
case <-tiker.C:
case <-h.ctx.Done():
h.log.Debug("read message store gc stop.")
tiker.Stop()
return
}
h.size = 0
if len(h.barrels) == 0 {
continue
}
var gcEvent []string
for k, v := range h.barrels {
if v.updateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息
gcEvent = append(gcEvent, k)
}
}
if gcEvent != nil && len(gcEvent) > 0 {
for _, id := range gcEvent {
barrel := h.barrels[id]
barrel.empty()
h.pool.Put(barrel) //放回对象池
delete(h.barrels, id)
}
}
}
}
func (h *monitorMessageStore) stop() {
h.cancel()
}
func (h *monitorMessageStore) InsertGarbageMessage(message ...*db.EventLogMessage) {}

View File

@ -81,7 +81,6 @@ func (h *newMonitorMessageStore) InsertMessage(message *db.EventLogMessage) {
if message == nil {
return
}
//h.log.Debug("Receive a monitor message:" + string(message.Content))
h.size++
h.allLogCount++
mm, ok := h.insertMessage(message)
@ -138,12 +137,15 @@ func (h *newMonitorMessageStore) Gc() {
}
var gcEvent []string
for k, v := range h.barrels {
if v.UpdateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息
gcEvent = append(gcEvent, k)
if len(v.subSocketChan) == 0 {
if v.UpdateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息
gcEvent = append(gcEvent, k)
}
}
}
if gcEvent != nil && len(gcEvent) > 0 {
for _, id := range gcEvent {
h.log.Infof("monitor message barrel %s will be gc", id)
barrel := h.barrels[id]
barrel.empty()
delete(h.barrels, id)
@ -285,11 +287,12 @@ func (c *CacheMonitorMessageList) addSubChan(subID string) chan *db.EventLogMess
return ch
}
//删除socket订阅
//delSubChan delete socket sub chan
func (c *CacheMonitorMessageList) delSubChan(subID string) {
c.subLock.Lock()
defer c.subLock.Unlock()
if _, ok := c.subSocketChan[subID]; ok {
if ch, ok := c.subSocketChan[subID]; ok {
close(ch)
delete(c.subSocketChan, subID)
}
}
@ -319,11 +322,8 @@ func merge(source, addsource MonitorMessageList) (result MonitorMessageList) {
if oldmm, ok := cache[mm.Key]; ok {
oldmm.Count += mm.Count
oldmm.AbnormalCount += mm.AbnormalCount
//平均时间
oldmm.AverageTime = Round((oldmm.AverageTime+mm.AverageTime)/2, 2)
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><E7A7AF><EFBFBD>
oldmm.CumulativeTime = Round(oldmm.CumulativeTime+mm.CumulativeTime, 2)
//最大时间
if mm.MaxTime > oldmm.MaxTime {
oldmm.MaxTime = mm.MaxTime
}

View File

@ -97,24 +97,6 @@ func NewStore(storeType string, manager *storeManager) MessageStore {
}
return read
}
if storeType == "monitor" {
monitor := &monitorMessageStore{
barrels: make(map[string]*monitorMessageBarrel, 100),
conf: manager.conf,
log: manager.log.WithField("module", "MonitorMessageStore"),
ctx: ctx,
cancel: cancel,
}
monitor.pool = &sync.Pool{
New: func() interface{} {
reb := &monitorMessageBarrel{
subSocketChan: make(map[string]chan *db.EventLogMessage, 0),
}
return reb
},
}
return monitor
}
if storeType == "docker_log" {
docker := &dockerLogStore{
barrels: make(map[string]*dockerLogEventBarrel, 100),

View File

@ -52,6 +52,7 @@ type NodeService struct {
func CreateNodeService(c *option.Conf, nodecluster *node.Cluster, kubecli kubecache.KubeClient) *NodeService {
if err := event.NewManager(event.EventConfig{
DiscoverAddress: c.Etcd.Endpoints,
EventLogServers: c.EventLogServer,
}); err != nil {
logrus.Errorf("create event manager faliure")
}
@ -159,11 +160,6 @@ func (n *NodeService) AsynchronousInstall(node *client.HostNode, eventID string)
logrus.Error("write hosts file error ", err.Error())
return
}
if err := event.NewManager(event.EventConfig{
DiscoverAddress: n.c.Etcd.Endpoints,
}); err != nil {
logrus.Errorf("create event manager faliure")
}
// start add node script
logrus.Infof("Begin install node %s", node.ID)
// write log to event log
@ -179,9 +175,10 @@ func (n *NodeService) AsynchronousInstall(node *client.HostNode, eventID string)
Stdout: logger.GetWriter("node-install", "info"),
Stderr: logger.GetWriter("node-install", "err"),
}
err = ansibleUtil.RunNodeInstallCmd(option)
if err != nil {
logrus.Error("Error executing shell script", err)
logrus.Error("Error executing shell script : ", err)
node.Status = client.InstallFailed
node.NodeStatus.Status = client.InstallFailed
n.nodecluster.UpdateNode(node)

View File

@ -0,0 +1,38 @@
package service
import (
"testing"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/event"
etcdClient "github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/node/core/store"
"github.com/goodrain/rainbond/node/masterserver"
"github.com/goodrain/rainbond/node/masterserver/node"
"github.com/goodrain/rainbond/node/nodem/client"
)
func TestAsynchronousInstall(t *testing.T) {
config := &option.Conf{
Etcd: etcdClient.Config{Endpoints: []string{"192.168.195.1:2379"}},
EventLogServer: []string{"192.168.195.1:6366"},
}
store.NewClient(config)
// node *client.HostNode, eventID string
eventID := "fanyangyang"
// nodemanager := nodem.NewNodeManager(&option.Conf{})
currentNode := client.HostNode{
ID: "123",
Role: []string{"manage"},
InternalIP: "127.0.0.1",
RootPass: "password",
}
cluster := node.CreateCluster(nil, &currentNode, nil)
cluster.CacheNode(&currentNode)
ms := &masterserver.MasterServer{Cluster: cluster}
nodeService := CreateNodeService(config, ms.Cluster, nil)
nodeService.AsynchronousInstall(&currentNode, eventID)
event.GetManager().Close()
}

View File

@ -105,15 +105,13 @@ func (e *etcdClusterClient) GetOptions() *option.Conf {
func (e *etcdClusterClient) GetEndpoints(key string) (result []string) {
key = "/rainbond/endpoint/" + key
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
resp, err := e.conf.EtcdCli.Get(ctx, key, clientv3.WithPrefix())
if err != nil || len(resp.Kvs) < 1 {
logrus.Errorf("Can not get endpoints of the key %s", key)
return
}
for _, kv := range resp.Kvs {
var res []string
err = json.Unmarshal(kv.Value, &res)
@ -123,23 +121,19 @@ func (e *etcdClusterClient) GetEndpoints(key string) (result []string) {
}
result = append(result, res...)
}
logrus.Infof("Get endpoints %s => %v", key, result)
return
}
func (e *etcdClusterClient) SetEndpoints(key string, value []string) {
key = "/rainbond/endpoint/" + key
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
jsonStr, err := json.Marshal(value)
if err != nil {
logrus.Errorf("Can not marshal %s endpoints to json.", key)
return
}
_, err = e.conf.EtcdCli.Put(ctx, key, string(jsonStr))
if err != nil {
logrus.Errorf("Failed to put endpoint for %s: %v", key, err)
@ -148,22 +142,20 @@ func (e *etcdClusterClient) SetEndpoints(key string, value []string) {
func (e *etcdClusterClient) DelEndpoints(key string) {
key = "/rainbond/endpoint/" + key
logrus.Infof("Delete endpoints: %s", key)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := e.conf.EtcdCli.Delete(ctx, key)
if err != nil {
logrus.Errorf("Failed to put endpoint for %s: %v", key, Error)
}
logrus.Infof("Delete endpoints: %s", key)
}
//ErrorNotFound node not found.
var ErrorNotFound = fmt.Errorf("node not found")
func (e *etcdClusterClient) GetNode(nodeID string) (*HostNode, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*8)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
res, err := e.conf.EtcdCli.Get(ctx, fmt.Sprintf("%s/%s", e.conf.NodePath, nodeID))
if err != nil {

View File

@ -227,11 +227,6 @@ func (p *probeManager) EnableWatcher(serviceName, watcherID string) {
if s, ok := p.watches[serviceName]; ok {
if w, ok := s[watcherID]; ok {
w.enable = true
// only health can set errornum is 0
// if h, ok := p.status[serviceName]; ok {
// h.ErrorNumber = 0
// h.ErrorTime = 0
// }
}
} else {
logrus.Error("Can not enable the watcher: Not found service: ", serviceName)
@ -281,7 +276,6 @@ func (p *probeManager) GetCurrentServiceHealthy(serviceName string) (*service.He
Info: statusMap["info"],
}
return result, nil
}
if v.ServiceHealth.Model == "cmd" {
statusMap := probe.GetShellHealth(v.ServiceHealth.Address)

View File

@ -52,7 +52,7 @@ func NewCopier(logfile *LogFile, dst []Logger, since time.Time) *Copier {
// Run starts logs copying
func (c *Copier) Run() {
c.closed = make(chan struct{})
go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since}, c.reader)
go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since, Tail: 0}, c.reader)
go c.copySrc()
}
@ -63,6 +63,13 @@ lool:
select {
case <-c.closed:
return
case err := <-c.reader.Err:
logrus.Errorf("read container log file error %s, will retry after 5 seconds", err.Error())
//If there is an error in the collection log process,
//the collection should be restarted and not stopped
time.Sleep(time.Second * 5)
go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since, Tail: 0}, c.reader)
continue
case msg, ok := <-c.reader.Msg:
if !ok {
break lool

View File

@ -139,14 +139,11 @@ func decodeFunc(rdr io.Reader) func() (*Message, error) {
if err == nil || err == io.EOF {
break
}
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
// try again, could be due to a an incomplete json object as we read
if _, ok := err.(*json.SyntaxError); ok {
dec = json.NewDecoder(rdr)
continue
}
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
// remaining data in the parser's buffer while an io.EOF occurs.
// If the json logger writes a partial json log entry to the disk
@ -156,6 +153,7 @@ func decodeFunc(rdr io.Reader) func() (*Message, error) {
dec = json.NewDecoder(reader)
continue
}
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
}
return msg, err
}
@ -205,7 +203,6 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) {
watcher.Err <- err
return
}
closeFiles := func() {
for _, f := range files {
f.Close()
@ -218,7 +215,6 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) {
}
}
}
readers := make([]SizeReaderAt, 0, len(files)+1)
for _, f := range files {
stat, err := f.Stat()
@ -244,10 +240,7 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) {
return
}
w.mu.RUnlock()
notifyRotate := w.notifyRotate.Subscribe()
defer w.notifyRotate.Evict(notifyRotate)
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
followLogs(currentFile, watcher, w.createDecoder, config.Since, config.Until)
}
func tailFiles(files []SizeReaderAt, watcher *LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config ReadConfig) {
@ -398,7 +391,7 @@ func decompressfile(fileName, destFileName string, since time.Time) (*os.File, e
return rs, nil
}
func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
func followLogs(f *os.File, logWatcher *LogWatcher, createDecoder makeDecoderFunc, since, until time.Time) {
decodeLogLine := createDecoder(f)
name := f.Name()
@ -445,11 +438,11 @@ func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-notifyRotate:
case <-logWatcher.WatchProducerGone():
return errDone
case <-logWatcher.WatchConsumerGone():
return errDone
default:
}
if err := handleRotate(); err != nil {
return err
@ -510,7 +503,6 @@ func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{
// ready to try again
continue
}
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue

View File

@ -27,12 +27,12 @@ import (
)
func TestReadFile(t *testing.T) {
reader, err := NewLogFile("/Users/qingguo/gopath/src/github.com/goodrain/rainbond/test/dockerlog/tes.log", 3, false, decodeFunc, 0640, getTailReader)
reader, err := NewLogFile("../../../test/dockerlog/tes.log", 3, false, decodeFunc, 0640, getTailReader)
if err != nil {
t.Fatal(err)
}
watch := NewLogWatcher()
reader.ReadLogs(ReadConfig{Follow: true, Tail: 10, Since: time.Now()}, watch)
reader.ReadLogs(ReadConfig{Follow: true, Tail: 0, Since: time.Now()}, watch)
defer watch.ConsumerGone()
LogLoop:
for {

View File

@ -128,7 +128,7 @@ func (c *ContainerLogManage) handleLogger() {
retry := 0
for retry < maxJSONDecodeRetry {
retry++
reader, err := NewLogFile(cevent.Container.LogPath, 3, false, decodeFunc, 0640, getTailReader)
reader, err := NewLogFile(cevent.Container.LogPath, 2, false, decodeFunc, 0640, getTailReader)
if err != nil {
logrus.Errorf("create logger failure %s", err.Error())
time.Sleep(time.Second * 1)

View File

@ -7,15 +7,16 @@ import (
"testing"
"time"
"github.com/goodrain/rainbond/node/nodem/logger"
"fmt"
"github.com/docker/docker/daemon/logger"
"github.com/pborman/uuid"
)
func TestStreamLogBeak(t *testing.T) {
log, err := New(logger.Context{
log, err := New(logger.Info{
ContainerID: uuid.New(),
ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()},
Config: map[string]string{"stream-server": "127.0.0.1:6362"},
@ -51,7 +52,7 @@ func TestStreamLogBeak(t *testing.T) {
func TestStreamLog(t *testing.T) {
log, err := New(logger.Context{
log, err := New(logger.Info{
ContainerID: uuid.New(),
ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()},
Config: map[string]string{"stream-server": "127.0.0.1:6362"},
@ -76,7 +77,7 @@ func TestStreamLog(t *testing.T) {
}
func BenchmarkStreamLog(t *testing.B) {
log, err := New(logger.Context{
log, err := New(logger.Info{
ContainerID: uuid.New(),
ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()},
Config: map[string]string{"stream-server": "127.0.0.1:5031"},

View File

@ -396,7 +396,6 @@ func (b *Exporter) GCollector() {
oldGauges := len(b.Gauges.Elements)
oldHistograms := len(b.Histograms.Elements)
oldSummaries := len(b.Summaries.Elements)
for k, v := range b.Counters.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -404,7 +403,6 @@ func (b *Exporter) GCollector() {
b.Counters.Register.Unregister(v)
}
}
for k, v := range b.Gauges.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -412,7 +410,6 @@ func (b *Exporter) GCollector() {
b.Gauges.Register.Unregister(v)
}
}
for k, v := range b.Histograms.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -420,7 +417,6 @@ func (b *Exporter) GCollector() {
b.Histograms.Register.Unregister(v)
}
}
for k, v := range b.Summaries.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -428,12 +424,10 @@ func (b *Exporter) GCollector() {
b.Summaries.Register.Unregister(v)
}
}
logrus.Debugf("current amount for Counters: %v => %v", oldCounters, len(b.Counters.Elements))
logrus.Debugf("current amount for Gauges: %v => %v", oldGauges, len(b.Gauges.Elements))
logrus.Debugf("current amount for Histograms: %v => %v", oldHistograms, len(b.Histograms.Elements))
logrus.Debugf("current amount for Summaries: %v => %v", oldSummaries, len(b.Summaries.Elements))
logrus.Info("clean exporter data complete")
}
}