Rainbond/cmd/eventlog/server/server.go

280 lines
14 KiB
Go
Raw Normal View History

2018-03-14 14:12:26 +08:00
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
2017-11-07 11:40:44 +08:00
// RAINBOND, Application Management Platform
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"os/signal"
"path"
"syscall"
"github.com/goodrain/rainbond/discover"
"github.com/goodrain/rainbond/eventlog/cluster"
"github.com/goodrain/rainbond/eventlog/conf"
"github.com/goodrain/rainbond/eventlog/entry"
"github.com/goodrain/rainbond/eventlog/exit/web"
"github.com/goodrain/rainbond/eventlog/store"
2017-11-07 11:40:44 +08:00
"os"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/eventlog/db"
"github.com/goodrain/rainbond/util"
2019-12-24 22:41:19 +08:00
etcdutil "github.com/goodrain/rainbond/util/etcd"
2018-11-20 11:34:03 +08:00
"github.com/spf13/pflag"
2017-11-07 11:40:44 +08:00
)
type LogServer struct {
Conf conf.Conf
Entry *entry.Entry
Logger *logrus.Logger
SocketServer *web.SocketServer
Cluster cluster.Cluster
}
func NewLogServer() *LogServer {
conf := conf.Conf{}
return &LogServer{
Conf: conf,
}
}
//AddFlags 添加参数
func (s *LogServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Conf.Entry.EventLogServer.BindIP, "eventlog.bind.ip", "0.0.0.0", "Collect the log service to listen the IP")
fs.IntVar(&s.Conf.Entry.EventLogServer.BindPort, "eventlog.bind.port", 6366, "Collect the log service to listen the Port")
fs.IntVar(&s.Conf.Entry.EventLogServer.CacheMessageSize, "eventlog.cache", 100, "the event log server cache the receive message size")
fs.StringVar(&s.Conf.Entry.DockerLogServer.BindIP, "dockerlog.bind.ip", "0.0.0.0", "Collect the log service to listen the IP")
fs.StringVar(&s.Conf.Entry.DockerLogServer.Mode, "dockerlog.mode", "stream", "the server mode zmq or stream")
fs.IntVar(&s.Conf.Entry.DockerLogServer.BindPort, "dockerlog.bind.port", 6362, "Collect the log service to listen the Port")
fs.IntVar(&s.Conf.Entry.DockerLogServer.CacheMessageSize, "dockerlog.cache", 200, "the docker log server cache the receive message size")
fs.StringSliceVar(&s.Conf.Entry.MonitorMessageServer.SubAddress, "monitor.subaddress", []string{"tcp://127.0.0.1:9442"}, "monitor message source address")
fs.IntVar(&s.Conf.Entry.MonitorMessageServer.CacheMessageSize, "monitor.cache", 200, "the monitor sub server cache the receive message size")
fs.StringVar(&s.Conf.Entry.MonitorMessageServer.SubSubscribe, "monitor.subscribe", "ceptop", "the monitor message sub server subscribe info")
fs.BoolVar(&s.Conf.ClusterMode, "cluster", true, "Whether open cluster mode")
fs.StringVar(&s.Conf.Cluster.Discover.InstanceIP, "cluster.instance.ip", "", "The current instance IP in the cluster can be communications.")
fs.StringVar(&s.Conf.Cluster.Discover.Type, "discover.type", "etcd", "the instance in cluster auto discover way.")
fs.StringSliceVar(&s.Conf.Cluster.Discover.EtcdAddr, "discover.etcd.addr", []string{"http://127.0.0.1:2379"}, "set all etcd server addr in cluster for message instence auto discover.")
2019-12-24 22:41:19 +08:00
fs.StringVar(&s.Conf.Cluster.Discover.EtcdCaFile, "discover.etcd.ca", "", "verify etcd certificates of TLS-enabled secure servers using this CA bundle")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdCertFile, "discover.etcd.cert", "", "identify secure etcd client using this TLS certificate file")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdKeyFile, "discover.etcd.key", "", "identify secure etcd client using this TLS key file")
2017-11-07 11:40:44 +08:00
fs.StringVar(&s.Conf.Cluster.Discover.HomePath, "discover.etcd.homepath", "/event", "etcd home key")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdUser, "discover.etcd.user", "", "etcd server user info")
fs.StringVar(&s.Conf.Cluster.Discover.EtcdPass, "discover.etcd.pass", "", "etcd server user password")
fs.StringVar(&s.Conf.Cluster.PubSub.PubBindIP, "cluster.bind.ip", "0.0.0.0", "Cluster communication to listen the IP")
fs.IntVar(&s.Conf.Cluster.PubSub.PubBindPort, "cluster.bind.port", 6365, "Cluster communication to listen the Port")
fs.StringVar(&s.Conf.EventStore.MessageType, "message.type", "json", "Receive and transmit the log message type.")
fs.StringVar(&s.Conf.EventStore.GarbageMessageSaveType, "message.garbage.save", "file", "garbage message way of storage")
fs.StringVar(&s.Conf.EventStore.GarbageMessageFile, "message.garbage.file", "/var/log/envent_garbage_message.log", "save garbage message file path when save type is file")
fs.Int64Var(&s.Conf.EventStore.PeerEventMaxLogNumber, "message.max.number", 100000, "the max number log message for peer event")
fs.IntVar(&s.Conf.EventStore.PeerEventMaxCacheLogNumber, "message.cache.number", 256, "Maintain log the largest number in the memory peer event")
fs.Int64Var(&s.Conf.EventStore.PeerDockerMaxCacheLogNumber, "dockermessage.cache.number", 128, "Maintain log the largest number in the memory peer docker service")
2017-11-07 11:40:44 +08:00
fs.IntVar(&s.Conf.EventStore.HandleMessageCoreNumber, "message.handle.core.number", 2, "The number of concurrent processing receive log data.")
fs.IntVar(&s.Conf.EventStore.HandleSubMessageCoreNumber, "message.sub.handle.core.number", 3, "The number of concurrent processing receive log data. more than message.handle.core.number")
fs.IntVar(&s.Conf.EventStore.HandleDockerLogCoreNumber, "message.dockerlog.handle.core.number", 2, "The number of concurrent processing receive log data. more than message.handle.core.number")
fs.StringVar(&s.Conf.Log.LogLevel, "log.level", "info", "app log level")
fs.StringVar(&s.Conf.Log.LogOutType, "log.type", "stdout", "app log output type. stdout or file ")
fs.StringVar(&s.Conf.Log.LogPath, "log.path", "/var/log/", "app log output file path.it is effective when log.type=file")
fs.StringVar(&s.Conf.WebSocket.BindIP, "websocket.bind.ip", "0.0.0.0", "the bind ip of websocket for push event message")
fs.IntVar(&s.Conf.WebSocket.BindPort, "websocket.bind.port", 6363, "the bind port of websocket for push event message")
fs.IntVar(&s.Conf.WebSocket.SSLBindPort, "websocket.ssl.bind.port", 6364, "the ssl bind port of websocket for push event message")
fs.BoolVar(&s.Conf.WebSocket.EnableCompression, "websocket.compression", true, "weither enable compression for web socket")
fs.IntVar(&s.Conf.WebSocket.ReadBufferSize, "websocket.readbuffersize", 4096, "the readbuffersize of websocket for push event message")
fs.IntVar(&s.Conf.WebSocket.WriteBufferSize, "websocket.writebuffersize", 4096, "the writebuffersize of websocket for push event message")
fs.IntVar(&s.Conf.WebSocket.MaxRestartCount, "websocket.maxrestart", 5, "the max restart count of websocket for push event message")
fs.BoolVar(&s.Conf.WebSocket.SSL, "websocket.ssl", false, "whether to enable websocket SSL")
fs.StringVar(&s.Conf.WebSocket.CertFile, "websocket.certfile", "/etc/ssl/goodrain.com/goodrain.com.crt", "websocket ssl cert file")
fs.StringVar(&s.Conf.WebSocket.KeyFile, "websocket.keyfile", "/etc/ssl/goodrain.com/goodrain.com.key", "websocket ssl cert file")
fs.StringVar(&s.Conf.WebSocket.TimeOut, "websocket.timeout", "1m", "Keep websocket service the longest time when without message ")
fs.StringVar(&s.Conf.WebSocket.PrometheusMetricPath, "monitor-path", "/metrics", "promethesu monitor metrics path")
fs.StringVar(&s.Conf.EventStore.DB.Type, "db.type", "mysql", "Data persistence type.")
fs.StringVar(&s.Conf.EventStore.DB.URL, "db.url", "root:admin@tcp(127.0.0.1:3306)/event", "Data persistence db url.")
fs.IntVar(&s.Conf.EventStore.DB.PoolSize, "db.pool.size", 3, "Data persistence db pool init size.")
fs.IntVar(&s.Conf.EventStore.DB.PoolMaxSize, "db.pool.maxsize", 10, "Data persistence db pool max size.")
fs.StringVar(&s.Conf.EventStore.DB.HomePath, "docker.log.homepath", "/grdata/logs/", "container log persistent home path")
fs.StringVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerHost, "monitor.udp.host", "0.0.0.0", "receive new monitor udp server host")
fs.IntVar(&s.Conf.Entry.NewMonitorMessageServerConf.ListenerPort, "monitor.udp.port", 6166, "receive new monitor udp server port")
fs.StringVar(&s.Conf.Cluster.Discover.NodeIDFile, "nodeid-file", "/opt/rainbond/etc/node/node_host_uuid.conf", "the unique ID for this node. Just specify, don't modify")
2017-11-07 11:40:44 +08:00
}
//InitLog 初始化log
func (s *LogServer) InitLog() {
log := logrus.New()
if l, err := logrus.ParseLevel(s.Conf.Log.LogLevel); err == nil {
log.Level = l
} else {
logrus.Warning("log.level is not valid.will set it is 'info'")
}
switch s.Conf.Log.LogOutType {
case "stdout":
log.Out = os.Stdout
case "file":
file, err := os.Stat(s.Conf.Log.LogPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(s.Conf.Log.LogPath, os.ModeDir); err != nil {
logrus.Errorf("Create log dir error.%s,The log configuration does not take effect", err.Error())
}
}
}
if file.IsDir() {
file, err := os.OpenFile(path.Join(s.Conf.Log.LogPath, "event_log.log"), os.O_RDWR|os.O_WRONLY, 755)
if err != nil {
logrus.Errorf("Open log file error.%s,The log configuration does not take effect", err.Error())
} else {
log.Out = file
}
} else { //直接使用指定文件路径
file, err := os.OpenFile(s.Conf.Log.LogPath, os.O_RDWR|os.O_WRONLY, 755)
if err != nil {
logrus.Errorf("Open log file error.%s,The log configuration does not take effect", err.Error())
} else {
log.Out = file
}
}
}
log.Formatter = &logrus.TextFormatter{}
s.Logger = log
}
//InitConf 初始化配置
func (s *LogServer) InitConf() {
s.Conf.Cluster.Discover.ClusterMode = s.Conf.ClusterMode
s.Conf.Cluster.PubSub.ClusterMode = s.Conf.ClusterMode
s.Conf.EventStore.ClusterMode = s.Conf.ClusterMode
s.Conf.Cluster.Discover.DockerLogPort = s.Conf.Entry.DockerLogServer.BindPort
s.Conf.Cluster.Discover.WebPort = s.Conf.WebSocket.BindPort
if os.Getenv("MYSQL_HOST") != "" && os.Getenv("MYSQL_USER") != "" && os.Getenv("MYSQL_PASSWORD") != "" {
s.Conf.EventStore.DB.URL = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", os.Getenv("MYSQL_USER"), os.Getenv("MYSQL_PASSWORD"),
os.Getenv("MYSQL_HOST"), os.Getenv("MYSQL_PORT"), os.Getenv("MYSQL_DATABASE"))
}
if os.Getenv("CLUSTER_BIND_IP") != "" {
s.Conf.Cluster.PubSub.PubBindIP = os.Getenv("CLUSTER_BIND_IP")
}
}
//Run 执行
func (s *LogServer) Run() error {
s.Logger.Debug("Start run server.")
log := s.Logger
2020-02-03 23:59:11 +08:00
etcdClientArgs := &etcdutil.ClientArgs{
Endpoints: s.Conf.Cluster.Discover.EtcdAddr,
CaFile: s.Conf.Cluster.Discover.EtcdCaFile,
CertFile: s.Conf.Cluster.Discover.EtcdCertFile,
KeyFile: s.Conf.Cluster.Discover.EtcdKeyFile,
}
//init new db
if err := db.CreateDBManager(s.Conf.EventStore.DB); err != nil {
2017-11-16 18:13:28 +08:00
logrus.Infof("create db manager error, %v", err)
return err
}
2017-11-07 11:40:44 +08:00
storeManager, err := store.NewManager(s.Conf.EventStore, log.WithField("module", "MessageStore"))
if err != nil {
return err
}
healthInfo := storeManager.HealthCheck()
2017-11-07 11:40:44 +08:00
if err := storeManager.Run(); err != nil {
return err
}
defer storeManager.Stop()
if s.Conf.ClusterMode {
2020-02-03 23:59:11 +08:00
s.Cluster, err = cluster.NewCluster(etcdClientArgs, s.Conf.Cluster, log.WithField("module", "Cluster"), storeManager)
if err != nil {
return err
}
2017-11-07 11:40:44 +08:00
if err := s.Cluster.Start(); err != nil {
return err
}
defer s.Cluster.Stop()
}
2020-02-03 23:59:11 +08:00
s.SocketServer, err = web.NewSocket(s.Conf.WebSocket, s.Conf.Cluster.Discover, etcdClientArgs,
log.WithField("module", "SocketServer"), storeManager, s.Cluster, healthInfo)
if err != nil {
return err
}
2017-11-07 11:40:44 +08:00
if err := s.SocketServer.Run(); err != nil {
return err
}
defer s.SocketServer.Stop()
s.Entry = entry.NewEntry(s.Conf.Entry, log.WithField("module", "EntryServer"), storeManager)
if err := s.Entry.Start(); err != nil {
return err
}
defer s.Entry.Stop()
//服务注册
2019-12-24 22:41:19 +08:00
grpckeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "event_log_event_grpc",
2018-11-20 11:34:03 +08:00
s.Conf.Cluster.Discover.InstanceIP, s.Conf.Cluster.Discover.InstanceIP, s.Conf.Entry.EventLogServer.BindPort)
if err != nil {
return err
}
if err := grpckeepalive.Start(); err != nil {
return err
}
defer grpckeepalive.Stop()
2019-12-24 22:41:19 +08:00
udpkeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "event_log_event_udp",
s.Conf.Cluster.Discover.InstanceIP, s.Conf.Cluster.Discover.InstanceIP, s.Conf.Entry.NewMonitorMessageServerConf.ListenerPort)
if err != nil {
return err
}
if err := udpkeepalive.Start(); err != nil {
return err
}
defer udpkeepalive.Stop()
hostID, err := util.ReadHostID(s.Conf.Cluster.Discover.NodeIDFile)
if err != nil {
return err
}
var id string
if len(hostID) < 12 {
id = hostID
} else {
id = hostID[len(hostID)-12:]
}
2019-12-24 22:41:19 +08:00
httpkeepalive, err := discover.CreateKeepAlive(etcdClientArgs, "event_log_event_http",
id, s.Conf.Cluster.Discover.InstanceIP, s.Conf.WebSocket.BindPort)
if err != nil {
return err
}
if err := httpkeepalive.Start(); err != nil {
return err
}
defer httpkeepalive.Stop()
2017-11-07 11:40:44 +08:00
term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
select {
case <-term:
log.Warn("Received SIGTERM, exiting gracefully...")
case err := <-s.SocketServer.ListenError():
log.Errorln("Error listen web socket server, exiting gracefully:", err)
case err := <-storeManager.Error():
log.Errorln("Store receive a error, exiting gracefully:", err)
}
log.Info("See you next time!")
return nil
}