Rainbond/event/manager.go
dp 447ab85000
fix(chaos): Large number of log newline issues (#1215)
* fix(chaos): docker build error log lost & use host network

* add commnet on ImageBuildNetworkModeHost

* fix(chaos): Large number of log newline issues

* fix(chaos): Large number of log newline issues
2022-07-07 18:51:24 +08:00

520 lines
12 KiB
Go

// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package event
import (
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/goodrain/rainbond/discover"
"github.com/goodrain/rainbond/discover/config"
eventclient "github.com/goodrain/rainbond/eventlog/entry/grpc/client"
eventpb "github.com/goodrain/rainbond/eventlog/entry/grpc/pb"
"github.com/goodrain/rainbond/util"
etcdutil "github.com/goodrain/rainbond/util/etcd"
"github.com/pquerna/ffjson/ffjson"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
//Manager 操作日志,客户端服务
//客户端负载均衡
type Manager interface {
GetLogger(eventID string) Logger
Start() error
Close() error
ReleaseLogger(Logger)
}
// EventConfig event config struct
type EventConfig struct {
EventLogServers []string
DiscoverArgs *etcdutil.ClientArgs
}
type manager struct {
ctx context.Context
cancel context.CancelFunc
config EventConfig
qos int32
loggers map[string]Logger
handles map[string]handle
lock sync.Mutex
eventServer []string
abnormalServer map[string]string
dis discover.Discover
}
var defaultManager Manager
const (
//REQUESTTIMEOUT time out
REQUESTTIMEOUT = 1000 * time.Millisecond
//MAXRETRIES 重试
MAXRETRIES = 3 // Before we abandon
buffersize = 1000
)
//NewManager 创建manager
func NewManager(conf EventConfig) error {
dis, err := discover.GetDiscover(config.DiscoverConfig{EtcdClientArgs: conf.DiscoverArgs})
if err != nil {
logrus.Error("create discover manager error.", err.Error())
if len(conf.EventLogServers) < 1 {
return err
}
}
ctx, cancel := context.WithCancel(context.Background())
defaultManager = &manager{
ctx: ctx,
cancel: cancel,
config: conf,
loggers: make(map[string]Logger, 1024),
handles: make(map[string]handle),
eventServer: conf.EventLogServers,
dis: dis,
abnormalServer: make(map[string]string),
}
return defaultManager.Start()
}
//GetManager 获取日志服务
func GetManager() Manager {
return defaultManager
}
// NewTestManager -
func NewTestManager(m Manager) {
defaultManager = m
}
//CloseManager 关闭日志服务
func CloseManager() {
if defaultManager != nil {
defaultManager.Close()
}
}
func (m *manager) Start() error {
m.lock.Lock()
defer m.lock.Unlock()
for i := 0; i < len(m.eventServer); i++ {
h := handle{
cacheChan: make(chan []byte, buffersize),
stop: make(chan struct{}),
server: m.eventServer[i],
manager: m,
ctx: m.ctx,
}
m.handles[m.eventServer[i]] = h
go h.HandleLog()
}
if m.dis != nil {
m.dis.AddProject("event_log_event_grpc", m)
}
go m.GC()
return nil
}
func (m *manager) UpdateEndpoints(endpoints ...*config.Endpoint) {
m.lock.Lock()
defer m.lock.Unlock()
if endpoints == nil || len(endpoints) < 1 {
return
}
//清空不可用节点信息,以服务发现为主
m.abnormalServer = make(map[string]string)
//增加新节点
var new = make(map[string]string)
for _, end := range endpoints {
new[end.URL] = end.URL
if _, ok := m.handles[end.URL]; !ok {
h := handle{
cacheChan: make(chan []byte, buffersize),
stop: make(chan struct{}),
server: end.URL,
manager: m,
ctx: m.ctx,
}
m.handles[end.URL] = h
logrus.Infof("Add event server endpoint,%s", end.URL)
go h.HandleLog()
}
}
//删除旧节点
for k := range m.handles {
if _, ok := new[k]; !ok {
delete(m.handles, k)
logrus.Infof("Remove event server endpoint,%s", k)
}
}
var eventServer []string
for k := range new {
eventServer = append(eventServer, k)
}
m.eventServer = eventServer
logrus.Debugf("update event handle core success,handle core count:%d, event server count:%d", len(m.handles), len(m.eventServer))
}
func (m *manager) Error(err error) {
}
func (m *manager) Close() error {
m.cancel()
if m.dis != nil {
m.dis.Stop()
}
return nil
}
func (m *manager) GC() {
util.IntermittentExec(m.ctx, func() {
m.lock.Lock()
defer m.lock.Unlock()
var needRelease []string
for k, l := range m.loggers {
//1min 未release ,自动gc
if l.CreateTime().Add(time.Minute).Before(time.Now()) {
needRelease = append(needRelease, k)
}
}
if len(needRelease) > 0 {
for _, event := range needRelease {
logrus.Infof("start auto release event logger. %s", event)
delete(m.loggers, event)
}
}
}, time.Second*20)
}
//GetLogger
//使用完成后必须调用ReleaseLogger方法
func (m *manager) GetLogger(eventID string) Logger {
m.lock.Lock()
defer m.lock.Unlock()
if eventID == " " || len(eventID) == 0 {
eventID = "system"
}
if l, ok := m.loggers[eventID]; ok {
return l
}
l := NewLogger(eventID, m.getLBChan())
m.loggers[eventID] = l
return l
}
func (m *manager) ReleaseLogger(l Logger) {
m.lock.Lock()
defer m.lock.Unlock()
if l, ok := m.loggers[l.Event()]; ok {
delete(m.loggers, l.Event())
}
}
type handle struct {
server string
stop chan struct{}
cacheChan chan []byte
ctx context.Context
manager *manager
}
func (m *manager) DiscardedLoggerChan(cacheChan chan []byte) {
m.lock.Lock()
defer m.lock.Unlock()
for k, v := range m.handles {
if v.cacheChan == cacheChan {
logrus.Warnf("event server %s can not link, will ignore it.", k)
m.abnormalServer[k] = k
}
}
for _, v := range m.loggers {
if v.GetChan() == cacheChan {
v.SetChan(m.getLBChan())
}
}
}
func (m *manager) getLBChan() chan []byte {
for i := 0; i < len(m.eventServer); i++ {
index := m.qos % int32(len(m.eventServer))
m.qos = atomic.AddInt32(&(m.qos), 1)
server := m.eventServer[index]
if _, ok := m.abnormalServer[server]; ok {
logrus.Warnf("server[%s] is abnormal, skip it", server)
continue
}
if h, ok := m.handles[server]; ok {
return h.cacheChan
}
h := handle{
cacheChan: make(chan []byte, buffersize),
stop: make(chan struct{}),
server: server,
manager: m,
ctx: m.ctx,
}
m.handles[server] = h
go h.HandleLog()
return h.cacheChan
}
//not select, return first handle chan
for _, v := range m.handles {
return v.cacheChan
}
return nil
}
func (m *manager) RemoveHandle(server string) {
m.lock.Lock()
defer m.lock.Unlock()
if _, ok := m.handles[server]; ok {
delete(m.handles, server)
}
}
func (m *handle) HandleLog() error {
defer m.manager.RemoveHandle(m.server)
return util.Exec(m.ctx, func() error {
ctx, cancel := context.WithCancel(m.ctx)
defer cancel()
client, err := eventclient.NewEventClient(ctx, m.server)
if err != nil {
logrus.Error("create event client error.", err.Error())
return err
}
logrus.Infof("start a event log handle core. connect server %s", m.server)
logClient, err := client.Log(ctx)
if err != nil {
logrus.Error("create event log client error.", err.Error())
//切换使用此chan的logger到其他chan
m.manager.DiscardedLoggerChan(m.cacheChan)
return err
}
for {
select {
case <-m.ctx.Done():
logClient.CloseSend()
return nil
case <-m.stop:
logClient.CloseSend()
return nil
case me := <-m.cacheChan:
err := logClient.Send(&eventpb.LogMessage{Log: me})
if err != nil {
logrus.Error("send event log error.", err.Error())
logClient.CloseSend()
//切换使用此chan的logger到其他chan
m.manager.DiscardedLoggerChan(m.cacheChan)
return nil
}
}
}
}, time.Second*3)
}
func (m *handle) Stop() {
close(m.stop)
}
//Logger 日志发送器
type Logger interface {
Info(string, map[string]string)
Error(string, map[string]string)
Debug(string, map[string]string)
Event() string
CreateTime() time.Time
GetChan() chan []byte
SetChan(chan []byte)
GetWriter(step, level string) LoggerWriter
}
// NewLogger creates a new Logger.
func NewLogger(eventID string, sendCh chan []byte) Logger {
return &logger{
event: eventID,
sendChan: sendCh,
createTime: time.Now(),
}
}
type logger struct {
event string
sendChan chan []byte
createTime time.Time
}
func (l *logger) GetChan() chan []byte {
return l.sendChan
}
func (l *logger) SetChan(ch chan []byte) {
l.sendChan = ch
}
func (l *logger) Event() string {
return l.event
}
func (l *logger) CreateTime() time.Time {
return l.createTime
}
func (l *logger) Info(message string, info map[string]string) {
if info == nil {
info = make(map[string]string)
}
info["level"] = "info"
l.send(message, info)
}
func (l *logger) Error(message string, info map[string]string) {
if info == nil {
info = make(map[string]string)
}
info["level"] = "error"
l.send(message, info)
}
func (l *logger) Debug(message string, info map[string]string) {
if info == nil {
info = make(map[string]string)
}
info["level"] = "debug"
l.send(message, info)
}
func (l *logger) send(message string, info map[string]string) {
info["event_id"] = l.event
info["message"] = message
info["time"] = time.Now().Format(time.RFC3339)
log, err := ffjson.Marshal(info)
if err == nil && l.sendChan != nil {
util.SendNoBlocking(log, l.sendChan)
}
}
//LoggerWriter logger writer
type LoggerWriter interface {
io.Writer
SetFormat(map[string]interface{})
}
func (l *logger) GetWriter(step, level string) LoggerWriter {
return &loggerWriter{
l: l,
step: step,
level: level,
}
}
type loggerWriter struct {
l *logger
step string
level string
fmt map[string]interface{}
tmp []byte
lastMessage string
}
func (l *loggerWriter) SetFormat(f map[string]interface{}) {
l.fmt = f
}
func (l *loggerWriter) Write(b []byte) (n int, err error) {
if len(b) > 0 {
if !strings.HasSuffix(string(b), "\n") {
l.tmp = append(l.tmp, b...)
return len(b), nil
}
var message string
if len(l.tmp) > 0 {
message = string(append(l.tmp, b...))
l.tmp = l.tmp[:0]
} else {
message = string(b)
}
// if loggerWriter has format, and then use it format message
if len(l.fmt) > 0 {
newLineMap := make(map[string]interface{}, len(l.fmt))
for k, v := range l.fmt {
if v == "%s" {
newLineMap[k] = fmt.Sprintf(v.(string), message)
} else {
newLineMap[k] = v
}
}
messageb, _ := ffjson.Marshal(newLineMap)
message = string(messageb)
}
if l.step == "build-progress" {
if strings.HasPrefix(message, "Progress ") && strings.HasPrefix(l.lastMessage, "Progress ") {
l.lastMessage = message
return len(b), nil
}
// send last message
if !strings.HasPrefix(message, "Progress ") && strings.HasPrefix(l.lastMessage, "Progress ") {
l.l.send(message, map[string]string{"step": l.lastMessage, "level": l.level})
}
}
l.l.send(message, map[string]string{"step": l.step, "level": l.level})
l.lastMessage = message
}
return len(b), nil
}
//GetTestLogger GetTestLogger
func GetTestLogger() Logger {
return &testLogger{}
}
type testLogger struct {
}
func (l *testLogger) GetChan() chan []byte {
return nil
}
func (l *testLogger) SetChan(ch chan []byte) {
}
func (l *testLogger) Event() string {
return "test"
}
func (l *testLogger) CreateTime() time.Time {
return time.Now()
}
func (l *testLogger) Info(message string, info map[string]string) {
fmt.Println("info:", message)
}
func (l *testLogger) Error(message string, info map[string]string) {
fmt.Println("error:", message)
}
func (l *testLogger) Debug(message string, info map[string]string) {
fmt.Println("debug:", message)
}
type testLoggerWriter struct {
}
func (l *testLoggerWriter) SetFormat(f map[string]interface{}) {
}
func (l *testLoggerWriter) Write(b []byte) (n int, err error) {
return os.Stdout.Write(b)
}
func (l *testLogger) GetWriter(step, level string) LoggerWriter {
return &testLoggerWriter{}
}