Merge branch 'feature/aboutEvent' of https://github.com/fanyangyang/rainbond into feature/aboutEvent

This commit is contained in:
凡羊羊 2019-08-23 09:57:33 +08:00
commit 8cb4b59b85
30 changed files with 496 additions and 960 deletions

View File

@ -1,4 +1,4 @@
<img src="./docs/rainbond_logo.png" width="100%">
<img src="https://grstatic.oss-cn-shanghai.aliyuncs.com/images/rainbond%20log_full.png" width="80%">
[![Go Report Card](https://goreportcard.com/badge/github.com/goodrain/rainbond)](https://goreportcard.com/report/github.com/goodrain/rainbond)
[![GitHub stars](https://img.shields.io/github/stars/goodrain/rainbond.svg?style=flat-square)](https://github.com/goodrain/rainbond/stargazers)
@ -6,15 +6,19 @@
[![Build Status](https://travis-ci.org/goodrain/rainbond.svg?branch=master)](https://travis-ci.org/goodrain/rainbond)
[![GoDoc](https://godoc.org/github.com/goodrain/rainbond?status.svg)](https://godoc.org/github.com/goodrain/rainbond)
[项目官网](http://www.rainbond.com) • [文档](https://www.rainbond.com/docs/) • [README in English](https://github.com/goodrain/rainbond/blob/master/README_EN.md)
[项目官网](http://www.rainbond.com) • [文档](https://www.rainbond.com/docs/)
**Rainbond** 企业应用云操作系统
Rainbond云帮是企业应用的操作系统。 Rainbond支撑企业应用的开发、架构、交付和运维的全流程通过“无侵入”架构无缝衔接各类企业应用底层资源可以对接和管理IaaS、虚拟机和物理服务器。
Rainbond云帮是云原生下企业应用操作系统。 Rainbond支撑企业应用的开发、架构、交付和运维的全流程通过“无侵入”架构无缝衔接各类企业应用底层资源可以对接和管理IaaS、虚拟机和物理服务器。
```
企业应用包括:
各类信息系统、OA、CRM、ERP、数据库、大数据、物联网、互联网平台、微服务架构等运行在企业内部的各种系统
各类信息系统、OA、CRM、ERP、数据库、大数据、物联网、互联网平台、微服务架构等运行在企业内部的各种系统
```
<img src="https://grstatic.oss-cn-shanghai.aliyuncs.com/images/Rainbond%E4%BA%A7%E5%93%81%E6%9E%B6%E6%9E%84.png" width="100%">
## 应用场景
* 企业应用开发
@ -33,21 +37,22 @@ DevOps开发流水线、微服务架构、服务治理及各类技术工具“
| 特性 | 描述 |
| -------------------------- | ------------------------------------------------------------ |
| 超越Kubernetes | 平台底层基于Kubernetes但用户无需学习和编辑复杂的yaml文件通过应用级图形界面操作使用实现业务流程开箱即用。 |
| 原生Service Mesh微服务架构 | 跨语言、跨协议、代码无侵入的Service Mesh微服务架构原生支持传统应用直接变成微服务架构。同时支持常见微服务架构Spring Cloud、Dubbo等通过插件扩展架构能力及治理功能。 |
| Kubernetes | 平台底层基于Kubernetes但用户无需学习和编辑复杂的yaml文件通过应用级图形界面操作使用实现业务流程开箱即用。 |
| Service Mesh微服务架构 | 内置跨语言、跨协议、代码无侵入的Service Mesh微服务架构原生支持传统应用直接变成微服务架构。同时支持常见微服务架构Spring Cloud、Dubbo等通过插件扩展架构能力及治理功能。 |
| 一体化DevOps | 衔接需求、开发、测试、构建、上线、运维的一体化DevOps。支持对接第三方软件Jira、Sonar、Jenkins、Gitlab等 |
| 企业级应用市场 | 非镜像市场和服务目录,支持各类企业级应用,像手机应用一样即点即用,全流程管理(应用开发、应用发布、应用展示、应用离线导入/导出、应用安装/升级、应用运维) |
| 自动化运维 | 应用自动化运维。节点自动安装、扩容、监控、容错。平台支持高可用、多数据中心管理、多租户管理。 |
| Serverless PaaS | 以应用为核心使用过程不需要了解服务器相关概念简单灵活。通过对接行业应用快速构建行业专有PaaS。 |
| 应用网关 | 基于HTTP、HTTPs、TCP、UDP等协议应用访问控制策略轻松操作应用灰度发布、A/B测试。 |
| 异构服务统一管理 | 支持集群内外不同架构服务统一管理和通信治理 |
| 异构服务统一管理 | 支持集群内外不同架构服务统一管理、监控和通信治理。 |
更多功能特性详见: [Rainbond功能特性说明](https://www.rainbond.com/docs/quick-start/edition/)
## 快速开始
1. [安装 Rainbond 集群](https://www.rainbond.com/docs/quick-start/rainbond_install/)
1. [快速安装 Rainbond 集群](https://www.rainbond.com/docs/quick-start/rainbond_install/)
2. [创建第一个应用(服务)](https://www.rainbond.com/docs/user-manual/app-creation/)
3. [搭建 ServiceMesh 微服务架构](https://www.rainbond.com/docs/advanced-scenarios/micro/)
3. [观看教程视频快速学习Rainbond](https://www.rainbond.com/video.html)
4. [搭建 ServiceMesh 微服务架构](https://www.rainbond.com/docs/advanced-scenarios/micro/)
## 社区
@ -55,40 +60,28 @@ DevOps开发流水线、微服务架构、服务治理及各类技术工具“
[Rainbond 项目官网](https://www.rainbond.com)
<center><img width="200px" src="https://t.goodrain.com/uploads/default/original/2X/6/6591ae9e78a9c7d65bfb260f741ac3985662cc51.jpg"/></center>
<center><img width="200px" src="https://grstatic.oss-cn-shanghai.aliyuncs.com/images/12141565594759_.pic_hd.jpg"/></center>
<center> 添加微信申请加入微信群了解Rainbond更多资讯 </center>
## 开发路线计划
点击查看 Rainbond 版本开发计划 [Roadmap](https://www.rainbond.com/docs/quick-start/roadmap/)
## 架构
<img src="https://static.goodrain.com/images/docs/5.0/architecture/architecture.svg" href="https://www.rainbond.com/docs/architecture/architecture/">
## 产品图示
<img src="https://grstatic.oss-cn-shanghai.aliyuncs.com/images/docs/5.0/readme/connect.gif" href="http://www.rainbond.com/docs">
- 应用组装部署示意图
<img src="https://grstatic.oss-cn-shanghai.aliyuncs.com/images/docs/5.0/readme/gateway.gif" href="http://www.rainbond.com/docs">
- 应用网关管理示意图
## 参与贡献
你可以参与Rainbond社区关于平台、应用、插件等领域的贡献和分享。
[参与Rainbond项目](https://www.rainbond.com/docs/contribution/)
[Rainbond 贡献者社区](https://t.goodrain.com/c/contribution)
## 相关项目
* [Rainbond-Console](https://github.com/goodrain/rainbond-console) Rainbond控制台业务层
* [Rainbond-Console-UI](https://github.com/goodrain/rainbond-ui) Rainbond控制台UI组件
* [Rainbond-Install](https://github.com/goodrain/rainbond-ansible) Rainbond安装工具
当前仓库为Rainbond数据中心端核心服务实现代码项目还包括以下子项目
* [Rainbond-Console](https://github.com/goodrain/rainbond-console) Rainbond控制台服务端
* [Rainbond-Console-UI](https://github.com/goodrain/rainbond-ui) Rainbond控制台前端
* [Rainbond-Ansible](https://github.com/goodrain/rainbond-ansible) Rainbond安装工具
* [Rainbond-Builder](https://github.com/goodrain/builder) Rainbond源码构建工具集
* [Rainbond-Docs](https://github.com/goodrain/rainbond-docs) Rainbond文档
* Rainbond-Resource/UI (企业版)
* Rainbond-APP-Store (企业版)
## License

View File

@ -1,115 +0,0 @@
<img src="./docs/rainbond_logo.png" width="100%">
[![Go Report Card](https://goreportcard.com/badge/github.com/goodrain/rainbond)](https://goreportcard.com/report/github.com/goodrain/rainbond)
[![GitHub stars](https://img.shields.io/github/stars/goodrain/rainbond.svg?style=flat-square)](https://github.com/goodrain/rainbond/stargazers)
![Rainbond version](https://img.shields.io/badge/version-v5.1-brightgreen.svg)
[![Build Status](https://travis-ci.org/goodrain/rainbond.svg?branch=master)](https://travis-ci.org/goodrain/rainbond)
[![GoDoc](https://godoc.org/github.com/goodrain/rainbond?status.svg)](https://godoc.org/github.com/goodrain/rainbond)
[项目官网](http://www.rainbond.com) • [文档](https://www.rainbond.com/docs/) • [README in English](https://github.com/goodrain/rainbond/blob/master/README_EN.md)
## **Rainbond** ENTERPRISE APPLICATION CLOUD OS
Rainbond云帮是企业应用的操作系统。 Rainbond支撑企业应用的开发、架构、交付和运维的全流程通过“无侵入”架构无缝衔接各类企业应用底层资源可以对接和管理IaaS、虚拟机和物理服务器。
Rainbond is a cloud OS for enterprise applications. It provides complete set of supports for enterprise applications' development, architecture, delivery and operation, can seamlessly docking almost all kinds of enterprise applications through "non-invasive" platform architecture, interface and manage underlying computing resources such as IaaS, virtual machine and physical servers.
```
Enterprise Applications include
information system, OA, CRM, ERP, database, big data, IOT, internet platform and microservice architecture etc.
```
## Be applied to
* Enterprise Application Developement
The development environment, micro-service architecture, service governance and various technical tools are “out of the box”, without changing development habits, allowing companies to focus on their business and improving efficiency by 10 times.
* Enterprise Application Delivery
Support continuous delivery, enterprise application market delivery, SaaS, enterprise application sales, secondary development and other delivery processes, unified customer management, and balanced delivery and personalized delivery.
* Enterprise Application Operation
Transparently interfaces and manages a variety of computing resources, naturally achieves cloudy and hybrid clouds, enterprise application automation and operation, and doubles resource utilization.
## Features
| Features | Description |
| -------------------------- | ------------------------------------------------------------ |
| beyond Kubernetes | based on kubernetes, but users do not need to learn and edit complex yaml files, achieved "out-of-the-box" business process by application-level graphical interface |
| native Service Mesh microservice architecture | Thanks to the cross-language, cross-protocol, code-free service Mesh microservices architecture native support, traditional applications can become microservice architecture directly. Support Spring Cloud, Dubbo, etc., and can easily expand the architectural capabilities and governance functions by adding plug-ins. |
| Integrated DevOps | Integrate DevOps for demand, development, testing, construction, online, and operation and maintenance. Support for docking third party software (Jira, Sonar, Jenkins, Gitlab, etc.) |
| Enterpeise-level application market | Not a simple mirror market and service catalog, but supports all kinds of enterprise-level applications, just like install and manage mobile apps, click-to-use, full process management (application development, application publishing, application display, application offline import/export, application installation/upgrade, application operation and maintenance) |
| Automated operation and maintenance | Automated application operation and maintenance. Nodes are automatically installed, expanded, monitored, and fault tolerant. The platform supports high availability, multiple data center management, and multi-tenant management. |
| Serverless PaaS | With the application-centric design philosophy, users do not need to understand the server-related concepts, and is simple and flexible. Quickly build industry-specific PaaS through docking industry applications. |
| Application Gateway | Applying access control policies based on protocols such as HTTP, HTTPs, TCP, and UDP, it is easy to operate grayscale publishing and A/B testing. |
More features [Rainbond features description](https://www.rainbond.com/docs/quick-start/edition/)
## Quick Start
1. [install Rainbond cluster](https://www.rainbond.com/docs/quick-start/rainbond_install/)
2. [create the first application / service](https://www.rainbond.com/docs/user-manual/app-creation/service_create/)
3. [build ServiceMesh microservice architecture](https://www.rainbond.com/docs/advanced-scenarios/micro/)
## Community
[Rainbond forum](https://t.goodrain.com)
[Rainbond website](https://www.rainbond.com)
## Roadmap
See [Rainbond Roadmap](https://www.rainbond.com/docs/quick-start/roadmap/)
## Architecture
<img src="https://static.goodrain.com/images/docs/5.0/architecture/architecture.svg" href="https://www.rainbond.com/docs/architecture/architecture/">
## Snapshot
<img src="https://grstatic.oss-cn-shanghai.aliyuncs.com/images/docs/5.0/readme/connect.gif" href="http://www.rainbond.com/docs/">
- Application assembly deployment diagram
<img src="https://grstatic.oss-cn-shanghai.aliyuncs.com/images/docs/5.0/readme/gateway.gif" href="http://www.rainbond.com/docs/">
- Application gateway management schematic diagram
## Contribution
You can participate in the Rainbond community's contributions and sharing on platforms, applications, plugins, and more.
[Participate in Rainbond Project](https://www.rainbond.com/docs/contribution/)
[Rainbond Contribution](https://t.goodrain.com/c/contribution)
## Related Projects
* [Rainbond-Console](https://github.com/goodrain/rainbond-console)
* [Rainbond-Console-UI](https://github.com/goodrain/rainbond-ui)
* [Rainbond-Install](https://github.com/goodrain/rainbond-ansible)
* [Rainbond-Builder](https://github.com/goodrain/builder)
* [Rainbond-Docs](https://github.com/goodrain/rainbond-docs)
## License
Rainbond is released under LGPL-3.0 license, see [LICENSE](https://github.com/goodrain/rainbond/blob/master/LICENSE) and [Licensing](https://github.com/goodrain/rainbond/blob/master/Licensing.md)。
## Special THANKS
Thanks to the following open source projects
- [Kubernetes](https://github.com/kubernetes/kubernetes)
- [Docker/Moby](https://github.com/moby/moby)
- [Heroku Buildpacks](https://github.com/heroku?utf8=%E2%9C%93&q=buildpack&type=&language=)
- [OpenResty](https://github.com/openresty/)
- [Calico](https://github.com/projectcalico)
- [Midonet](https://github.com/midonet/midonet)
- [Etcd](https://github.com/coreos/etcd)
- [Prometheus](https://github.com/prometheus/prometheus)
- [GlusterFS](https://github.com/gluster/glusterfs)
- [Ceph](https://github.com/ceph/ceph)
- [CockroachDB](https://github.com/cockroachdb/cockroach)
- [MySQL](https://github.com/mysql/mysql-server)
- [Weave Scope](https://github.com/weaveworks/scope)
- [Ant Design](https://github.com/ant-design/ant-design)

View File

@ -32,6 +32,7 @@ func Routes() chi.Router {
r.Get("/monitor_message", controller.GetMonitorMessage().Get)
r.Get("/new_monitor_message", controller.GetMonitorMessage().Get)
r.Get("/event_log", controller.GetEventLog().Get)
r.Get("/services/{serviceID}/pubsub", controller.GetPubSubControll().Get)
return r
}
@ -43,7 +44,7 @@ func LogRoutes() chi.Router {
return r
}
//LogRoutes 应用导出包下载路由
//AppRoutes 应用导出包下载路由
func AppRoutes() chi.Router {
r := chi.NewRouter()
r.Get("/download/{format}/{fileName}", controller.GetManager().Download)

View File

@ -19,15 +19,15 @@
package controller
import (
"context"
"net/http"
"os"
"github.com/goodrain/rainbond/api/discover"
"github.com/goodrain/rainbond/api/proxy"
"github.com/Sirupsen/logrus"
"github.com/go-chi/chi"
"github.com/goodrain/rainbond/api/discover"
"github.com/goodrain/rainbond/api/handler"
"github.com/goodrain/rainbond/api/proxy"
)
//DockerConsole docker console
@ -175,3 +175,32 @@ func (d LogFile) GetInstallLog(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
}
}
var pubSubControll *PubSubControll
//PubSubControll service pub sub
type PubSubControll struct {
socketproxy proxy.Proxy
}
//GetPubSubControll get service pub sub controller
func GetPubSubControll() *PubSubControll {
if pubSubControll == nil {
pubSubControll = &PubSubControll{
socketproxy: proxy.CreateProxy("dockerlog", "websocket", defaultEventLogEndpoints),
}
discover.GetEndpointDiscover(defaultEtcdEndpoints).AddProject("event_log_event_http", pubSubControll.socketproxy)
}
return pubSubControll
}
//Get get
func (d PubSubControll) Get(w http.ResponseWriter, r *http.Request) {
serviceID := chi.URLParam(r, "serviceID")
name, _ := handler.GetEventHandler().GetLogInstance(serviceID)
if name != "" {
r.URL.Query().Add("host_id", name)
r = r.WithContext(context.WithValue(r.Context(), proxy.ContextKey("host_id"), name))
}
d.socketproxy.Proxy(w, r)
}

View File

@ -22,8 +22,13 @@ import (
"net/http"
"strings"
"sync/atomic"
"github.com/Sirupsen/logrus"
)
//ContextKey context key
type ContextKey string
// RoundRobin round robin loadBalance impl
type RoundRobin struct {
ops *uint64
@ -170,7 +175,14 @@ func (s *SelectBalance) Select(r *http.Request, endpoints EndpointList) Endpoint
if r.URL != nil {
hostID := r.URL.Query().Get("host_id")
if hostID == "" {
hostIDFromContext := r.Context().Value(ContextKey("host_id"))
if hostIDFromContext != nil {
hostID = hostIDFromContext.(string)
}
}
if e, ok := id2ip[hostID]; ok {
logrus.Debugf("[lb selelct] find host %s from name %s success", e, hostID)
return Endpoint(e)
}
}

View File

@ -38,93 +38,7 @@ type WebSocketProxy struct {
upgrader *websocket.Upgrader
}
//Proxy 代理
// func (h *WebSocketProxy) Proxy(w http.ResponseWriter, r *http.Request) {
// upgrader := websocket.Upgrader{
// EnableCompression: true,
// Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {
// w.WriteHeader(500)
// },
// CheckOrigin: func(r *http.Request) bool {
// return true
// },
// }
// conn, err := upgrader.Upgrade(w, r, nil)
// if err != nil {
// logrus.Error("Create web socket conn error.", err.Error())
// return
// }
// defer func() {
// conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
// conn.Close()
// }()
// endpoint := h.lb.Select(r, h.endpoints)
// path := r.RequestURI
// if strings.Contains(path, "?") {
// path = path[:strings.Index(path, "?")]
// }
// u := url.URL{Scheme: "ws", Host: endpoint.String(), Path: path}
// logrus.Infof("connecting to %s", u.String())
// c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
// if err != nil {
// logrus.Errorf("dial websocket endpoint %s error. %s", u.String(), err.Error())
// w.WriteHeader(500)
// return
// }
// defer c.Close()
// done := make(chan struct{})
// go func() {
// defer close(done)
// for {
// select {
// case <-r.Context().Done():
// return
// default:
// }
// t, message, err := c.ReadMessage()
// if err != nil {
// logrus.Println("read proxy websocket message error: ", err)
// return
// }
// err = conn.WriteMessage(t, message)
// if err != nil {
// logrus.Println("write client websocket message error: ", err)
// return
// }
// }
// }()
// for {
// select {
// case <-r.Context().Done():
// // To cleanly close a connection, a client should send a close
// // frame and wait for the server to close the connection.
// err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
// if err != nil {
// log.Println("write close:", err)
// return
// }
// select {
// case <-done:
// case <-time.After(time.Second):
// }
// return
// case <-done:
// return
// default:
// }
// t, message, err := conn.ReadMessage()
// if err != nil {
// logrus.Errorln("read client websocket message error: ", err)
// return
// }
// err = c.WriteMessage(t, message)
// if err != nil {
// logrus.Errorln("write proxy websocket message error: ", err)
// return
// }
// }
// }
//Proxy websocket proxy
func (h *WebSocketProxy) Proxy(w http.ResponseWriter, req *http.Request) {
endpoint := h.lb.Select(req, h.endpoints)
logrus.Info("Proxy webSocket to: ", endpoint)

View File

@ -21,9 +21,10 @@ package main
import (
"os"
_ "net/http/pprof"
"github.com/goodrain/rainbond/cmd"
"github.com/goodrain/rainbond/cmd/eventlog/server"
"github.com/spf13/pflag"
)

View File

@ -28,7 +28,6 @@ import (
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/entry"
"github.com/goodrain/rainbond/eventlog/exit/web"
"github.com/goodrain/rainbond/eventlog/exit/webhook"
"github.com/goodrain/rainbond/eventlog/store"
"os"
@ -82,7 +81,7 @@ func (s *LogServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Conf.EventStore.GarbageMessageFile, "message.garbage.file", "/var/log/envent_garbage_message.log", "save garbage message file path when save type is file")
fs.Int64Var(&s.Conf.EventStore.PeerEventMaxLogNumber, "message.max.number", 100000, "the max number log message for peer event")
fs.IntVar(&s.Conf.EventStore.PeerEventMaxCacheLogNumber, "message.cache.number", 256, "Maintain log the largest number in the memory peer event")
fs.Int64Var(&s.Conf.EventStore.PeerDockerMaxCacheLogNumber, "dockermessage.cache.number", 512, "Maintain log the largest number in the memory peer docker service")
fs.Int64Var(&s.Conf.EventStore.PeerDockerMaxCacheLogNumber, "dockermessage.cache.number", 128, "Maintain log the largest number in the memory peer docker service")
fs.IntVar(&s.Conf.EventStore.HandleMessageCoreNumber, "message.handle.core.number", 2, "The number of concurrent processing receive log data.")
fs.IntVar(&s.Conf.EventStore.HandleSubMessageCoreNumber, "message.sub.handle.core.number", 3, "The number of concurrent processing receive log data. more than message.handle.core.number")
fs.IntVar(&s.Conf.EventStore.HandleDockerLogCoreNumber, "message.dockerlog.handle.core.number", 2, "The number of concurrent processing receive log data. more than message.handle.core.number")
@ -106,8 +105,6 @@ func (s *LogServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Conf.EventStore.DB.PoolSize, "db.pool.size", 3, "Data persistence db pool init size.")
fs.IntVar(&s.Conf.EventStore.DB.PoolMaxSize, "db.pool.maxsize", 10, "Data persistence db pool max size.")
fs.StringVar(&s.Conf.EventStore.DB.HomePath, "docker.log.homepath", "/grdata/logs/", "container log persistent home path")
fs.StringVar(&s.Conf.WebHook.ConsoleURL, "webhook.console.url", "http://console.goodrain.me", "console web api url")
fs.StringVar(&s.Conf.WebHook.ConsoleToken, "webhook.console.token", "", "console web api token")
fs.StringVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerHost, "monitor.udp.host", "0.0.0.0", "receive new monitor udp server host")
fs.IntVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerPort, "monitor.udp.port", 6166, "receive new monitor udp server port")
fs.StringVar(&s.Conf.Cluster.Discover.NodeIDFile, "nodeid-file", "/opt/rainbond/etc/node/node_host_uuid.conf", "the unique ID for this node. Just specify, don't modify")
@ -173,9 +170,6 @@ func (s *LogServer) InitConf() {
if os.Getenv("CLUSTER_BIND_IP") != "" {
s.Conf.Cluster.PubSub.PubBindIP = os.Getenv("CLUSTER_BIND_IP")
}
if os.Getenv("CONSOLE_TOKEN") != "" {
s.Conf.WebHook.ConsoleToken = os.Getenv("CONSOLE_TOKEN")
}
}
//Run 执行
@ -183,10 +177,6 @@ func (s *LogServer) Run() error {
s.Logger.Debug("Start run server.")
log := s.Logger
if err := webhook.InitManager(s.Conf.WebHook, log.WithField("module", "WebHook")); err != nil {
return err
}
//init new db
if err := db.CreateDBManager(s.Conf.EventStore.DB); err != nil {
logrus.Infof("create db manager error, %v", err)

View File

@ -22,7 +22,6 @@ import (
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
"time"
@ -49,6 +48,8 @@ type Manager interface {
Close() error
ReleaseLogger(Logger)
}
// EventConfig event config struct
type EventConfig struct {
EventLogServers []string
DiscoverAddress []string
@ -263,6 +264,7 @@ func (m *manager) getLBChan() chan []byte {
m.qos = atomic.AddInt32(&(m.qos), 1)
server := m.eventServer[index]
if _, ok := m.abnormalServer[server]; ok {
logrus.Warnf("server[%s] is abnormal, skip it", server)
continue
}
if h, ok := m.handles[server]; ok {
@ -424,13 +426,10 @@ func (l *loggerWriter) SetFormat(f string) {
func (l *loggerWriter) Write(b []byte) (n int, err error) {
if b != nil && len(b) > 0 {
message := string(b)
message = strings.Replace(message, "\r", "", -1)
message = strings.Replace(message, "\n", "", -1)
message = strings.Replace(message, "\u0000", "", -1)
message = strings.Replace(message, "\"", "", -1)
if l.fmt != "" {
message = fmt.Sprintf(l.fmt, message)
}
logrus.Debugf("step: %s, level: %s;write message : %v", l.step, l.level, message)
l.l.send(message, map[string]string{"step": l.step, "level": l.level})
}
return len(b), nil

View File

@ -1,49 +0,0 @@
GO_LDFLAGS=-ldflags " -w"
IMAGE_NAME=hub.goodrain.com/dc-deploy/acp_event-log
IMAGE_TAG=3.2
install:
@go build ${GO_LDFLAGS}
dev:install
./event_log --log.level=debug --discover.etcd.addr=http://127.0.0.1:2379 \
--db.url="root:admin@tcp(127.0.0.1:3306)/event" \
--dockerlog.mode=stream \
--message.dockerlog.handle.core.number=2 \
--message.garbage.file="/tmp/garbage.log" \
--docker.log.homepath="/Users/qingguo/tmp"
dev-cluster:
@./event_log --log.level=debug --discover.etcd.addr=http://127.0.0.1:2379 \
--db.url="root:admin@tcp(127.0.0.1:3306)/event" \
--message.garbage.file="/tmp/garbage.log" \
--eventlog.bind.port=2000 \
--dockerlog.bind.port=2001 \
--cluster.bind.port=2002 \
--websocket.bind.port=2003\
--monitor.subaddress=tcp://127.0.0.1:9441\
--docker.log.homepath="/Users/qingguo"
build-alpine:
@echo "🐳 $@"
@docker build -t goodraim.me/event-build:v1 build
@echo "building..."
@docker run --rm -v `pwd`:/go/src/event_log -w /go/src/event_log goodraim.me/event-build:v1 go build ${GO_LDFLAGS} -o event_log
@echo "build done."
instance-ali:
@docker run --name event-log --net host -d -v /var/log/event-log/:/var/log/ ${IMAGE_NAME}:${IMAGE_TAG} \
--db.url="root:admin@tcp(172.30.0.9:3306)/event" \
--discover.etcd.addr="http://127.0.0.1:4001"
instance-test:build-alpine
@docker run --name event-log-test --net host -d ${IMAGE_NAME}:${IMAGE_TAG} \
--db.url="writer1:CeRYK8UzWD@tcp(10.0.55.72:3306)/region" \
--discover.etcd.addr="http://127.0.0.1:4001" \
--eventlog.bind.port=2000 \
--dockerlog.bind.port=2001 \
--cluster.bind.port=2002 \
--websocket.bind.port=2003\
--monitor.subaddress="tcp://10.0.1.11:9442"
instance-ali-build:build-debian instance-ali

View File

@ -1,37 +0,0 @@
# v1版本说明
1. 支持zmq-server接收消息。已完成
2. 支持集群自动发现和转发消息(已完成)
3. 支持webSocket转发消息,支持ssl协议已完成
* 建立连接后先发送缓存消息。
* 完成后再发送实时消息
4. 完成mysql存储。(已完成)
5. 完成消息分析和结果回调。(已完成)
6. 支持http接收消息。
# v2版本说明
1. 支持接收docker日志并选举接收节点。
选举策略:
* etcd中已有对应关系检测节点是否正常如果正常返回该节点。
* 对于新的serviceID根据监控数据选举闲节点。(以每分钟日志量+20倍服务量算标志标志最小为最优)
* 应用关闭时删除etcd中的对应数据。region-api完成
选举请求:
* 请求由dockerd为容器创建logger时完成。
* dockerd完成链接状态检测工作若分配的服务端状态异常。重新请求心得节点。
2. 支持集群间各类消息通信。
* 操作日志消息。(及时)
* docker日志监控数据消息。用于选举
* 容器状态检测停止消息。(用于应用启动和停止操作处理节点不在同一节点)
3. docker日志文件存储。
* 文件缓存写入。每128条写入或每1分钟写入
* 分布式文件存储。
* 历史日志文件压缩。
# v3版本说明
1. 支持监控数据websocket消息转发服务。
2. 消息输入zmq sub订阅数据源。
3. 进行数据切割,区分主题。
3. 消息输出以不同的主题进行区分和订阅。

View File

@ -20,31 +20,27 @@ package web
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/go-chi/chi"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/eventlog/cluster"
"github.com/goodrain/rainbond/eventlog/cluster/discover"
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/exit/monitor"
"github.com/goodrain/rainbond/eventlog/store"
"golang.org/x/net/context"
"fmt"
"strings"
_ "net/http/pprof"
"github.com/Sirupsen/logrus"
httputil "github.com/goodrain/rainbond/util/http"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/twinj/uuid"
"golang.org/x/net/context"
)
//SocketServer socket 服务
@ -438,15 +434,16 @@ func (s *SocketServer) Run() error {
return nil
}
func (s *SocketServer) listen() {
http.HandleFunc("/event_log", s.pushEventMessage)
http.HandleFunc("/docker_log", s.pushDockerLog)
http.HandleFunc("/monitor_message", s.pushMonitorMessage)
http.HandleFunc("/new_monitor_message", s.pushNewMonitorMessage)
http.HandleFunc("/monitor", func(w http.ResponseWriter, r *http.Request) {
r := chi.NewRouter()
r.Get("/event_log", s.pushEventMessage)
r.Get("/docker_log", s.pushDockerLog)
r.Get("/monitor_message", s.pushMonitorMessage)
r.Get("/new_monitor_message", s.pushNewMonitorMessage)
r.Get("/monitor", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("ok"))
})
http.HandleFunc("/docker-instance", func(w http.ResponseWriter, r *http.Request) {
r.Get("/docker-instance", func(w http.ResponseWriter, r *http.Request) {
ServiceID := r.FormValue("service_id")
if ServiceID == "" {
w.WriteHeader(412)
@ -466,21 +463,22 @@ func (s *SocketServer) listen() {
url := fmt.Sprintf("tcp://%s:%d", instance.HostIP, instance.DockerLogPort)
w.Write([]byte(`{"host":"` + url + `","status":"success"}`))
})
http.HandleFunc("/event_push", s.receiveEventMessage)
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
r.Get("/event_push", s.receiveEventMessage)
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
if s.healthInfo["status"] != "health" {
httputil.ReturnError(r, w, 400, "eventlog service unusual")
}
httputil.ReturnSuccess(r, w, s.healthInfo)
})
// new websocket pubsub
r.Get("/services/{serviceID}/pubsub", s.pubsub)
//monitor setting
s.prometheus()
s.prometheus(r)
if s.conf.SSL {
go func() {
addr := fmt.Sprintf("%s:%d", s.conf.BindIP, s.conf.SSLBindPort)
s.log.Infof("web socket ssl server listen %s", addr)
err := http.ListenAndServeTLS(addr, s.conf.CertFile, s.conf.KeyFile, nil)
err := http.ListenAndServeTLS(addr, s.conf.CertFile, s.conf.KeyFile, r)
if err != nil {
s.log.Error("websocket listen error.", err.Error())
s.listenErr <- err
@ -489,7 +487,7 @@ func (s *SocketServer) listen() {
}
addr := fmt.Sprintf("%s:%d", s.conf.BindIP, s.conf.BindPort)
s.log.Infof("web socket server listen %s", addr)
err := http.ListenAndServe(addr, nil)
err := http.ListenAndServe(addr, r)
if err != nil {
s.log.Error("websocket listen error.", err.Error())
s.listenErr <- err
@ -554,11 +552,11 @@ func (s *SocketServer) receiveEventMessage(w http.ResponseWriter, r *http.Reques
return
}
func (s *SocketServer) prometheus() {
func (s *SocketServer) prometheus(r *chi.Mux) {
prometheus.MustRegister(version.NewCollector("event_log"))
exporter := monitor.NewExporter(s.storemanager, s.cluster)
prometheus.MustRegister(exporter)
http.Handle(s.conf.PrometheusMetricPath, promhttp.Handler())
r.Handle(s.conf.PrometheusMetricPath, promhttp.Handler())
}
//ResponseType 返回内容

309
eventlog/exit/web/pusher.go Normal file
View File

@ -0,0 +1,309 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2019 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package web
import (
"net/http"
"strings"
"sync"
"time"
"github.com/goodrain/rainbond/eventlog/db"
"github.com/goodrain/rainbond/util"
"github.com/pquerna/ffjson/ffjson"
"github.com/gorilla/websocket"
)
//WebsocketMessage websocket message
type WebsocketMessage struct {
Event string `json:"event"`
Data interface{} `json:"data"`
Channel string `json:"channel,omitempty"`
}
//Encode return json encode data
func (w *WebsocketMessage) Encode() []byte {
reb, _ := ffjson.Marshal(w)
return reb
}
//PubContext websocket context
type PubContext struct {
ID string
upgrader websocket.Upgrader
conn *websocket.Conn
httpWriter http.ResponseWriter
httpRequest *http.Request
server *SocketServer
chans map[string]*Chan
lock sync.Mutex
close chan struct{}
}
//Chan handle
type Chan struct {
ch chan *db.EventLogMessage
id string
chtype string
reevent string
channel string
p *PubContext
}
//NewPubContext create context
func NewPubContext(upgrader websocket.Upgrader,
httpWriter http.ResponseWriter,
httpRequest *http.Request,
s *SocketServer,
) *PubContext {
return &PubContext{
ID: util.NewUUID(),
upgrader: upgrader,
httpWriter: httpWriter,
httpRequest: httpRequest,
server: s,
chans: make(map[string]*Chan, 2),
close: make(chan struct{}),
}
}
func (p *PubContext) handleMessage(me []byte) {
var wm WebsocketMessage
if err := ffjson.Unmarshal(me, &wm); err != nil {
p.SendMessage(WebsocketMessage{Event: "error", Data: "Invalid message"})
return
}
switch wm.Event {
case "pusher:subscribe":
p.handleSubscribe(wm)
}
}
func (p *PubContext) createChan(channel, chantype, id string) {
p.lock.Lock()
defer p.lock.Unlock()
if _, exist := p.chans[chantype+"-"+id]; exist {
return
}
ch := p.server.storemanager.WebSocketMessageChan(chantype, id, p.ID)
if ch != nil {
c := &Chan{
ch: ch,
channel: chantype + "-" + id,
id: id,
chtype: chantype,
reevent: func() string {
if chantype == "docker" {
return "service:log"
}
if chantype == "newmonitor" {
return "monitor"
}
if chantype == "event" {
return "event:log"
}
return ""
}(),
p: p,
}
p.chans[chantype+"-"+id] = c
// send success message
p.SendMessage(WebsocketMessage{
Event: "pusher:succeeded",
Channel: c.channel,
})
go c.handleChan()
}
}
func (p *PubContext) removeChan(key string) {
p.lock.Lock()
defer p.lock.Unlock()
if _, exist := p.chans[key]; exist {
delete(p.chans, key)
}
if len(p.chans) == 0 {
p.Close()
}
}
func (p *PubContext) handleSubscribe(wm WebsocketMessage) {
data := wm.Data.(map[string]interface{})
if channel, ok := data["channel"].(string); ok {
channelInfo := strings.SplitN(channel, "-", 2)
if len(channelInfo) < 2 {
p.SendMessage(WebsocketMessage{Event: "error", Data: "Invalid message"})
return
}
if channelInfo[0] == "s" {
p.createChan(channel, "docker", channelInfo[1])
p.createChan(channel, "newmonitor", channelInfo[1])
}
if channelInfo[0] == "e" {
p.createChan(channel, "newmonitor", channelInfo[1])
}
}
}
func (c *Chan) handleChan() {
defer func() {
c.p.server.log.Infof("pubsub message chan %s closed", c.channel)
c.p.removeChan(c.chtype + "-" + c.id)
}()
for {
select {
case <-c.p.close:
c.p.server.log.Info("pub sub context closed")
return
case <-c.p.httpRequest.Context().Done():
c.p.server.log.Info("pub sub request context cancel")
return
case message, ok := <-c.ch:
if !ok {
c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: "{}", Channel: c.channel})
c.p.SendWebsocketMessage(websocket.CloseMessage)
return
}
if message != nil {
if message.Step == "last" {
c.p.SendMessage(WebsocketMessage{Event: "event:success", Data: message.Message, Channel: c.channel})
c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: message.Message, Channel: c.channel})
return
}
if message.Step == "callback" {
c.p.SendMessage(WebsocketMessage{Event: "event:failure", Data: message.Message, Channel: c.channel})
c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: message.Message, Channel: c.channel})
return
}
if message.MonitorData != nil {
c.p.SendMessage(WebsocketMessage{Event: c.reevent, Data: string(message.MonitorData), Channel: c.channel})
} else {
c.p.SendMessage(WebsocketMessage{Event: c.reevent, Data: string(message.Content), Channel: c.channel})
}
}
}
}
}
func (p *PubContext) readMessage(closed chan struct{}) {
defer func() {
close(closed)
}()
if p.conn == nil {
p.server.log.Errorf("websocket connection is not connect")
}
for {
messageType, me, err := p.conn.ReadMessage()
if err != nil {
p.server.log.Errorf("read websocket message failure %s", err.Error())
break
}
if messageType == websocket.CloseMessage {
break
}
if messageType == websocket.TextMessage {
p.handleMessage(me)
continue
}
if messageType == websocket.PingMessage {
p.conn.WriteMessage(websocket.PongMessage, []byte{})
continue
}
if messageType == websocket.BinaryMessage {
continue
}
}
}
//SendMessage send websocket message
func (p *PubContext) SendMessage(message WebsocketMessage) error {
p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
return p.conn.WriteMessage(websocket.TextMessage, message.Encode())
}
//SendWebsocketMessage send websocket message
func (p *PubContext) SendWebsocketMessage(message int) error {
p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
return p.conn.WriteMessage(message, []byte{})
}
func (p *PubContext) sendPing(closed chan struct{}) {
err := util.Exec(p.httpRequest.Context(), func() error {
if err := p.SendWebsocketMessage(websocket.PingMessage); err != nil {
return err
}
return nil
}, time.Second*20)
if err != nil {
p.server.log.Errorf("send ping message failure %s will closed the connect", err.Error())
close(closed)
}
}
//Start start context
func (p *PubContext) Start() {
var err error
p.conn, err = p.upgrader.Upgrade(p.httpWriter, p.httpRequest, nil)
if err != nil {
p.server.log.Error("Create web socket conn error.", err.Error())
return
}
pingclosed := make(chan struct{})
readclosed := make(chan struct{})
go p.sendPing(pingclosed)
go p.readMessage(readclosed)
select {
case <-pingclosed:
case <-readclosed:
case <-p.close:
}
}
//Stop close context
func (p *PubContext) Stop() {
if p.conn != nil {
p.conn.Close()
}
for _, v := range p.chans {
p.server.storemanager.RealseWebSocketMessageChan(v.chtype, v.id, p.ID)
}
}
//Close close the context
func (p *PubContext) Close() {
close(p.close)
}
func (s *SocketServer) pubsub(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: s.conf.ReadBufferSize,
WriteBufferSize: s.conf.WriteBufferSize,
EnableCompression: s.conf.EnableCompression,
Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {
},
CheckOrigin: func(r *http.Request) bool {
return true
},
}
context := NewPubContext(upgrader, w, r, s)
defer context.Stop()
s.log.Infof("websocket pubsub context running %s", context.ID)
context.Start()
s.log.Infof("websocket pubsub context closed %s", context.ID)
}

View File

@ -1,187 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package webhook
import (
"github.com/goodrain/rainbond/eventlog/conf"
"net/http"
"sync"
"bytes"
"fmt"
"strings"
"net/url"
"strconv"
"github.com/Sirupsen/logrus"
)
type Manager struct {
hooks map[string]*WebHook
conf conf.WebHookConf
log *logrus.Entry
lock sync.Mutex
}
type WebHook struct {
EndPoint string
RequestParameter map[string]interface{}
RequestBody []byte
RequestHeader map[string]string
Name string
Method string
}
const UpDateEventStatus = "console_update_event_status"
const UpdateEventCodeVersion = "console_update_event_code_version"
var manager *Manager
func GetManager() *Manager {
return manager
}
func InitManager(conf conf.WebHookConf, log *logrus.Entry) error {
ma := &Manager{
conf: conf,
log: log,
hooks: make(map[string]*WebHook, 0),
}
eventStatus := &WebHook{
Name: UpDateEventStatus,
Method: "PUT",
EndPoint: conf.ConsoleURL + "/api/event/update",
RequestHeader: make(map[string]string, 0),
}
eventStatus.RequestHeader["Content-Type"] = "application/json"
if conf.ConsoleToken != "" {
eventStatus.RequestHeader["Authorization"] = "Token " + conf.ConsoleToken
}
codeVsersion := &WebHook{
Name: UpdateEventCodeVersion,
Method: "PUT",
EndPoint: conf.ConsoleURL + "/api/event/update-code",
RequestHeader: make(map[string]string, 0),
}
codeVsersion.RequestHeader["Content-Type"] = "application/json"
if conf.ConsoleToken != "" {
codeVsersion.RequestHeader["Authorization"] = "Token " + conf.ConsoleToken
}
ma.Regist(eventStatus)
ma.Regist(codeVsersion)
manager = ma
return nil
}
func (m *Manager) GetWebhook(name string) *WebHook {
m.lock.Lock()
defer m.lock.Unlock()
return m.hooks[name]
}
func (m *Manager) RunWebhook(name string, body []byte) {
w := m.GetWebhook(name)
w.RequestBody = body
if err := w.Run(); err != nil {
m.log.Errorf("Webhook %s run error. %s", w.Name, err.Error())
}
m.log.Debugf("Run web hook %s success", w.Name)
}
func (m *Manager) RunWebhookWithParameter(name string, body []byte, parameter map[string]interface{}) {
w := m.GetWebhook(name)
w.RequestBody = body
w.RequestParameter = parameter
if err := w.Run(); err != nil {
m.log.Errorf("Webhook %s run error. %s", w.Name, err.Error())
}
}
//Regist 注册
func (m *Manager) Regist(w *WebHook) {
if w == nil || w.Name == "" {
return
}
m.lock.Lock()
defer m.lock.Unlock()
m.hooks[w.Name] = w
}
//Run 执行
func (w *WebHook) Run() error {
var err error
if w.RequestParameter != nil && len(w.RequestParameter) > 0 {
if strings.ToUpper(w.Method) == "GET" {
values := make(url.Values)
for k, v := range w.RequestParameter {
switch v.(type) {
case int:
values.Set(k, strconv.Itoa(v.(int)))
case string:
values.Set(k, v.(string))
}
}
w.EndPoint += "?" + values.Encode()
} else {
var jsonStr = "{"
for k, v := range w.RequestParameter {
switch v.(type) {
case int:
jsonStr += `"` + k + `":` + strconv.Itoa(v.(int)) + `,`
case string:
jsonStr += `"` + k + `":"` + v.(string) + `",`
}
}
jsonStr = jsonStr[0:len(jsonStr)-1] + "}"
w.RequestBody = []byte(jsonStr)
}
}
var request *http.Request
if w.RequestBody != nil {
request, err = http.NewRequest(strings.ToUpper(w.Method), w.EndPoint, bytes.NewReader(w.RequestBody))
if err != nil {
return err
}
} else {
request, err = http.NewRequest(strings.ToUpper(w.Method), w.EndPoint, nil)
if err != nil {
return err
}
}
request.Header.Add("Content-Type", "application/json")
for k, v := range w.RequestHeader {
request.Header.Add(k, v)
}
res, err := http.DefaultClient.Do(request)
if err != nil {
return err
}
if res.Body != nil {
defer res.Body.Close()
}
if res.StatusCode/100 == 2 {
return nil
}
return fmt.Errorf("Endpoint return status code is %d", res.StatusCode)
}

View File

@ -1,34 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package webhook
import (
"github.com/goodrain/rainbond/eventlog/conf"
"testing"
"github.com/Sirupsen/logrus"
)
func TestEventUpdate(t *testing.T) {
InitManager(conf.WebHookConf{
ConsoleURL: "http://test.goodrain.com:7070/",
}, logrus.WithField("test", "test"))
GetManager().RunWebhookWithParameter(UpDateEventStatus, nil,
map[string]interface{}{"event_id": "4e9dafa96cb043cdb992adde2af421d4", "status": "success", "message": "操作成功"})
}

View File

@ -140,12 +140,6 @@ func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) {
func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -165,7 +159,8 @@ func (r *readEventBarrel) addSubChan(subID string) chan *db.EventLogMessage {
func (r *readEventBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if _, ok := r.subSocketChan[subID]; ok {
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}
@ -221,13 +216,6 @@ func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) {
func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
// send cache log will cause user illusion
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -247,7 +235,8 @@ func (r *dockerLogEventBarrel) addSubChan(subID string) chan *db.EventLogMessage
func (r *dockerLogEventBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if _, ok := r.subSocketChan[subID]; ok {
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}
@ -299,8 +288,6 @@ func (r *monitorMessageBarrel) empty() {
}
func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) {
//r.barrel = append(r.barrel, message)
//logrus.Info(string(message.Content))
r.updateTime = time.Now()
r.subLock.Lock()
defer r.subLock.Unlock()
@ -315,12 +302,6 @@ func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) {
func (r *monitorMessageBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -340,7 +321,8 @@ func (r *monitorMessageBarrel) addSubChan(subID string) chan *db.EventLogMessage
func (r *monitorMessageBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if _, ok := r.subSocketChan[subID]; ok {
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}

View File

@ -131,7 +131,7 @@ func (h *dockerLogStore) GetMonitorData() *db.MonitorData {
return data
}
func (h *dockerLogStore) Gc() {
tiker := time.NewTicker(time.Minute * 1)
tiker := time.NewTicker(time.Second * 30)
for {
select {
case <-tiker.C:
@ -151,14 +151,15 @@ func (h *dockerLogStore) handle() []string {
}
var gcEvent []string
for k, v := range h.barrels {
if v.updateTime.Add(time.Minute * 1).Before(time.Now()) { // barrel 超时未收到消息
if v.updateTime.Add(time.Minute * 1).Before(time.Now()) {
//gc without client link
if v.GetSubChanLength() == 0 {
h.saveBeforeGc(k, v)
gcEvent = append(gcEvent, k)
}
}
if v.persistenceTime.Add(time.Minute * 2).Before(time.Now()) { //超过2分钟未持久化 间隔需要大于1分钟。以分钟为单位
//The interval not persisted for more than 1 minute should be more than 30 seconds
if v.persistenceTime.Add(time.Minute * 1).Before(time.Now()) {
if len(v.barrel) > 0 {
v.persistence()
}

View File

@ -19,7 +19,6 @@
package store
import (
"strings"
"sync"
"time"
@ -114,7 +113,6 @@ func (h *handleMessageStore) Gc() {
}
}
func (h *handleMessageStore) gcRun() {
//h.log.Debugf("runGC %d", time.Now().UnixNano())
h.lock.Lock()
defer h.lock.Unlock()
t := time.Now()
@ -123,7 +121,7 @@ func (h *handleMessageStore) gcRun() {
}
var gcEvent []string
for k, v := range h.barrels {
if v.updateTime.Add(time.Second * 30).Before(time.Now()) { // barrel 超时未收到消息
if v.updateTime.Add(time.Second * 30).Before(time.Now()) {
h.saveBeforeGc(v)
gcEvent = append(gcEvent, k)
}
@ -132,7 +130,7 @@ func (h *handleMessageStore) gcRun() {
for _, id := range gcEvent {
barrel := h.barrels[id]
barrel.empty()
h.pool.Put(barrel) //放回对象池
h.pool.Put(barrel)
delete(h.barrels, id)
}
}
@ -240,7 +238,6 @@ func (h *handleMessageStore) saveGarbageMessage() {
}
err := util.AppendToFile(h.conf.GarbageMessageFile, content)
if err != nil {
//h.log.Error("Save garbage message to file error.context :\n " + content)
h.log.Error("Save garbage message to file error.context", err.Error())
} else {
h.log.Info("Save the garbage message to file.")
@ -296,16 +293,6 @@ func (h *handleMessageStore) handleBarrelEvent() {
}
}
}
if event[0] == "code-version" { //代码版本
if len(event) == 3 {
eventID := event[1]
codeVersion := strings.TrimSpace(event[2])
event := model.ServiceEvent{}
event.EventID = eventID
cdb.GetManager().ServiceEventDao().UpdateModel(&event)
h.log.Infof("run web hook update code version .event_id %s code_version %s", eventID, codeVersion)
}
}
case <-h.ctx.Done():
return
}

View File

@ -23,7 +23,6 @@ import (
"strconv"
"github.com/goodrain/rainbond/eventlog/db"
"github.com/goodrain/rainbond/eventlog/util"
coreutil "github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/eventlog/conf"
@ -45,7 +44,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/pquerna/ffjson/ffjson"
"github.com/prometheus/client_golang/prometheus"
"github.com/tidwall/gjson"
)
//Manager 存储管理器
@ -68,11 +66,6 @@ type Manager interface {
//NewManager 存储管理器
func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) {
// event log do not save in db,will save in file
// dbPlugin, err := db.NewManager(conf.DB, log)
// if err != nil {
// return nil, err
// }
conf.DB.Type = "eventfile"
dbPlugin, err := db.NewManager(conf.DB, log)
if err != nil {
@ -103,12 +96,10 @@ func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) {
handle := NewStore("handle", storeManager)
read := NewStore("read", storeManager)
docker := NewStore("docker_log", storeManager)
monitor := NewStore("monitor", storeManager)
newmonitor := NewStore("newmonitor", storeManager)
storeManager.handleMessageStore = handle
storeManager.readMessageStore = read
storeManager.dockerLogStore = docker
storeManager.monitorMessageStore = monitor
storeManager.newmonitorMessageStore = newmonitor
return storeManager, nil
}
@ -119,7 +110,6 @@ type storeManager struct {
handleMessageStore MessageStore
readMessageStore MessageStore
dockerLogStore MessageStore
monitorMessageStore MessageStore
newmonitorMessageStore MessageStore
receiveChan chan []byte
pubChan, subChan chan [][]byte
@ -157,7 +147,7 @@ func (s *storeManager) Scrape(ch chan<- prometheus.Metric, namespace, exporter,
s.dockerLogStore.Scrape(ch, namespace, exporter, from)
s.handleMessageStore.Scrape(ch, namespace, exporter, from)
s.monitorMessageStore.Scrape(ch, namespace, exporter, from)
s.newmonitorMessageStore.Scrape(ch, namespace, exporter, from)
chanDesc := prometheus.NewDesc(
prometheus.BuildFQName(namespace, exporter, "chan_cache_size"),
"the handle chan cache size.",
@ -183,7 +173,7 @@ func (s *storeManager) Scrape(ch chan<- prometheus.Metric, namespace, exporter,
func (s *storeManager) Monitor() []db.MonitorData {
data := s.dockerLogStore.GetMonitorData()
moData := s.monitorMessageStore.GetMonitorData()
moData := s.newmonitorMessageStore.GetMonitorData()
if moData != nil {
data.LogSizePeerM += moData.LogSizePeerM
data.ServiceSize += moData.ServiceSize
@ -248,10 +238,6 @@ func (s *storeManager) WebSocketMessageChan(mode, eventID, subID string) chan *d
ch := s.dockerLogStore.SubChan(eventID, subID)
return ch
}
if mode == "monitor" {
ch := s.monitorMessageStore.SubChan(eventID, subID)
return ch
}
if mode == "newmonitor" {
ch := s.newmonitorMessageStore.SubChan(eventID, subID)
return ch
@ -264,7 +250,6 @@ func (s *storeManager) Run() error {
s.handleMessageStore.Run()
s.readMessageStore.Run()
s.dockerLogStore.Run()
s.monitorMessageStore.Run()
s.newmonitorMessageStore.Run()
for i := 0; i < s.conf.HandleMessageCoreNumber; i++ {
go s.handleReceiveMessage()
@ -275,9 +260,6 @@ func (s *storeManager) Run() error {
for i := 0; i < s.conf.HandleDockerLogCoreNumber; i++ {
go s.handleDockerLog()
}
for i := 0; i < s.conf.HandleMessageCoreNumber; i++ {
go s.handleMonitorMessage()
}
go s.handleNewMonitorMessage()
go s.cleanLog()
return nil
@ -454,10 +436,6 @@ func (s *storeManager) handleSubMessage() {
if string(msg[0]) == string(db.EventMessage) {
s.readMessageStore.InsertMessage(message)
}
if string(msg[0]) == string(db.ServiceMonitorMessage) {
s.monitorMessageStore.InsertMessage(message)
}
}
}
}
@ -499,14 +477,11 @@ loop:
Content: buffer.Bytes(),
EventID: serviceID,
}
//s.log.Debug("Receive docker log:", info)
s.dockerLogStore.InsertMessage(&message)
buffer.Reset()
}
}
s.errChan <- fmt.Errorf("handle docker log core exist")
}
type event struct {
@ -515,101 +490,6 @@ type event struct {
Update string `json:"update_time"`
}
func (s *storeManager) handleMonitorMessage() {
loop:
for {
select {
case <-s.context.Done():
return
case msg, ok := <-s.monitorMessageChan:
if !ok {
s.log.Error("handle monitor message core stop.monitor message log chan closed")
break loop
}
if msg == nil {
continue
}
if len(msg) == 2 {
message := strings.SplitAfterN(string(msg[1]), " ", 5)
name := message[3]
body := message[4]
var currentTopic string
var data []interface{}
switch strings.TrimSpace(name) {
case "SumTimeByUrl":
result := gjson.Parse(body).Array()
for _, r := range result {
var port int
if p, ok := r.Map()["port"]; ok {
port = int(p.Int())
}
wsTopic := fmt.Sprintf("%s.%s.statistic", r.Map()["tenant"], r.Map()["service"])
if port != 0 {
wsTopic = fmt.Sprintf("%s.%s.%d.statistic", r.Map()["tenant"], r.Map()["service"], port)
}
if currentTopic == "" {
currentTopic = wsTopic
}
if wsTopic == currentTopic {
data = append(data, util.Format(r.Map()))
} else {
s.sendMonitorData("SumTimeByUrl", data, currentTopic)
currentTopic = wsTopic
data = []interface{}{util.Format(r.Map())}
}
}
s.sendMonitorData("SumTimeByUrl", data, currentTopic)
case "SumTimeBySql":
result := gjson.Parse(body).Array()
for _, r := range result {
tenantID := r.Map()["tenant_id"].String()
serviceID := r.Map()["service_id"].String()
if len(tenantID) < 12 || len(serviceID) < 12 {
continue
}
tenantAlias := tenantID[len(tenantID)-12:]
serviceAlias := serviceID[len(serviceID)-12:]
wsTopic := fmt.Sprintf("%s.%s.statistic", tenantAlias, serviceAlias)
if currentTopic == "" {
currentTopic = wsTopic
}
if wsTopic == currentTopic {
data = append(data, util.Format(r.Map()))
} else {
s.sendMonitorData("SumTimeBySql", data, currentTopic)
currentTopic = wsTopic
data = []interface{}{util.Format(r.Map())}
}
}
s.sendMonitorData("SumTimeBySql", data, currentTopic)
}
}
}
}
s.errChan <- fmt.Errorf("handle monitor log core exist")
}
func (s *storeManager) sendMonitorData(name string, data []interface{}, topic string) {
e := event{
Name: name,
Update: time.Now().Format(time.Kitchen),
Data: data,
}
eventByte, _ := json.Marshal(e)
m := &db.EventLogMessage{
EventID: topic,
MonitorData: eventByte,
}
s.monitorMessageStore.InsertMessage(m)
d, err := json.Marshal(m)
if err != nil {
s.log.Error("Marshal monitor message to byte error.", err.Error())
return
}
s.pubChan <- [][]byte{[]byte(db.ServiceMonitorMessage), d}
}
func (s *storeManager) RealseWebSocketMessageChan(mode string, eventID, subID string) {
if mode == "event" {
s.readMessageStore.RealseSubChan(eventID, subID)
@ -617,8 +497,8 @@ func (s *storeManager) RealseWebSocketMessageChan(mode string, eventID, subID st
if mode == "docker" {
s.dockerLogStore.RealseSubChan(eventID, subID)
}
if mode == "monitor" {
s.monitorMessageStore.RealseSubChan(eventID, subID)
if mode == "newmonitor" {
s.newmonitorMessageStore.RealseSubChan(eventID, subID)
}
}
@ -626,7 +506,7 @@ func (s *storeManager) Stop() {
s.handleMessageStore.stop()
s.readMessageStore.stop()
s.dockerLogStore.stop()
s.monitorMessageStore.stop()
s.newmonitorMessageStore.stop()
s.cancel()
if s.filePlugin != nil {
s.filePlugin.Close()

View File

@ -1,149 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package store
import (
"sync"
"time"
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/db"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
)
type monitorMessageStore struct {
conf conf.EventStoreConf
log *logrus.Entry
barrels map[string]*monitorMessageBarrel
lock sync.RWMutex
cancel func()
ctx context.Context
pool *sync.Pool
size int64
allLogCount float64
}
func (h *monitorMessageStore) Scrape(ch chan<- prometheus.Metric, namespace, exporter, from string) error {
chanDesc := prometheus.NewDesc(
prometheus.BuildFQName(namespace, exporter, "monitor_store_barrel_count"),
"the handle container log count size.",
[]string{"from"}, nil,
)
ch <- prometheus.MustNewConstMetric(chanDesc, prometheus.GaugeValue, float64(len(h.barrels)), from)
logDesc := prometheus.NewDesc(
prometheus.BuildFQName(namespace, exporter, "monitor_store_log_count"),
"the handle monitor log count size.",
[]string{"from"}, nil,
)
ch <- prometheus.MustNewConstMetric(logDesc, prometheus.GaugeValue, h.allLogCount, from)
return nil
}
func (h *monitorMessageStore) insertMessage(message *db.EventLogMessage) bool {
h.lock.RLock()
defer h.lock.RUnlock()
if ba, ok := h.barrels[message.EventID]; ok {
ba.insertMessage(message)
return true
}
return false
}
func (h *monitorMessageStore) InsertMessage(message *db.EventLogMessage) {
if message == nil || message.EventID == "" {
return
}
//h.log.Debug("Receive a monitor message:" + string(message.Content))
h.size++
h.allLogCount++
if h.insertMessage(message) {
return
}
h.lock.Lock()
defer h.lock.Unlock()
ba := h.pool.Get().(*monitorMessageBarrel)
ba.insertMessage(message)
h.barrels[message.EventID] = ba
}
func (h *monitorMessageStore) GetMonitorData() *db.MonitorData {
data := &db.MonitorData{
ServiceSize: len(h.barrels),
LogSizePeerM: h.size,
}
return data
}
func (h *monitorMessageStore) SubChan(eventID, subID string) chan *db.EventLogMessage {
h.lock.Lock()
defer h.lock.Unlock()
if ba, ok := h.barrels[eventID]; ok {
return ba.addSubChan(subID)
}
ba := h.pool.Get().(*monitorMessageBarrel)
h.barrels[eventID] = ba
return ba.addSubChan(subID)
}
func (h *monitorMessageStore) RealseSubChan(eventID, subID string) {
h.lock.RLock()
defer h.lock.RUnlock()
if ba, ok := h.barrels[eventID]; ok {
ba.delSubChan(subID)
}
}
func (h *monitorMessageStore) Run() {
go h.Gc()
}
func (h *monitorMessageStore) Gc() {
tiker := time.NewTicker(time.Second * 30)
for {
select {
case <-tiker.C:
case <-h.ctx.Done():
h.log.Debug("read message store gc stop.")
tiker.Stop()
return
}
h.size = 0
if len(h.barrels) == 0 {
continue
}
var gcEvent []string
for k, v := range h.barrels {
if v.updateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息
gcEvent = append(gcEvent, k)
}
}
if gcEvent != nil && len(gcEvent) > 0 {
for _, id := range gcEvent {
barrel := h.barrels[id]
barrel.empty()
h.pool.Put(barrel) //放回对象池
delete(h.barrels, id)
}
}
}
}
func (h *monitorMessageStore) stop() {
h.cancel()
}
func (h *monitorMessageStore) InsertGarbageMessage(message ...*db.EventLogMessage) {}

View File

@ -81,7 +81,6 @@ func (h *newMonitorMessageStore) InsertMessage(message *db.EventLogMessage) {
if message == nil {
return
}
//h.log.Debug("Receive a monitor message:" + string(message.Content))
h.size++
h.allLogCount++
mm, ok := h.insertMessage(message)
@ -138,12 +137,15 @@ func (h *newMonitorMessageStore) Gc() {
}
var gcEvent []string
for k, v := range h.barrels {
if v.UpdateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息
gcEvent = append(gcEvent, k)
if len(v.subSocketChan) == 0 {
if v.UpdateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息
gcEvent = append(gcEvent, k)
}
}
}
if gcEvent != nil && len(gcEvent) > 0 {
for _, id := range gcEvent {
h.log.Infof("monitor message barrel %s will be gc", id)
barrel := h.barrels[id]
barrel.empty()
delete(h.barrels, id)
@ -285,11 +287,12 @@ func (c *CacheMonitorMessageList) addSubChan(subID string) chan *db.EventLogMess
return ch
}
//删除socket订阅
//delSubChan delete socket sub chan
func (c *CacheMonitorMessageList) delSubChan(subID string) {
c.subLock.Lock()
defer c.subLock.Unlock()
if _, ok := c.subSocketChan[subID]; ok {
if ch, ok := c.subSocketChan[subID]; ok {
close(ch)
delete(c.subSocketChan, subID)
}
}
@ -319,11 +322,8 @@ func merge(source, addsource MonitorMessageList) (result MonitorMessageList) {
if oldmm, ok := cache[mm.Key]; ok {
oldmm.Count += mm.Count
oldmm.AbnormalCount += mm.AbnormalCount
//平均时间
oldmm.AverageTime = Round((oldmm.AverageTime+mm.AverageTime)/2, 2)
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><E7A7AF><EFBFBD>
oldmm.CumulativeTime = Round(oldmm.CumulativeTime+mm.CumulativeTime, 2)
//最大时间
if mm.MaxTime > oldmm.MaxTime {
oldmm.MaxTime = mm.MaxTime
}

View File

@ -97,24 +97,6 @@ func NewStore(storeType string, manager *storeManager) MessageStore {
}
return read
}
if storeType == "monitor" {
monitor := &monitorMessageStore{
barrels: make(map[string]*monitorMessageBarrel, 100),
conf: manager.conf,
log: manager.log.WithField("module", "MonitorMessageStore"),
ctx: ctx,
cancel: cancel,
}
monitor.pool = &sync.Pool{
New: func() interface{} {
reb := &monitorMessageBarrel{
subSocketChan: make(map[string]chan *db.EventLogMessage, 0),
}
return reb
},
}
return monitor
}
if storeType == "docker_log" {
docker := &dockerLogStore{
barrels: make(map[string]*dockerLogEventBarrel, 100),

View File

@ -52,6 +52,7 @@ type NodeService struct {
func CreateNodeService(c *option.Conf, nodecluster *node.Cluster, kubecli kubecache.KubeClient) *NodeService {
if err := event.NewManager(event.EventConfig{
DiscoverAddress: c.Etcd.Endpoints,
EventLogServers: c.EventLogServer,
}); err != nil {
logrus.Errorf("create event manager faliure")
}
@ -159,11 +160,6 @@ func (n *NodeService) AsynchronousInstall(node *client.HostNode, eventID string)
logrus.Error("write hosts file error ", err.Error())
return
}
if err := event.NewManager(event.EventConfig{
DiscoverAddress: n.c.Etcd.Endpoints,
}); err != nil {
logrus.Errorf("create event manager faliure")
}
// start add node script
logrus.Infof("Begin install node %s", node.ID)
// write log to event log
@ -179,9 +175,10 @@ func (n *NodeService) AsynchronousInstall(node *client.HostNode, eventID string)
Stdout: logger.GetWriter("node-install", "info"),
Stderr: logger.GetWriter("node-install", "err"),
}
err = ansibleUtil.RunNodeInstallCmd(option)
if err != nil {
logrus.Error("Error executing shell script", err)
logrus.Error("Error executing shell script : ", err)
node.Status = client.InstallFailed
node.NodeStatus.Status = client.InstallFailed
n.nodecluster.UpdateNode(node)

View File

@ -0,0 +1,38 @@
package service
import (
"testing"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/event"
etcdClient "github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/node/core/store"
"github.com/goodrain/rainbond/node/masterserver"
"github.com/goodrain/rainbond/node/masterserver/node"
"github.com/goodrain/rainbond/node/nodem/client"
)
func TestAsynchronousInstall(t *testing.T) {
config := &option.Conf{
Etcd: etcdClient.Config{Endpoints: []string{"192.168.195.1:2379"}},
EventLogServer: []string{"192.168.195.1:6366"},
}
store.NewClient(config)
// node *client.HostNode, eventID string
eventID := "fanyangyang"
// nodemanager := nodem.NewNodeManager(&option.Conf{})
currentNode := client.HostNode{
ID: "123",
Role: []string{"manage"},
InternalIP: "127.0.0.1",
RootPass: "password",
}
cluster := node.CreateCluster(nil, &currentNode, nil)
cluster.CacheNode(&currentNode)
ms := &masterserver.MasterServer{Cluster: cluster}
nodeService := CreateNodeService(config, ms.Cluster, nil)
nodeService.AsynchronousInstall(&currentNode, eventID)
event.GetManager().Close()
}

View File

@ -52,7 +52,7 @@ func NewCopier(logfile *LogFile, dst []Logger, since time.Time) *Copier {
// Run starts logs copying
func (c *Copier) Run() {
c.closed = make(chan struct{})
go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since}, c.reader)
go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since, Tail: 0}, c.reader)
go c.copySrc()
}
@ -63,6 +63,13 @@ lool:
select {
case <-c.closed:
return
case err := <-c.reader.Err:
logrus.Errorf("read container log file error %s, will retry after 5 seconds", err.Error())
//If there is an error in the collection log process,
//the collection should be restarted and not stopped
time.Sleep(time.Second * 5)
go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since, Tail: 0}, c.reader)
continue
case msg, ok := <-c.reader.Msg:
if !ok {
break lool

View File

@ -139,14 +139,11 @@ func decodeFunc(rdr io.Reader) func() (*Message, error) {
if err == nil || err == io.EOF {
break
}
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
// try again, could be due to a an incomplete json object as we read
if _, ok := err.(*json.SyntaxError); ok {
dec = json.NewDecoder(rdr)
continue
}
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
// remaining data in the parser's buffer while an io.EOF occurs.
// If the json logger writes a partial json log entry to the disk
@ -156,6 +153,7 @@ func decodeFunc(rdr io.Reader) func() (*Message, error) {
dec = json.NewDecoder(reader)
continue
}
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
}
return msg, err
}
@ -205,7 +203,6 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) {
watcher.Err <- err
return
}
closeFiles := func() {
for _, f := range files {
f.Close()
@ -218,7 +215,6 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) {
}
}
}
readers := make([]SizeReaderAt, 0, len(files)+1)
for _, f := range files {
stat, err := f.Stat()
@ -244,10 +240,7 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) {
return
}
w.mu.RUnlock()
notifyRotate := w.notifyRotate.Subscribe()
defer w.notifyRotate.Evict(notifyRotate)
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
followLogs(currentFile, watcher, w.createDecoder, config.Since, config.Until)
}
func tailFiles(files []SizeReaderAt, watcher *LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config ReadConfig) {
@ -398,7 +391,7 @@ func decompressfile(fileName, destFileName string, since time.Time) (*os.File, e
return rs, nil
}
func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
func followLogs(f *os.File, logWatcher *LogWatcher, createDecoder makeDecoderFunc, since, until time.Time) {
decodeLogLine := createDecoder(f)
name := f.Name()
@ -445,11 +438,11 @@ func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-notifyRotate:
case <-logWatcher.WatchProducerGone():
return errDone
case <-logWatcher.WatchConsumerGone():
return errDone
default:
}
if err := handleRotate(); err != nil {
return err
@ -510,7 +503,6 @@ func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{
// ready to try again
continue
}
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue

View File

@ -27,12 +27,12 @@ import (
)
func TestReadFile(t *testing.T) {
reader, err := NewLogFile("/Users/qingguo/gopath/src/github.com/goodrain/rainbond/test/dockerlog/tes.log", 3, false, decodeFunc, 0640, getTailReader)
reader, err := NewLogFile("../../../test/dockerlog/tes.log", 3, false, decodeFunc, 0640, getTailReader)
if err != nil {
t.Fatal(err)
}
watch := NewLogWatcher()
reader.ReadLogs(ReadConfig{Follow: true, Tail: 10, Since: time.Now()}, watch)
reader.ReadLogs(ReadConfig{Follow: true, Tail: 0, Since: time.Now()}, watch)
defer watch.ConsumerGone()
LogLoop:
for {

View File

@ -128,7 +128,7 @@ func (c *ContainerLogManage) handleLogger() {
retry := 0
for retry < maxJSONDecodeRetry {
retry++
reader, err := NewLogFile(cevent.Container.LogPath, 3, false, decodeFunc, 0640, getTailReader)
reader, err := NewLogFile(cevent.Container.LogPath, 2, false, decodeFunc, 0640, getTailReader)
if err != nil {
logrus.Errorf("create logger failure %s", err.Error())
time.Sleep(time.Second * 1)

View File

@ -7,15 +7,16 @@ import (
"testing"
"time"
"github.com/goodrain/rainbond/node/nodem/logger"
"fmt"
"github.com/docker/docker/daemon/logger"
"github.com/pborman/uuid"
)
func TestStreamLogBeak(t *testing.T) {
log, err := New(logger.Context{
log, err := New(logger.Info{
ContainerID: uuid.New(),
ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()},
Config: map[string]string{"stream-server": "127.0.0.1:6362"},
@ -51,7 +52,7 @@ func TestStreamLogBeak(t *testing.T) {
func TestStreamLog(t *testing.T) {
log, err := New(logger.Context{
log, err := New(logger.Info{
ContainerID: uuid.New(),
ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()},
Config: map[string]string{"stream-server": "127.0.0.1:6362"},
@ -76,7 +77,7 @@ func TestStreamLog(t *testing.T) {
}
func BenchmarkStreamLog(t *testing.B) {
log, err := New(logger.Context{
log, err := New(logger.Info{
ContainerID: uuid.New(),
ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()},
Config: map[string]string{"stream-server": "127.0.0.1:5031"},

View File

@ -396,7 +396,6 @@ func (b *Exporter) GCollector() {
oldGauges := len(b.Gauges.Elements)
oldHistograms := len(b.Histograms.Elements)
oldSummaries := len(b.Summaries.Elements)
for k, v := range b.Counters.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -404,7 +403,6 @@ func (b *Exporter) GCollector() {
b.Counters.Register.Unregister(v)
}
}
for k, v := range b.Gauges.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -412,7 +410,6 @@ func (b *Exporter) GCollector() {
b.Gauges.Register.Unregister(v)
}
}
for k, v := range b.Histograms.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -420,7 +417,6 @@ func (b *Exporter) GCollector() {
b.Histograms.Register.Unregister(v)
}
}
for k, v := range b.Summaries.Elements {
oldTime := v.GetTimestamp()
if (currentTime - oldTime) > HP {
@ -428,12 +424,10 @@ func (b *Exporter) GCollector() {
b.Summaries.Register.Unregister(v)
}
}
logrus.Debugf("current amount for Counters: %v => %v", oldCounters, len(b.Counters.Elements))
logrus.Debugf("current amount for Gauges: %v => %v", oldGauges, len(b.Gauges.Elements))
logrus.Debugf("current amount for Histograms: %v => %v", oldHistograms, len(b.Histograms.Elements))
logrus.Debugf("current amount for Summaries: %v => %v", oldSummaries, len(b.Summaries.Elements))
logrus.Info("clean exporter data complete")
}
}