[REV] change event log save to file

This commit is contained in:
barnettZQG 2018-08-31 12:19:27 +08:00
parent 6740a85cc1
commit 77ff96fb5c
7 changed files with 289 additions and 63 deletions

View File

@ -26,8 +26,6 @@ import (
"io"
"io/ioutil"
"os"
"strings"
"time"
"github.com/goodrain/rainbond/util"
@ -36,22 +34,25 @@ import (
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/client"
"github.com/pquerna/ffjson/ffjson"
"os/exec"
"github.com/goodrain/rainbond/db"
eventdb "github.com/goodrain/rainbond/eventlog/db"
)
//LogAction log action struct
type LogAction struct {
EtcdEndpoints []string
eventdb *eventdb.EventFilePlugin
}
//CreateLogManager get log manager
func CreateLogManager(etcdEndpoint []string) *LogAction {
return &LogAction{
EtcdEndpoints: etcdEndpoint,
eventdb: &eventdb.EventFilePlugin{
HomePath: "/grdata/logs/",
},
}
}
@ -131,49 +132,13 @@ func (l *LogAction) GetLogInstance(serviceID string) (string, error) {
//GetLevelLog 获取指定操作的操作日志
func (l *LogAction) GetLevelLog(eventID string, level string) (*api_model.DataLog, error) {
messages, err := db.GetManager().EventLogDao().GetEventLogMessages(eventID)
messageList, err := l.eventdb.GetMessages(eventID, level)
if err != nil {
return nil, err
}
var d []api_model.MessageData
timeLayout := "2006-01-02T15:04:05+08:00"
loc, _ := time.LoadLocation("Local")
for _, v := range messages {
log, err := decompress(v.Message)
if err != nil {
return nil, err
}
var mlogs []api_model.MsgStruct
if err := ffjson.Unmarshal(log, &mlogs); err != nil {
return nil, err
}
for _, msg := range mlogs {
if checkLevel(level, msg.Level) {
//兼容 {system worker worker已收到异步任务。 info 2017-09-29T10:02:44.703640}"
if strings.Contains(msg.Time, ".") {
//msg.Time = strings.Split(msg.Time, ".")[0]
timeLayout = "2006-01-02T15:04:05"
} else {
timeLayout = "2006-01-02T15:04:05+08:00"
}
utime, err := time.ParseInLocation(timeLayout, msg.Time, loc)
if err != nil {
return nil, err
}
md := api_model.MessageData{
Message: msg.Message,
Time: msg.Time,
Unixtime: utime.Unix(),
}
d = append(d, md)
}
}
}
//耗时
d = bubSort(d)
return &api_model.DataLog{
Status: "success",
Data: d,
Data: messageList,
}, nil
}

View File

@ -18,6 +18,10 @@
package model
import (
eventdb "github.com/goodrain/rainbond/eventlog/db"
)
//LogData log data
type LogData struct {
num int
@ -34,7 +38,7 @@ type MessageData struct {
//DataLog 获取指定操作的操作日志
type DataLog struct {
Status string
Data []MessageData
Data eventdb.MessageDataList
}
//LogByLevelStruct GetLogByLevelStruct

View File

@ -0,0 +1,172 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package db
import (
"bufio"
"fmt"
"io"
"os"
"path"
"sort"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/util"
)
type EventFilePlugin struct {
HomePath string
}
func (m *EventFilePlugin) SaveMessage(events []*EventLogMessage) error {
if len(events) == 0 {
return nil
}
key := events[0].EventID
dirpath := path.Join(m.HomePath, "eventlog")
if err := util.CheckAndCreateDir(dirpath); err != nil {
return err
}
apath := path.Join(m.HomePath, "eventlog", key+".log")
writeFile, err := os.OpenFile(apath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0755)
if err != nil {
return err
}
defer writeFile.Close()
var lastTime int64
for _, e := range events {
writeFile.Write(GetLevelFlag(e.Level))
logtime := GetTimeUnix(e.Time)
if logtime != 0 {
lastTime = logtime
}
writeFile.Write([]byte(fmt.Sprintf("%d ", lastTime)))
writeFile.Write([]byte(e.Message))
writeFile.Write([]byte("\n"))
}
return err
}
//MessageData message data 获取指定操作的操作日志
type MessageData struct {
Message string `json:"message"`
Time string `json:"time"`
Unixtime int64 `json:"utime"`
}
//MessageDataList MessageDataList
type MessageDataList []MessageData
func (a MessageDataList) Len() int { return len(a) }
func (a MessageDataList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MessageDataList) Less(i, j int) bool { return a[i].Unixtime < a[j].Unixtime }
func (m *EventFilePlugin) GetMessages(eventID, level string) (MessageDataList, error) {
apath := path.Join(m.HomePath, "eventlog", eventID+".log")
eventFile, err := os.Open(apath)
if err != nil {
return nil, err
}
defer eventFile.Close()
reader := bufio.NewReader(eventFile)
var message MessageDataList
for {
line, _, err := reader.ReadLine()
if err != nil {
if err != io.EOF {
logrus.Error("read event log file error:", err.Error())
}
break
}
if len(line) > 2 {
flag := line[0]
if CheckLevel(string(flag), level) {
info := strings.SplitN(string(line), " ", 3)
fmt.Println(info)
if len(info) == 3 {
timeunix := info[1]
unix, _ := strconv.ParseInt(timeunix, 10, 64)
tm := time.Unix(unix, 0)
md := MessageData{
Message: info[2],
Unixtime: unix,
Time: tm.Format(time.RFC3339),
}
message = append(message, md)
}
}
}
}
sort.Sort(sort.Reverse(message))
return message, nil
}
//CheckLevel check log level
func CheckLevel(flag, level string) bool {
switch flag {
case "0":
return true
case "1":
if level != "error" {
return true
}
case "2":
if level == "debug" {
return true
}
}
return false
}
//GetTimeUnix get specified time unix
func GetTimeUnix(timeStr string) int64 {
var timeLayout string
if strings.Contains(timeStr, ".") {
timeLayout = "2006-01-02T15:04:05"
} else {
timeLayout = "2006-01-02T15:04:05+08:00"
}
loc, _ := time.LoadLocation("Local")
utime, err := time.ParseInLocation(timeLayout, timeStr, loc)
if err != nil {
logrus.Errorf("Parse log time error %s", err.Error())
return 0
}
return utime.Unix()
}
//GetLevelFlag get log level flag
func GetLevelFlag(level string) []byte {
switch level {
case "error":
return []byte("0 ")
case "info":
return []byte("1 ")
case "debug":
return []byte("2 ")
default:
return []byte("0 ")
}
}
func (m *EventFilePlugin) Close() error {
return nil
}

View File

@ -0,0 +1,75 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package db
import (
"testing"
"time"
)
func TestEventFileSaveMessage(t *testing.T) {
eventFilePlugin := EventFilePlugin{
homePath: "/tmp",
}
if err := eventFilePlugin.SaveMessage([]*EventLogMessage{
&EventLogMessage{
EventID: "eventidsadasd",
Level: "info",
Message: "1ajdsnadskfasndjn afnasdfnln asdfjnajksndfjk",
Time: time.Now().Format(time.RFC3339),
},
&EventLogMessage{
EventID: "eventidsadasd",
Level: "debug",
Message: "2ajdsnadskfasndjn afnasdfnln asdfjnajksndfjk",
Time: time.Now().Format(time.RFC3339),
},
&EventLogMessage{
EventID: "eventidsadasd",
Level: "error",
Message: "3ajdsnadskfasndjn afnasdfnln asdfjnajksndfjk",
Time: time.Now().Format(time.RFC3339),
},
&EventLogMessage{
EventID: "eventidsadasd",
Level: "debug",
Message: "4ajdsnadskfasndjn afnasdfnln asdfjnajksndfjk",
Time: time.Now().Add(time.Hour).Format(time.RFC3339),
},
&EventLogMessage{
EventID: "eventidsadasd",
Level: "info",
Message: "5ajdsnadskfasndjn afnasdfnln asdfjnajksndfjk",
Time: time.Now().Format(time.RFC3339),
},
}); err != nil {
t.Fatal(err)
}
}
func TestGetMessage(t *testing.T) {
eventFilePlugin := eventFilePlugin{
homePath: "/tmp",
}
list, err := eventFilePlugin.GetMessages("eventidsadasd", "debug")
if err != nil {
t.Fatal(err)
}
t.Log(list)
}

View File

@ -43,6 +43,10 @@ func NewManager(conf conf.DBConf, log *logrus.Entry) (Manager, error) {
return nil, err
}
return cdb, nil
case "eventfile":
return &EventFilePlugin{
HomePath: conf.HomePath,
}, nil
default:
my := &mysqlPlugin{
conf: conf,

View File

@ -140,12 +140,12 @@ func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) {
func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
for _, m := range r.barrel {
select {
case ch <- m:
default:
}
}
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -221,12 +221,13 @@ func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) {
func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
for _, m := range r.barrel {
select {
case ch <- m:
default:
}
}
// send cache log will cause user illusion
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}
@ -309,12 +310,12 @@ func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) {
func (r *monitorMessageBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
for _, m := range r.barrel {
select {
case ch <- m:
default:
}
}
// for _, m := range r.barrel {
// select {
// case ch <- m:
// default:
// }
// }
r.subSocketChan[subID] = ch
}

View File

@ -68,8 +68,12 @@ type Manager interface {
//NewManager 存储管理器
func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) {
ctx, cancel := context.WithCancel(context.Background())
// event log do not save in db,will save in file
// dbPlugin, err := db.NewManager(conf.DB, log)
// if err != nil {
// return nil, err
// }
conf.DB.Type = "eventfile"
dbPlugin, err := db.NewManager(conf.DB, log)
if err != nil {
return nil, err
@ -79,6 +83,7 @@ func NewManager(conf conf.EventStoreConf, log *logrus.Entry) (Manager, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
storeManager := &storeManager{
cancel: cancel,
context: ctx,