From e2596a74c2802f10c0827c0aba17de5e1a514643 Mon Sep 17 00:00:00 2001 From: barnett <576501057@qq.com> Date: Tue, 13 Aug 2019 14:13:46 +0800 Subject: [PATCH 1/3] change readme --- README.md | 55 +++++++++++------------- README_EN.md | 115 --------------------------------------------------- 2 files changed, 24 insertions(+), 146 deletions(-) delete mode 100644 README_EN.md diff --git a/README.md b/README.md index f2e46d851..c4a775ffc 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ - + [![Go Report Card](https://goreportcard.com/badge/github.com/goodrain/rainbond)](https://goreportcard.com/report/github.com/goodrain/rainbond) [![GitHub stars](https://img.shields.io/github/stars/goodrain/rainbond.svg?style=flat-square)](https://github.com/goodrain/rainbond/stargazers) @@ -6,15 +6,19 @@ [![Build Status](https://travis-ci.org/goodrain/rainbond.svg?branch=master)](https://travis-ci.org/goodrain/rainbond) [![GoDoc](https://godoc.org/github.com/goodrain/rainbond?status.svg)](https://godoc.org/github.com/goodrain/rainbond) -[项目官网](http://www.rainbond.com) • [文档](https://www.rainbond.com/docs/) • [README in English](https://github.com/goodrain/rainbond/blob/master/README_EN.md) +[项目官网](http://www.rainbond.com) • [文档](https://www.rainbond.com/docs/) **Rainbond** 企业应用云操作系统 -Rainbond(云帮)是企业应用的操作系统。 Rainbond支撑企业应用的开发、架构、交付和运维的全流程,通过“无侵入”架构,无缝衔接各类企业应用,底层资源可以对接和管理IaaS、虚拟机和物理服务器。 +Rainbond(云帮)是云原生下企业应用操作系统。 Rainbond支撑企业应用的开发、架构、交付和运维的全流程,通过“无侵入”架构,无缝衔接各类企业应用,底层资源可以对接和管理IaaS、虚拟机和物理服务器。 + ``` 企业应用包括: -各类信息系统、OA、CRM、ERP、数据库、大数据、物联网、互联网平台、微服务架构等运行在企业内部的各种系统 +各类信息系统、OA、CRM、ERP、数据库、大数据、物联网、互联网平台、微服务架构等运行在企业内部的各种系统。 ``` + + + ## 应用场景 * 企业应用开发 @@ -33,21 +37,22 @@ DevOps开发流水线、微服务架构、服务治理及各类技术工具“ | 特性 | 描述 | | -------------------------- | ------------------------------------------------------------ | -| 超越Kubernetes | 平台底层基于Kubernetes,但用户无需学习和编辑复杂的yaml文件,通过应用级图形界面操作使用,实现业务流程开箱即用。 | -| 原生Service Mesh微服务架构 | 跨语言、跨协议、代码无侵入的Service Mesh微服务架构原生支持,传统应用直接变成微服务架构。同时支持常见微服务架构Spring Cloud、Dubbo等,通过插件扩展架构能力及治理功能。 | +| Kubernetes | 平台底层基于Kubernetes,但用户无需学习和编辑复杂的yaml文件,通过应用级图形界面操作使用,实现业务流程开箱即用。 | +| Service Mesh微服务架构 | 内置跨语言、跨协议、代码无侵入的Service Mesh微服务架构原生支持,传统应用直接变成微服务架构。同时支持常见微服务架构Spring Cloud、Dubbo等,通过插件扩展架构能力及治理功能。 | | 一体化DevOps | 衔接需求、开发、测试、构建、上线、运维的一体化DevOps。支持对接第三方软件(Jira、Sonar、Jenkins、Gitlab等) | | 企业级应用市场 | 非镜像市场和服务目录,支持各类企业级应用,像手机应用一样即点即用,全流程管理(应用开发、应用发布、应用展示、应用离线导入/导出、应用安装/升级、应用运维) | | 自动化运维 | 应用自动化运维。节点自动安装、扩容、监控、容错。平台支持高可用、多数据中心管理、多租户管理。 | | Serverless PaaS | 以应用为核心,使用过程不需要了解服务器相关概念,简单灵活。通过对接行业应用,快速构建行业专有PaaS。 | | 应用网关 | 基于HTTP、HTTPs、TCP、UDP等协议应用访问控制策略,轻松操作应用灰度发布、A/B测试。 | -| 异构服务统一管理 | 支持集群内外不同架构服务统一管理和通信治理 | +| 异构服务统一管理 | 支持集群内外不同架构服务统一管理、监控和通信治理。 | 更多功能特性详见: [Rainbond功能特性说明](https://www.rainbond.com/docs/quick-start/edition/) ## 快速开始 -1. [安装 Rainbond 集群](https://www.rainbond.com/docs/quick-start/rainbond_install/) +1. [快速安装 Rainbond 集群](https://www.rainbond.com/docs/quick-start/rainbond_install/) 2. [创建第一个应用(服务)](https://www.rainbond.com/docs/user-manual/app-creation/) -3. [搭建 ServiceMesh 微服务架构](https://www.rainbond.com/docs/advanced-scenarios/micro/) +3. [观看教程视频,快速学习Rainbond](https://www.rainbond.com/video.html) +4. [搭建 ServiceMesh 微服务架构](https://www.rainbond.com/docs/advanced-scenarios/micro/) ## 社区 @@ -55,40 +60,28 @@ DevOps开发流水线、微服务架构、服务治理及各类技术工具“ [Rainbond 项目官网](https://www.rainbond.com) -
+
添加微信,申请加入微信群,了解Rainbond更多资讯
-## 开发路线计划 - -点击查看 Rainbond 版本开发计划 [Roadmap](https://www.rainbond.com/docs/quick-start/roadmap/) - -## 架构 - - - -## 产品图示 - - - -- 应用组装部署示意图 - - - -- 应用网关管理示意图 - ## 参与贡献 你可以参与Rainbond社区关于平台、应用、插件等领域的贡献和分享。 + [参与Rainbond项目](https://www.rainbond.com/docs/contribution/) + [Rainbond 贡献者社区](https://t.goodrain.com/c/contribution) ## 相关项目 - * [Rainbond-Console](https://github.com/goodrain/rainbond-console) Rainbond控制台业务层 - * [Rainbond-Console-UI](https://github.com/goodrain/rainbond-ui) Rainbond控制台UI组件 - * [Rainbond-Install](https://github.com/goodrain/rainbond-ansible) Rainbond安装工具 +当前仓库为Rainbond数据中心端核心服务实现代码,项目还包括以下子项目: + + * [Rainbond-Console](https://github.com/goodrain/rainbond-console) Rainbond控制台服务端 + * [Rainbond-Console-UI](https://github.com/goodrain/rainbond-ui) Rainbond控制台前端 + * [Rainbond-Ansible](https://github.com/goodrain/rainbond-ansible) Rainbond安装工具 * [Rainbond-Builder](https://github.com/goodrain/builder) Rainbond源码构建工具集 * [Rainbond-Docs](https://github.com/goodrain/rainbond-docs) Rainbond文档 + * Rainbond-Resource/UI (企业版) + * Rainbond-APP-Store (企业版) ## License diff --git a/README_EN.md b/README_EN.md deleted file mode 100644 index 3da02e323..000000000 --- a/README_EN.md +++ /dev/null @@ -1,115 +0,0 @@ - - -[![Go Report Card](https://goreportcard.com/badge/github.com/goodrain/rainbond)](https://goreportcard.com/report/github.com/goodrain/rainbond) -[![GitHub stars](https://img.shields.io/github/stars/goodrain/rainbond.svg?style=flat-square)](https://github.com/goodrain/rainbond/stargazers) -![Rainbond version](https://img.shields.io/badge/version-v5.1-brightgreen.svg) -[![Build Status](https://travis-ci.org/goodrain/rainbond.svg?branch=master)](https://travis-ci.org/goodrain/rainbond) -[![GoDoc](https://godoc.org/github.com/goodrain/rainbond?status.svg)](https://godoc.org/github.com/goodrain/rainbond) - -[项目官网](http://www.rainbond.com) • [文档](https://www.rainbond.com/docs/) • [README in English](https://github.com/goodrain/rainbond/blob/master/README_EN.md) - -## **Rainbond** ENTERPRISE APPLICATION CLOUD OS - -Rainbond(云帮)是企业应用的操作系统。 Rainbond支撑企业应用的开发、架构、交付和运维的全流程,通过“无侵入”架构,无缝衔接各类企业应用,底层资源可以对接和管理IaaS、虚拟机和物理服务器。 - -Rainbond is a cloud OS for enterprise applications. It provides complete set of supports for enterprise applications' development, architecture, delivery and operation, can seamlessly docking almost all kinds of enterprise applications through "non-invasive" platform architecture, interface and manage underlying computing resources such as IaaS, virtual machine and physical servers. - -``` -Enterprise Applications include: -information system, OA, CRM, ERP, database, big data, IOT, internet platform and microservice architecture etc. -``` -## Be applied to - -* Enterprise Application Developement - -The development environment, micro-service architecture, service governance and various technical tools are “out of the box”, without changing development habits, allowing companies to focus on their business and improving efficiency by 10 times. - -* Enterprise Application Delivery - -Support continuous delivery, enterprise application market delivery, SaaS, enterprise application sales, secondary development and other delivery processes, unified customer management, and balanced delivery and personalized delivery. - -* Enterprise Application Operation - -Transparently interfaces and manages a variety of computing resources, naturally achieves cloudy and hybrid clouds, enterprise application automation and operation, and doubles resource utilization. - -## Features - -| Features | Description | -| -------------------------- | ------------------------------------------------------------ | -| beyond Kubernetes | based on kubernetes, but users do not need to learn and edit complex yaml files, achieved "out-of-the-box" business process by application-level graphical interface | -| native Service Mesh microservice architecture | Thanks to the cross-language, cross-protocol, code-free service Mesh microservices architecture native support, traditional applications can become microservice architecture directly. Support Spring Cloud, Dubbo, etc., and can easily expand the architectural capabilities and governance functions by adding plug-ins. | -| Integrated DevOps | Integrate DevOps for demand, development, testing, construction, online, and operation and maintenance. Support for docking third party software (Jira, Sonar, Jenkins, Gitlab, etc.) | -| Enterpeise-level application market | Not a simple mirror market and service catalog, but supports all kinds of enterprise-level applications, just like install and manage mobile apps, click-to-use, full process management (application development, application publishing, application display, application offline import/export, application installation/upgrade, application operation and maintenance) | -| Automated operation and maintenance | Automated application operation and maintenance. Nodes are automatically installed, expanded, monitored, and fault tolerant. The platform supports high availability, multiple data center management, and multi-tenant management. | -| Serverless PaaS | With the application-centric design philosophy, users do not need to understand the server-related concepts, and is simple and flexible. Quickly build industry-specific PaaS through docking industry applications. | -| Application Gateway | Applying access control policies based on protocols such as HTTP, HTTPs, TCP, and UDP, it is easy to operate grayscale publishing and A/B testing. | - -More features: [Rainbond features description](https://www.rainbond.com/docs/quick-start/edition/) - -## Quick Start - -1. [install Rainbond cluster](https://www.rainbond.com/docs/quick-start/rainbond_install/) -2. [create the first application / service](https://www.rainbond.com/docs/user-manual/app-creation/service_create/) -3. [build ServiceMesh microservice architecture](https://www.rainbond.com/docs/advanced-scenarios/micro/) - -## Community - -[Rainbond forum](https://t.goodrain.com) - -[Rainbond website](https://www.rainbond.com) - -## Roadmap - -See [Rainbond Roadmap](https://www.rainbond.com/docs/quick-start/roadmap/) - -## Architecture - - - -## Snapshot - - - -- Application assembly deployment diagram - - - -- Application gateway management schematic diagram - -## Contribution - -You can participate in the Rainbond community's contributions and sharing on platforms, applications, plugins, and more. -[Participate in Rainbond Project](https://www.rainbond.com/docs/contribution/) -[Rainbond Contribution](https://t.goodrain.com/c/contribution) - -## Related Projects - * [Rainbond-Console](https://github.com/goodrain/rainbond-console) - * [Rainbond-Console-UI](https://github.com/goodrain/rainbond-ui) - * [Rainbond-Install](https://github.com/goodrain/rainbond-ansible) - * [Rainbond-Builder](https://github.com/goodrain/builder) - * [Rainbond-Docs](https://github.com/goodrain/rainbond-docs) - -## License - -Rainbond is released under LGPL-3.0 license, see [LICENSE](https://github.com/goodrain/rainbond/blob/master/LICENSE) and [Licensing](https://github.com/goodrain/rainbond/blob/master/Licensing.md)。 - -## Special THANKS - -Thanks to the following open source projects - -- [Kubernetes](https://github.com/kubernetes/kubernetes) -- [Docker/Moby](https://github.com/moby/moby) -- [Heroku Buildpacks](https://github.com/heroku?utf8=%E2%9C%93&q=buildpack&type=&language=) -- [OpenResty](https://github.com/openresty/) -- [Calico](https://github.com/projectcalico) -- [Midonet](https://github.com/midonet/midonet) -- [Etcd](https://github.com/coreos/etcd) -- [Prometheus](https://github.com/prometheus/prometheus) -- [GlusterFS](https://github.com/gluster/glusterfs) -- [Ceph](https://github.com/ceph/ceph) -- [CockroachDB](https://github.com/cockroachdb/cockroach) -- [MySQL](https://github.com/mysql/mysql-server) -- [Weave Scope](https://github.com/weaveworks/scope) -- [Ant Design](https://github.com/ant-design/ant-design) - - From ee6fbf4a536cdf742920400fdae5697208c52bf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=87=A1=E7=BE=8A=E7=BE=8A?= Date: Fri, 16 Aug 2019 19:44:18 +0800 Subject: [PATCH 2/3] [fix] resolve node install no log --- event/manager.go | 4 +++ node/core/service/node_service.go | 9 ++---- node/core/service/node_service_test.go | 38 ++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 node/core/service/node_service_test.go diff --git a/event/manager.go b/event/manager.go index 916d1e088..401861c31 100644 --- a/event/manager.go +++ b/event/manager.go @@ -49,6 +49,8 @@ type Manager interface { Close() error ReleaseLogger(Logger) } + +// EventConfig event config struct type EventConfig struct { EventLogServers []string DiscoverAddress []string @@ -263,6 +265,7 @@ func (m *manager) getLBChan() chan []byte { m.qos = atomic.AddInt32(&(m.qos), 1) server := m.eventServer[index] if _, ok := m.abnormalServer[server]; ok { + logrus.Warnf("server[%s] is abnormal, skip it", server) continue } if h, ok := m.handles[server]; ok { @@ -431,6 +434,7 @@ func (l *loggerWriter) Write(b []byte) (n int, err error) { if l.fmt != "" { message = fmt.Sprintf(l.fmt, message) } + logrus.Debugf("step: %s, level: %s;write message : %v", l.step, l.level, message) l.l.send(message, map[string]string{"step": l.step, "level": l.level}) } return len(b), nil diff --git a/node/core/service/node_service.go b/node/core/service/node_service.go index 9d0bffaf3..ee65ff0c5 100644 --- a/node/core/service/node_service.go +++ b/node/core/service/node_service.go @@ -52,6 +52,7 @@ type NodeService struct { func CreateNodeService(c *option.Conf, nodecluster *node.Cluster, kubecli kubecache.KubeClient) *NodeService { if err := event.NewManager(event.EventConfig{ DiscoverAddress: c.Etcd.Endpoints, + EventLogServers: c.EventLogServer, }); err != nil { logrus.Errorf("create event manager faliure") } @@ -159,11 +160,6 @@ func (n *NodeService) AsynchronousInstall(node *client.HostNode, eventID string) logrus.Error("write hosts file error ", err.Error()) return } - if err := event.NewManager(event.EventConfig{ - DiscoverAddress: n.c.Etcd.Endpoints, - }); err != nil { - logrus.Errorf("create event manager faliure") - } // start add node script logrus.Infof("Begin install node %s", node.ID) // write log to event log @@ -179,9 +175,10 @@ func (n *NodeService) AsynchronousInstall(node *client.HostNode, eventID string) Stdout: logger.GetWriter("node-install", "info"), Stderr: logger.GetWriter("node-install", "err"), } + err = ansibleUtil.RunNodeInstallCmd(option) if err != nil { - logrus.Error("Error executing shell script", err) + logrus.Error("Error executing shell script : ", err) node.Status = client.InstallFailed node.NodeStatus.Status = client.InstallFailed n.nodecluster.UpdateNode(node) diff --git a/node/core/service/node_service_test.go b/node/core/service/node_service_test.go new file mode 100644 index 000000000..b7c4bc411 --- /dev/null +++ b/node/core/service/node_service_test.go @@ -0,0 +1,38 @@ +package service + +import ( + "testing" + + "github.com/goodrain/rainbond/cmd/node/option" + "github.com/goodrain/rainbond/event" + + etcdClient "github.com/coreos/etcd/clientv3" + "github.com/goodrain/rainbond/node/core/store" + "github.com/goodrain/rainbond/node/masterserver" + "github.com/goodrain/rainbond/node/masterserver/node" + "github.com/goodrain/rainbond/node/nodem/client" +) + +func TestAsynchronousInstall(t *testing.T) { + config := &option.Conf{ + Etcd: etcdClient.Config{Endpoints: []string{"192.168.195.1:2379"}}, + EventLogServer: []string{"192.168.195.1:6366"}, + } + + store.NewClient(config) + // node *client.HostNode, eventID string + eventID := "fanyangyang" + // nodemanager := nodem.NewNodeManager(&option.Conf{}) + currentNode := client.HostNode{ + ID: "123", + Role: []string{"manage"}, + InternalIP: "127.0.0.1", + RootPass: "password", + } + cluster := node.CreateCluster(nil, ¤tNode, nil) + cluster.CacheNode(¤tNode) + ms := &masterserver.MasterServer{Cluster: cluster} + nodeService := CreateNodeService(config, ms.Cluster, nil) + nodeService.AsynchronousInstall(¤tNode, eventID) + event.GetManager().Close() +} From 1f6033c7e679c7d180f56d26e4684e1e0ba22ced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=BE=E5=BA=86=E5=9B=BD?= Date: Thu, 22 Aug 2019 15:05:56 +0800 Subject: [PATCH 3/3] Optimize the application log collection link and log push link #371 --- api/api_routers/websocket/websocket.go | 3 +- api/controller/websocket.go | 37 ++- api/proxy/lb.go | 12 + api/proxy/websocket_proxy.go | 88 +---- cmd/eventlog/eventlog.go | 3 +- cmd/eventlog/server/server.go | 12 +- event/manager.go | 5 - eventlog/Makefile | 49 --- eventlog/README.md | 37 --- eventlog/exit/web/manager.go | 46 ++- eventlog/exit/web/pusher.go | 309 ++++++++++++++++++ eventlog/exit/webhook/manager.go | 187 ----------- eventlog/exit/webhook/manager_test.go | 34 -- eventlog/store/barrel.go | 30 +- eventlog/store/docker_log_store.go | 7 +- eventlog/store/handle_message_store.go | 18 +- eventlog/store/manager.go | 130 +------- eventlog/store/monitor_message_store.go | 149 --------- eventlog/store/new_monitor_message_store.go | 16 +- eventlog/store/store.go | 18 - node/nodem/logger/copier.go | 9 +- node/nodem/logger/logger_file.go | 16 +- node/nodem/logger/logger_file_test.go | 4 +- node/nodem/logger/manage.go | 2 +- node/nodem/logger/streamlog/streamlog_test.go | 9 +- node/statsd/exporter/exporter.go | 6 - 26 files changed, 427 insertions(+), 809 deletions(-) delete mode 100644 eventlog/Makefile delete mode 100644 eventlog/README.md create mode 100644 eventlog/exit/web/pusher.go delete mode 100644 eventlog/exit/webhook/manager.go delete mode 100644 eventlog/exit/webhook/manager_test.go delete mode 100644 eventlog/store/monitor_message_store.go diff --git a/api/api_routers/websocket/websocket.go b/api/api_routers/websocket/websocket.go index eec37ad2e..638bcefb8 100644 --- a/api/api_routers/websocket/websocket.go +++ b/api/api_routers/websocket/websocket.go @@ -32,6 +32,7 @@ func Routes() chi.Router { r.Get("/monitor_message", controller.GetMonitorMessage().Get) r.Get("/new_monitor_message", controller.GetMonitorMessage().Get) r.Get("/event_log", controller.GetEventLog().Get) + r.Get("/services/{serviceID}/pubsub", controller.GetPubSubControll().Get) return r } @@ -43,7 +44,7 @@ func LogRoutes() chi.Router { return r } -//LogRoutes 应用导出包下载路由 +//AppRoutes 应用导出包下载路由 func AppRoutes() chi.Router { r := chi.NewRouter() r.Get("/download/{format}/{fileName}", controller.GetManager().Download) diff --git a/api/controller/websocket.go b/api/controller/websocket.go index 6cbd45513..b3bccf4e5 100644 --- a/api/controller/websocket.go +++ b/api/controller/websocket.go @@ -19,15 +19,15 @@ package controller import ( + "context" "net/http" "os" - "github.com/goodrain/rainbond/api/discover" - "github.com/goodrain/rainbond/api/proxy" - "github.com/Sirupsen/logrus" - "github.com/go-chi/chi" + "github.com/goodrain/rainbond/api/discover" + "github.com/goodrain/rainbond/api/handler" + "github.com/goodrain/rainbond/api/proxy" ) //DockerConsole docker console @@ -175,3 +175,32 @@ func (d LogFile) GetInstallLog(w http.ResponseWriter, r *http.Request) { w.WriteHeader(404) } } + +var pubSubControll *PubSubControll + +//PubSubControll service pub sub +type PubSubControll struct { + socketproxy proxy.Proxy +} + +//GetPubSubControll get service pub sub controller +func GetPubSubControll() *PubSubControll { + if pubSubControll == nil { + pubSubControll = &PubSubControll{ + socketproxy: proxy.CreateProxy("dockerlog", "websocket", defaultEventLogEndpoints), + } + discover.GetEndpointDiscover(defaultEtcdEndpoints).AddProject("event_log_event_http", pubSubControll.socketproxy) + } + return pubSubControll +} + +//Get get +func (d PubSubControll) Get(w http.ResponseWriter, r *http.Request) { + serviceID := chi.URLParam(r, "serviceID") + name, _ := handler.GetEventHandler().GetLogInstance(serviceID) + if name != "" { + r.URL.Query().Add("host_id", name) + r = r.WithContext(context.WithValue(r.Context(), proxy.ContextKey("host_id"), name)) + } + d.socketproxy.Proxy(w, r) +} diff --git a/api/proxy/lb.go b/api/proxy/lb.go index 003db21f3..c233b8c98 100644 --- a/api/proxy/lb.go +++ b/api/proxy/lb.go @@ -22,8 +22,13 @@ import ( "net/http" "strings" "sync/atomic" + + "github.com/Sirupsen/logrus" ) +//ContextKey context key +type ContextKey string + // RoundRobin round robin loadBalance impl type RoundRobin struct { ops *uint64 @@ -170,7 +175,14 @@ func (s *SelectBalance) Select(r *http.Request, endpoints EndpointList) Endpoint if r.URL != nil { hostID := r.URL.Query().Get("host_id") + if hostID == "" { + hostIDFromContext := r.Context().Value(ContextKey("host_id")) + if hostIDFromContext != nil { + hostID = hostIDFromContext.(string) + } + } if e, ok := id2ip[hostID]; ok { + logrus.Debugf("[lb selelct] find host %s from name %s success", e, hostID) return Endpoint(e) } } diff --git a/api/proxy/websocket_proxy.go b/api/proxy/websocket_proxy.go index 99ee27f38..f99b9fdc8 100644 --- a/api/proxy/websocket_proxy.go +++ b/api/proxy/websocket_proxy.go @@ -38,93 +38,7 @@ type WebSocketProxy struct { upgrader *websocket.Upgrader } -//Proxy 代理 -// func (h *WebSocketProxy) Proxy(w http.ResponseWriter, r *http.Request) { -// upgrader := websocket.Upgrader{ -// EnableCompression: true, -// Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) { -// w.WriteHeader(500) -// }, -// CheckOrigin: func(r *http.Request) bool { -// return true -// }, -// } -// conn, err := upgrader.Upgrade(w, r, nil) -// if err != nil { -// logrus.Error("Create web socket conn error.", err.Error()) -// return -// } -// defer func() { -// conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) -// conn.Close() -// }() -// endpoint := h.lb.Select(r, h.endpoints) -// path := r.RequestURI -// if strings.Contains(path, "?") { -// path = path[:strings.Index(path, "?")] -// } -// u := url.URL{Scheme: "ws", Host: endpoint.String(), Path: path} -// logrus.Infof("connecting to %s", u.String()) -// c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) -// if err != nil { -// logrus.Errorf("dial websocket endpoint %s error. %s", u.String(), err.Error()) -// w.WriteHeader(500) -// return -// } -// defer c.Close() -// done := make(chan struct{}) -// go func() { -// defer close(done) -// for { -// select { -// case <-r.Context().Done(): -// return -// default: -// } -// t, message, err := c.ReadMessage() -// if err != nil { -// logrus.Println("read proxy websocket message error: ", err) -// return -// } -// err = conn.WriteMessage(t, message) -// if err != nil { -// logrus.Println("write client websocket message error: ", err) -// return -// } -// } -// }() -// for { -// select { -// case <-r.Context().Done(): -// // To cleanly close a connection, a client should send a close -// // frame and wait for the server to close the connection. -// err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) -// if err != nil { -// log.Println("write close:", err) -// return -// } -// select { -// case <-done: -// case <-time.After(time.Second): -// } -// return -// case <-done: -// return -// default: -// } -// t, message, err := conn.ReadMessage() -// if err != nil { -// logrus.Errorln("read client websocket message error: ", err) -// return -// } -// err = c.WriteMessage(t, message) -// if err != nil { -// logrus.Errorln("write proxy websocket message error: ", err) -// return -// } -// } -// } - +//Proxy websocket proxy func (h *WebSocketProxy) Proxy(w http.ResponseWriter, req *http.Request) { endpoint := h.lb.Select(req, h.endpoints) logrus.Info("Proxy webSocket to: ", endpoint) diff --git a/cmd/eventlog/eventlog.go b/cmd/eventlog/eventlog.go index 3e794809e..75a1b6508 100644 --- a/cmd/eventlog/eventlog.go +++ b/cmd/eventlog/eventlog.go @@ -21,9 +21,10 @@ package main import ( "os" + _ "net/http/pprof" + "github.com/goodrain/rainbond/cmd" "github.com/goodrain/rainbond/cmd/eventlog/server" - "github.com/spf13/pflag" ) diff --git a/cmd/eventlog/server/server.go b/cmd/eventlog/server/server.go index a7bfaf9dd..5402802c9 100644 --- a/cmd/eventlog/server/server.go +++ b/cmd/eventlog/server/server.go @@ -28,7 +28,6 @@ import ( "github.com/goodrain/rainbond/eventlog/conf" "github.com/goodrain/rainbond/eventlog/entry" "github.com/goodrain/rainbond/eventlog/exit/web" - "github.com/goodrain/rainbond/eventlog/exit/webhook" "github.com/goodrain/rainbond/eventlog/store" "os" @@ -82,7 +81,7 @@ func (s *LogServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Conf.EventStore.GarbageMessageFile, "message.garbage.file", "/var/log/envent_garbage_message.log", "save garbage message file path when save type is file") fs.Int64Var(&s.Conf.EventStore.PeerEventMaxLogNumber, "message.max.number", 100000, "the max number log message for peer event") fs.IntVar(&s.Conf.EventStore.PeerEventMaxCacheLogNumber, "message.cache.number", 256, "Maintain log the largest number in the memory peer event") - fs.Int64Var(&s.Conf.EventStore.PeerDockerMaxCacheLogNumber, "dockermessage.cache.number", 512, "Maintain log the largest number in the memory peer docker service") + fs.Int64Var(&s.Conf.EventStore.PeerDockerMaxCacheLogNumber, "dockermessage.cache.number", 128, "Maintain log the largest number in the memory peer docker service") fs.IntVar(&s.Conf.EventStore.HandleMessageCoreNumber, "message.handle.core.number", 2, "The number of concurrent processing receive log data.") fs.IntVar(&s.Conf.EventStore.HandleSubMessageCoreNumber, "message.sub.handle.core.number", 3, "The number of concurrent processing receive log data. more than message.handle.core.number") fs.IntVar(&s.Conf.EventStore.HandleDockerLogCoreNumber, "message.dockerlog.handle.core.number", 2, "The number of concurrent processing receive log data. more than message.handle.core.number") @@ -106,8 +105,6 @@ func (s *LogServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.Conf.EventStore.DB.PoolSize, "db.pool.size", 3, "Data persistence db pool init size.") fs.IntVar(&s.Conf.EventStore.DB.PoolMaxSize, "db.pool.maxsize", 10, "Data persistence db pool max size.") fs.StringVar(&s.Conf.EventStore.DB.HomePath, "docker.log.homepath", "/grdata/logs/", "container log persistent home path") - fs.StringVar(&s.Conf.WebHook.ConsoleURL, "webhook.console.url", "http://console.goodrain.me", "console web api url") - fs.StringVar(&s.Conf.WebHook.ConsoleToken, "webhook.console.token", "", "console web api token") fs.StringVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerHost, "monitor.udp.host", "0.0.0.0", "receive new monitor udp server host") fs.IntVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerPort, "monitor.udp.port", 6166, "receive new monitor udp server port") fs.StringVar(&s.Conf.Cluster.Discover.NodeIDFile, "nodeid-file", "/opt/rainbond/etc/node/node_host_uuid.conf", "the unique ID for this node. Just specify, don't modify") @@ -173,9 +170,6 @@ func (s *LogServer) InitConf() { if os.Getenv("CLUSTER_BIND_IP") != "" { s.Conf.Cluster.PubSub.PubBindIP = os.Getenv("CLUSTER_BIND_IP") } - if os.Getenv("CONSOLE_TOKEN") != "" { - s.Conf.WebHook.ConsoleToken = os.Getenv("CONSOLE_TOKEN") - } } //Run 执行 @@ -183,10 +177,6 @@ func (s *LogServer) Run() error { s.Logger.Debug("Start run server.") log := s.Logger - if err := webhook.InitManager(s.Conf.WebHook, log.WithField("module", "WebHook")); err != nil { - return err - } - //init new db if err := db.CreateDBManager(s.Conf.EventStore.DB); err != nil { logrus.Infof("create db manager error, %v", err) diff --git a/event/manager.go b/event/manager.go index 916d1e088..315abbc1a 100644 --- a/event/manager.go +++ b/event/manager.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "os" - "strings" "sync" "sync/atomic" "time" @@ -424,10 +423,6 @@ func (l *loggerWriter) SetFormat(f string) { func (l *loggerWriter) Write(b []byte) (n int, err error) { if b != nil && len(b) > 0 { message := string(b) - message = strings.Replace(message, "\r", "", -1) - message = strings.Replace(message, "\n", "", -1) - message = strings.Replace(message, "\u0000", "", -1) - message = strings.Replace(message, "\"", "", -1) if l.fmt != "" { message = fmt.Sprintf(l.fmt, message) } diff --git a/eventlog/Makefile b/eventlog/Makefile deleted file mode 100644 index 168331ce8..000000000 --- a/eventlog/Makefile +++ /dev/null @@ -1,49 +0,0 @@ -GO_LDFLAGS=-ldflags " -w" -IMAGE_NAME=hub.goodrain.com/dc-deploy/acp_event-log -IMAGE_TAG=3.2 - -install: - @go build ${GO_LDFLAGS} - -dev:install - ./event_log --log.level=debug --discover.etcd.addr=http://127.0.0.1:2379 \ - --db.url="root:admin@tcp(127.0.0.1:3306)/event" \ - --dockerlog.mode=stream \ - --message.dockerlog.handle.core.number=2 \ - --message.garbage.file="/tmp/garbage.log" \ - --docker.log.homepath="/Users/qingguo/tmp" -dev-cluster: - @./event_log --log.level=debug --discover.etcd.addr=http://127.0.0.1:2379 \ - --db.url="root:admin@tcp(127.0.0.1:3306)/event" \ - --message.garbage.file="/tmp/garbage.log" \ - --eventlog.bind.port=2000 \ - --dockerlog.bind.port=2001 \ - --cluster.bind.port=2002 \ - --websocket.bind.port=2003\ - --monitor.subaddress=tcp://127.0.0.1:9441\ - --docker.log.homepath="/Users/qingguo" - -build-alpine: - @echo "🐳 $@" - @docker build -t goodraim.me/event-build:v1 build - @echo "building..." - @docker run --rm -v `pwd`:/go/src/event_log -w /go/src/event_log goodraim.me/event-build:v1 go build ${GO_LDFLAGS} -o event_log - @echo "build done." - - -instance-ali: - @docker run --name event-log --net host -d -v /var/log/event-log/:/var/log/ ${IMAGE_NAME}:${IMAGE_TAG} \ - --db.url="root:admin@tcp(172.30.0.9:3306)/event" \ - --discover.etcd.addr="http://127.0.0.1:4001" - -instance-test:build-alpine - @docker run --name event-log-test --net host -d ${IMAGE_NAME}:${IMAGE_TAG} \ - --db.url="writer1:CeRYK8UzWD@tcp(10.0.55.72:3306)/region" \ - --discover.etcd.addr="http://127.0.0.1:4001" \ - --eventlog.bind.port=2000 \ - --dockerlog.bind.port=2001 \ - --cluster.bind.port=2002 \ - --websocket.bind.port=2003\ - --monitor.subaddress="tcp://10.0.1.11:9442" - -instance-ali-build:build-debian instance-ali diff --git a/eventlog/README.md b/eventlog/README.md deleted file mode 100644 index f70f92253..000000000 --- a/eventlog/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# v1版本说明 -1. 支持zmq-server接收消息。(已完成) -2. 支持集群自动发现和转发消息(已完成) -3. 支持webSocket转发消息,支持ssl协议(已完成) - * 建立连接后先发送缓存消息。 - * 完成后再发送实时消息 -4. 完成mysql存储。(已完成) -5. 完成消息分析和结果回调。(已完成) -6. 支持http接收消息。 - - -# v2版本说明 -1. 支持接收docker日志,并选举接收节点。 - 选举策略: - * etcd中已有对应关系,检测节点是否正常,如果正常返回该节点。 - * 对于新的serviceID根据监控数据选举闲节点。(以每分钟日志量+20倍服务量算标志,标志最小为最优) - * 应用关闭时,删除etcd中的对应数据。(region-api完成)。 - 选举请求: - * 请求由dockerd为容器创建logger时完成。 - * dockerd完成链接状态检测工作,若分配的服务端状态异常。重新请求心得节点。 -2. 支持集群间各类消息通信。 - * 操作日志消息。(及时) - * docker日志监控数据消息。(用于选举) - * 容器状态检测停止消息。(用于应用启动和停止操作处理节点不在同一节点) -3. docker日志文件存储。 - * 文件缓存写入。(每128条写入或每1分钟写入) - * 分布式文件存储。 - * 历史日志文件压缩。 - - -# v3版本说明 - -1. 支持监控数据websocket消息转发服务。 -2. 消息输入zmq sub订阅数据源。 -3. 进行数据切割,区分主题。 -3. 消息输出以不同的主题进行区分和订阅。 - diff --git a/eventlog/exit/web/manager.go b/eventlog/exit/web/manager.go index 2a2a6bfb5..df02683a9 100644 --- a/eventlog/exit/web/manager.go +++ b/eventlog/exit/web/manager.go @@ -20,31 +20,27 @@ package web import ( "encoding/json" + "fmt" "io/ioutil" "net/http" + "strings" "time" + "github.com/go-chi/chi" + + "github.com/Sirupsen/logrus" "github.com/goodrain/rainbond/eventlog/cluster" "github.com/goodrain/rainbond/eventlog/cluster/discover" "github.com/goodrain/rainbond/eventlog/conf" "github.com/goodrain/rainbond/eventlog/exit/monitor" "github.com/goodrain/rainbond/eventlog/store" - - "golang.org/x/net/context" - - "fmt" - - "strings" - - _ "net/http/pprof" - - "github.com/Sirupsen/logrus" httputil "github.com/goodrain/rainbond/util/http" "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" "github.com/twinj/uuid" + "golang.org/x/net/context" ) //SocketServer socket 服务 @@ -438,15 +434,16 @@ func (s *SocketServer) Run() error { return nil } func (s *SocketServer) listen() { - http.HandleFunc("/event_log", s.pushEventMessage) - http.HandleFunc("/docker_log", s.pushDockerLog) - http.HandleFunc("/monitor_message", s.pushMonitorMessage) - http.HandleFunc("/new_monitor_message", s.pushNewMonitorMessage) - http.HandleFunc("/monitor", func(w http.ResponseWriter, r *http.Request) { + r := chi.NewRouter() + r.Get("/event_log", s.pushEventMessage) + r.Get("/docker_log", s.pushDockerLog) + r.Get("/monitor_message", s.pushMonitorMessage) + r.Get("/new_monitor_message", s.pushNewMonitorMessage) + r.Get("/monitor", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) w.Write([]byte("ok")) }) - http.HandleFunc("/docker-instance", func(w http.ResponseWriter, r *http.Request) { + r.Get("/docker-instance", func(w http.ResponseWriter, r *http.Request) { ServiceID := r.FormValue("service_id") if ServiceID == "" { w.WriteHeader(412) @@ -466,21 +463,22 @@ func (s *SocketServer) listen() { url := fmt.Sprintf("tcp://%s:%d", instance.HostIP, instance.DockerLogPort) w.Write([]byte(`{"host":"` + url + `","status":"success"}`)) }) - http.HandleFunc("/event_push", s.receiveEventMessage) - http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + r.Get("/event_push", s.receiveEventMessage) + r.Get("/health", func(w http.ResponseWriter, r *http.Request) { if s.healthInfo["status"] != "health" { httputil.ReturnError(r, w, 400, "eventlog service unusual") } httputil.ReturnSuccess(r, w, s.healthInfo) }) + // new websocket pubsub + r.Get("/services/{serviceID}/pubsub", s.pubsub) //monitor setting - s.prometheus() - + s.prometheus(r) if s.conf.SSL { go func() { addr := fmt.Sprintf("%s:%d", s.conf.BindIP, s.conf.SSLBindPort) s.log.Infof("web socket ssl server listen %s", addr) - err := http.ListenAndServeTLS(addr, s.conf.CertFile, s.conf.KeyFile, nil) + err := http.ListenAndServeTLS(addr, s.conf.CertFile, s.conf.KeyFile, r) if err != nil { s.log.Error("websocket listen error.", err.Error()) s.listenErr <- err @@ -489,7 +487,7 @@ func (s *SocketServer) listen() { } addr := fmt.Sprintf("%s:%d", s.conf.BindIP, s.conf.BindPort) s.log.Infof("web socket server listen %s", addr) - err := http.ListenAndServe(addr, nil) + err := http.ListenAndServe(addr, r) if err != nil { s.log.Error("websocket listen error.", err.Error()) s.listenErr <- err @@ -554,11 +552,11 @@ func (s *SocketServer) receiveEventMessage(w http.ResponseWriter, r *http.Reques return } -func (s *SocketServer) prometheus() { +func (s *SocketServer) prometheus(r *chi.Mux) { prometheus.MustRegister(version.NewCollector("event_log")) exporter := monitor.NewExporter(s.storemanager, s.cluster) prometheus.MustRegister(exporter) - http.Handle(s.conf.PrometheusMetricPath, promhttp.Handler()) + r.Handle(s.conf.PrometheusMetricPath, promhttp.Handler()) } //ResponseType 返回内容 diff --git a/eventlog/exit/web/pusher.go b/eventlog/exit/web/pusher.go new file mode 100644 index 000000000..4712bfe2b --- /dev/null +++ b/eventlog/exit/web/pusher.go @@ -0,0 +1,309 @@ +// RAINBOND, Application Management Platform +// Copyright (C) 2014-2019 Goodrain Co., Ltd. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. For any non-GPL usage of Rainbond, +// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. +// must be obtained first. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package web + +import ( + "net/http" + "strings" + "sync" + "time" + + "github.com/goodrain/rainbond/eventlog/db" + + "github.com/goodrain/rainbond/util" + + "github.com/pquerna/ffjson/ffjson" + + "github.com/gorilla/websocket" +) + +//WebsocketMessage websocket message +type WebsocketMessage struct { + Event string `json:"event"` + Data interface{} `json:"data"` + Channel string `json:"channel,omitempty"` +} + +//Encode return json encode data +func (w *WebsocketMessage) Encode() []byte { + reb, _ := ffjson.Marshal(w) + return reb +} + +//PubContext websocket context +type PubContext struct { + ID string + upgrader websocket.Upgrader + conn *websocket.Conn + httpWriter http.ResponseWriter + httpRequest *http.Request + server *SocketServer + chans map[string]*Chan + lock sync.Mutex + close chan struct{} +} + +//Chan handle +type Chan struct { + ch chan *db.EventLogMessage + id string + chtype string + reevent string + channel string + p *PubContext +} + +//NewPubContext create context +func NewPubContext(upgrader websocket.Upgrader, + httpWriter http.ResponseWriter, + httpRequest *http.Request, + s *SocketServer, +) *PubContext { + return &PubContext{ + ID: util.NewUUID(), + upgrader: upgrader, + httpWriter: httpWriter, + httpRequest: httpRequest, + server: s, + chans: make(map[string]*Chan, 2), + close: make(chan struct{}), + } +} +func (p *PubContext) handleMessage(me []byte) { + var wm WebsocketMessage + if err := ffjson.Unmarshal(me, &wm); err != nil { + p.SendMessage(WebsocketMessage{Event: "error", Data: "Invalid message"}) + return + } + switch wm.Event { + case "pusher:subscribe": + p.handleSubscribe(wm) + } +} +func (p *PubContext) createChan(channel, chantype, id string) { + p.lock.Lock() + defer p.lock.Unlock() + if _, exist := p.chans[chantype+"-"+id]; exist { + return + } + ch := p.server.storemanager.WebSocketMessageChan(chantype, id, p.ID) + if ch != nil { + c := &Chan{ + ch: ch, + channel: chantype + "-" + id, + id: id, + chtype: chantype, + reevent: func() string { + if chantype == "docker" { + return "service:log" + } + if chantype == "newmonitor" { + return "monitor" + } + if chantype == "event" { + return "event:log" + } + return "" + }(), + p: p, + } + p.chans[chantype+"-"+id] = c + // send success message + p.SendMessage(WebsocketMessage{ + Event: "pusher:succeeded", + Channel: c.channel, + }) + go c.handleChan() + } +} +func (p *PubContext) removeChan(key string) { + p.lock.Lock() + defer p.lock.Unlock() + if _, exist := p.chans[key]; exist { + delete(p.chans, key) + } + if len(p.chans) == 0 { + p.Close() + } +} +func (p *PubContext) handleSubscribe(wm WebsocketMessage) { + data := wm.Data.(map[string]interface{}) + if channel, ok := data["channel"].(string); ok { + channelInfo := strings.SplitN(channel, "-", 2) + if len(channelInfo) < 2 { + p.SendMessage(WebsocketMessage{Event: "error", Data: "Invalid message"}) + return + } + if channelInfo[0] == "s" { + p.createChan(channel, "docker", channelInfo[1]) + p.createChan(channel, "newmonitor", channelInfo[1]) + } + if channelInfo[0] == "e" { + p.createChan(channel, "newmonitor", channelInfo[1]) + } + } +} + +func (c *Chan) handleChan() { + defer func() { + c.p.server.log.Infof("pubsub message chan %s closed", c.channel) + c.p.removeChan(c.chtype + "-" + c.id) + }() + for { + select { + case <-c.p.close: + c.p.server.log.Info("pub sub context closed") + return + case <-c.p.httpRequest.Context().Done(): + c.p.server.log.Info("pub sub request context cancel") + return + case message, ok := <-c.ch: + if !ok { + c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: "{}", Channel: c.channel}) + c.p.SendWebsocketMessage(websocket.CloseMessage) + return + } + if message != nil { + if message.Step == "last" { + c.p.SendMessage(WebsocketMessage{Event: "event:success", Data: message.Message, Channel: c.channel}) + c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: message.Message, Channel: c.channel}) + return + } + if message.Step == "callback" { + c.p.SendMessage(WebsocketMessage{Event: "event:failure", Data: message.Message, Channel: c.channel}) + c.p.SendMessage(WebsocketMessage{Event: "pusher:close", Data: message.Message, Channel: c.channel}) + return + } + if message.MonitorData != nil { + c.p.SendMessage(WebsocketMessage{Event: c.reevent, Data: string(message.MonitorData), Channel: c.channel}) + } else { + c.p.SendMessage(WebsocketMessage{Event: c.reevent, Data: string(message.Content), Channel: c.channel}) + } + } + } + } +} + +func (p *PubContext) readMessage(closed chan struct{}) { + defer func() { + close(closed) + }() + if p.conn == nil { + p.server.log.Errorf("websocket connection is not connect") + } + for { + messageType, me, err := p.conn.ReadMessage() + if err != nil { + p.server.log.Errorf("read websocket message failure %s", err.Error()) + break + } + if messageType == websocket.CloseMessage { + break + } + if messageType == websocket.TextMessage { + p.handleMessage(me) + continue + } + if messageType == websocket.PingMessage { + p.conn.WriteMessage(websocket.PongMessage, []byte{}) + continue + } + if messageType == websocket.BinaryMessage { + continue + } + } +} + +//SendMessage send websocket message +func (p *PubContext) SendMessage(message WebsocketMessage) error { + p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return p.conn.WriteMessage(websocket.TextMessage, message.Encode()) +} + +//SendWebsocketMessage send websocket message +func (p *PubContext) SendWebsocketMessage(message int) error { + p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return p.conn.WriteMessage(message, []byte{}) +} + +func (p *PubContext) sendPing(closed chan struct{}) { + err := util.Exec(p.httpRequest.Context(), func() error { + if err := p.SendWebsocketMessage(websocket.PingMessage); err != nil { + return err + } + return nil + }, time.Second*20) + if err != nil { + p.server.log.Errorf("send ping message failure %s will closed the connect", err.Error()) + close(closed) + } +} + +//Start start context +func (p *PubContext) Start() { + var err error + p.conn, err = p.upgrader.Upgrade(p.httpWriter, p.httpRequest, nil) + if err != nil { + p.server.log.Error("Create web socket conn error.", err.Error()) + return + } + pingclosed := make(chan struct{}) + readclosed := make(chan struct{}) + go p.sendPing(pingclosed) + go p.readMessage(readclosed) + select { + case <-pingclosed: + case <-readclosed: + case <-p.close: + } +} + +//Stop close context +func (p *PubContext) Stop() { + if p.conn != nil { + p.conn.Close() + } + for _, v := range p.chans { + p.server.storemanager.RealseWebSocketMessageChan(v.chtype, v.id, p.ID) + } +} + +//Close close the context +func (p *PubContext) Close() { + close(p.close) +} + +func (s *SocketServer) pubsub(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + ReadBufferSize: s.conf.ReadBufferSize, + WriteBufferSize: s.conf.WriteBufferSize, + EnableCompression: s.conf.EnableCompression, + Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) { + + }, + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + context := NewPubContext(upgrader, w, r, s) + defer context.Stop() + s.log.Infof("websocket pubsub context running %s", context.ID) + context.Start() + s.log.Infof("websocket pubsub context closed %s", context.ID) +} diff --git a/eventlog/exit/webhook/manager.go b/eventlog/exit/webhook/manager.go deleted file mode 100644 index d65df3100..000000000 --- a/eventlog/exit/webhook/manager.go +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package webhook - -import ( - "github.com/goodrain/rainbond/eventlog/conf" - "net/http" - - "sync" - - "bytes" - - "fmt" - - "strings" - - "net/url" - - "strconv" - - "github.com/Sirupsen/logrus" -) - -type Manager struct { - hooks map[string]*WebHook - conf conf.WebHookConf - log *logrus.Entry - lock sync.Mutex -} - -type WebHook struct { - EndPoint string - RequestParameter map[string]interface{} - RequestBody []byte - RequestHeader map[string]string - Name string - Method string -} - -const UpDateEventStatus = "console_update_event_status" -const UpdateEventCodeVersion = "console_update_event_code_version" - -var manager *Manager - -func GetManager() *Manager { - return manager -} - -func InitManager(conf conf.WebHookConf, log *logrus.Entry) error { - ma := &Manager{ - conf: conf, - log: log, - hooks: make(map[string]*WebHook, 0), - } - eventStatus := &WebHook{ - Name: UpDateEventStatus, - Method: "PUT", - EndPoint: conf.ConsoleURL + "/api/event/update", - RequestHeader: make(map[string]string, 0), - } - eventStatus.RequestHeader["Content-Type"] = "application/json" - if conf.ConsoleToken != "" { - eventStatus.RequestHeader["Authorization"] = "Token " + conf.ConsoleToken - } - codeVsersion := &WebHook{ - Name: UpdateEventCodeVersion, - Method: "PUT", - EndPoint: conf.ConsoleURL + "/api/event/update-code", - RequestHeader: make(map[string]string, 0), - } - codeVsersion.RequestHeader["Content-Type"] = "application/json" - if conf.ConsoleToken != "" { - codeVsersion.RequestHeader["Authorization"] = "Token " + conf.ConsoleToken - } - ma.Regist(eventStatus) - ma.Regist(codeVsersion) - manager = ma - return nil -} - -func (m *Manager) GetWebhook(name string) *WebHook { - m.lock.Lock() - defer m.lock.Unlock() - return m.hooks[name] -} - -func (m *Manager) RunWebhook(name string, body []byte) { - w := m.GetWebhook(name) - w.RequestBody = body - if err := w.Run(); err != nil { - m.log.Errorf("Webhook %s run error. %s", w.Name, err.Error()) - } - m.log.Debugf("Run web hook %s success", w.Name) -} - -func (m *Manager) RunWebhookWithParameter(name string, body []byte, parameter map[string]interface{}) { - w := m.GetWebhook(name) - w.RequestBody = body - w.RequestParameter = parameter - if err := w.Run(); err != nil { - m.log.Errorf("Webhook %s run error. %s", w.Name, err.Error()) - } -} - -//Regist 注册 -func (m *Manager) Regist(w *WebHook) { - if w == nil || w.Name == "" { - return - } - m.lock.Lock() - defer m.lock.Unlock() - m.hooks[w.Name] = w -} - -//Run 执行 -func (w *WebHook) Run() error { - var err error - if w.RequestParameter != nil && len(w.RequestParameter) > 0 { - if strings.ToUpper(w.Method) == "GET" { - values := make(url.Values) - for k, v := range w.RequestParameter { - switch v.(type) { - case int: - values.Set(k, strconv.Itoa(v.(int))) - case string: - values.Set(k, v.(string)) - } - } - w.EndPoint += "?" + values.Encode() - } else { - var jsonStr = "{" - for k, v := range w.RequestParameter { - switch v.(type) { - case int: - jsonStr += `"` + k + `":` + strconv.Itoa(v.(int)) + `,` - case string: - jsonStr += `"` + k + `":"` + v.(string) + `",` - } - } - jsonStr = jsonStr[0:len(jsonStr)-1] + "}" - w.RequestBody = []byte(jsonStr) - } - } - var request *http.Request - if w.RequestBody != nil { - request, err = http.NewRequest(strings.ToUpper(w.Method), w.EndPoint, bytes.NewReader(w.RequestBody)) - if err != nil { - return err - } - } else { - request, err = http.NewRequest(strings.ToUpper(w.Method), w.EndPoint, nil) - if err != nil { - return err - } - } - request.Header.Add("Content-Type", "application/json") - for k, v := range w.RequestHeader { - request.Header.Add(k, v) - } - res, err := http.DefaultClient.Do(request) - if err != nil { - return err - } - if res.Body != nil { - defer res.Body.Close() - } - if res.StatusCode/100 == 2 { - return nil - } - return fmt.Errorf("Endpoint return status code is %d", res.StatusCode) -} diff --git a/eventlog/exit/webhook/manager_test.go b/eventlog/exit/webhook/manager_test.go deleted file mode 100644 index c8816149e..000000000 --- a/eventlog/exit/webhook/manager_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package webhook - -import ( - "github.com/goodrain/rainbond/eventlog/conf" - "testing" - - "github.com/Sirupsen/logrus" -) - -func TestEventUpdate(t *testing.T) { - InitManager(conf.WebHookConf{ - ConsoleURL: "http://test.goodrain.com:7070/", - }, logrus.WithField("test", "test")) - GetManager().RunWebhookWithParameter(UpDateEventStatus, nil, - map[string]interface{}{"event_id": "4e9dafa96cb043cdb992adde2af421d4", "status": "success", "message": "操作成功"}) -} diff --git a/eventlog/store/barrel.go b/eventlog/store/barrel.go index af7c0d074..f1c301352 100644 --- a/eventlog/store/barrel.go +++ b/eventlog/store/barrel.go @@ -140,12 +140,6 @@ func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) { func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { r.subLock.Lock() defer r.subLock.Unlock() - // for _, m := range r.barrel { - // select { - // case ch <- m: - // default: - // } - // } r.subSocketChan[subID] = ch } @@ -165,7 +159,8 @@ func (r *readEventBarrel) addSubChan(subID string) chan *db.EventLogMessage { func (r *readEventBarrel) delSubChan(subID string) { r.subLock.Lock() defer r.subLock.Unlock() - if _, ok := r.subSocketChan[subID]; ok { + if ch, ok := r.subSocketChan[subID]; ok { + close(ch) delete(r.subSocketChan, subID) } } @@ -221,13 +216,6 @@ func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) { func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { r.subLock.Lock() defer r.subLock.Unlock() - // send cache log will cause user illusion - // for _, m := range r.barrel { - // select { - // case ch <- m: - // default: - // } - // } r.subSocketChan[subID] = ch } @@ -247,7 +235,8 @@ func (r *dockerLogEventBarrel) addSubChan(subID string) chan *db.EventLogMessage func (r *dockerLogEventBarrel) delSubChan(subID string) { r.subLock.Lock() defer r.subLock.Unlock() - if _, ok := r.subSocketChan[subID]; ok { + if ch, ok := r.subSocketChan[subID]; ok { + close(ch) delete(r.subSocketChan, subID) } } @@ -299,8 +288,6 @@ func (r *monitorMessageBarrel) empty() { } func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) { - //r.barrel = append(r.barrel, message) - //logrus.Info(string(message.Content)) r.updateTime = time.Now() r.subLock.Lock() defer r.subLock.Unlock() @@ -315,12 +302,6 @@ func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) { func (r *monitorMessageBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { r.subLock.Lock() defer r.subLock.Unlock() - // for _, m := range r.barrel { - // select { - // case ch <- m: - // default: - // } - // } r.subSocketChan[subID] = ch } @@ -340,7 +321,8 @@ func (r *monitorMessageBarrel) addSubChan(subID string) chan *db.EventLogMessage func (r *monitorMessageBarrel) delSubChan(subID string) { r.subLock.Lock() defer r.subLock.Unlock() - if _, ok := r.subSocketChan[subID]; ok { + if ch, ok := r.subSocketChan[subID]; ok { + close(ch) delete(r.subSocketChan, subID) } } diff --git a/eventlog/store/docker_log_store.go b/eventlog/store/docker_log_store.go index 805ec437f..e2426eee7 100644 --- a/eventlog/store/docker_log_store.go +++ b/eventlog/store/docker_log_store.go @@ -131,7 +131,7 @@ func (h *dockerLogStore) GetMonitorData() *db.MonitorData { return data } func (h *dockerLogStore) Gc() { - tiker := time.NewTicker(time.Minute * 1) + tiker := time.NewTicker(time.Second * 30) for { select { case <-tiker.C: @@ -151,14 +151,15 @@ func (h *dockerLogStore) handle() []string { } var gcEvent []string for k, v := range h.barrels { - if v.updateTime.Add(time.Minute * 1).Before(time.Now()) { // barrel 超时未收到消息 + if v.updateTime.Add(time.Minute * 1).Before(time.Now()) { //gc without client link if v.GetSubChanLength() == 0 { h.saveBeforeGc(k, v) gcEvent = append(gcEvent, k) } } - if v.persistenceTime.Add(time.Minute * 2).Before(time.Now()) { //超过2分钟未持久化 间隔需要大于1分钟。以分钟为单位 + //The interval not persisted for more than 1 minute should be more than 30 seconds + if v.persistenceTime.Add(time.Minute * 1).Before(time.Now()) { if len(v.barrel) > 0 { v.persistence() } diff --git a/eventlog/store/handle_message_store.go b/eventlog/store/handle_message_store.go index af950cf5a..06b8e5510 100644 --- a/eventlog/store/handle_message_store.go +++ b/eventlog/store/handle_message_store.go @@ -19,7 +19,6 @@ package store import ( - "strings" "sync" "time" @@ -114,7 +113,6 @@ func (h *handleMessageStore) Gc() { } } func (h *handleMessageStore) gcRun() { - //h.log.Debugf("runGC %d", time.Now().UnixNano()) h.lock.Lock() defer h.lock.Unlock() t := time.Now() @@ -123,7 +121,7 @@ func (h *handleMessageStore) gcRun() { } var gcEvent []string for k, v := range h.barrels { - if v.updateTime.Add(time.Second * 30).Before(time.Now()) { // barrel 超时未收到消息 + if v.updateTime.Add(time.Second * 30).Before(time.Now()) { h.saveBeforeGc(v) gcEvent = append(gcEvent, k) } @@ -132,7 +130,7 @@ func (h *handleMessageStore) gcRun() { for _, id := range gcEvent { barrel := h.barrels[id] barrel.empty() - h.pool.Put(barrel) //放回对象池 + h.pool.Put(barrel) delete(h.barrels, id) } } @@ -240,7 +238,6 @@ func (h *handleMessageStore) saveGarbageMessage() { } err := util.AppendToFile(h.conf.GarbageMessageFile, content) if err != nil { - //h.log.Error("Save garbage message to file error.context :\n " + content) h.log.Error("Save garbage message to file error.context", err.Error()) } else { h.log.Info("Save the garbage message to file.") @@ -296,17 +293,6 @@ func (h *handleMessageStore) handleBarrelEvent() { } } } - if event[0] == "code-version" { //代码版本 - if len(event) == 3 { - eventID := event[1] - codeVersion := strings.TrimSpace(event[2]) - event := model.ServiceEvent{} - event.EventID = eventID - event.CodeVersion = codeVersion - cdb.GetManager().ServiceEventDao().UpdateModel(&event) - h.log.Infof("run web hook update code version .event_id %s code_version %s", eventID, codeVersion) - } - } case <-h.ctx.Done(): return } diff --git a/eventlog/store/manager.go b/eventlog/store/manager.go index fba416d1c..1e9249a68 100644 --- a/eventlog/store/manager.go +++ b/eventlog/store/manager.go @@ -23,7 +23,6 @@ import ( "strconv" "github.com/goodrain/rainbond/eventlog/db" - "github.com/goodrain/rainbond/eventlog/util" coreutil "github.com/goodrain/rainbond/util" "github.com/goodrain/rainbond/eventlog/conf" @@ -45,7 +44,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/pquerna/ffjson/ffjson" "github.com/prometheus/client_golang/prometheus" - "github.com/tidwall/gjson" ) //Manager 存储管理器 @@ -68,11 +66,6 @@ type Manager interface { //NewManager 存储管理器 func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) { - // event log do not save in db,will save in file - // dbPlugin, err := db.NewManager(conf.DB, log) - // if err != nil { - // return nil, err - // } conf.DB.Type = "eventfile" dbPlugin, err := db.NewManager(conf.DB, log) if err != nil { @@ -103,12 +96,10 @@ func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) { handle := NewStore("handle", storeManager) read := NewStore("read", storeManager) docker := NewStore("docker_log", storeManager) - monitor := NewStore("monitor", storeManager) newmonitor := NewStore("newmonitor", storeManager) storeManager.handleMessageStore = handle storeManager.readMessageStore = read storeManager.dockerLogStore = docker - storeManager.monitorMessageStore = monitor storeManager.newmonitorMessageStore = newmonitor return storeManager, nil } @@ -119,7 +110,6 @@ type storeManager struct { handleMessageStore MessageStore readMessageStore MessageStore dockerLogStore MessageStore - monitorMessageStore MessageStore newmonitorMessageStore MessageStore receiveChan chan []byte pubChan, subChan chan [][]byte @@ -157,7 +147,7 @@ func (s *storeManager) Scrape(ch chan<- prometheus.Metric, namespace, exporter, s.dockerLogStore.Scrape(ch, namespace, exporter, from) s.handleMessageStore.Scrape(ch, namespace, exporter, from) - s.monitorMessageStore.Scrape(ch, namespace, exporter, from) + s.newmonitorMessageStore.Scrape(ch, namespace, exporter, from) chanDesc := prometheus.NewDesc( prometheus.BuildFQName(namespace, exporter, "chan_cache_size"), "the handle chan cache size.", @@ -183,7 +173,7 @@ func (s *storeManager) Scrape(ch chan<- prometheus.Metric, namespace, exporter, func (s *storeManager) Monitor() []db.MonitorData { data := s.dockerLogStore.GetMonitorData() - moData := s.monitorMessageStore.GetMonitorData() + moData := s.newmonitorMessageStore.GetMonitorData() if moData != nil { data.LogSizePeerM += moData.LogSizePeerM data.ServiceSize += moData.ServiceSize @@ -248,10 +238,6 @@ func (s *storeManager) WebSocketMessageChan(mode, eventID, subID string) chan *d ch := s.dockerLogStore.SubChan(eventID, subID) return ch } - if mode == "monitor" { - ch := s.monitorMessageStore.SubChan(eventID, subID) - return ch - } if mode == "newmonitor" { ch := s.newmonitorMessageStore.SubChan(eventID, subID) return ch @@ -264,7 +250,6 @@ func (s *storeManager) Run() error { s.handleMessageStore.Run() s.readMessageStore.Run() s.dockerLogStore.Run() - s.monitorMessageStore.Run() s.newmonitorMessageStore.Run() for i := 0; i < s.conf.HandleMessageCoreNumber; i++ { go s.handleReceiveMessage() @@ -275,9 +260,6 @@ func (s *storeManager) Run() error { for i := 0; i < s.conf.HandleDockerLogCoreNumber; i++ { go s.handleDockerLog() } - for i := 0; i < s.conf.HandleMessageCoreNumber; i++ { - go s.handleMonitorMessage() - } go s.handleNewMonitorMessage() go s.cleanLog() return nil @@ -454,10 +436,6 @@ func (s *storeManager) handleSubMessage() { if string(msg[0]) == string(db.EventMessage) { s.readMessageStore.InsertMessage(message) } - if string(msg[0]) == string(db.ServiceMonitorMessage) { - s.monitorMessageStore.InsertMessage(message) - } - } } } @@ -499,14 +477,11 @@ loop: Content: buffer.Bytes(), EventID: serviceID, } - //s.log.Debug("Receive docker log:", info) s.dockerLogStore.InsertMessage(&message) buffer.Reset() } } - s.errChan <- fmt.Errorf("handle docker log core exist") - } type event struct { @@ -515,101 +490,6 @@ type event struct { Update string `json:"update_time"` } -func (s *storeManager) handleMonitorMessage() { -loop: - for { - select { - case <-s.context.Done(): - return - case msg, ok := <-s.monitorMessageChan: - if !ok { - s.log.Error("handle monitor message core stop.monitor message log chan closed") - break loop - } - if msg == nil { - continue - } - if len(msg) == 2 { - message := strings.SplitAfterN(string(msg[1]), " ", 5) - name := message[3] - body := message[4] - var currentTopic string - var data []interface{} - switch strings.TrimSpace(name) { - case "SumTimeByUrl": - result := gjson.Parse(body).Array() - for _, r := range result { - var port int - if p, ok := r.Map()["port"]; ok { - port = int(p.Int()) - } - wsTopic := fmt.Sprintf("%s.%s.statistic", r.Map()["tenant"], r.Map()["service"]) - if port != 0 { - wsTopic = fmt.Sprintf("%s.%s.%d.statistic", r.Map()["tenant"], r.Map()["service"], port) - } - if currentTopic == "" { - currentTopic = wsTopic - } - if wsTopic == currentTopic { - data = append(data, util.Format(r.Map())) - } else { - s.sendMonitorData("SumTimeByUrl", data, currentTopic) - currentTopic = wsTopic - data = []interface{}{util.Format(r.Map())} - } - } - s.sendMonitorData("SumTimeByUrl", data, currentTopic) - - case "SumTimeBySql": - result := gjson.Parse(body).Array() - for _, r := range result { - tenantID := r.Map()["tenant_id"].String() - serviceID := r.Map()["service_id"].String() - if len(tenantID) < 12 || len(serviceID) < 12 { - continue - } - tenantAlias := tenantID[len(tenantID)-12:] - serviceAlias := serviceID[len(serviceID)-12:] - wsTopic := fmt.Sprintf("%s.%s.statistic", tenantAlias, serviceAlias) - if currentTopic == "" { - currentTopic = wsTopic - } - if wsTopic == currentTopic { - data = append(data, util.Format(r.Map())) - } else { - s.sendMonitorData("SumTimeBySql", data, currentTopic) - currentTopic = wsTopic - data = []interface{}{util.Format(r.Map())} - } - } - s.sendMonitorData("SumTimeBySql", data, currentTopic) - } - } - - } - } - s.errChan <- fmt.Errorf("handle monitor log core exist") -} -func (s *storeManager) sendMonitorData(name string, data []interface{}, topic string) { - e := event{ - Name: name, - Update: time.Now().Format(time.Kitchen), - Data: data, - } - eventByte, _ := json.Marshal(e) - m := &db.EventLogMessage{ - EventID: topic, - MonitorData: eventByte, - } - s.monitorMessageStore.InsertMessage(m) - d, err := json.Marshal(m) - if err != nil { - s.log.Error("Marshal monitor message to byte error.", err.Error()) - return - } - s.pubChan <- [][]byte{[]byte(db.ServiceMonitorMessage), d} -} - func (s *storeManager) RealseWebSocketMessageChan(mode string, eventID, subID string) { if mode == "event" { s.readMessageStore.RealseSubChan(eventID, subID) @@ -617,8 +497,8 @@ func (s *storeManager) RealseWebSocketMessageChan(mode string, eventID, subID st if mode == "docker" { s.dockerLogStore.RealseSubChan(eventID, subID) } - if mode == "monitor" { - s.monitorMessageStore.RealseSubChan(eventID, subID) + if mode == "newmonitor" { + s.newmonitorMessageStore.RealseSubChan(eventID, subID) } } @@ -626,7 +506,7 @@ func (s *storeManager) Stop() { s.handleMessageStore.stop() s.readMessageStore.stop() s.dockerLogStore.stop() - s.monitorMessageStore.stop() + s.newmonitorMessageStore.stop() s.cancel() if s.filePlugin != nil { s.filePlugin.Close() diff --git a/eventlog/store/monitor_message_store.go b/eventlog/store/monitor_message_store.go deleted file mode 100644 index 429067f85..000000000 --- a/eventlog/store/monitor_message_store.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package store - -import ( - "sync" - "time" - - "github.com/goodrain/rainbond/eventlog/conf" - "github.com/goodrain/rainbond/eventlog/db" - - "golang.org/x/net/context" - - "github.com/Sirupsen/logrus" - "github.com/prometheus/client_golang/prometheus" -) - -type monitorMessageStore struct { - conf conf.EventStoreConf - log *logrus.Entry - barrels map[string]*monitorMessageBarrel - lock sync.RWMutex - cancel func() - ctx context.Context - pool *sync.Pool - size int64 - allLogCount float64 -} - -func (h *monitorMessageStore) Scrape(ch chan<- prometheus.Metric, namespace, exporter, from string) error { - chanDesc := prometheus.NewDesc( - prometheus.BuildFQName(namespace, exporter, "monitor_store_barrel_count"), - "the handle container log count size.", - []string{"from"}, nil, - ) - ch <- prometheus.MustNewConstMetric(chanDesc, prometheus.GaugeValue, float64(len(h.barrels)), from) - logDesc := prometheus.NewDesc( - prometheus.BuildFQName(namespace, exporter, "monitor_store_log_count"), - "the handle monitor log count size.", - []string{"from"}, nil, - ) - ch <- prometheus.MustNewConstMetric(logDesc, prometheus.GaugeValue, h.allLogCount, from) - - return nil -} -func (h *monitorMessageStore) insertMessage(message *db.EventLogMessage) bool { - h.lock.RLock() - defer h.lock.RUnlock() - if ba, ok := h.barrels[message.EventID]; ok { - ba.insertMessage(message) - return true - } - return false -} - -func (h *monitorMessageStore) InsertMessage(message *db.EventLogMessage) { - if message == nil || message.EventID == "" { - return - } - //h.log.Debug("Receive a monitor message:" + string(message.Content)) - h.size++ - h.allLogCount++ - if h.insertMessage(message) { - return - } - h.lock.Lock() - defer h.lock.Unlock() - ba := h.pool.Get().(*monitorMessageBarrel) - ba.insertMessage(message) - h.barrels[message.EventID] = ba -} -func (h *monitorMessageStore) GetMonitorData() *db.MonitorData { - data := &db.MonitorData{ - ServiceSize: len(h.barrels), - LogSizePeerM: h.size, - } - return data -} - -func (h *monitorMessageStore) SubChan(eventID, subID string) chan *db.EventLogMessage { - h.lock.Lock() - defer h.lock.Unlock() - if ba, ok := h.barrels[eventID]; ok { - return ba.addSubChan(subID) - } - ba := h.pool.Get().(*monitorMessageBarrel) - h.barrels[eventID] = ba - return ba.addSubChan(subID) -} -func (h *monitorMessageStore) RealseSubChan(eventID, subID string) { - h.lock.RLock() - defer h.lock.RUnlock() - if ba, ok := h.barrels[eventID]; ok { - ba.delSubChan(subID) - } -} -func (h *monitorMessageStore) Run() { - go h.Gc() -} -func (h *monitorMessageStore) Gc() { - tiker := time.NewTicker(time.Second * 30) - for { - select { - case <-tiker.C: - case <-h.ctx.Done(): - h.log.Debug("read message store gc stop.") - tiker.Stop() - return - } - h.size = 0 - if len(h.barrels) == 0 { - continue - } - var gcEvent []string - for k, v := range h.barrels { - if v.updateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息 - gcEvent = append(gcEvent, k) - } - } - if gcEvent != nil && len(gcEvent) > 0 { - for _, id := range gcEvent { - barrel := h.barrels[id] - barrel.empty() - h.pool.Put(barrel) //放回对象池 - delete(h.barrels, id) - } - } - } -} -func (h *monitorMessageStore) stop() { - h.cancel() -} -func (h *monitorMessageStore) InsertGarbageMessage(message ...*db.EventLogMessage) {} diff --git a/eventlog/store/new_monitor_message_store.go b/eventlog/store/new_monitor_message_store.go index 435d4c9c7..1cd5626cd 100644 --- a/eventlog/store/new_monitor_message_store.go +++ b/eventlog/store/new_monitor_message_store.go @@ -81,7 +81,6 @@ func (h *newMonitorMessageStore) InsertMessage(message *db.EventLogMessage) { if message == nil { return } - //h.log.Debug("Receive a monitor message:" + string(message.Content)) h.size++ h.allLogCount++ mm, ok := h.insertMessage(message) @@ -138,12 +137,15 @@ func (h *newMonitorMessageStore) Gc() { } var gcEvent []string for k, v := range h.barrels { - if v.UpdateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息 - gcEvent = append(gcEvent, k) + if len(v.subSocketChan) == 0 { + if v.UpdateTime.Add(time.Minute * 3).Before(time.Now()) { // barrel 超时未收到消息 + gcEvent = append(gcEvent, k) + } } } if gcEvent != nil && len(gcEvent) > 0 { for _, id := range gcEvent { + h.log.Infof("monitor message barrel %s will be gc", id) barrel := h.barrels[id] barrel.empty() delete(h.barrels, id) @@ -285,11 +287,12 @@ func (c *CacheMonitorMessageList) addSubChan(subID string) chan *db.EventLogMess return ch } -//删除socket订阅 +//delSubChan delete socket sub chan func (c *CacheMonitorMessageList) delSubChan(subID string) { c.subLock.Lock() defer c.subLock.Unlock() - if _, ok := c.subSocketChan[subID]; ok { + if ch, ok := c.subSocketChan[subID]; ok { + close(ch) delete(c.subSocketChan, subID) } } @@ -319,11 +322,8 @@ func merge(source, addsource MonitorMessageList) (result MonitorMessageList) { if oldmm, ok := cache[mm.Key]; ok { oldmm.Count += mm.Count oldmm.AbnormalCount += mm.AbnormalCount - //平均时间 oldmm.AverageTime = Round((oldmm.AverageTime+mm.AverageTime)/2, 2) - //�����积���间 oldmm.CumulativeTime = Round(oldmm.CumulativeTime+mm.CumulativeTime, 2) - //最大时间 if mm.MaxTime > oldmm.MaxTime { oldmm.MaxTime = mm.MaxTime } diff --git a/eventlog/store/store.go b/eventlog/store/store.go index c63f40170..4ad6cd7fb 100644 --- a/eventlog/store/store.go +++ b/eventlog/store/store.go @@ -97,24 +97,6 @@ func NewStore(storeType string, manager *storeManager) MessageStore { } return read } - if storeType == "monitor" { - monitor := &monitorMessageStore{ - barrels: make(map[string]*monitorMessageBarrel, 100), - conf: manager.conf, - log: manager.log.WithField("module", "MonitorMessageStore"), - ctx: ctx, - cancel: cancel, - } - monitor.pool = &sync.Pool{ - New: func() interface{} { - reb := &monitorMessageBarrel{ - subSocketChan: make(map[string]chan *db.EventLogMessage, 0), - } - return reb - }, - } - return monitor - } if storeType == "docker_log" { docker := &dockerLogStore{ barrels: make(map[string]*dockerLogEventBarrel, 100), diff --git a/node/nodem/logger/copier.go b/node/nodem/logger/copier.go index ce32832ad..27e18b66f 100644 --- a/node/nodem/logger/copier.go +++ b/node/nodem/logger/copier.go @@ -52,7 +52,7 @@ func NewCopier(logfile *LogFile, dst []Logger, since time.Time) *Copier { // Run starts logs copying func (c *Copier) Run() { c.closed = make(chan struct{}) - go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since}, c.reader) + go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since, Tail: 0}, c.reader) go c.copySrc() } @@ -63,6 +63,13 @@ lool: select { case <-c.closed: return + case err := <-c.reader.Err: + logrus.Errorf("read container log file error %s, will retry after 5 seconds", err.Error()) + //If there is an error in the collection log process, + //the collection should be restarted and not stopped + time.Sleep(time.Second * 5) + go c.logfile.ReadLogs(ReadConfig{Follow: true, Since: c.since, Tail: 0}, c.reader) + continue case msg, ok := <-c.reader.Msg: if !ok { break lool diff --git a/node/nodem/logger/logger_file.go b/node/nodem/logger/logger_file.go index 84d7a4f3b..a1ff27157 100644 --- a/node/nodem/logger/logger_file.go +++ b/node/nodem/logger/logger_file.go @@ -139,14 +139,11 @@ func decodeFunc(rdr io.Reader) func() (*Message, error) { if err == nil || err == io.EOF { break } - - logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json") // try again, could be due to a an incomplete json object as we read if _, ok := err.(*json.SyntaxError); ok { dec = json.NewDecoder(rdr) continue } - // io.ErrUnexpectedEOF is returned from json.Decoder when there is // remaining data in the parser's buffer while an io.EOF occurs. // If the json logger writes a partial json log entry to the disk @@ -156,6 +153,7 @@ func decodeFunc(rdr io.Reader) func() (*Message, error) { dec = json.NewDecoder(reader) continue } + logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json") } return msg, err } @@ -205,7 +203,6 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) { watcher.Err <- err return } - closeFiles := func() { for _, f := range files { f.Close() @@ -218,7 +215,6 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) { } } } - readers := make([]SizeReaderAt, 0, len(files)+1) for _, f := range files { stat, err := f.Stat() @@ -244,10 +240,7 @@ func (w *LogFile) ReadLogs(config ReadConfig, watcher *LogWatcher) { return } w.mu.RUnlock() - - notifyRotate := w.notifyRotate.Subscribe() - defer w.notifyRotate.Evict(notifyRotate) - followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until) + followLogs(currentFile, watcher, w.createDecoder, config.Since, config.Until) } func tailFiles(files []SizeReaderAt, watcher *LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config ReadConfig) { @@ -398,7 +391,7 @@ func decompressfile(fileName, destFileName string, since time.Time) (*os.File, e return rs, nil } -func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) { +func followLogs(f *os.File, logWatcher *LogWatcher, createDecoder makeDecoderFunc, since, until time.Time) { decodeLogLine := createDecoder(f) name := f.Name() @@ -445,11 +438,11 @@ func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{ return nil case fsnotify.Rename, fsnotify.Remove: select { - case <-notifyRotate: case <-logWatcher.WatchProducerGone(): return errDone case <-logWatcher.WatchConsumerGone(): return errDone + default: } if err := handleRotate(); err != nil { return err @@ -510,7 +503,6 @@ func followLogs(f *os.File, logWatcher *LogWatcher, notifyRotate chan interface{ // ready to try again continue } - retries = 0 // reset retries since we've succeeded if !since.IsZero() && msg.Timestamp.Before(since) { continue diff --git a/node/nodem/logger/logger_file_test.go b/node/nodem/logger/logger_file_test.go index b0b751273..85ee80d10 100644 --- a/node/nodem/logger/logger_file_test.go +++ b/node/nodem/logger/logger_file_test.go @@ -27,12 +27,12 @@ import ( ) func TestReadFile(t *testing.T) { - reader, err := NewLogFile("/Users/qingguo/gopath/src/github.com/goodrain/rainbond/test/dockerlog/tes.log", 3, false, decodeFunc, 0640, getTailReader) + reader, err := NewLogFile("../../../test/dockerlog/tes.log", 3, false, decodeFunc, 0640, getTailReader) if err != nil { t.Fatal(err) } watch := NewLogWatcher() - reader.ReadLogs(ReadConfig{Follow: true, Tail: 10, Since: time.Now()}, watch) + reader.ReadLogs(ReadConfig{Follow: true, Tail: 0, Since: time.Now()}, watch) defer watch.ConsumerGone() LogLoop: for { diff --git a/node/nodem/logger/manage.go b/node/nodem/logger/manage.go index b2e4decc4..a59494c40 100644 --- a/node/nodem/logger/manage.go +++ b/node/nodem/logger/manage.go @@ -128,7 +128,7 @@ func (c *ContainerLogManage) handleLogger() { retry := 0 for retry < maxJSONDecodeRetry { retry++ - reader, err := NewLogFile(cevent.Container.LogPath, 3, false, decodeFunc, 0640, getTailReader) + reader, err := NewLogFile(cevent.Container.LogPath, 2, false, decodeFunc, 0640, getTailReader) if err != nil { logrus.Errorf("create logger failure %s", err.Error()) time.Sleep(time.Second * 1) diff --git a/node/nodem/logger/streamlog/streamlog_test.go b/node/nodem/logger/streamlog/streamlog_test.go index 7a15d110b..fb09e57af 100644 --- a/node/nodem/logger/streamlog/streamlog_test.go +++ b/node/nodem/logger/streamlog/streamlog_test.go @@ -7,15 +7,16 @@ import ( "testing" "time" + "github.com/goodrain/rainbond/node/nodem/logger" + "fmt" - "github.com/docker/docker/daemon/logger" "github.com/pborman/uuid" ) func TestStreamLogBeak(t *testing.T) { - log, err := New(logger.Context{ + log, err := New(logger.Info{ ContainerID: uuid.New(), ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()}, Config: map[string]string{"stream-server": "127.0.0.1:6362"}, @@ -51,7 +52,7 @@ func TestStreamLogBeak(t *testing.T) { func TestStreamLog(t *testing.T) { - log, err := New(logger.Context{ + log, err := New(logger.Info{ ContainerID: uuid.New(), ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()}, Config: map[string]string{"stream-server": "127.0.0.1:6362"}, @@ -76,7 +77,7 @@ func TestStreamLog(t *testing.T) { } func BenchmarkStreamLog(t *testing.B) { - log, err := New(logger.Context{ + log, err := New(logger.Info{ ContainerID: uuid.New(), ContainerEnv: []string{"TENANT_ID=" + uuid.New(), "SERVICE_ID=" + uuid.New()}, Config: map[string]string{"stream-server": "127.0.0.1:5031"}, diff --git a/node/statsd/exporter/exporter.go b/node/statsd/exporter/exporter.go index 7b3f4769f..203360508 100644 --- a/node/statsd/exporter/exporter.go +++ b/node/statsd/exporter/exporter.go @@ -396,7 +396,6 @@ func (b *Exporter) GCollector() { oldGauges := len(b.Gauges.Elements) oldHistograms := len(b.Histograms.Elements) oldSummaries := len(b.Summaries.Elements) - for k, v := range b.Counters.Elements { oldTime := v.GetTimestamp() if (currentTime - oldTime) > HP { @@ -404,7 +403,6 @@ func (b *Exporter) GCollector() { b.Counters.Register.Unregister(v) } } - for k, v := range b.Gauges.Elements { oldTime := v.GetTimestamp() if (currentTime - oldTime) > HP { @@ -412,7 +410,6 @@ func (b *Exporter) GCollector() { b.Gauges.Register.Unregister(v) } } - for k, v := range b.Histograms.Elements { oldTime := v.GetTimestamp() if (currentTime - oldTime) > HP { @@ -420,7 +417,6 @@ func (b *Exporter) GCollector() { b.Histograms.Register.Unregister(v) } } - for k, v := range b.Summaries.Elements { oldTime := v.GetTimestamp() if (currentTime - oldTime) > HP { @@ -428,12 +424,10 @@ func (b *Exporter) GCollector() { b.Summaries.Register.Unregister(v) } } - logrus.Debugf("current amount for Counters: %v => %v", oldCounters, len(b.Counters.Elements)) logrus.Debugf("current amount for Gauges: %v => %v", oldGauges, len(b.Gauges.Elements)) logrus.Debugf("current amount for Histograms: %v => %v", oldHistograms, len(b.Histograms.Elements)) logrus.Debugf("current amount for Summaries: %v => %v", oldSummaries, len(b.Summaries.Elements)) - logrus.Info("clean exporter data complete") } }