fix: error conn event log (#1856)

Signed-off-by: 逆流而上 <1666888816@qq.com>
This commit is contained in:
逆流而上 2024-01-19 10:57:45 +08:00 committed by GitHub
parent e5722ab076
commit 3e3fa434f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -20,6 +20,7 @@ package event
import ( import (
"fmt" "fmt"
"github.com/goodrain/rainbond/pkg/gogo"
"io" "io"
"os" "os"
"strings" "strings"
@ -63,13 +64,7 @@ type manager struct {
var defaultManager Manager var defaultManager Manager
const ( const buffersize = 1000
//REQUESTTIMEOUT time out
REQUESTTIMEOUT = 1000 * time.Millisecond
//MAXRETRIES 重试
MAXRETRIES = 3 // Before we abandon
buffersize = 1000
)
// NewManager 创建manager // NewManager 创建manager
func NewManager(conf EventConfig) error { func NewManager(conf EventConfig) error {
@ -107,64 +102,43 @@ func CloseManager() {
func (m *manager) Start() error { func (m *manager) Start() error {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
for i := 0; i < len(m.eventServer); i++ { if len(m.eventServer) == 0 {
h := handle{ logrus.Errorf("event log server is empty , plase set it in config file.")
cacheChan: make(chan []byte, buffersize), return nil
stop: make(chan struct{}),
server: m.eventServer[i],
manager: m,
ctx: m.ctx,
}
m.handles[m.eventServer[i]] = h
go h.HandleLog()
} }
//if m.dis != nil { defaultServer := m.eventServer[0]
// m.dis.AddProject("event_log_event_grpc", m)
//} err := gogo.Go(func(ctx context.Context) error {
for {
h := handle{
cacheChan: make(chan []byte, buffersize),
stop: make(chan struct{}),
server: defaultServer,
manager: m,
ctx: m.ctx,
}
m.handles[defaultServer] = h
err := h.HandleLog()
if err != nil {
time.Sleep(time.Second * 10)
logrus.Warnf("event log server %s connect error: %v. auto retry after 10 seconds ", defaultServer, err)
continue
}
return nil
}
})
if err != nil {
logrus.Errorf("event log server %s connect error, %v", defaultServer, err)
return err
}
go m.GC() go m.GC()
return nil return nil
} }
// UpdateEndpoints - // UpdateEndpoints - 不需要去更新节点信息
func (m *manager) UpdateEndpoints(endpoints ...*config.Endpoint) { func (m *manager) UpdateEndpoints(endpoints ...*config.Endpoint) {
m.lock.Lock()
defer m.lock.Unlock()
if endpoints == nil || len(endpoints) < 1 {
return
}
//清空不可用节点信息,以服务发现为主
m.abnormalServer = make(map[string]string)
//增加新节点
var new = make(map[string]string)
for _, end := range endpoints {
new[end.URL] = end.URL
if _, ok := m.handles[end.URL]; !ok {
h := handle{
cacheChan: make(chan []byte, buffersize),
stop: make(chan struct{}),
server: end.URL,
manager: m,
ctx: m.ctx,
}
m.handles[end.URL] = h
logrus.Infof("Add event server endpoint,%s", end.URL)
go h.HandleLog()
}
}
//删除旧节点
for k := range m.handles {
if _, ok := new[k]; !ok {
delete(m.handles, k)
logrus.Infof("Remove event server endpoint,%s", k)
}
}
var eventServer []string
for k := range new {
eventServer = append(eventServer, k)
}
m.eventServer = eventServer
m.config.EventLogServers = eventServer
logrus.Debugf("update event handle core success,handle core count:%d, event server count:%d", len(m.handles), len(m.eventServer))
} }
// Error - // Error -