mirror of
https://gitee.com/rainbond/Rainbond.git
synced 2024-12-01 03:07:51 +08:00
[FIX] do not delete unhealthy endpoints
This commit is contained in:
parent
dfda2b8e03
commit
7bc40f65bd
@ -46,6 +46,7 @@ type ClusterClient interface {
|
||||
GetOptions() *option.Conf
|
||||
GetEndpoints(key string) []string
|
||||
SetEndpoints(key string, value []string)
|
||||
DelEndpoints(key string)
|
||||
}
|
||||
|
||||
//NewClusterClient new cluster client
|
||||
@ -124,7 +125,6 @@ func (e *etcdClusterClient) GetEndpoints(key string) (result []string) {
|
||||
|
||||
func (e *etcdClusterClient) SetEndpoints(key string, value []string) {
|
||||
key = "/rainbond/endpoint/" + key
|
||||
logrus.Infof("Put endpoints %s => %v", key, value)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -141,6 +141,19 @@ func (e *etcdClusterClient) SetEndpoints(key string, value []string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *etcdClusterClient) DelEndpoints(key string) {
|
||||
key = "/rainbond/endpoint/" + key
|
||||
logrus.Infof("Delete endpoints: %s", key)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
_, err := e.conf.EtcdCli.Delete(ctx, key)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to put endpoint for %s: %v", key, Error)
|
||||
}
|
||||
}
|
||||
|
||||
//ErrorNotFound node not found.
|
||||
var ErrorNotFound = fmt.Errorf("node not found")
|
||||
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// ArgsReg -
|
||||
ArgsReg = regexp.MustCompile(`\$\{(\w+)\|{0,1}(.{0,1})\}`)
|
||||
)
|
||||
|
||||
@ -103,6 +104,7 @@ func (m *ManagerService) Stop() error {
|
||||
func (m *ManagerService) Online() error {
|
||||
logrus.Info("Doing node online by node controller manager")
|
||||
if ok := m.ctr.CheckBeforeStart(); !ok {
|
||||
logrus.Debug("check before starting: false")
|
||||
return nil
|
||||
}
|
||||
go m.StartServices()
|
||||
@ -158,16 +160,22 @@ func (m *ManagerService) Offline() error {
|
||||
|
||||
//DownOneServiceEndpoint down service endpoint
|
||||
func (m *ManagerService) DownOneServiceEndpoint(s *service.Service) {
|
||||
HostIP := m.cluster.GetOptions().HostIP
|
||||
hostIP := m.cluster.GetOptions().HostIP
|
||||
for _, end := range s.Endpoints {
|
||||
logrus.Debug("Anti-registry endpoint: ", end.Name)
|
||||
endpoint := toEndpoint(end, HostIP)
|
||||
oldEndpoints := m.cluster.GetEndpoints(end.Name)
|
||||
key := end.Name + "/" + hostIP
|
||||
endpoint := toEndpoint(end, hostIP)
|
||||
oldEndpoints := m.cluster.GetEndpoints(key)
|
||||
if exist := isExistEndpoint(oldEndpoints, endpoint); exist {
|
||||
m.cluster.SetEndpoints(end.Name, rmEndpointFrom(oldEndpoints, endpoint))
|
||||
endpoints := rmEndpointFrom(oldEndpoints, endpoint)
|
||||
if len(endpoints) > 0 {
|
||||
m.cluster.SetEndpoints(key, endpoints)
|
||||
continue
|
||||
}
|
||||
m.cluster.DelEndpoints(key)
|
||||
}
|
||||
}
|
||||
logrus.Infof("node %s down service %s endpoints", HostIP, s.Name)
|
||||
logrus.Infof("node %s down service %s endpoints", hostIP, s.Name)
|
||||
}
|
||||
|
||||
//UpOneServiceEndpoint up service endpoint
|
||||
@ -186,11 +194,11 @@ func (m *ManagerService) UpOneServiceEndpoint(s *service.Service) {
|
||||
endpoint := toEndpoint(end, hostIP)
|
||||
m.cluster.SetEndpoints(key, []string{endpoint})
|
||||
}
|
||||
logrus.Infof("node %s up service %s endpoints", hostIP, s.Name)
|
||||
}
|
||||
|
||||
//SyncServiceStatusController synchronize all service status to as we expect
|
||||
func (m *ManagerService) SyncServiceStatusController() {
|
||||
logrus.Debug("run SyncServiceStatusController")
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if m.autoStatusController != nil && len(m.autoStatusController) > 0 {
|
||||
@ -263,6 +271,7 @@ type statusController struct {
|
||||
}
|
||||
|
||||
func (s *statusController) Run() {
|
||||
logrus.Info("run status controller")
|
||||
s.healthyManager.EnableWatcher(s.service.Name, s.watcher.GetID())
|
||||
defer s.watcher.Close()
|
||||
defer s.healthyManager.DisableWatcher(s.service.Name, s.watcher.GetID())
|
||||
@ -295,6 +304,7 @@ func (s *statusController) Stop() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
// StopSyncService -
|
||||
func (m *ManagerService) StopSyncService() {
|
||||
if m.syncCtx != nil {
|
||||
m.syncCancel()
|
||||
@ -323,6 +333,7 @@ func (m *ManagerService) WaitStart(name string, duration time.Duration) bool {
|
||||
}
|
||||
}
|
||||
|
||||
// ReLoadServices -
|
||||
/*
|
||||
1. reload services info from local file system
|
||||
2. regenerate systemd config file and restart with config changes
|
||||
|
@ -236,7 +236,9 @@ func (n *NodeManager) heartbeat() {
|
||||
if err := n.cluster.UpdateStatus(n.currentNode, n.getInitLable(n.currentNode)); err != nil {
|
||||
logrus.Errorf("update node status error %s", err.Error())
|
||||
}
|
||||
logrus.Infof("Send node %s heartbeat to master:%s ", n.currentNode.ID, n.currentNode.NodeStatus.Status)
|
||||
if n.currentNode.NodeStatus.Status != "running" {
|
||||
logrus.Infof("Send node %s heartbeat to master:%s ", n.currentNode.ID, n.currentNode.NodeStatus.Status)
|
||||
}
|
||||
return nil
|
||||
}, time.Second*time.Duration(n.cfg.TTL))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user