// Copyright (C) 2014-2018 Goodrain Co., Ltd. // RAINBOND, Application Management Platform // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. For any non-GPL usage of Rainbond, // one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. // must be obtained first. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . package store import ( "errors" "sync" "time" "github.com/goodrain/rainbond/eventlog/db" "github.com/Sirupsen/logrus" ) //EventBarrel 事件桶 //不能在此结构上起协程 type EventBarrel struct { eventID string barrel []*db.EventLogMessage persistenceBarrel []*db.EventLogMessage needPersistence bool persistencelock sync.Mutex barrelEvent chan []string maxNumber int64 cacheNumber int size int64 isCallback bool updateTime time.Time } //insert 插入消息 func (e *EventBarrel) insert(m *db.EventLogMessage) error { if e.size > e.maxNumber { return errors.New("received message number more than peer event max message number") } if m.Step == "progress" { //进度日志不存储 return nil } e.barrel = append(e.barrel, m) e.size++ //消息数据总数 e.analysis(m) //同步分析 if len(e.barrel) >= e.cacheNumber { e.persistence() } e.updateTime = time.Now() return nil } //值归零,放回对象池 func (e *EventBarrel) empty() { e.size = 0 e.eventID = "" e.barrel = e.barrel[:0] e.persistenceBarrel = e.persistenceBarrel[:0] e.needPersistence = false e.isCallback = false } //analysis 实时分析 func (e *EventBarrel) analysis(newMessage *db.EventLogMessage) { if newMessage.Step == "last" || newMessage.Step == "callback" { e.persistence() e.barrelEvent <- []string{"callback", e.eventID, newMessage.Status, newMessage.Message} } if newMessage.Step == "code-version" { e.barrelEvent <- []string{"code-version", e.eventID, newMessage.Message} } } //persistence 持久化 func (e *EventBarrel) persistence() { e.persistencelock.Lock() defer e.persistencelock.Unlock() e.needPersistence = true e.persistenceBarrel = append(e.persistenceBarrel, e.barrel...) //数据转到持久化等候队列 e.barrel = e.barrel[:0] //缓存队列清空 select { case e.barrelEvent <- []string{"persistence", e.eventID}: //发出持久化命令 default: logrus.Debug("event message log persistence delay") } } //调用者加锁 func (e *EventBarrel) gcPersistence() { e.needPersistence = true e.persistenceBarrel = append(e.persistenceBarrel, e.barrel...) //数据转到持久化等候队列 e.barrel = nil } type readEventBarrel struct { barrel []*db.EventLogMessage subSocketChan map[string]chan *db.EventLogMessage subLock sync.Mutex updateTime time.Time } func (r *readEventBarrel) empty() { r.subLock.Lock() defer r.subLock.Unlock() if r.barrel != nil { r.barrel = r.barrel[:0] } //关闭订阅chan for _, ch := range r.subSocketChan { close(ch) } r.subSocketChan = make(map[string]chan *db.EventLogMessage) } func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) { r.barrel = append(r.barrel, message) r.updateTime = time.Now() r.subLock.Lock() defer r.subLock.Unlock() for _, v := range r.subSocketChan { //向订阅的通道发送消息 select { case v <- message: default: } } } func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { r.subLock.Lock() defer r.subLock.Unlock() r.subSocketChan[subID] = ch } //增加socket订阅 func (r *readEventBarrel) addSubChan(subID string) chan *db.EventLogMessage { r.subLock.Lock() defer r.subLock.Unlock() if sub, ok := r.subSocketChan[subID]; ok { return sub } ch := make(chan *db.EventLogMessage, 10) go r.pushCashMessage(ch, subID) return ch } //删除socket订阅 func (r *readEventBarrel) delSubChan(subID string) { r.subLock.Lock() defer r.subLock.Unlock() if ch, ok := r.subSocketChan[subID]; ok { close(ch) delete(r.subSocketChan, subID) } } type dockerLogEventBarrel struct { name string barrel []*db.EventLogMessage subSocketChan map[string]chan *db.EventLogMessage subLock sync.Mutex updateTime time.Time size int cacheSize int64 persistencelock sync.Mutex persistenceTime time.Time needPersistence bool persistenceBarrel []*db.EventLogMessage barrelEvent chan []string } func (r *dockerLogEventBarrel) empty() { r.subLock.Lock() defer r.subLock.Unlock() if r.barrel != nil { r.barrel = r.barrel[:0] } for _, ch := range r.subSocketChan { close(ch) } r.subSocketChan = make(map[string]chan *db.EventLogMessage) r.size = 0 r.name = "" r.persistenceBarrel = r.persistenceBarrel[:0] r.needPersistence = false } func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) { r.subLock.Lock() defer r.subLock.Unlock() r.barrel = append(r.barrel, message) if r.name == "" { r.name = message.EventID } r.updateTime = time.Now() for _, v := range r.subSocketChan { //向订阅的通道发送消息 select { case v <- message: default: } } r.size++ if int64(len(r.barrel)) >= r.cacheSize { r.persistence() } } func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { r.subLock.Lock() defer r.subLock.Unlock() r.subSocketChan[subID] = ch } //增加socket订阅 func (r *dockerLogEventBarrel) addSubChan(subID string) chan *db.EventLogMessage { r.subLock.Lock() defer r.subLock.Unlock() if sub, ok := r.subSocketChan[subID]; ok { return sub } ch := make(chan *db.EventLogMessage, 100) go r.pushCashMessage(ch, subID) return ch } //删除socket订阅 func (r *dockerLogEventBarrel) delSubChan(subID string) { r.subLock.Lock() defer r.subLock.Unlock() if ch, ok := r.subSocketChan[subID]; ok { close(ch) delete(r.subSocketChan, subID) } } //persistence 持久化 func (r *dockerLogEventBarrel) persistence() { r.persistencelock.Lock() defer r.persistencelock.Unlock() r.needPersistence = true r.persistenceBarrel = append(r.persistenceBarrel, r.barrel...) //数据转到持久化等候队列 r.barrel = r.barrel[:0] //缓存队列清空 select { case r.barrelEvent <- []string{"persistence", r.name}: //发出持久化命令 r.persistenceTime = time.Now() default: logrus.Errorln("docker log persistence delay") } } //调用者加锁 func (r *dockerLogEventBarrel) gcPersistence() { r.needPersistence = true r.persistenceBarrel = append(r.persistenceBarrel, r.barrel...) //数据转到持久化等候队列 r.barrel = nil } func (r *dockerLogEventBarrel) GetSubChanLength() int { r.subLock.Lock() defer r.subLock.Unlock() return len(r.subSocketChan) } type monitorMessageBarrel struct { barrel []*db.EventLogMessage subSocketChan map[string]chan *db.EventLogMessage subLock sync.Mutex updateTime time.Time } func (r *monitorMessageBarrel) empty() { r.subLock.Lock() defer r.subLock.Unlock() if r.barrel != nil { r.barrel = r.barrel[:0] } for _, ch := range r.subSocketChan { close(ch) } r.subSocketChan = make(map[string]chan *db.EventLogMessage) } func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) { r.updateTime = time.Now() r.subLock.Lock() defer r.subLock.Unlock() for _, v := range r.subSocketChan { //向订阅的通道发送消息 select { case v <- message: default: } } } func (r *monitorMessageBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { r.subLock.Lock() defer r.subLock.Unlock() r.subSocketChan[subID] = ch } //增加socket订阅 func (r *monitorMessageBarrel) addSubChan(subID string) chan *db.EventLogMessage { r.subLock.Lock() defer r.subLock.Unlock() if sub, ok := r.subSocketChan[subID]; ok { return sub } ch := make(chan *db.EventLogMessage, 10) go r.pushCashMessage(ch, subID) return ch } //删除socket订阅 func (r *monitorMessageBarrel) delSubChan(subID string) { r.subLock.Lock() defer r.subLock.Unlock() if ch, ok := r.subSocketChan[subID]; ok { close(ch) delete(r.subSocketChan, subID) } }