[REV] Add online and offline node by command.

This commit is contained in:
Zhang Jiajun 2018-07-25 16:31:43 +08:00
parent 0388635b63
commit 195ca60633
7 changed files with 51 additions and 31 deletions

View File

@ -58,8 +58,7 @@ func Run(c *option.Conf) error {
defer kubecli.Stop() defer kubecli.Stop()
// init etcd client // init etcd client
if err = store.NewClient(c); err != nil { if err = store.NewClient(c); err != nil {
return fmt.Errorf("Connect to ETCD %s failed: %s", return fmt.Errorf("Connect to ETCD %s failed: %s", c.Etcd.Endpoints, err)
c.Etcd.Endpoints, err)
} }
nodemanager, err := nodem.NewNodeManager(c) nodemanager, err := nodem.NewNodeManager(c)
if err != nil { if err != nil {
@ -85,7 +84,7 @@ func Run(c *option.Conf) error {
defer ms.Stop(nil) defer ms.Stop(nil)
} }
//create api manager //create api manager
apiManager := api.NewManager(*c, nodemanager.GetCurrentNode(), nodemanager.GetController(), ms, kubecli) apiManager := api.NewManager(*c, nodemanager.GetCurrentNode(), ms, kubecli)
if err := apiManager.Start(errChan); err != nil { if err := apiManager.Start(errChan); err != nil {
return err return err
} }

View File

@ -26,7 +26,6 @@ import (
"github.com/goodrain/rainbond/node/core/service" "github.com/goodrain/rainbond/node/core/service"
"github.com/goodrain/rainbond/node/kubecache" "github.com/goodrain/rainbond/node/kubecache"
"github.com/goodrain/rainbond/node/masterserver" "github.com/goodrain/rainbond/node/masterserver"
"github.com/goodrain/rainbond/node/nodem/controller"
) )
var datacenterConfig *config.DataCenterConfig var datacenterConfig *config.DataCenterConfig
@ -40,14 +39,14 @@ var discoverService *service.DiscoverAction
var kubecli kubecache.KubeClient var kubecli kubecache.KubeClient
//Init 初始化 //Init 初始化
func Init(c *option.Conf, ms *masterserver.MasterServer, kube kubecache.KubeClient, controller controller.Manager) { func Init(c *option.Conf, ms *masterserver.MasterServer, kube kubecache.KubeClient) {
if ms != nil { if ms != nil {
prometheusService = service.CreatePrometheusService(c, ms) prometheusService = service.CreatePrometheusService(c, ms)
taskService = service.CreateTaskService(c, ms) taskService = service.CreateTaskService(c, ms)
taskTempService = service.CreateTaskTempService(c) taskTempService = service.CreateTaskTempService(c)
taskGroupService = service.CreateTaskGroupService(c, ms) taskGroupService = service.CreateTaskGroupService(c, ms)
datacenterConfig = config.GetDataCenterConfig() datacenterConfig = config.GetDataCenterConfig()
nodeService = service.CreateNodeService(c, ms.Cluster, kube, controller) nodeService = service.CreateNodeService(c, ms.Cluster, kube)
} }
appService = service.CreateAppService(c) appService = service.CreateAppService(c)
discoverService = service.CreateDiscoverActionManager(c, kube) discoverService = service.CreateDiscoverActionManager(c, kube)

View File

@ -265,6 +265,7 @@ func PutLabel(w http.ResponseWriter, r *http.Request) {
//DownNode 节点下线,计算节点操作 //DownNode 节点下线,计算节点操作
func DownNode(w http.ResponseWriter, r *http.Request) { func DownNode(w http.ResponseWriter, r *http.Request) {
logrus.Info("Node down")
nodeUID := strings.TrimSpace(chi.URLParam(r, "node_id")) nodeUID := strings.TrimSpace(chi.URLParam(r, "node_id"))
node, err := nodeService.DownNode(nodeUID) node, err := nodeService.DownNode(nodeUID)
if err != nil { if err != nil {
@ -276,6 +277,7 @@ func DownNode(w http.ResponseWriter, r *http.Request) {
//UpNode 节点上线,计算节点操作 //UpNode 节点上线,计算节点操作
func UpNode(w http.ResponseWriter, r *http.Request) { func UpNode(w http.ResponseWriter, r *http.Request) {
logrus.Info("Node up")
nodeUID := strings.TrimSpace(chi.URLParam(r, "node_id")) nodeUID := strings.TrimSpace(chi.URLParam(r, "node_id"))
node, err := nodeService.UpNode(nodeUID) node, err := nodeService.UpNode(nodeUID)
if err != nil { if err != nil {

View File

@ -36,7 +36,6 @@ import (
"github.com/goodrain/rainbond/cmd/node/option" "github.com/goodrain/rainbond/cmd/node/option"
nodeclient "github.com/goodrain/rainbond/node/nodem/client" nodeclient "github.com/goodrain/rainbond/node/nodem/client"
ncontroller "github.com/goodrain/rainbond/node/nodem/controller"
_ "net/http/pprof" _ "net/http/pprof"
@ -59,10 +58,10 @@ type Manager struct {
} }
//NewManager api manager //NewManager api manager
func NewManager(c option.Conf, node *nodeclient.HostNode, ncontroller ncontroller.Manager, ms *masterserver.MasterServer, kubecli kubecache.KubeClient) *Manager { func NewManager(c option.Conf, node *nodeclient.HostNode, ms *masterserver.MasterServer, kubecli kubecache.KubeClient) *Manager {
r := router.Routers(c.RunMode) r := router.Routers(c.RunMode)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
controller.Init(&c, ms, kubecli, ncontroller) controller.Init(&c, ms, kubecli)
m := &Manager{ m := &Manager{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,

View File

@ -31,7 +31,6 @@ import (
"github.com/goodrain/rainbond/node/kubecache" "github.com/goodrain/rainbond/node/kubecache"
"github.com/goodrain/rainbond/node/masterserver/node" "github.com/goodrain/rainbond/node/masterserver/node"
"github.com/goodrain/rainbond/node/nodem/client" "github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/nodem/controller"
"github.com/goodrain/rainbond/node/utils" "github.com/goodrain/rainbond/node/utils"
"github.com/twinj/uuid" "github.com/twinj/uuid"
) )
@ -41,16 +40,14 @@ type NodeService struct {
c *option.Conf c *option.Conf
nodecluster *node.Cluster nodecluster *node.Cluster
kubecli kubecache.KubeClient kubecli kubecache.KubeClient
controller controller.Manager
} }
//CreateNodeService create //CreateNodeService create
func CreateNodeService(c *option.Conf, nodecluster *node.Cluster, kubecli kubecache.KubeClient, controller controller.Manager) *NodeService { func CreateNodeService(c *option.Conf, nodecluster *node.Cluster, kubecli kubecache.KubeClient) *NodeService {
return &NodeService{ return &NodeService{
c: c, c: c,
nodecluster: nodecluster, nodecluster: nodecluster,
kubecli: kubecli, kubecli: kubecli,
controller: controller,
} }
} }
@ -187,10 +184,6 @@ func (n *NodeService) DownNode(nodeID string) (*client.HostNode, *utils.APIHandl
if err != nil { if err != nil {
return nil, utils.CreateAPIHandleError(500, fmt.Errorf("k8s node down error,%s", err.Error())) return nil, utils.CreateAPIHandleError(500, fmt.Errorf("k8s node down error,%s", err.Error()))
} }
err = n.controller.Offline()
if err != nil {
return nil, utils.CreateAPIHandleError(501, fmt.Errorf("node offine error,%s", err.Error()))
}
hostNode.Status = "offline" hostNode.Status = "offline"
hostNode.NodeStatus.Status = "offline" hostNode.NodeStatus.Status = "offline"
n.nodecluster.UpdateNode(hostNode) n.nodecluster.UpdateNode(hostNode)
@ -207,10 +200,6 @@ func (n *NodeService) UpNode(nodeID string) (*client.HostNode, *utils.APIHandleE
if !hostNode.Role.HasRule(client.ComputeNode) { if !hostNode.Role.HasRule(client.ComputeNode) {
return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not compute node")) return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not compute node"))
} }
err := n.controller.Online()
if err != nil {
return nil, utils.CreateAPIHandleError(501, fmt.Errorf("node online error,%s", err.Error()))
}
if k8snode, _ := n.kubecli.GetNode(hostNode.ID); k8snode != nil { if k8snode, _ := n.kubecli.GetNode(hostNode.ID); k8snode != nil {
return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not compute node or it not down")) return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not compute node or it not down"))
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/goodrain/rainbond/node/nodem/client" "github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/nodem/healthy" "github.com/goodrain/rainbond/node/nodem/healthy"
"github.com/goodrain/rainbond/node/nodem/service" "github.com/goodrain/rainbond/node/nodem/service"
"github.com/goodrain/rainbond/util/watch"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"io/ioutil" "io/ioutil"
"os/exec" "os/exec"
@ -42,6 +43,8 @@ type ManagerService struct {
cluster client.ClusterClient cluster client.ClusterClient
healthyManager healthy.Manager healthyManager healthy.Manager
services []*service.Service services []*service.Service
etcdcli *clientv3.Client
watchChan watch.Interface
} }
func (m *ManagerService) GetAllService() ([]*service.Service, error) { func (m *ManagerService) GetAllService() ([]*service.Service, error) {
@ -52,8 +55,7 @@ func (m *ManagerService) GetAllService() ([]*service.Service, error) {
func (m *ManagerService) Start() error { func (m *ManagerService) Start() error {
logrus.Info("Starting node controller manager.") logrus.Info("Starting node controller manager.")
err := m.Online() if err := m.SyncNodeStatus(); err != nil {
if err != nil {
return err return err
} }
@ -62,10 +64,43 @@ func (m *ManagerService) Start() error {
// stop manager // stop manager
func (m *ManagerService) Stop() error { func (m *ManagerService) Stop() error {
m.watchChan.Stop()
m.cancel() m.cancel()
return nil return nil
} }
func (m *ManagerService) SyncNodeStatus() error {
watcher := watch.New(m.etcdcli, "")
watchChan, err := watcher.WatchList(m.ctx, m.conf.NodePath, "")
if err != nil {
logrus.Error("Failed to Watch list for key ", m.conf.NodePath)
return err
}
m.watchChan = watchChan
for event := range m.watchChan.ResultChan() {
logrus.Debug("watch node status: ", event.Type)
switch event.Type {
case watch.Added:
case watch.Modified:
var node *client.HostNode
if err := node.Decode(event.GetValue()); err != nil {
logrus.Error("Failed to decode node from sync node event: ", err)
}
if node.Status == "offline" { // TODO const
m.Offline()
} else if node.Status == "running" {
m.Online()
}
case watch.Deleted:
default:
logrus.Error("watch node event error: ", event.Error)
}
}
logrus.Info("Stop sync node status from node cluster client.")
}
// start all service of on the node // start all service of on the node
func (m *ManagerService) Online() error { func (m *ManagerService) Online() error {
logrus.Info("Node online") logrus.Info("Node online")
@ -102,7 +137,7 @@ func (m *ManagerService) Online() error {
m.ctr.StartList(m.services) m.ctr.StartList(m.services)
m.StartSync() m.StartSyncService()
return nil return nil
} }
@ -124,7 +159,7 @@ func (m *ManagerService) Offline() error {
} }
} }
m.StopSync() m.StopSyncService()
if err := m.ctr.StopList(m.services); err != nil { if err := m.ctr.StopList(m.services); err != nil {
return err return err
@ -134,7 +169,7 @@ func (m *ManagerService) Offline() error {
} }
// synchronize all service status to as we expect // synchronize all service status to as we expect
func (m *ManagerService) StartSync() { func (m *ManagerService) StartSyncService() {
for _, s := range m.services { for _, s := range m.services {
name := s.Name name := s.Name
logrus.Error("Start watch the service status ", name) logrus.Error("Start watch the service status ", name)
@ -187,7 +222,7 @@ func (m *ManagerService) StartSync() {
} }
} }
func (m *ManagerService) StopSync() { func (m *ManagerService) StopSyncService() {
m.syncCancel() m.syncCancel()
} }
@ -330,6 +365,7 @@ func NewManagerService(conf *option.Conf, healthyManager healthy.Manager) (*Mana
cluster: cluster, cluster: cluster,
ctr: NewControllerSystemd(conf, cluster), ctr: NewControllerSystemd(conf, cluster),
healthyManager: healthyManager, healthyManager: healthyManager,
etcdcli: etcdcli,
} }
return manager, etcdcli, cluster return manager, etcdcli, cluster

View File

@ -249,7 +249,3 @@ func CreateNode(nodeID, ip string) client.HostNode {
} }
return HostNode return HostNode
} }
func (n *NodeManager) GetController() controller.Manager {
return n.controller
}