Merge branch 'V3.7' of https://github.com/goodrain/rainbond into V3.7

This commit is contained in:
barnettZQG 2018-07-20 13:32:49 +08:00
commit 203a063228
13 changed files with 500 additions and 357 deletions

View File

@ -23,15 +23,13 @@ import (
)
type Controller interface {
WriteAllConfig() error
RemoveAllConfig() error
EnableAll() error
DisableAll() error
StartService(name string) error
StopService(name string) error
StartList(list []*service.Service) error
StopList(list []*service.Service) error
WriteConfig(s *service.Service) error
RemoveConfig(name string) error
EnableService(name string) error
DisableService(name string) error
CheckBeforeStart() bool
StartAll() error
StartByName(serviceName string) error
StopAll() error
StopByName(serviceName string) error
ReLoadServices() error
GetAllService() []*service.Service
}

View File

@ -25,7 +25,6 @@ import (
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/nodem/service"
"gopkg.in/yaml.v2"
"io/ioutil"
"os"
"os/exec"
@ -36,12 +35,9 @@ type ControllerSystemd struct {
SysConfigDir string
NodeType string
StartType string
// all services
services []*service.Service
conf *option.Conf
cluster client.ClusterClient
regBlock *regexp.Regexp
conf *option.Conf
cluster client.ClusterClient
regBlock *regexp.Regexp
}
// At the stage you want to load the configurations of all rainbond components
@ -53,73 +49,31 @@ func NewControllerSystemd(conf *option.Conf, cluster client.ClusterClient) *Cont
}
}
// for all rainbond components generate config file of systemd
func (m *ControllerSystemd) WriteAllConfig() error {
logrus.Info("Write all service config to systemd.")
for _, v := range m.services {
fileName := fmt.Sprintf("%s/%s.service", m.SysConfigDir, v.Name)
content := service.ToConfig(v, m.cluster)
if content == nil {
logrus.Error("can not generate config for service ", v.Name)
continue
}
if err := ioutil.WriteFile(fileName, content, 0644); err != nil {
logrus.Warnf("Generate config file %s: %v, has been ignored.", fileName, err)
}
}
err := exec.Command("/usr/bin/systemctl", "daemon-reload").Run()
if err != nil {
logrus.Errorf("reload all services %s: %v", err)
}
return nil
}
func (m *ControllerSystemd) RemoveAllConfig() error {
logrus.Info("Remote all service config to systemd.")
for _, v := range m.services {
fileName := fmt.Sprintf("%s/%s.service", m.SysConfigDir, v.Name)
_, err := os.Stat(fileName)
if err == nil {
os.Remove(fileName)
}
}
return nil
}
func (m *ControllerSystemd) EnableAll() error {
logrus.Info("Enable all services.")
for _, s := range m.services {
err := exec.Command("/usr/bin/systemctl", "enable", s.Name).Run()
if err != nil {
logrus.Errorf("Enable service %s: %v", s.Name, err)
}
}
return nil
}
func (m *ControllerSystemd) DisableAll() error {
logrus.Info("Disable all service config to systemd.")
for _, s := range m.services {
err := exec.Command("/usr/bin/systemctl", "disable", s.Name).Run()
if err != nil {
logrus.Errorf("Disable service %s: %v", s.Name, err)
}
}
return nil
}
func (m *ControllerSystemd) CheckBeforeStart() bool {
logrus.Info("Checking environments.")
return true
}
func (m *ControllerSystemd) StartAll() error {
func (m *ControllerSystemd) StartService(serviceName string) error {
err := exec.Command("/usr/bin/systemctl", "start", serviceName).Run()
if err != nil {
logrus.Errorf("Start service %s: %v", serviceName, err)
return err
}
return nil
}
func (m *ControllerSystemd) StopService(serviceName string) error {
err := exec.Command("/usr/bin/systemctl", "stop", serviceName).Run()
if err != nil {
logrus.Errorf("Stop service %s: %v", serviceName, err)
return err
}
return nil
}
func (m *ControllerSystemd) StartList(list []*service.Service) error {
logrus.Info("Starting all services.")
err := exec.Command("/usr/bin/systemctl", "start", "multi-user.target").Run()
@ -131,18 +85,9 @@ func (m *ControllerSystemd) StartAll() error {
return nil
}
func (m *ControllerSystemd) StartByName(serviceName string) error {
err := exec.Command("/usr/bin/systemctl", "start", serviceName).Run()
if err != nil {
logrus.Errorf("Start service %s: %v", serviceName, err)
return err
}
return nil
}
func (m *ControllerSystemd) StopAll() error {
func (m *ControllerSystemd) StopList(list []*service.Service) error {
logrus.Info("Stop all services.")
for _, s := range m.services {
for _, s := range list {
err := exec.Command("/usr/bin/systemctl", "stop", s.Name).Run()
if err != nil {
logrus.Errorf("Enable service %s: %v", s.Name, err)
@ -152,104 +97,50 @@ func (m *ControllerSystemd) StopAll() error {
return nil
}
func (m *ControllerSystemd) StopByName(serviceName string) error {
err := exec.Command("/usr/bin/systemctl", "stop", serviceName).Run()
func (m *ControllerSystemd) EnableService(name string) error {
logrus.Info("Enable service config by systemd.")
err := exec.Command("/usr/bin/systemctl", "enable", name).Run()
if err != nil {
logrus.Errorf("Stop service %s: %v", serviceName, err)
return err
logrus.Errorf("Enable service %s: %v", name, err)
}
return nil
}
func LoadServices(defaultConfigFile, serviceListFile string) ([]*service.Service, error) {
logrus.Info("Loading all services.")
services, err := loadServicesFromLocal(defaultConfigFile, serviceListFile)
if err != nil {
return nil, err
}
return services, nil
}
func (m *ControllerSystemd) GetAllService() []*service.Service {
return m.services
}
/*
1. reload services config from local file system
2. regenerate systemd config
3. start all services of status is not running
*/
func (m *ControllerSystemd) ReLoadServices() error {
services, err := LoadServices(m.conf.DefaultConfigFile, m.conf.ServiceListFile)
if err != nil {
logrus.Error("Failed to load all services: ", err)
return err
}
m.services = services
if err := m.WriteAllConfig(); err != nil {
return err
}
m.DisableAll()
if err := m.EnableAll(); err != nil {
return err
}
if ok := m.CheckBeforeStart(); !ok {
return fmt.Errorf("check environments is not passed")
}
m.StartAll()
return nil
}
func loadServicesFromLocal(defaultConfigFile, serviceListFile string) ([]*service.Service, error) {
logrus.Info("Loading all services from local.")
// load default-configs.yaml
content, err := ioutil.ReadFile(defaultConfigFile)
func (m *ControllerSystemd) DisableService(name string) error {
logrus.Info("Disable service config by systemd.")
err := exec.Command("/usr/bin/systemctl", "disable", name).Run()
if err != nil {
logrus.Error("Failed to read default configs file: ", err)
return nil, err
}
var defaultConfigs service.Services
err = yaml.Unmarshal(content, &defaultConfigs)
if err != nil {
logrus.Error("Failed to parse default configs yaml file: ", err)
return nil, err
}
// to map, reduce time complexity
defaultConfigsMap := make(map[string]*service.Service, len(defaultConfigs.Services))
for _, v := range defaultConfigs.Services {
defaultConfigsMap[v.Name] = v
logrus.Errorf("Disable service %s: %v", name, err)
}
// load type-service.yaml, e.g. manager-service.yaml
content, err = ioutil.ReadFile(serviceListFile)
if err != nil {
logrus.Error("Failed to read service list file: ", err)
return nil, err
}
var serviceList service.ServiceList
err = yaml.Unmarshal(content, &serviceList)
if err != nil {
logrus.Error("Failed to parse service list yaml file: ", err)
return nil, err
}
// parse services with the node type
services := make([]*service.Service, 0, len(defaultConfigs.Services))
for _, item := range serviceList.Services {
if s, ok := defaultConfigsMap[item.Name]; ok {
services = append(services, s)
} else {
logrus.Warn("Not found the service %s in default config list, ignore it.", item.Name)
}
}
return services, nil
return nil
}
func (m *ControllerSystemd) WriteConfig(s *service.Service) error {
fileName := fmt.Sprintf("%s/%s.service", m.SysConfigDir, s.Name)
content := service.ToConfig(s, m.cluster)
if content == nil {
err := fmt.Errorf("can not generate config for service %s", s.Name)
logrus.Error(err)
return err
}
if err := ioutil.WriteFile(fileName, content, 0644); err != nil {
logrus.Errorf("Generate config file %s: %v", fileName, err)
return err
}
return nil
}
func (m *ControllerSystemd) RemoveConfig(name string) error {
logrus.Info("Remote service config by systemd.")
fileName := fmt.Sprintf("%s/%s.service", m.SysConfigDir, name)
_, err := os.Stat(fileName)
if err == nil {
os.Remove(fileName)
}
return nil
}

View File

@ -23,8 +23,9 @@ import "github.com/goodrain/rainbond/node/nodem/service"
//Manager Manager
type Manager interface {
Start() error
GetAllService() ([]*service.Service, error)
Stop() error
GetAllService() ([]*service.Service, error)
Online() error
Offline() error
ReLoadServices() error
}

View File

@ -19,30 +19,51 @@
package controller
import (
"context"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/nodem/healthy"
"github.com/goodrain/rainbond/node/nodem/service"
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
)
type ManagerService struct {
controller Controller
cluster client.ClusterClient
ctx context.Context
cancel context.CancelFunc
conf *option.Conf
ctr Controller
cluster client.ClusterClient
healthyManager healthy.Manager
services []*service.Service
}
func (m *ManagerService) GetAllService() ([]*service.Service, error) {
return m.controller.GetAllService(), nil
return m.services, nil
}
// start manager
// start and monitor all service
func (m *ManagerService) Start() error {
logrus.Info("Starting node controller manager.")
return m.Online()
err := m.Online()
if err != nil {
return err
}
for _, s := range m.services {
serviceName := s.Name
go m.SyncService(serviceName)
}
return nil
}
// stop manager
func (m *ManagerService) Stop() error {
m.cancel()
return nil
}
@ -62,7 +83,7 @@ func (m *ManagerService) Online() error {
}
}
if err := m.controller.ReLoadServices(); err != nil {
if err := m.ReLoadServices(); err != nil {
return err
}
@ -84,13 +105,135 @@ func (m *ManagerService) Offline() error {
}
}
if err := m.controller.StopAll(); err != nil {
if err := m.ctr.StopList(m.services); err != nil {
return err
}
return nil
}
// synchronize all service status to as we expect
func (m *ManagerService) SyncService(name string) {
logrus.Error("Start watch the service status ", name)
w := m.healthyManager.WatchServiceHealthy(name)
if w == nil {
logrus.Error("Not found watcher of the service ", name)
return
}
unhealthyNum := 0
maxUnhealthyNum := 2
for {
select {
case event := <-w.Watch():
switch event.Status {
case service.Stat_healthy:
logrus.Debugf("The %s service is %s.", event.Name, event.Status)
case service.Stat_unhealthy:
if unhealthyNum > maxUnhealthyNum {
logrus.Infof("The %s service is %s and will be restart.", event.Name, event.Status)
m.ctr.StopService(event.Name)
m.ctr.StartService(event.Name)
unhealthyNum = 0
}
unhealthyNum++
case service.Stat_death:
logrus.Infof("The %s service is %s and will be restart.", event.Name, event.Status)
m.ctr.StartService(event.Name)
}
case <-m.ctx.Done():
return
}
}
}
/*
1. reload services config from local file system
2. regenerate systemd config for all service
3. start all services of status is not running
*/
func (m *ManagerService) ReLoadServices() error {
services, err := loadServicesFromLocal(m.conf.DefaultConfigFile, m.conf.ServiceListFile)
if err != nil {
logrus.Error("Failed to load all services: ", err)
return err
}
m.services = services
for _, s := range m.services {
err := m.ctr.WriteConfig(s)
if err != nil {
return err
}
}
for _, s := range m.services {
m.ctr.DisableService(s.Name)
err := m.ctr.EnableService(s.Name)
if err != nil {
return err
}
}
if ok := m.ctr.CheckBeforeStart(); !ok {
return fmt.Errorf("check environments is not passed")
}
m.ctr.StartList(m.services)
return nil
}
func loadServicesFromLocal(defaultConfigFile, serviceListFile string) ([]*service.Service, error) {
logrus.Info("Loading all services from local.")
// load default-configs.yaml
content, err := ioutil.ReadFile(defaultConfigFile)
if err != nil {
logrus.Error("Failed to read default configs file: ", err)
return nil, err
}
var defaultConfigs service.Services
err = yaml.Unmarshal(content, &defaultConfigs)
if err != nil {
logrus.Error("Failed to parse default configs yaml file: ", err)
return nil, err
}
// to map, reduce time complexity
defaultConfigsMap := make(map[string]*service.Service, len(defaultConfigs.Services))
for _, v := range defaultConfigs.Services {
defaultConfigsMap[v.Name] = v
}
// load type-service.yaml, e.g. manager-service.yaml
content, err = ioutil.ReadFile(serviceListFile)
if err != nil {
logrus.Error("Failed to read service list file: ", err)
return nil, err
}
var serviceList service.ServiceList
err = yaml.Unmarshal(content, &serviceList)
if err != nil {
logrus.Error("Failed to parse service list yaml file: ", err)
return nil, err
}
// parse services with the node type
services := make([]*service.Service, 0, len(defaultConfigs.Services))
for _, item := range serviceList.Services {
if s, ok := defaultConfigsMap[item.Name]; ok {
services = append(services, s)
logrus.Info("Load service ", s.Name)
} else {
logrus.Warn("Not found the service %s in default config list, ignore it.", item.Name)
}
}
return services, nil
}
func isExistEndpoint(etcdEndPoints []string, end string) bool {
for _, v := range etcdEndPoints {
if v == end {
@ -117,9 +260,14 @@ func toEndpoint(reg *service.Endpoint, ip string) string {
return fmt.Sprintf("%s://%s:%s", reg.Protocol, ip, reg.Port)
}
func NewManagerService(conf *option.Conf, cluster client.ClusterClient) *ManagerService {
func NewManagerService(conf *option.Conf, cluster client.ClusterClient, healthyManager healthy.Manager) *ManagerService {
ctx, cancel := context.WithCancel(context.Background())
return &ManagerService{
NewControllerSystemd(conf, cluster),
cluster,
ctx: ctx,
cancel: cancel,
conf: conf,
cluster: cluster,
ctr: NewControllerSystemd(conf, cluster),
healthyManager: healthyManager,
}
}

View File

@ -28,7 +28,7 @@ import (
//Manager Manager
type Manager interface {
GetServiceHealthy(serviceName string) *service.HealthStatus
GetServiceHealthy(serviceName string) (*service.HealthStatus, bool)
WatchServiceHealthy(serviceName string) Watcher
CloseWatch(serviceName string, id string) error
Start() error
@ -36,32 +36,51 @@ type Manager interface {
Stop() error
}
type Watcher interface {
Watch() <-chan *service.HealthStatus
Close() error
}
type watcher struct {
manager Manager
statusChan chan *service.HealthStatus
id string
serviceName string
}
type probeManager struct {
services []*service.Service
status map[string]*service.HealthStatus
ctx context.Context
cancel context.CancelFunc
watches map[string]map[string]*watcher
statusChan chan *service.HealthStatus
errorNum map[string]int
errorTime map[string]time.Time
errorFlag map[string]bool
}
func CreateManager() Manager {
ctx, cancel := context.WithCancel(context.Background())
statusChan := make(chan *service.HealthStatus, 100)
status := make(map[string]*service.HealthStatus)
watches := make(map[string]map[string]*watcher)
errorNum := make(map[string]int)
errorTime := make(map[string]time.Time)
errorFlag := make(map[string]bool)
m := &probeManager{
ctx: ctx,
cancel: cancel,
statusChan: statusChan,
status: status,
watches: watches,
errorNum: errorNum,
errorTime: errorTime,
errorFlag: errorFlag,
}
return m
}
type probeManager struct {
services []*service.Service
status map[string]*service.HealthStatus
ctx context.Context
cancel context.CancelFunc
watches map[string]map[string]*watcher
//lock sync.Mutex
statusChan chan *service.HealthStatus
}
func (p *probeManager) AddServices(inner []*service.Service) error {
p.services = inner
return nil
@ -70,49 +89,73 @@ func (p *probeManager) AddServices(inner []*service.Service) error {
func (p *probeManager) Start() (error) {
logrus.Info("health mode start")
go p.HandleStatus()
for _, v := range p.services {
if v.ServiceHealth.Model == "http" {
h := &HttpProbe{
name: v.ServiceHealth.Name,
address: v.ServiceHealth.Address,
ctx: p.ctx,
cancel: p.cancel,
resultsChan: p.statusChan,
TimeInterval: v.ServiceHealth.TimeInterval,
MaxErrorNumber: v.ServiceHealth.MaxErrorNumber,
name: v.ServiceHealth.Name,
address: v.ServiceHealth.Address,
ctx: p.ctx,
cancel: p.cancel,
resultsChan: p.statusChan,
TimeInterval: v.ServiceHealth.TimeInterval,
}
go h.Check()
}
if v.ServiceHealth.Model == "tcp"{
h := &TcpProbe{
name: v.ServiceHealth.Name,
address: v.ServiceHealth.Address,
ctx: p.ctx,
cancel: p.cancel,
resultsChan: p.statusChan,
TimeInterval: v.ServiceHealth.TimeInterval,
}
go h.TcpCheck()
}
}
go p.processResult()
time.Sleep(time.Second*5)
go p.SubscriptionPush()
return nil
}
func (p *probeManager) processResult() {
func (p *probeManager) updateServiceStatus(status *service.HealthStatus) {
for {
result := <-p.statusChan
p.status[result.Name] = result
if status.Status != service.Stat_healthy {
number := p.errorNum[status.Name] + 1
p.errorNum[status.Name] = number
status.ErrorNumber = number
if !p.errorFlag[status.Name] {
p.errorTime[status.Name] = time.Now()
p.errorFlag[status.Name] = true
}
status.ErrorTime = time.Now().Sub(p.errorTime[status.Name])
p.status[status.Name] = status
} else {
p.errorNum[status.Name] = 0
status.ErrorNumber = 0
p.errorFlag[status.Name] = false
status.ErrorTime = 0
p.status[status.Name] = status
}
}
func (p *probeManager) SubscriptionPush() {
func (p *probeManager) HandleStatus() {
for {
for _, service := range p.services {
if watcherMap, ok := p.watches[service.Name]; ok {
for _, watcher := range watcherMap {
watcher.statusChan <- p.status[service.Name]
select {
case status := <-p.statusChan:
p.updateServiceStatus(status)
if watcherMap, ok := p.watches[status.Name]; ok {
for _, watcher := range watcherMap {
watcher.statusChan <- status
}
}
case <-p.ctx.Done():
return
}
}
}}
}
func (p *probeManager) Stop() error {
p.cancel()
@ -123,35 +166,14 @@ func (p *probeManager) CloseWatch(serviceName string, id string) error {
close(channel)
return nil
}
func (p *probeManager) GetServiceHealthy(serviceName string) *service.HealthStatus {
for _, v := range p.services {
if v.Name == serviceName {
if v.ServiceHealth.Model == "http" {
healthMap := GetHttpHealth(v.ServiceHealth.Address)
func (p *probeManager) GetServiceHealthy(serviceName string) (*service.HealthStatus, bool) {
v, ok := p.status[serviceName]
return v, ok
return &service.HealthStatus{
Status: healthMap["status"],
Info: healthMap["info"],
}
}
}
}
return nil
}
type Watcher interface {
Watch() *service.HealthStatus
Close() error
}
type watcher struct {
manager Manager
statusChan chan *service.HealthStatus
id string
serviceName string
}
func (w *watcher) Watch() *service.HealthStatus {
return <-w.statusChan
func (w *watcher) Watch() <-chan *service.HealthStatus {
return w.statusChan
}
func (w *watcher) Close() error {
return w.manager.CloseWatch(w.serviceName, w.id)

View File

@ -15,10 +15,9 @@ func TestProbeManager_Start(t *testing.T) {
Name: "builder",
ServiceHealth: &service.Health{
Name: "builder",
Model: "http",
Address: "127.0.0.1:6369/worker/health",
Model: "tcp",
Address: "127.0.0.1:3228",
TimeInterval: 3,
MaxErrorNumber:3,
},
}
h2 := &service.Service{
@ -28,8 +27,6 @@ func TestProbeManager_Start(t *testing.T) {
Model: "http",
Address: "127.0.0.1:6369/worker/health",
TimeInterval: 3,
MaxErrorNumber:3,
},
}
h3 := &service.Service{
@ -39,7 +36,6 @@ func TestProbeManager_Start(t *testing.T) {
Model: "http",
Address: "127.0.0.1:7171/health",
TimeInterval: 3,
MaxErrorNumber:3,
},
}
@ -47,40 +43,80 @@ func TestProbeManager_Start(t *testing.T) {
serviceList = append(serviceList, h2)
serviceList = append(serviceList, h3)
m.AddServices(serviceList)
watcher := m.WatchServiceHealthy("webcli")
watcher1 := m.WatchServiceHealthy("webcli")
watcher2 := m.WatchServiceHealthy("worker")
watcher3 := m.WatchServiceHealthy("builder")
m.Start()
for {
v := watcher.Watch()
fmt.Println("----",v.Name, v.Status, v.Info)
for {
v := <-watcher1.Watch()
if v!=nil{
fmt.Println("----",v.Name, v.Status, v.Info,v.ErrorNumber,v.ErrorTime.Seconds())
}else{
t.Log("nil nil nil")
}
v2 := <-watcher2.Watch()
fmt.Println("===",v2.Name, v2.Status, v2.Info,v2.ErrorNumber,v2.ErrorTime.Seconds())
v3 := <-watcher3.Watch()
fmt.Println("vvvv",v3.Name, v3.Status, v3.Info,v3.ErrorNumber,v3.ErrorTime.Seconds())
}
}
//func TestGetHttpHealth(t *testing.T) {
// ctx, cancel := context.WithCancel(context.Background())
// m := CreateManager()
// serviceList := make([]*service.Service, 0, 10)
//
// h := &service.Service{
// Name: "builder",
// ServiceHealth: &service.Health{
// Name: "builder",
// Model: "http",
// Address: "127.0.0.1:3228",
// Path: "/v2/builder/health",
// Name: "builder",
// Model: "tcp",
// Address: "127.0.0.1:3228",
// TimeInterval: 3,
// },
// }
// h2 := &service.Service{
// Name: "worker",
// ServiceHealth: &service.Health{
// Name: "worker",
// Model: "http",
// Address: "127.0.0.1:6369/worker/health",
// TimeInterval: 3,
// },
// }
// h3 := &service.Service{
// Name: "webcli",
// ServiceHealth: &service.Health{
// Name: "webcli",
// Model: "http",
// Address: "127.0.0.1:7171/health",
// TimeInterval: 3,
//
// },
// }
// serviceList = append(serviceList, h)
// v := ProbeManager{
// ctx: ctx,
// cancel: cancel,
// services: serviceList,
// serviceList = append(serviceList, h2)
// serviceList = append(serviceList, h3)
// m.AddServices(serviceList)
// m.Start()
//
// for {
//
// time.Sleep(time.Second*1)
// info, ok := m.GetServiceHealthy("builder")
// if !ok {
// fmt.Println("cuowu")
// } else {
// fmt.Println(info.Name, info.Status, info.Info, info.ErrorNumber, info.ErrorTime)
//
// }
// result := v.GetServiceHealthy("builder")
// fmt.Println(result.Name, result.Status, result.Info)
//
//}
//func TestProbeManager_Start(t *testing.T) {

View File

@ -0,0 +1,54 @@
package healthy
import (
"context"
"github.com/goodrain/rainbond/node/nodem/service"
"github.com/goodrain/rainbond/util"
"net/http"
"time"
)
type Probe interface {
Check()
}
type HttpProbe struct {
name string
address string
resultsChan chan *service.HealthStatus
ctx context.Context
cancel context.CancelFunc
TimeInterval int
}
func (h *HttpProbe) Check() {
util.Exec(h.ctx, func() error {
HealthMap := GetHttpHealth(h.address)
result := &service.HealthStatus{
Name: h.name,
Status: HealthMap["status"],
Info: HealthMap["info"],
}
h.resultsChan <- result
return nil
}, time.Second*time.Duration(h.TimeInterval))
}
func GetHttpHealth(address string) map[string]string {
resp, err := http.Get("http://" + address)
if err != nil {
return map[string]string{"status": service.Stat_death, "info": "Request service is unreachable"}
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return map[string]string{"status": service.Stat_unhealthy, "info": "Service unhealthy"}
}
return map[string]string{"status": service.Stat_healthy, "info": "service health"}
}

View File

@ -1,68 +0,0 @@
package healthy
import (
"context"
"github.com/goodrain/rainbond/node/nodem/service"
"github.com/goodrain/rainbond/util"
"net/http"
"time"
)
var errorNum int = 0
type Probe interface {
Check() map[string]string
}
type HttpProbe struct {
name string
address string
resultsChan chan *service.HealthStatus
ctx context.Context
cancel context.CancelFunc
TimeInterval int
MaxErrorNumber int
}
func (h *HttpProbe) Check() {
util.Exec(h.ctx, func() error {
HealthMap := GetHttpHealth(h.address)
if HealthMap["status"] != "health" {
errorNum += 1
} else {
errorNum = 0
}
if errorNum >= h.MaxErrorNumber {
result := &service.HealthStatus{
Name: h.name,
Status: "death",
Info: "More than the maximum number of errors, needs to be restarted",
}
h.resultsChan <- result
} else {
result := &service.HealthStatus{
Name: h.name,
Status: HealthMap["status"],
Info: HealthMap["info"],
}
h.resultsChan <- result
}
return nil
}, time.Second*time.Duration(h.TimeInterval))
}
func GetHttpHealth(address string) map[string]string {
resp, err := http.Get("http://" + address)
if err != nil {
return map[string]string{"status": "disconnect", "info": "Request service is unreachable"}
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return map[string]string{"status": "unhealthy", "info": "Service unhealthy"}
}
return map[string]string{"status": "health", "info": "service health"}
}

View File

@ -0,0 +1,47 @@
package healthy
import (
"context"
"github.com/goodrain/rainbond/node/nodem/service"
"github.com/goodrain/rainbond/util"
"time"
"net"
)
type TCPProbe interface {
TcpCheck()
}
type TcpProbe struct {
name string
address string
resultsChan chan *service.HealthStatus
ctx context.Context
cancel context.CancelFunc
TimeInterval int
}
func (h *TcpProbe) TcpCheck() {
util.Exec(h.ctx, func() error {
HealthMap := GetTcpHealth(h.address)
result := &service.HealthStatus{
Name: h.name,
Status: HealthMap["status"],
Info: HealthMap["info"],
}
h.resultsChan <- result
return nil
}, time.Second*time.Duration(h.TimeInterval))
}
func GetTcpHealth(address string) map[string]string {
conn, err := net.Dial("tcp",address)
if err != nil {
return map[string]string{"status": service.Stat_death, "info": "Tcp connection error"}
}
defer conn.Close()
return map[string]string{"status": service.Stat_healthy, "info": "service health"}
}

View File

@ -65,8 +65,9 @@ func NewNodeManager(conf *option.Conf) (*NodeManager, error) {
if err != nil {
return nil, err
}
healthyManager := healthy.CreateManager()
cluster := client.NewClusterClient(conf, etcdcli)
controller := controller.NewManagerService(conf, cluster)
controller := controller.NewManagerService(conf, cluster, healthyManager)
monitor, err := monitor.CreateManager(conf)
if err != nil {
return nil, err
@ -80,6 +81,7 @@ func NewNodeManager(conf *option.Conf) (*NodeManager, error) {
taskrun: taskrun,
cluster: cluster,
monitor: monitor,
healthy: healthyManager,
}
return nodem, nil
}
@ -102,8 +104,7 @@ func (n *NodeManager) Start(errchan chan error) error {
if err != nil {
return fmt.Errorf("get all services error,%s", err.Error())
}
if err := n.healthy.AddServices(services); err!=nil{
if err := n.healthy.AddServices(services); err != nil {
return fmt.Errorf("get all services error,%s", err.Error())
}

View File

@ -43,8 +43,11 @@ func ToConfig(svc *Service, cluster client.ClusterClient) []byte {
s := Lines{"[Unit]"}
s.Add("Description", svc.Name)
for _, d := range svc.Dependences {
for _, d := range svc.After {
s.Add("After", d+".service")
}
for _, d := range svc.Requires {
s.Add("Requires", d+".service")
}
@ -53,9 +56,9 @@ func ToConfig(svc *Service, cluster client.ClusterClient) []byte {
s.Add("Type", svc.Type)
s.Add("RemainAfterExit", "yes")
}
s.Add("ExecStartPre", fmt.Sprintf(`-/bin/bash -c "%s"`, svc.PreStart))
s.Add("ExecStart", fmt.Sprintf(`/bin/bash -c "%s"`, svc.Start))
s.Add("ExecStop", fmt.Sprintf(`/bin/bash -c "%s"`, svc.Stop))
s.Add("ExecStartPre", fmt.Sprintf(`-/bin/bash -c '%s'`, svc.PreStart))
s.Add("ExecStart", fmt.Sprintf(`/bin/bash -c '%s'`, svc.Start))
s.Add("ExecStop", fmt.Sprintf(`/bin/bash -c '%s'`, svc.Stop))
s.Add("Restart", svc.RestartPolicy)
s.Add("RestartSec", svc.RestartSec)

View File

@ -18,12 +18,21 @@
package service
import "time"
const (
Stat_healthy string = "healthy" //健康
Stat_unhealthy string = "unhealthy" //出现异常
Stat_death string = "death" //请求不通
)
//Service Service
type Service struct {
Name string `yaml:"name"`
Endpoints []*Endpoint `yaml:"endpoints,omitempty"`
ServiceHealth *Health `yaml:"health"`
Dependences []string `yaml:"dependences"`
After []string `yaml:"after"`
Requires []string `yaml:"requires"`
Type string `yaml:"type,omitempty"`
PreStart string `yaml:"pre_start,omitempty"`
Start string `yaml:"start"`
@ -40,7 +49,7 @@ type Services struct {
// service list of the node
type ServiceList struct {
Version string `yaml:"version"`
Version string `yaml:"version"`
Services []struct {
Name string `yaml:"name"`
} `yaml:"services"`
@ -57,12 +66,13 @@ type Health struct {
Name string `yaml:"name"`
Model string `yaml:"model"`
Address string `yaml:"address"`
TimeInterval int `yaml:"time_interval"`
MaxErrorNumber int `yaml:"max_error_number"`
TimeInterval int `yaml:"time_interval"`
}
type HealthStatus struct {
Name string `yaml:"name"`
Status string `yaml:"status"`
Info string `yaml:"info"`
Name string
Status string
ErrorNumber int
ErrorTime time.Duration
Info string
}