diff --git a/event/manager.go b/event/manager.go index 179b392ab..92782c948 100644 --- a/event/manager.go +++ b/event/manager.go @@ -20,6 +20,7 @@ package event import ( "fmt" + "github.com/goodrain/rainbond/pkg/gogo" "io" "os" "strings" @@ -63,13 +64,7 @@ type manager struct { var defaultManager Manager -const ( - //REQUESTTIMEOUT time out - REQUESTTIMEOUT = 1000 * time.Millisecond - //MAXRETRIES 重试 - MAXRETRIES = 3 // Before we abandon - buffersize = 1000 -) +const buffersize = 1000 // NewManager 创建manager func NewManager(conf EventConfig) error { @@ -107,64 +102,43 @@ func CloseManager() { func (m *manager) Start() error { m.lock.Lock() defer m.lock.Unlock() - for i := 0; i < len(m.eventServer); i++ { - h := handle{ - cacheChan: make(chan []byte, buffersize), - stop: make(chan struct{}), - server: m.eventServer[i], - manager: m, - ctx: m.ctx, - } - m.handles[m.eventServer[i]] = h - go h.HandleLog() + if len(m.eventServer) == 0 { + logrus.Errorf("event log server is empty , plase set it in config file.") + return nil } - //if m.dis != nil { - // m.dis.AddProject("event_log_event_grpc", m) - //} + defaultServer := m.eventServer[0] + + err := gogo.Go(func(ctx context.Context) error { + for { + h := handle{ + cacheChan: make(chan []byte, buffersize), + stop: make(chan struct{}), + server: defaultServer, + manager: m, + ctx: m.ctx, + } + m.handles[defaultServer] = h + err := h.HandleLog() + if err != nil { + time.Sleep(time.Second * 10) + logrus.Warnf("event log server %s connect error: %v. auto retry after 10 seconds ", defaultServer, err) + continue + } + return nil + } + }) + + if err != nil { + logrus.Errorf("event log server %s connect error, %v", defaultServer, err) + return err + } + go m.GC() return nil } -// UpdateEndpoints - +// UpdateEndpoints - 不需要去更新节点信息 func (m *manager) UpdateEndpoints(endpoints ...*config.Endpoint) { - m.lock.Lock() - defer m.lock.Unlock() - if endpoints == nil || len(endpoints) < 1 { - return - } - //清空不可用节点信息,以服务发现为主 - m.abnormalServer = make(map[string]string) - //增加新节点 - var new = make(map[string]string) - for _, end := range endpoints { - new[end.URL] = end.URL - if _, ok := m.handles[end.URL]; !ok { - h := handle{ - cacheChan: make(chan []byte, buffersize), - stop: make(chan struct{}), - server: end.URL, - manager: m, - ctx: m.ctx, - } - m.handles[end.URL] = h - logrus.Infof("Add event server endpoint,%s", end.URL) - go h.HandleLog() - } - } - //删除旧节点 - for k := range m.handles { - if _, ok := new[k]; !ok { - delete(m.handles, k) - logrus.Infof("Remove event server endpoint,%s", k) - } - } - var eventServer []string - for k := range new { - eventServer = append(eventServer, k) - } - m.eventServer = eventServer - m.config.EventLogServers = eventServer - logrus.Debugf("update event handle core success,handle core count:%d, event server count:%d", len(m.handles), len(m.eventServer)) } // Error -