mirror of
https://gitee.com/rainbond/Rainbond.git
synced 2024-12-05 13:17:42 +08:00
519 lines
12 KiB
Go
519 lines
12 KiB
Go
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
|
|
// RAINBOND, Application Management Platform
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version. For any non-GPL usage of Rainbond,
|
|
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
|
|
// must be obtained first.
|
|
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package event
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/goodrain/rainbond/discover"
|
|
"github.com/goodrain/rainbond/discover/config"
|
|
eventclient "github.com/goodrain/rainbond/eventlog/entry/grpc/client"
|
|
eventpb "github.com/goodrain/rainbond/eventlog/entry/grpc/pb"
|
|
"github.com/goodrain/rainbond/util"
|
|
etcdutil "github.com/goodrain/rainbond/util/etcd"
|
|
"github.com/pquerna/ffjson/ffjson"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
//Manager 操作日志,客户端服务
|
|
//客户端负载均衡
|
|
type Manager interface {
|
|
GetLogger(eventID string) Logger
|
|
Start() error
|
|
Close() error
|
|
ReleaseLogger(Logger)
|
|
}
|
|
|
|
// EventConfig event config struct
|
|
type EventConfig struct {
|
|
EventLogServers []string
|
|
DiscoverArgs *etcdutil.ClientArgs
|
|
}
|
|
type manager struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
config EventConfig
|
|
qos int32
|
|
loggers map[string]Logger
|
|
handles map[string]handle
|
|
lock sync.Mutex
|
|
eventServer []string
|
|
abnormalServer map[string]string
|
|
dis discover.Discover
|
|
}
|
|
|
|
var defaultManager Manager
|
|
|
|
const (
|
|
//REQUESTTIMEOUT time out
|
|
REQUESTTIMEOUT = 1000 * time.Millisecond
|
|
//MAXRETRIES 重试
|
|
MAXRETRIES = 3 // Before we abandon
|
|
buffersize = 1000
|
|
)
|
|
|
|
//NewManager 创建manager
|
|
func NewManager(conf EventConfig) error {
|
|
dis, err := discover.GetDiscover(config.DiscoverConfig{EtcdClientArgs: conf.DiscoverArgs})
|
|
if err != nil {
|
|
logrus.Error("create discover manager error.", err.Error())
|
|
if len(conf.EventLogServers) < 1 {
|
|
return err
|
|
}
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defaultManager = &manager{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
config: conf,
|
|
loggers: make(map[string]Logger, 1024),
|
|
handles: make(map[string]handle),
|
|
eventServer: conf.EventLogServers,
|
|
dis: dis,
|
|
abnormalServer: make(map[string]string),
|
|
}
|
|
return defaultManager.Start()
|
|
}
|
|
|
|
//GetManager 获取日志服务
|
|
func GetManager() Manager {
|
|
return defaultManager
|
|
}
|
|
|
|
// NewTestManager -
|
|
func NewTestManager(m Manager) {
|
|
defaultManager = m
|
|
}
|
|
|
|
//CloseManager 关闭日志服务
|
|
func CloseManager() {
|
|
if defaultManager != nil {
|
|
defaultManager.Close()
|
|
}
|
|
}
|
|
|
|
func (m *manager) Start() error {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
for i := 0; i < len(m.eventServer); i++ {
|
|
h := handle{
|
|
cacheChan: make(chan []byte, buffersize),
|
|
stop: make(chan struct{}),
|
|
server: m.eventServer[i],
|
|
manager: m,
|
|
ctx: m.ctx,
|
|
}
|
|
m.handles[m.eventServer[i]] = h
|
|
go h.HandleLog()
|
|
}
|
|
if m.dis != nil {
|
|
m.dis.AddProject("event_log_event_grpc", m)
|
|
}
|
|
go m.GC()
|
|
return nil
|
|
}
|
|
|
|
func (m *manager) UpdateEndpoints(endpoints ...*config.Endpoint) {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
if endpoints == nil || len(endpoints) < 1 {
|
|
return
|
|
}
|
|
//清空不可用节点信息,以服务发现为主
|
|
m.abnormalServer = make(map[string]string)
|
|
//增加新节点
|
|
var new = make(map[string]string)
|
|
for _, end := range endpoints {
|
|
new[end.URL] = end.URL
|
|
if _, ok := m.handles[end.URL]; !ok {
|
|
h := handle{
|
|
cacheChan: make(chan []byte, buffersize),
|
|
stop: make(chan struct{}),
|
|
server: end.URL,
|
|
manager: m,
|
|
ctx: m.ctx,
|
|
}
|
|
m.handles[end.URL] = h
|
|
logrus.Infof("Add event server endpoint,%s", end.URL)
|
|
go h.HandleLog()
|
|
}
|
|
}
|
|
//删除旧节点
|
|
for k := range m.handles {
|
|
if _, ok := new[k]; !ok {
|
|
delete(m.handles, k)
|
|
logrus.Infof("Remove event server endpoint,%s", k)
|
|
}
|
|
}
|
|
var eventServer []string
|
|
for k := range new {
|
|
eventServer = append(eventServer, k)
|
|
}
|
|
m.eventServer = eventServer
|
|
logrus.Debugf("update event handle core success,handle core count:%d, event server count:%d", len(m.handles), len(m.eventServer))
|
|
}
|
|
|
|
func (m *manager) Error(err error) {
|
|
|
|
}
|
|
func (m *manager) Close() error {
|
|
m.cancel()
|
|
if m.dis != nil {
|
|
m.dis.Stop()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *manager) GC() {
|
|
util.IntermittentExec(m.ctx, func() {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
var needRelease []string
|
|
for k, l := range m.loggers {
|
|
//1min 未release ,自动gc
|
|
if l.CreateTime().Add(time.Minute).Before(time.Now()) {
|
|
needRelease = append(needRelease, k)
|
|
}
|
|
}
|
|
if len(needRelease) > 0 {
|
|
for _, event := range needRelease {
|
|
logrus.Infof("start auto release event logger. %s", event)
|
|
delete(m.loggers, event)
|
|
}
|
|
}
|
|
}, time.Second*20)
|
|
}
|
|
|
|
//GetLogger
|
|
//使用完成后必须调用ReleaseLogger方法
|
|
func (m *manager) GetLogger(eventID string) Logger {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
if eventID == " " || len(eventID) == 0 {
|
|
eventID = "system"
|
|
}
|
|
if l, ok := m.loggers[eventID]; ok {
|
|
return l
|
|
}
|
|
l := NewLogger(eventID, m.getLBChan())
|
|
m.loggers[eventID] = l
|
|
return l
|
|
}
|
|
|
|
func (m *manager) ReleaseLogger(l Logger) {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
if l, ok := m.loggers[l.Event()]; ok {
|
|
delete(m.loggers, l.Event())
|
|
}
|
|
}
|
|
|
|
type handle struct {
|
|
server string
|
|
stop chan struct{}
|
|
cacheChan chan []byte
|
|
ctx context.Context
|
|
manager *manager
|
|
}
|
|
|
|
func (m *manager) DiscardedLoggerChan(cacheChan chan []byte) {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
for k, v := range m.handles {
|
|
if v.cacheChan == cacheChan {
|
|
logrus.Warnf("event server %s can not link, will ignore it.", k)
|
|
m.abnormalServer[k] = k
|
|
}
|
|
}
|
|
for _, v := range m.loggers {
|
|
if v.GetChan() == cacheChan {
|
|
v.SetChan(m.getLBChan())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *manager) getLBChan() chan []byte {
|
|
for i := 0; i < len(m.eventServer); i++ {
|
|
index := m.qos % int32(len(m.eventServer))
|
|
m.qos = atomic.AddInt32(&(m.qos), 1)
|
|
server := m.eventServer[index]
|
|
if _, ok := m.abnormalServer[server]; ok {
|
|
logrus.Warnf("server[%s] is abnormal, skip it", server)
|
|
continue
|
|
}
|
|
if h, ok := m.handles[server]; ok {
|
|
return h.cacheChan
|
|
}
|
|
h := handle{
|
|
cacheChan: make(chan []byte, buffersize),
|
|
stop: make(chan struct{}),
|
|
server: server,
|
|
manager: m,
|
|
ctx: m.ctx,
|
|
}
|
|
m.handles[server] = h
|
|
go h.HandleLog()
|
|
return h.cacheChan
|
|
}
|
|
//not select, return first handle chan
|
|
for _, v := range m.handles {
|
|
return v.cacheChan
|
|
}
|
|
return nil
|
|
}
|
|
func (m *manager) RemoveHandle(server string) {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
if _, ok := m.handles[server]; ok {
|
|
delete(m.handles, server)
|
|
}
|
|
}
|
|
func (m *handle) HandleLog() error {
|
|
defer m.manager.RemoveHandle(m.server)
|
|
return util.Exec(m.ctx, func() error {
|
|
ctx, cancel := context.WithCancel(m.ctx)
|
|
defer cancel()
|
|
client, err := eventclient.NewEventClient(ctx, m.server)
|
|
if err != nil {
|
|
logrus.Error("create event client error.", err.Error())
|
|
return err
|
|
}
|
|
logrus.Infof("start a event log handle core. connect server %s", m.server)
|
|
logClient, err := client.Log(ctx)
|
|
if err != nil {
|
|
logrus.Error("create event log client error.", err.Error())
|
|
//切换使用此chan的logger到其他chan
|
|
m.manager.DiscardedLoggerChan(m.cacheChan)
|
|
return err
|
|
}
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
logClient.CloseSend()
|
|
return nil
|
|
case <-m.stop:
|
|
logClient.CloseSend()
|
|
return nil
|
|
case me := <-m.cacheChan:
|
|
err := logClient.Send(&eventpb.LogMessage{Log: me})
|
|
if err != nil {
|
|
logrus.Error("send event log error.", err.Error())
|
|
logClient.CloseSend()
|
|
//切换使用此chan的logger到其他chan
|
|
m.manager.DiscardedLoggerChan(m.cacheChan)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}, time.Second*3)
|
|
}
|
|
|
|
func (m *handle) Stop() {
|
|
close(m.stop)
|
|
}
|
|
|
|
//Logger 日志发送器
|
|
type Logger interface {
|
|
Info(string, map[string]string)
|
|
Error(string, map[string]string)
|
|
Debug(string, map[string]string)
|
|
Event() string
|
|
CreateTime() time.Time
|
|
GetChan() chan []byte
|
|
SetChan(chan []byte)
|
|
GetWriter(step, level string) LoggerWriter
|
|
}
|
|
|
|
// NewLogger creates a new Logger.
|
|
func NewLogger(eventID string, sendCh chan []byte) Logger {
|
|
return &logger{
|
|
event: eventID,
|
|
sendChan: sendCh,
|
|
createTime: time.Now(),
|
|
}
|
|
}
|
|
|
|
type logger struct {
|
|
event string
|
|
sendChan chan []byte
|
|
createTime time.Time
|
|
}
|
|
|
|
func (l *logger) GetChan() chan []byte {
|
|
return l.sendChan
|
|
}
|
|
func (l *logger) SetChan(ch chan []byte) {
|
|
l.sendChan = ch
|
|
}
|
|
func (l *logger) Event() string {
|
|
return l.event
|
|
}
|
|
func (l *logger) CreateTime() time.Time {
|
|
return l.createTime
|
|
}
|
|
func (l *logger) Info(message string, info map[string]string) {
|
|
if info == nil {
|
|
info = make(map[string]string)
|
|
}
|
|
info["level"] = "info"
|
|
l.send(message, info)
|
|
}
|
|
func (l *logger) Error(message string, info map[string]string) {
|
|
if info == nil {
|
|
info = make(map[string]string)
|
|
}
|
|
info["level"] = "error"
|
|
l.send(message, info)
|
|
}
|
|
func (l *logger) Debug(message string, info map[string]string) {
|
|
if info == nil {
|
|
info = make(map[string]string)
|
|
}
|
|
info["level"] = "debug"
|
|
l.send(message, info)
|
|
}
|
|
func (l *logger) send(message string, info map[string]string) {
|
|
info["event_id"] = l.event
|
|
info["message"] = message
|
|
info["time"] = time.Now().Format(time.RFC3339)
|
|
log, err := ffjson.Marshal(info)
|
|
if err == nil && l.sendChan != nil {
|
|
util.SendNoBlocking(log, l.sendChan)
|
|
}
|
|
}
|
|
|
|
//LoggerWriter logger writer
|
|
type LoggerWriter interface {
|
|
io.Writer
|
|
SetFormat(map[string]interface{})
|
|
}
|
|
|
|
func (l *logger) GetWriter(step, level string) LoggerWriter {
|
|
return &loggerWriter{
|
|
l: l,
|
|
step: step,
|
|
level: level,
|
|
}
|
|
}
|
|
|
|
type loggerWriter struct {
|
|
l *logger
|
|
step string
|
|
level string
|
|
fmt map[string]interface{}
|
|
tmp []byte
|
|
lastMessage string
|
|
}
|
|
|
|
func (l *loggerWriter) SetFormat(f map[string]interface{}) {
|
|
l.fmt = f
|
|
}
|
|
func (l *loggerWriter) Write(b []byte) (n int, err error) {
|
|
if b != nil && len(b) > 0 {
|
|
if !strings.HasSuffix(string(b), "\n") {
|
|
l.tmp = append(l.tmp, b...)
|
|
return len(b), nil
|
|
}
|
|
var message string
|
|
if len(l.tmp) > 0 {
|
|
message = string(append(l.tmp, b...))
|
|
l.tmp = l.tmp[:0]
|
|
} else {
|
|
message = string(b)
|
|
}
|
|
// if loggerWriter has format, and then use it format message
|
|
if len(l.fmt) > 0 {
|
|
newLineMap := make(map[string]interface{}, len(l.fmt))
|
|
for k, v := range l.fmt {
|
|
if v == "%s" {
|
|
newLineMap[k] = fmt.Sprintf(v.(string), message)
|
|
} else {
|
|
newLineMap[k] = v
|
|
}
|
|
}
|
|
messageb, _ := ffjson.Marshal(newLineMap)
|
|
message = string(messageb)
|
|
}
|
|
if l.step == "build-progress" {
|
|
if strings.HasPrefix(message, "Progress ") && strings.HasPrefix(l.lastMessage, "Progress ") {
|
|
l.lastMessage = message
|
|
return len(b), nil
|
|
}
|
|
// send last message
|
|
if !strings.HasPrefix(message, "Progress ") && strings.HasPrefix(l.lastMessage, "Progress ") {
|
|
l.l.send(message, map[string]string{"step": l.lastMessage, "level": l.level})
|
|
}
|
|
}
|
|
l.l.send(message, map[string]string{"step": l.step, "level": l.level})
|
|
l.lastMessage = message
|
|
}
|
|
return len(b), nil
|
|
}
|
|
|
|
//GetTestLogger GetTestLogger
|
|
func GetTestLogger() Logger {
|
|
return &testLogger{}
|
|
}
|
|
|
|
type testLogger struct {
|
|
}
|
|
|
|
func (l *testLogger) GetChan() chan []byte {
|
|
return nil
|
|
}
|
|
func (l *testLogger) SetChan(ch chan []byte) {
|
|
|
|
}
|
|
func (l *testLogger) Event() string {
|
|
return "test"
|
|
}
|
|
func (l *testLogger) CreateTime() time.Time {
|
|
return time.Now()
|
|
}
|
|
func (l *testLogger) Info(message string, info map[string]string) {
|
|
fmt.Println("info:", message)
|
|
}
|
|
func (l *testLogger) Error(message string, info map[string]string) {
|
|
fmt.Println("error:", message)
|
|
}
|
|
func (l *testLogger) Debug(message string, info map[string]string) {
|
|
fmt.Println("debug:", message)
|
|
}
|
|
|
|
type testLoggerWriter struct {
|
|
}
|
|
|
|
func (l *testLoggerWriter) SetFormat(f map[string]interface{}) {
|
|
|
|
}
|
|
func (l *testLoggerWriter) Write(b []byte) (n int, err error) {
|
|
return os.Stdout.Write(b)
|
|
}
|
|
|
|
func (l *testLogger) GetWriter(step, level string) LoggerWriter {
|
|
return &testLoggerWriter{}
|
|
}
|