diff --git a/cmd/node/option/conf.go b/cmd/node/option/conf.go index e231168d2..03efc7160 100644 --- a/cmd/node/option/conf.go +++ b/cmd/node/option/conf.go @@ -25,7 +25,6 @@ import ( "time" "github.com/goodrain/rainbond/node/utils" - "github.com/prometheus/node_exporter/collector" "github.com/Sirupsen/logrus" client "github.com/coreos/etcd/clientv3" @@ -56,15 +55,6 @@ func Init() error { if err := Config.parse(); err != nil { return err } - // This instance is only used to check collector creation and logging. - nc, err := collector.NewNodeCollector() - if err != nil { - logrus.Fatalf("Couldn't create collector: %s", err) - } - logrus.Infof("Enabled collectors:") - for n := range nc.Collectors { - logrus.Infof(" - %s", n) - } initialized = true return nil } @@ -242,10 +232,3 @@ func (c *Conf) parse() error { c.HostIDFile = "/opt/rainbond/etc/node/node_host_uuid.conf" return nil } - -func Exit(i interface{}) { - close(exitChan) - if watcher != nil { - watcher.Close() - } -} diff --git a/cmd/node/server/server.go b/cmd/node/server/server.go index 012f9ce6e..b45b349de 100644 --- a/cmd/node/server/server.go +++ b/cmd/node/server/server.go @@ -24,23 +24,18 @@ import ( "syscall" "github.com/goodrain/rainbond/cmd/node/option" + "github.com/goodrain/rainbond/node/api" "github.com/goodrain/rainbond/node/api/controller" - "github.com/goodrain/rainbond/node/core/job" "github.com/goodrain/rainbond/node/core/store" "github.com/goodrain/rainbond/node/kubecache" "github.com/goodrain/rainbond/node/masterserver" - "github.com/goodrain/rainbond/node/monitormessage" "github.com/goodrain/rainbond/node/nodem" - "github.com/goodrain/rainbond/node/statsd" - "github.com/prometheus/client_golang/prometheus" "github.com/Sirupsen/logrus" eventLog "github.com/goodrain/rainbond/event" "os/signal" - - "github.com/goodrain/rainbond/node/api" ) //Run start run @@ -61,14 +56,15 @@ func Run(c *option.Conf) error { return err } defer kubecli.Stop() - // init etcd client if err = store.NewClient(c); err != nil { return fmt.Errorf("Connect to ETCD %s failed: %s", c.Etcd.Endpoints, err) } - - nodemanager := nodem.NewNodeManager(c) + nodemanager, err := nodem.NewNodeManager(c) + if err != nil { + return fmt.Errorf("create node manager failed: %s", err) + } if err := nodemanager.Start(errChan); err != nil { return fmt.Errorf("start node manager failed: %s", err) } @@ -87,27 +83,17 @@ func Run(c *option.Conf) error { } defer ms.Stop(nil) } - //statsd exporter - registry := prometheus.NewRegistry() - exporter := statsd.CreateExporter(c.StatsdConfig, registry) - if err := exporter.Start(); err != nil { - logrus.Errorf("start statsd exporter server error,%s", err.Error()) - return err - } - meserver := monitormessage.CreateUDPServer("0.0.0.0", 6666, c.Etcd.Endpoints) - if err := meserver.Start(); err != nil { - return err - } - //启动API服务 - apiManager := api.NewManager(*c, nodemanager.GetCurrentNode(), ms, exporter, kubecli) + //create api manager + apiManager := api.NewManager(*c, nodemanager.GetCurrentNode(), ms, kubecli) if err := apiManager.Start(errChan); err != nil { return err } + if err := nodemanager.AddAPIManager(apiManager); err != nil { + return err + } defer apiManager.Stop() - defer job.Exit(nil) defer controller.Exist(nil) - defer option.Exit(nil) //step finally: listen Signal term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM) diff --git a/node/api/controller/node_controller.go b/node/api/controller/node_controller.go index ea11df08a..4da92bc19 100644 --- a/node/api/controller/node_controller.go +++ b/node/api/controller/node_controller.go @@ -29,9 +29,7 @@ import ( "github.com/goodrain/rainbond/node/nodem/client" "github.com/goodrain/rainbond/node/utils" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" - "github.com/prometheus/node_exporter/collector" "github.com/goodrain/rainbond/node/api/model" @@ -372,39 +370,6 @@ func Instances(w http.ResponseWriter, r *http.Request) { httputil.ReturnSuccess(r, w, pods) } -//NodeExporter 节点监控 -func NodeExporter(w http.ResponseWriter, r *http.Request) { - // filters := r.URL.Query()["collect[]"] - // logrus.Debugln("collect query:", filters) - filters := []string{"cpu", "diskstats", "filesystem", "ipvs", "loadavg", "meminfo", "netdev", "netstat", "uname", "mountstats", "nfs"} - nc, err := collector.NewNodeCollector(filters...) - if err != nil { - logrus.Warnln("Couldn't create", err) - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(fmt.Sprintf("Couldn't create %s", err))) - return - } - registry := prometheus.NewRegistry() - err = registry.Register(nc) - if err != nil { - logrus.Errorln("Couldn't register collector:", err) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(fmt.Sprintf("Couldn't register collector: %s", err))) - return - } - gatherers := prometheus.Gatherers{ - prometheus.DefaultGatherer, - registry, - } - // Delegate http serving to Prometheus client library, which will call collector.Collect. - h := promhttp.HandlerFor(gatherers, - promhttp.HandlerOpts{ - ErrorLog: logrus.StandardLogger(), - ErrorHandling: promhttp.ContinueOnError, - }) - h.ServeHTTP(w, r) -} - //临时存在 func outJSON(w http.ResponseWriter, data interface{}) { outJSONWithCode(w, http.StatusOK, data) diff --git a/node/api/manager.go b/node/api/manager.go index 15bbfd8a4..6758af2a2 100644 --- a/node/api/manager.go +++ b/node/api/manager.go @@ -27,8 +27,6 @@ import ( "github.com/goodrain/rainbond/node/kubecache" "github.com/goodrain/rainbond/node/masterserver" "github.com/goodrain/rainbond/node/statsd" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/goodrain/rainbond/node/api/controller" "github.com/goodrain/rainbond/node/api/router" @@ -60,50 +58,21 @@ type Manager struct { } //NewManager api manager -func NewManager(c option.Conf, node *nodeclient.HostNode, ms *masterserver.MasterServer, exporter *statsd.Exporter, kubecli kubecache.KubeClient) *Manager { +func NewManager(c option.Conf, node *nodeclient.HostNode, ms *masterserver.MasterServer, kubecli kubecache.KubeClient) *Manager { r := router.Routers(c.RunMode) ctx, cancel := context.WithCancel(context.Background()) controller.Init(&c, ms, kubecli) m := &Manager{ - ctx: ctx, - cancel: cancel, - conf: c, - router: r, - node: node, - ms: ms, - exporter: exporter, + ctx: ctx, + cancel: cancel, + conf: c, + router: r, + node: node, + ms: ms, } - m.router.Get("/app/metrics", m.HandleStatsd) - m.router.Get("/-/statsdreload", m.ReloadStatsdMappConfig) return m } -//ReloadStatsdMappConfig ReloadStatsdMappConfig -func (m *Manager) ReloadStatsdMappConfig(w http.ResponseWriter, r *http.Request) { - if err := m.exporter.ReloadConfig(); err != nil { - w.Write([]byte(err.Error())) - w.WriteHeader(500) - } else { - w.Write([]byte("Success reload")) - w.WriteHeader(200) - } -} - -//HandleStatsd statsd handle -func (m *Manager) HandleStatsd(w http.ResponseWriter, r *http.Request) { - gatherers := prometheus.Gatherers{ - prometheus.DefaultGatherer, - m.exporter.GetRegister(), - } - // Delegate http serving to Prometheus client library, which will call collector.Collect. - h := promhttp.HandlerFor(gatherers, - promhttp.HandlerOpts{ - ErrorLog: logrus.StandardLogger(), - ErrorHandling: promhttp.ContinueOnError, - }) - h.ServeHTTP(w, r) -} - //Start 启动 func (m *Manager) Start(errChan chan error) error { logrus.Infof("api server start listening on %s", m.conf.APIAddr) @@ -154,11 +123,7 @@ func (m *Manager) Stop() error { return nil } -func (m *Manager) prometheus() { - //prometheus.MustRegister(version.NewCollector("acp_node")) - // exporter := monitor.NewExporter(m.coreManager) - // prometheus.MustRegister(exporter) - - //todo 我注释的 - //m.container.Handle(m.conf.PrometheusMetricPath, promhttp.Handler()) +//GetRouter GetRouter +func (m *Manager) GetRouter() *chi.Mux { + return m.router } diff --git a/node/api/router/router.go b/node/api/router/router.go index 6d73af2f3..51fe46e94 100644 --- a/node/api/router/router.go +++ b/node/api/router/router.go @@ -116,7 +116,5 @@ func Routers(mode string) *chi.Mux { r.Put("/tasks/taskreload", controller.ReloadStaticTasks) } }) - //节点监控 - r.Get("/node/metrics", controller.NodeExporter) return r } diff --git a/node/core/job/proc.go b/node/core/job/proc.go index 38eb323b3..790cdcb14 100644 --- a/node/core/job/proc.go +++ b/node/core/job/proc.go @@ -26,121 +26,10 @@ import ( "time" conf "github.com/goodrain/rainbond/cmd/node/option" - "github.com/goodrain/rainbond/node/core/store" "github.com/Sirupsen/logrus" - client "github.com/coreos/etcd/clientv3" ) -var ( - lID *leaseID -) - -// 维持 lease id 服务 -func StartProc() error { - lID = &leaseID{ - ttl: conf.Config.ProcTTL, - lk: new(sync.RWMutex), - done: make(chan struct{}), - } - - if lID.ttl == 0 { - return nil - } - - err := lID.set() - go lID.keepAlive() - return err -} - -func Reload(i interface{}) { - if lID.ttl == conf.Config.ProcTTL { - return - } - - close(lID.done) - lID.done, lID.ttl = make(chan struct{}), conf.Config.ProcTTL - if conf.Config.ProcTTL == 0 { - return - } - - if err := lID.set(); err != nil { - logrus.Warnf("proc lease id set err: %s", err.Error()) - } - go lID.keepAlive() -} - -func Exit(i interface{}) { - if lID.done != nil { - close(lID.done) - } -} - -type leaseID struct { - ttl int64 - ID client.LeaseID - lk *sync.RWMutex - - done chan struct{} -} - -func (l *leaseID) get() client.LeaseID { - if l.ttl == 0 { - return -1 - } - - l.lk.RLock() - id := l.ID - l.lk.RUnlock() - return id -} - -func (l *leaseID) set() error { - id := client.LeaseID(-1) - resp, err := store.DefalutClient.Grant(l.ttl + 2) - if err == nil { - id = resp.ID - } - - l.lk.Lock() - l.ID = id - l.lk.Unlock() - return err -} - -func (l *leaseID) keepAlive() { - duration := time.Duration(l.ttl) * time.Second - timer := time.NewTimer(duration) - for { - select { - case <-l.done: - return - case <-timer.C: - if l.ttl == 0 { - return - } - - id := l.get() - if id > 0 { - _, err := store.DefalutClient.KeepAliveOnce(l.ID) - if err == nil { - timer.Reset(duration) - continue - } - - logrus.Warnf("proc lease id[%x] keepAlive err: %s, try to reset...", id, err.Error()) - } - - if err := l.set(); err != nil { - logrus.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl) - } else { - logrus.Infof("proc set lease id[%x] success", l.get()) - } - timer.Reset(duration) - } - } -} - // 当前执行中的任务信息 // key: /cronsun/proc/node/group/jobId/pid // value: 开始执行时间 @@ -187,32 +76,34 @@ func (p *Process) Val() string { // 有可能某种原因,put 命令已经发送到 etcd server // 目前已知的 deadline 会出现此情况 func (p *Process) put() (err error) { - if atomic.LoadInt32(&p.running) != 1 { - return - } + // if atomic.LoadInt32(&p.running) != 1 { + // return + // } - if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) { - return - } + // if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) { + // return + // } - id := lID.get() - if id < 0 { - if _, err = store.DefalutClient.Put(p.Key(), p.Val()); err != nil { - return - } - } + // id := lID.get() + // if id < 0 { + // if _, err = store.DefalutClient.Put(p.Key(), p.Val()); err != nil { + // return + // } + // } - _, err = store.DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id)) + // _, err = store.DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id)) + // return return } func (p *Process) del() error { - if atomic.LoadInt32(&p.hasPut) != 1 { - return nil - } + // if atomic.LoadInt32(&p.hasPut) != 1 { + // return nil + // } - _, err := store.DefalutClient.Delete(p.Key()) - return err + // _, err := store.DefalutClient.Delete(p.Key()) + // return err + return nil } func (p *Process) Start() { diff --git a/node/masterserver/node/cluster.go b/node/masterserver/node/cluster.go index 9409e7760..ba5003067 100644 --- a/node/masterserver/node/cluster.go +++ b/node/masterserver/node/cluster.go @@ -151,25 +151,22 @@ func (n *Cluster) GetNode(id string) *client.HostNode { } func (n *Cluster) handleNodeStatus(v *client.HostNode) { if v.Role.HasRule("compute") { - if v.NodeStatus != nil { - if v.Unschedulable { + k8sNode, err := n.kubecli.GetNode(v.ID) + if err != nil { + logrus.Errorf("get k8s node error:%s", err.Error()) + v.Status = "error" + return + } + if k8sNode != nil { + if v.Unschedulable || k8sNode.Spec.Unschedulable { v.Status = "unschedulable" return } - // if v.AvailableCPU == 0 { - // v.AvailableCPU = v.NodeStatus.Allocatable.Cpu().Value() - // } - // if v.AvailableMemory == 0 { - // v.AvailableMemory = v.NodeStatus.Allocatable.Memory().Value() - // } var haveready bool - for _, condiction := range v.NodeStatus.Conditions { + for _, condiction := range k8sNode.Status.Conditions { if condiction.Status == "True" && (condiction.Type == "OutOfDisk" || condiction.Type == "MemoryPressure" || condiction.Type == "DiskPressure") { v.Status = "error" return - } - if v.Status == "unschedulable" || v.Status == "init" || v.Status == "init_success" || v.Status == "init_failed" || v.Status == "installing" || v.Status == "install_success" || v.Status == "install_failed" { - } if condiction.Type == "Ready" { haveready = true @@ -193,10 +190,13 @@ func (n *Cluster) handleNodeStatus(v *client.HostNode) { } if v.Alived { for _, condition := range v.NodeStatus.Conditions { - if condition.Type == "NodeInit" && condition.Status == "True" { + if condition.Type == "Ready" && condition.Status == "True" { v.Status = "running" + return } } + } else { + v.Status = "down" } } } diff --git a/node/nodem/client/cluster_client.go b/node/nodem/client/cluster_client.go index 2c9378acd..7e605c4a7 100644 --- a/node/nodem/client/cluster_client.go +++ b/node/nodem/client/cluster_client.go @@ -19,7 +19,11 @@ package client import ( + "context" + "time" + "github.com/coreos/etcd/clientv3" + "github.com/goodrain/rainbond/cmd/node/option" "github.com/goodrain/rainbond/node/core/config" "github.com/goodrain/rainbond/node/core/job" ) @@ -27,6 +31,7 @@ import ( //ClusterClient ClusterClient type ClusterClient interface { UpdateStatus(*HostNode) error + DownNode(*HostNode) error GetMasters() ([]*HostNode, error) GetNode(nodeID string) (*HostNode, error) GetDataCenterConfig() (*config.DataCenterConfig, error) @@ -37,17 +42,28 @@ type ClusterClient interface { } //NewClusterClient new cluster client -func NewClusterClient(etcdClient *clientv3.Client) ClusterClient { +func NewClusterClient(conf *option.Conf, etcdClient *clientv3.Client) ClusterClient { return &etcdClusterClient{ etcdClient: etcdClient, + conf: conf, } + } type etcdClusterClient struct { etcdClient *clientv3.Client + conf *option.Conf + onlineLes clientv3.LeaseID } -func (e *etcdClusterClient) UpdateStatus(*HostNode) error { +func (e *etcdClusterClient) UpdateStatus(n *HostNode) error { + n.UpTime = time.Now() + if err := e.Update(n); err != nil { + return err + } + if err := e.nodeOnlinePut(n); err != nil { + return err + } return nil } @@ -66,3 +82,43 @@ func (e *etcdClusterClient) WatchJobs() <-chan *job.Event { func (e *etcdClusterClient) GetNode(nodeID string) (*HostNode, error) { return nil, nil } + +//nodeOnlinePut onde noline status update +func (e *etcdClusterClient) nodeOnlinePut(h *HostNode) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + if e.onlineLes != 0 { + if _, err := e.etcdClient.KeepAlive(ctx, e.onlineLes); err == nil { + return nil + } + e.onlineLes = 0 + } + les, err := e.etcdClient.Grant(ctx, 30) + if err != nil { + return err + } + e.onlineLes = les.ID + _, err = e.etcdClient.Put(ctx, e.conf.OnlineNodePath+"/"+h.ID, h.PID, clientv3.WithLease(les.ID)) + if err != nil { + return err + } + return nil +} + +//Update update node info +func (e *etcdClusterClient) Update(h *HostNode) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + _, err := e.etcdClient.Put(ctx, e.conf.NodePath+"/"+h.ID, h.String()) + return err +} + +//Down node +func (e *etcdClusterClient) DownNode(h *HostNode) error { + h.Alived, h.DownTime = false, time.Now() + e.Update(h) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + _, err := e.etcdClient.Delete(ctx, e.conf.OnlineNodePath+h.ID) + return err +} diff --git a/node/nodem/monitor/monitor_manager.go b/node/nodem/monitor/monitor_manager.go index e62937027..1eb1711c5 100644 --- a/node/nodem/monitor/monitor_manager.go +++ b/node/nodem/monitor/monitor_manager.go @@ -19,19 +19,129 @@ package monitor import ( + "net/http" + + "github.com/Sirupsen/logrus" + "github.com/goodrain/rainbond/cmd/node/option" + "github.com/goodrain/rainbond/node/api" + "github.com/goodrain/rainbond/node/monitormessage" "github.com/goodrain/rainbond/node/statsd" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/node_exporter/collector" ) //Manager Manager type Manager interface { - Start(errchan chan error) + Start(errchan chan error) error Stop() error + SetAPIRoute(apim *api.Manager) error } type manager struct { - statsdExporter *statsd.Exporter - nodeExporter *nodeExporter + statsdExporter *statsd.Exporter + statsdRegistry *prometheus.Registry + nodeExporterRestry *prometheus.Registry + meserver *monitormessage.UDPServer } -type nodeExporter struct { +func createNodeExporterRestry() (*prometheus.Registry, error) { + registry := prometheus.NewRegistry() + filters := []string{"cpu", "diskstats", "filesystem", "ipvs", "loadavg", "meminfo", "netdev", "netstat", "uname", "mountstats", "nfs"} + nc, err := collector.NewNodeCollector(filters...) + if err != nil { + return nil, err + } + for n := range nc.Collectors { + logrus.Infof("node collector - %s", n) + } + err = registry.Register(nc) + if err != nil { + return nil, err + } + return registry, nil +} + +//CreateManager CreateManager +func CreateManager(c *option.Conf) (Manager, error) { + //statsd exporter + statsdRegistry := prometheus.NewRegistry() + exporter := statsd.CreateExporter(c.StatsdConfig, statsdRegistry) + meserver := monitormessage.CreateUDPServer("0.0.0.0", 6666, c.Etcd.Endpoints) + nodeExporterRestry, err := createNodeExporterRestry() + if err != nil { + return nil, err + } + manage := &manager{ + statsdExporter: exporter, + statsdRegistry: statsdRegistry, + nodeExporterRestry: nodeExporterRestry, + meserver: meserver, + } + return manage, nil +} + +func (m *manager) Start(errchan chan error) error { + if err := m.statsdExporter.Start(); err != nil { + logrus.Errorf("start statsd exporter server error,%s", err.Error()) + return err + } + if err := m.meserver.Start(); err != nil { + return err + } + + return nil +} + +func (m *manager) Stop() error { + return nil +} + +//ReloadStatsdMappConfig ReloadStatsdMappConfig +func (m *manager) ReloadStatsdMappConfig(w http.ResponseWriter, r *http.Request) { + if err := m.statsdExporter.ReloadConfig(); err != nil { + w.Write([]byte(err.Error())) + w.WriteHeader(500) + } else { + w.Write([]byte("Success reload")) + w.WriteHeader(200) + } +} + +//HandleStatsd statsd handle +func (m *manager) HandleStatsd(w http.ResponseWriter, r *http.Request) { + gatherers := prometheus.Gatherers{ + prometheus.DefaultGatherer, + m.statsdRegistry, + } + // Delegate http serving to Prometheus client library, which will call collector.Collect. + h := promhttp.HandlerFor(gatherers, + promhttp.HandlerOpts{ + ErrorLog: logrus.StandardLogger(), + ErrorHandling: promhttp.ContinueOnError, + }) + h.ServeHTTP(w, r) +} + +//NodeExporter node exporter +func (m *manager) NodeExporter(w http.ResponseWriter, r *http.Request) { + gatherers := prometheus.Gatherers{ + prometheus.DefaultGatherer, + m.nodeExporterRestry, + } + // Delegate http serving to Prometheus client library, which will call collector.Collect. + h := promhttp.HandlerFor(gatherers, + promhttp.HandlerOpts{ + ErrorLog: logrus.StandardLogger(), + ErrorHandling: promhttp.ContinueOnError, + }) + h.ServeHTTP(w, r) +} + +//SetAPIRoute set api route rule +func (m *manager) SetAPIRoute(apim *api.Manager) error { + apim.GetRouter().Get("/app/metrics", m.HandleStatsd) + apim.GetRouter().Get("/-/statsdreload", m.ReloadStatsdMappConfig) + apim.GetRouter().Get("/node/metrics", m.NodeExporter) + return nil } diff --git a/node/nodem/node_manager.go b/node/nodem/node_manager.go index f8accec24..78206f77e 100644 --- a/node/nodem/node_manager.go +++ b/node/nodem/node_manager.go @@ -28,7 +28,10 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/version" "github.com/goodrain/rainbond/cmd/node/option" + "github.com/goodrain/rainbond/node/api" "github.com/goodrain/rainbond/node/nodem/client" "github.com/goodrain/rainbond/node/nodem/controller" "github.com/goodrain/rainbond/node/nodem/healthy" @@ -49,17 +52,40 @@ type NodeManager struct { controller controller.Manager taskrun taskrun.Manager cfg *option.Conf + apim *api.Manager } //NewNodeManager new a node manager -func NewNodeManager(conf *option.Conf) *NodeManager { +func NewNodeManager(conf *option.Conf) (*NodeManager, error) { + etcdcli, err := clientv3.New(conf.Etcd) + if err != nil { + return nil, err + } + taskrun, err := taskrun.Newmanager(conf, etcdcli) + if err != nil { + return nil, err + } + cluster := client.NewClusterClient(conf, etcdcli) + monitor, err := monitor.CreateManager(conf) + if err != nil { + return nil, err + } ctx, cancel := context.WithCancel(context.Background()) nodem := &NodeManager{ - cfg: conf, - ctx: ctx, - cancel: cancel, + cfg: conf, + ctx: ctx, + cancel: cancel, + taskrun: taskrun, + cluster: cluster, + monitor: monitor, } - return nodem + return nodem, nil +} + +//AddAPIManager AddApiManager +func (n *NodeManager) AddAPIManager(apim *api.Manager) error { + n.apim = apim + return n.monitor.SetAPIRoute(apim) } //Start start @@ -67,19 +93,19 @@ func (n *NodeManager) Start(errchan chan error) error { if err := n.init(); err != nil { return err } - if err := n.controller.Start(); err != nil { - return fmt.Errorf("start node controller error,%s", err.Error()) - } - services, err := n.controller.GetAllService() - if err != nil { - return fmt.Errorf("get all services error,%s", err.Error()) - } - if err := n.healthy.AddServices(services); err != nil { - return fmt.Errorf("get all services error,%s", err.Error()) - } - if err := n.healthy.Start(); err != nil { - return fmt.Errorf("node healty start error,%s", err.Error()) - } + // if err := n.controller.Start(); err != nil { + // return fmt.Errorf("start node controller error,%s", err.Error()) + // } + // services, err := n.controller.GetAllService() + // if err != nil { + // return fmt.Errorf("get all services error,%s", err.Error()) + // } + // if err := n.healthy.AddServices(services); err != nil { + // return fmt.Errorf("get all services error,%s", err.Error()) + // } + // if err := n.healthy.Start(); err != nil { + // return fmt.Errorf("node healty start error,%s", err.Error()) + // } go n.monitor.Start(errchan) go n.taskrun.Start(errchan) go n.heartbeat() @@ -89,6 +115,7 @@ func (n *NodeManager) Start(errchan chan error) error { //Stop Stop func (n *NodeManager) Stop() { n.cancel() + n.cluster.DownNode(&n.HostNode) if n.taskrun != nil { n.taskrun.Stop() } @@ -165,6 +192,7 @@ func (n *NodeManager) init() error { if node.AvailableCPU == 0 { node.AvailableCPU = int64(runtime.NumCPU()) } + node.Version = version.Version return nil } diff --git a/node/nodem/taskrun/manage.go b/node/nodem/taskrun/manage.go index e70b421ad..f9c697e31 100644 --- a/node/nodem/taskrun/manage.go +++ b/node/nodem/taskrun/manage.go @@ -19,7 +19,16 @@ package taskrun import ( + "context" + "sync" + + "github.com/Sirupsen/logrus" + clientv3 "github.com/coreos/etcd/clientv3" + "github.com/goodrain/rainbond/cmd/node/option" "github.com/goodrain/rainbond/node/core/job" + "github.com/goodrain/rainbond/node/nodem/client" + "github.com/goodrain/rainbond/util/watch" + "github.com/robfig/cron" ) //Manager Manager @@ -28,208 +37,198 @@ type Manager interface { Stop() error } -// //Config config server -// type Config struct { -// EtcdEndPoints []string -// EtcdTimeout int -// EtcdPrefix string -// ClusterName string -// APIAddr string -// K8SConfPath string -// EventServerAddress []string -// PrometheusMetricPath string -// TTL int64 -// } - //Jobs jobs type Jobs map[string]*job.Job -// //manager node manager server -// type manager struct { -// cluster client.ClusterClient -// *cron.Cron -// ctx context.Context -// jobs Jobs // 和结点相关的任务 -// onceJobs Jobs //记录执行的单任务 -// jobLock sync.Mutex -// cmds map[string]*corejob.Cmd -// delIDs map[string]bool -// ttl int64 -// } +//manager node manager server +type manager struct { + cluster client.ClusterClient + etcdcli *clientv3.Client + HostNode *client.HostNode + *cron.Cron + ctx context.Context + cancel context.CancelFunc + Conf option.Conf + jobs Jobs // 和结点相关的任务 + onceJobs Jobs //记录执行的单任务 + jobLock sync.Mutex + cmds map[string]*job.Cmd + delIDs map[string]bool + ttl int64 +} -// //Run taskrun start -// func (n *manager) Start(errchan chan error) { -// go n.watchJobs(errchan) -// n.Cron.Start() -// if err := corejob.StartProc(); err != nil { -// logrus.Warnf("[process key will not timeout]proc lease id set err: %s", err.Error()) -// } -// return -// } +//Run taskrun start +func (n *manager) Start(errchan chan error) { + go n.watchJobs(errchan) + n.Cron.Start() + return +} -// func (n *manager) watchJobs(errChan chan error) error { -// watcher := watch.New(store.DefalutClient.Client, "") -// watchChan, err := watcher.WatchList(n.ctx, n.Conf.JobPath, "") -// if err != nil { -// errChan <- err -// return err -// } -// defer watchChan.Stop() -// for event := range watchChan.ResultChan() { -// switch event.Type { -// case watch.Added: -// j := new(job.Job) -// err := j.Decode(event.GetValue()) -// if err != nil { -// logrus.Errorf("decode job error :%s", err) -// continue -// } -// n.addJob(j) -// case watch.Modified: -// j := new(job.Job) -// err := j.Decode(event.GetValue()) -// if err != nil { -// logrus.Errorf("decode job error :%s", err) -// continue -// } -// n.modJob(j) -// case watch.Deleted: -// n.delJob(event.GetKey()) -// default: -// logrus.Errorf("watch job error:%v", event.Error) -// errChan <- event.Error -// } -// } -// return nil -// } +func (n *manager) watchJobs(errChan chan error) error { + watcher := watch.New(n.etcdcli, "") + watchChan, err := watcher.WatchList(n.ctx, n.Conf.JobPath, "") + if err != nil { + errChan <- err + return err + } + defer watchChan.Stop() + for event := range watchChan.ResultChan() { + switch event.Type { + case watch.Added: + j := new(job.Job) + err := j.Decode(event.GetValue()) + if err != nil { + logrus.Errorf("decode job error :%s", err) + continue + } + n.addJob(j) + case watch.Modified: + j := new(job.Job) + err := j.Decode(event.GetValue()) + if err != nil { + logrus.Errorf("decode job error :%s", err) + continue + } + n.modJob(j) + case watch.Deleted: + n.delJob(event.GetKey()) + default: + logrus.Errorf("watch job error:%v", event.Error) + errChan <- event.Error + } + } + return nil +} -// //添加job缓存 -// func (n *manager) addJob(j *corejob.Job) { -// if !j.IsRunOn(n.HostNode) { -// return -// } -// //一次性任务 -// if j.Rules.Mode != corejob.Cycle { -// n.runOnceJob(j) -// return -// } -// n.jobLock.Lock() -// defer n.jobLock.Unlock() -// n.jobs[j.ID] = j -// cmds := j.Cmds(n.HostNode) -// if len(cmds) == 0 { -// return -// } -// for _, cmd := range cmds { -// n.addCmd(cmd) -// } -// return -// } +//添加job缓存 +func (n *manager) addJob(j *job.Job) { + if !j.IsRunOn(n.HostNode.ID) { + return + } + //一次性任务 + if j.Rules.Mode != job.Cycle { + n.runOnceJob(j) + return + } + n.jobLock.Lock() + defer n.jobLock.Unlock() + n.jobs[j.ID] = j + cmds := j.Cmds() + if len(cmds) == 0 { + return + } + for _, cmd := range cmds { + n.addCmd(cmd) + } + return +} -// func (n *manager) delJob(id string) { -// n.jobLock.Lock() -// defer n.jobLock.Unlock() -// n.delIDs[id] = true -// job, ok := n.jobs[id] -// // 之前此任务没有在当前结点执行 -// if !ok { -// return -// } -// cmds := job.Cmds(n.HostNode) -// if len(cmds) == 0 { -// return -// } -// for _, cmd := range cmds { -// n.delCmd(cmd) -// } -// delete(n.jobs, id) -// return -// } +func (n *manager) delJob(id string) { + n.jobLock.Lock() + defer n.jobLock.Unlock() + n.delIDs[id] = true + job, ok := n.jobs[id] + // 之前此任务没有在当前结点执行 + if !ok { + return + } + cmds := job.Cmds() + if len(cmds) == 0 { + return + } + for _, cmd := range cmds { + n.delCmd(cmd) + } + delete(n.jobs, id) + return +} -// func (n *manager) modJob(job *corejob.Job) { -// if !job.IsRunOn(n.HostNode) { -// return -// } -// //一次性任务 -// if job.Rules.Mode != corejob.Cycle { -// n.runOnceJob(job) -// return -// } -// oJob, ok := n.jobs[job.ID] -// // 之前此任务没有在当前结点执行,直接增加任务 -// if !ok { -// n.addJob(job) -// return -// } -// prevCmds := oJob.Cmds(n.HostNode) +func (n *manager) modJob(jobb *job.Job) { + if !jobb.IsRunOn(n.HostNode.ID) { + return + } + //一次性任务 + if jobb.Rules.Mode != job.Cycle { + n.runOnceJob(jobb) + return + } + oJob, ok := n.jobs[jobb.ID] + // 之前此任务没有在当前结点执行,直接增加任务 + if !ok { + n.addJob(jobb) + return + } + prevCmds := oJob.Cmds() -// job.Count = oJob.Count -// *oJob = *job -// cmds := oJob.Cmds(n.HostNode) -// for id, cmd := range cmds { -// n.modCmd(cmd) -// delete(prevCmds, id) -// } -// for _, cmd := range prevCmds { -// n.delCmd(cmd) -// } -// } + jobb.Count = oJob.Count + *oJob = *jobb + cmds := oJob.Cmds() + for id, cmd := range cmds { + n.modCmd(cmd) + delete(prevCmds, id) + } + for _, cmd := range prevCmds { + n.delCmd(cmd) + } +} -// func (n *manager) addCmd(cmd *corejob.Cmd) { -// n.Cron.Schedule(cmd.Rule.Schedule, cmd) -// n.cmds[cmd.GetID()] = cmd -// logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer) -// return -// } +func (n *manager) addCmd(cmd *job.Cmd) { + n.Cron.Schedule(cmd.Rule.Schedule, cmd) + n.cmds[cmd.GetID()] = cmd + logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer) + return +} -// func (n *manager) modCmd(cmd *corejob.Cmd) { -// c, ok := n.cmds[cmd.GetID()] -// if !ok { -// n.addCmd(cmd) -// return -// } -// sch := c.Rule.Timer -// *c = *cmd -// // 节点执行时间改变,更新 cron -// // 否则不用更新 cron -// if c.Rule.Timer != sch { -// n.Cron.Schedule(c.Rule.Schedule, c) -// } -// logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Rule.ID, c.Rule.Timer) -// } +func (n *manager) modCmd(cmd *job.Cmd) { + c, ok := n.cmds[cmd.GetID()] + if !ok { + n.addCmd(cmd) + return + } + sch := c.Rule.Timer + *c = *cmd + // 节点执行时间改变,更新 cron + // 否则不用更新 cron + if c.Rule.Timer != sch { + n.Cron.Schedule(c.Rule.Schedule, c) + } + logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Rule.ID, c.Rule.Timer) +} -// func (n *manager) delCmd(cmd *corejob.Cmd) { -// delete(n.cmds, cmd.GetID()) -// n.Cron.DelJob(cmd) -// logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer) -// } +func (n *manager) delCmd(cmd *job.Cmd) { + delete(n.cmds, cmd.GetID()) + n.Cron.DelJob(cmd) + logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer) +} -// //job must be schedulered -// func (n *manager) runOnceJob(j *corejob.Job) { -// go j.RunWithRecovery() -// } +//job must be schedulered +func (n *manager) runOnceJob(j *job.Job) { + go j.RunWithRecovery() +} -// //Stop 停止服务 -// func (n *manager) Stop(i interface{}) { -// n.Cron.Stop() -// } +//Stop 停止服务 +func (n *manager) Stop() error { + n.cancel() + n.Cron.Stop() + return nil +} -// //Newmanager new server -// func Newmanager(cfg *conf.Conf) (*manager, error) { -// currentNode, err := GetCurrentNode(cfg) -// if err != nil { -// return nil, err -// } -// if cfg.TTL == 0 { -// cfg.TTL = 10 -// } -// n := &manager{ -// Cron: cron.New(), -// jobs: make(Jobs, 8), -// onceJobs: make(Jobs, 8), -// cmds: make(map[string]*corejob.Cmd), -// delIDs: make(map[string]bool, 8), -// ttl: cfg.TTL, -// } -// return n, nil -// } +//Newmanager new server +func Newmanager(cfg *option.Conf, etcdCli *clientv3.Client) (Manager, error) { + if cfg.TTL == 0 { + cfg.TTL = 10 + } + ctx, cancel := context.WithCancel(context.Background()) + n := &manager{ + ctx: ctx, + cancel: cancel, + Cron: cron.New(), + jobs: make(Jobs, 8), + onceJobs: make(Jobs, 8), + cmds: make(map[string]*job.Cmd), + delIDs: make(map[string]bool, 8), + ttl: cfg.TTL, + etcdcli: etcdCli, + } + return n, nil +} diff --git a/node/nodeserver/server.go b/node/nodeserver/server.go deleted file mode 100644 index 07d6f89a3..000000000 --- a/node/nodeserver/server.go +++ /dev/null @@ -1,309 +0,0 @@ -// Copyright (C) 2014-2018 Goodrain Co., Ltd. -// RAINBOND, Application Management Platform - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. For any non-GPL usage of Rainbond, -// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. -// must be obtained first. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package nodeserver - -import ( - "context" - "sync" - "time" - - "github.com/goodrain/rainbond/node/core/job" - - "github.com/goodrain/rainbond/util/watch" - - conf "github.com/goodrain/rainbond/cmd/node/option" - corejob "github.com/goodrain/rainbond/node/core/job" - "github.com/goodrain/rainbond/node/core/store" - nodeclient "github.com/goodrain/rainbond/node/nodem/client" - "github.com/robfig/cron" - - "github.com/Sirupsen/logrus" - client "github.com/coreos/etcd/clientv3" -) - -//Config config server -type Config struct { - EtcdEndPoints []string - EtcdTimeout int - EtcdPrefix string - ClusterName string - APIAddr string - K8SConfPath string - EventServerAddress []string - PrometheusMetricPath string - TTL int64 -} - -//Jobs jobs -type Jobs map[string]*corejob.Job - -//NodeServer node manager server -type NodeServer struct { - *store.Client - *nodeclient.HostNode - *cron.Cron - ctx context.Context - cancel context.CancelFunc - jobs Jobs // 和结点相关的任务 - onceJobs Jobs //记录执行的单任务 - jobLock sync.Mutex - cmds map[string]*corejob.Cmd - // 删除的 job id,用于 group 更新 - delIDs map[string]bool - ttl int64 - lID client.LeaseID // lease id - regLeaseID client.LeaseID // lease id - done chan struct{} - //Config - *conf.Conf -} - -//Regist 节点注册 -func (n *NodeServer) Regist() error { - resp, err := n.Client.Grant(n.ttl + 2) - if err != nil { - return err - } - if _, err = n.HostNode.Update(); err != nil { - return err - } - if _, err = n.HostNode.Put(client.WithLease(resp.ID)); err != nil { - return err - } - n.lID = resp.ID - logrus.Infof("node(%s) registe success", n.HostName) - return nil -} - -//Run 启动 -func (n *NodeServer) Run(errchan chan error) (err error) { - n.ctx, n.cancel = context.WithCancel(context.Background()) - n.Regist() - go n.keepAlive() - go n.watchJobs(errchan) - n.Cron.Start() - if err := corejob.StartProc(); err != nil { - logrus.Warnf("[process key will not timeout]proc lease id set err: %s", err.Error()) - } - return -} - -func (n *NodeServer) watchJobs(errChan chan error) error { - watcher := watch.New(store.DefalutClient.Client, "") - watchChan, err := watcher.WatchList(n.ctx, n.Conf.JobPath, "") - if err != nil { - errChan <- err - return err - } - defer watchChan.Stop() - for event := range watchChan.ResultChan() { - switch event.Type { - case watch.Added: - j := new(job.Job) - err := j.Decode(event.GetValue()) - if err != nil { - logrus.Errorf("decode job error :%s", err) - continue - } - n.addJob(j) - case watch.Modified: - j := new(job.Job) - err := j.Decode(event.GetValue()) - if err != nil { - logrus.Errorf("decode job error :%s", err) - continue - } - n.modJob(j) - case watch.Deleted: - n.delJob(event.GetKey()) - default: - logrus.Errorf("watch job error:%v", event.Error) - errChan <- event.Error - } - } - return nil -} - -//添加job缓存 -func (n *NodeServer) addJob(j *corejob.Job) { - if !j.IsRunOn(n.HostNode.ID) { - return - } - //一次性任务 - if j.Rules.Mode != corejob.Cycle { - n.runOnceJob(j) - return - } - n.jobLock.Lock() - defer n.jobLock.Unlock() - n.jobs[j.ID] = j - cmds := j.Cmds() - if len(cmds) == 0 { - return - } - for _, cmd := range cmds { - n.addCmd(cmd) - } - return -} - -func (n *NodeServer) delJob(id string) { - n.jobLock.Lock() - defer n.jobLock.Unlock() - n.delIDs[id] = true - job, ok := n.jobs[id] - // 之前此任务没有在当前结点执行 - if !ok { - return - } - cmds := job.Cmds() - if len(cmds) == 0 { - return - } - for _, cmd := range cmds { - n.delCmd(cmd) - } - delete(n.jobs, id) - return -} - -func (n *NodeServer) modJob(job *corejob.Job) { - if !job.IsRunOn(n.HostNode.ID) { - return - } - //一次性任务 - if job.Rules.Mode != corejob.Cycle { - n.runOnceJob(job) - return - } - oJob, ok := n.jobs[job.ID] - // 之前此任务没有在当前结点执行,直接增加任务 - if !ok { - n.addJob(job) - return - } - prevCmds := oJob.Cmds() - - job.Count = oJob.Count - *oJob = *job - cmds := oJob.Cmds() - for id, cmd := range cmds { - n.modCmd(cmd) - delete(prevCmds, id) - } - for _, cmd := range prevCmds { - n.delCmd(cmd) - } -} - -func (n *NodeServer) addCmd(cmd *corejob.Cmd) { - n.Cron.Schedule(cmd.Rule.Schedule, cmd) - n.cmds[cmd.GetID()] = cmd - logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer) - return -} - -func (n *NodeServer) modCmd(cmd *corejob.Cmd) { - c, ok := n.cmds[cmd.GetID()] - if !ok { - n.addCmd(cmd) - return - } - sch := c.Rule.Timer - *c = *cmd - // 节点执行时间改变,更新 cron - // 否则不用更新 cron - if c.Rule.Timer != sch { - n.Cron.Schedule(c.Rule.Schedule, c) - } - logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Rule.ID, c.Rule.Timer) -} - -func (n *NodeServer) delCmd(cmd *corejob.Cmd) { - delete(n.cmds, cmd.GetID()) - n.Cron.DelJob(cmd) - logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer) -} - -//job must be schedulered -func (n *NodeServer) runOnceJob(j *corejob.Job) { - go j.RunWithRecovery() -} - -//Stop 停止服务 -func (n *NodeServer) Stop(i interface{}) { - n.cancel() - n.HostNode.Down() - close(n.done) - n.HostNode.Del() - n.Client.Close() - n.Cron.Stop() -} - -func (n *NodeServer) keepAlive() { - duration := time.Duration(n.ttl) * time.Second - timer := time.NewTimer(duration) - for { - select { - case <-n.done: - timer.Stop() - return - case <-timer.C: - if n.lID > 0 { - _, err := n.Client.KeepAliveOnce(n.lID) - if err == nil { - timer.Reset(duration) - continue - } - logrus.Warnf("%s lid[%x] keepAlive err: %s, try to reset...", n.HostName, n.lID, err.Error()) - n.lID = 0 - } - if err := n.Regist(); err != nil { - logrus.Warnf("%s set lid err: %s, try to reset after %d seconds...", n.HostName, err.Error(), n.ttl) - } else { - logrus.Infof("%s set lid[%x] success", n.HostName, n.lID) - } - timer.Reset(duration) - } - } -} - -//NewNodeServer new server -func NewNodeServer(cfg *conf.Conf) (*NodeServer, error) { - //currentNode, err := GetCurrentNode(cfg) - // if err != nil { - // return nil, err - // } - if cfg.TTL == 0 { - cfg.TTL = 10 - } - n := &NodeServer{ - Client: store.DefalutClient, - //HostNode: currentNode, - Cron: cron.New(), - jobs: make(Jobs, 8), - onceJobs: make(Jobs, 8), - cmds: make(map[string]*corejob.Cmd), - delIDs: make(map[string]bool, 8), - Conf: cfg, - ttl: cfg.TTL, - done: make(chan struct{}), - } - return n, nil -} diff --git a/vendor/github.com/prometheus/node_exporter/collector/collector.go b/vendor/github.com/prometheus/node_exporter/collector/collector.go index e8840cc07..eb5c0698c 100644 --- a/vendor/github.com/prometheus/node_exporter/collector/collector.go +++ b/vendor/github.com/prometheus/node_exporter/collector/collector.go @@ -72,12 +72,12 @@ func registerCollector(collector string, isDefaultEnabled bool, factory func() ( } // NodeCollector implements the prometheus.Collector interface. -type nodeCollector struct { +type NodeCollector struct { Collectors map[string]Collector } // NewNodeCollector creates a new NodeCollector -func NewNodeCollector(filters ...string) (*nodeCollector, error) { +func NewNodeCollector(filters ...string) (*NodeCollector, error) { f := make(map[string]bool) for _, filter := range filters { enabled, exist := collectorState[filter] @@ -103,17 +103,17 @@ func NewNodeCollector(filters ...string) (*nodeCollector, error) { } } } - return &nodeCollector{Collectors: collectors}, nil + return &NodeCollector{Collectors: collectors}, nil } // Describe implements the prometheus.Collector interface. -func (n nodeCollector) Describe(ch chan<- *prometheus.Desc) { +func (n NodeCollector) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeDurationDesc ch <- scrapeSuccessDesc } // Collect implements the prometheus.Collector interface. -func (n nodeCollector) Collect(ch chan<- prometheus.Metric) { +func (n NodeCollector) Collect(ch chan<- prometheus.Metric) { wg := sync.WaitGroup{} wg.Add(len(n.Collectors)) for name, c := range n.Collectors {