[REV] update node monitor code

This commit is contained in:
barnettZQG 2018-07-16 11:56:48 +08:00
parent f33a8836ff
commit a217cf54c2
13 changed files with 465 additions and 793 deletions

View File

@ -25,7 +25,6 @@ import (
"time"
"github.com/goodrain/rainbond/node/utils"
"github.com/prometheus/node_exporter/collector"
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
@ -56,15 +55,6 @@ func Init() error {
if err := Config.parse(); err != nil {
return err
}
// This instance is only used to check collector creation and logging.
nc, err := collector.NewNodeCollector()
if err != nil {
logrus.Fatalf("Couldn't create collector: %s", err)
}
logrus.Infof("Enabled collectors:")
for n := range nc.Collectors {
logrus.Infof(" - %s", n)
}
initialized = true
return nil
}
@ -242,10 +232,3 @@ func (c *Conf) parse() error {
c.HostIDFile = "/opt/rainbond/etc/node/node_host_uuid.conf"
return nil
}
func Exit(i interface{}) {
close(exitChan)
if watcher != nil {
watcher.Close()
}
}

View File

@ -24,23 +24,18 @@ import (
"syscall"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/api"
"github.com/goodrain/rainbond/node/api/controller"
"github.com/goodrain/rainbond/node/core/job"
"github.com/goodrain/rainbond/node/core/store"
"github.com/goodrain/rainbond/node/kubecache"
"github.com/goodrain/rainbond/node/masterserver"
"github.com/goodrain/rainbond/node/monitormessage"
"github.com/goodrain/rainbond/node/nodem"
"github.com/goodrain/rainbond/node/statsd"
"github.com/prometheus/client_golang/prometheus"
"github.com/Sirupsen/logrus"
eventLog "github.com/goodrain/rainbond/event"
"os/signal"
"github.com/goodrain/rainbond/node/api"
)
//Run start run
@ -61,14 +56,15 @@ func Run(c *option.Conf) error {
return err
}
defer kubecli.Stop()
// init etcd client
if err = store.NewClient(c); err != nil {
return fmt.Errorf("Connect to ETCD %s failed: %s",
c.Etcd.Endpoints, err)
}
nodemanager := nodem.NewNodeManager(c)
nodemanager, err := nodem.NewNodeManager(c)
if err != nil {
return fmt.Errorf("create node manager failed: %s", err)
}
if err := nodemanager.Start(errChan); err != nil {
return fmt.Errorf("start node manager failed: %s", err)
}
@ -87,27 +83,17 @@ func Run(c *option.Conf) error {
}
defer ms.Stop(nil)
}
//statsd exporter
registry := prometheus.NewRegistry()
exporter := statsd.CreateExporter(c.StatsdConfig, registry)
if err := exporter.Start(); err != nil {
logrus.Errorf("start statsd exporter server error,%s", err.Error())
return err
}
meserver := monitormessage.CreateUDPServer("0.0.0.0", 6666, c.Etcd.Endpoints)
if err := meserver.Start(); err != nil {
return err
}
//启动API服务
apiManager := api.NewManager(*c, nodemanager.GetCurrentNode(), ms, exporter, kubecli)
//create api manager
apiManager := api.NewManager(*c, nodemanager.GetCurrentNode(), ms, kubecli)
if err := apiManager.Start(errChan); err != nil {
return err
}
if err := nodemanager.AddAPIManager(apiManager); err != nil {
return err
}
defer apiManager.Stop()
defer job.Exit(nil)
defer controller.Exist(nil)
defer option.Exit(nil)
//step finally: listen Signal
term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)

View File

@ -29,9 +29,7 @@ import (
"github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/prometheus/node_exporter/collector"
"github.com/goodrain/rainbond/node/api/model"
@ -372,39 +370,6 @@ func Instances(w http.ResponseWriter, r *http.Request) {
httputil.ReturnSuccess(r, w, pods)
}
//NodeExporter 节点监控
func NodeExporter(w http.ResponseWriter, r *http.Request) {
// filters := r.URL.Query()["collect[]"]
// logrus.Debugln("collect query:", filters)
filters := []string{"cpu", "diskstats", "filesystem", "ipvs", "loadavg", "meminfo", "netdev", "netstat", "uname", "mountstats", "nfs"}
nc, err := collector.NewNodeCollector(filters...)
if err != nil {
logrus.Warnln("Couldn't create", err)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Couldn't create %s", err)))
return
}
registry := prometheus.NewRegistry()
err = registry.Register(nc)
if err != nil {
logrus.Errorln("Couldn't register collector:", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("Couldn't register collector: %s", err)))
return
}
gatherers := prometheus.Gatherers{
prometheus.DefaultGatherer,
registry,
}
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(gatherers,
promhttp.HandlerOpts{
ErrorLog: logrus.StandardLogger(),
ErrorHandling: promhttp.ContinueOnError,
})
h.ServeHTTP(w, r)
}
//临时存在
func outJSON(w http.ResponseWriter, data interface{}) {
outJSONWithCode(w, http.StatusOK, data)

View File

@ -27,8 +27,6 @@ import (
"github.com/goodrain/rainbond/node/kubecache"
"github.com/goodrain/rainbond/node/masterserver"
"github.com/goodrain/rainbond/node/statsd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/goodrain/rainbond/node/api/controller"
"github.com/goodrain/rainbond/node/api/router"
@ -60,50 +58,21 @@ type Manager struct {
}
//NewManager api manager
func NewManager(c option.Conf, node *nodeclient.HostNode, ms *masterserver.MasterServer, exporter *statsd.Exporter, kubecli kubecache.KubeClient) *Manager {
func NewManager(c option.Conf, node *nodeclient.HostNode, ms *masterserver.MasterServer, kubecli kubecache.KubeClient) *Manager {
r := router.Routers(c.RunMode)
ctx, cancel := context.WithCancel(context.Background())
controller.Init(&c, ms, kubecli)
m := &Manager{
ctx: ctx,
cancel: cancel,
conf: c,
router: r,
node: node,
ms: ms,
exporter: exporter,
ctx: ctx,
cancel: cancel,
conf: c,
router: r,
node: node,
ms: ms,
}
m.router.Get("/app/metrics", m.HandleStatsd)
m.router.Get("/-/statsdreload", m.ReloadStatsdMappConfig)
return m
}
//ReloadStatsdMappConfig ReloadStatsdMappConfig
func (m *Manager) ReloadStatsdMappConfig(w http.ResponseWriter, r *http.Request) {
if err := m.exporter.ReloadConfig(); err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(500)
} else {
w.Write([]byte("Success reload"))
w.WriteHeader(200)
}
}
//HandleStatsd statsd handle
func (m *Manager) HandleStatsd(w http.ResponseWriter, r *http.Request) {
gatherers := prometheus.Gatherers{
prometheus.DefaultGatherer,
m.exporter.GetRegister(),
}
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(gatherers,
promhttp.HandlerOpts{
ErrorLog: logrus.StandardLogger(),
ErrorHandling: promhttp.ContinueOnError,
})
h.ServeHTTP(w, r)
}
//Start 启动
func (m *Manager) Start(errChan chan error) error {
logrus.Infof("api server start listening on %s", m.conf.APIAddr)
@ -154,11 +123,7 @@ func (m *Manager) Stop() error {
return nil
}
func (m *Manager) prometheus() {
//prometheus.MustRegister(version.NewCollector("acp_node"))
// exporter := monitor.NewExporter(m.coreManager)
// prometheus.MustRegister(exporter)
//todo 我注释的
//m.container.Handle(m.conf.PrometheusMetricPath, promhttp.Handler())
//GetRouter GetRouter
func (m *Manager) GetRouter() *chi.Mux {
return m.router
}

View File

@ -116,7 +116,5 @@ func Routers(mode string) *chi.Mux {
r.Put("/tasks/taskreload", controller.ReloadStaticTasks)
}
})
//节点监控
r.Get("/node/metrics", controller.NodeExporter)
return r
}

View File

@ -26,121 +26,10 @@ import (
"time"
conf "github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/core/store"
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
)
var (
lID *leaseID
)
// 维持 lease id 服务
func StartProc() error {
lID = &leaseID{
ttl: conf.Config.ProcTTL,
lk: new(sync.RWMutex),
done: make(chan struct{}),
}
if lID.ttl == 0 {
return nil
}
err := lID.set()
go lID.keepAlive()
return err
}
func Reload(i interface{}) {
if lID.ttl == conf.Config.ProcTTL {
return
}
close(lID.done)
lID.done, lID.ttl = make(chan struct{}), conf.Config.ProcTTL
if conf.Config.ProcTTL == 0 {
return
}
if err := lID.set(); err != nil {
logrus.Warnf("proc lease id set err: %s", err.Error())
}
go lID.keepAlive()
}
func Exit(i interface{}) {
if lID.done != nil {
close(lID.done)
}
}
type leaseID struct {
ttl int64
ID client.LeaseID
lk *sync.RWMutex
done chan struct{}
}
func (l *leaseID) get() client.LeaseID {
if l.ttl == 0 {
return -1
}
l.lk.RLock()
id := l.ID
l.lk.RUnlock()
return id
}
func (l *leaseID) set() error {
id := client.LeaseID(-1)
resp, err := store.DefalutClient.Grant(l.ttl + 2)
if err == nil {
id = resp.ID
}
l.lk.Lock()
l.ID = id
l.lk.Unlock()
return err
}
func (l *leaseID) keepAlive() {
duration := time.Duration(l.ttl) * time.Second
timer := time.NewTimer(duration)
for {
select {
case <-l.done:
return
case <-timer.C:
if l.ttl == 0 {
return
}
id := l.get()
if id > 0 {
_, err := store.DefalutClient.KeepAliveOnce(l.ID)
if err == nil {
timer.Reset(duration)
continue
}
logrus.Warnf("proc lease id[%x] keepAlive err: %s, try to reset...", id, err.Error())
}
if err := l.set(); err != nil {
logrus.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
} else {
logrus.Infof("proc set lease id[%x] success", l.get())
}
timer.Reset(duration)
}
}
}
// 当前执行中的任务信息
// key: /cronsun/proc/node/group/jobId/pid
// value: 开始执行时间
@ -187,32 +76,34 @@ func (p *Process) Val() string {
// 有可能某种原因put 命令已经发送到 etcd server
// 目前已知的 deadline 会出现此情况
func (p *Process) put() (err error) {
if atomic.LoadInt32(&p.running) != 1 {
return
}
// if atomic.LoadInt32(&p.running) != 1 {
// return
// }
if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) {
return
}
// if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) {
// return
// }
id := lID.get()
if id < 0 {
if _, err = store.DefalutClient.Put(p.Key(), p.Val()); err != nil {
return
}
}
// id := lID.get()
// if id < 0 {
// if _, err = store.DefalutClient.Put(p.Key(), p.Val()); err != nil {
// return
// }
// }
_, err = store.DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
// _, err = store.DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
// return
return
}
func (p *Process) del() error {
if atomic.LoadInt32(&p.hasPut) != 1 {
return nil
}
// if atomic.LoadInt32(&p.hasPut) != 1 {
// return nil
// }
_, err := store.DefalutClient.Delete(p.Key())
return err
// _, err := store.DefalutClient.Delete(p.Key())
// return err
return nil
}
func (p *Process) Start() {

View File

@ -151,25 +151,22 @@ func (n *Cluster) GetNode(id string) *client.HostNode {
}
func (n *Cluster) handleNodeStatus(v *client.HostNode) {
if v.Role.HasRule("compute") {
if v.NodeStatus != nil {
if v.Unschedulable {
k8sNode, err := n.kubecli.GetNode(v.ID)
if err != nil {
logrus.Errorf("get k8s node error:%s", err.Error())
v.Status = "error"
return
}
if k8sNode != nil {
if v.Unschedulable || k8sNode.Spec.Unschedulable {
v.Status = "unschedulable"
return
}
// if v.AvailableCPU == 0 {
// v.AvailableCPU = v.NodeStatus.Allocatable.Cpu().Value()
// }
// if v.AvailableMemory == 0 {
// v.AvailableMemory = v.NodeStatus.Allocatable.Memory().Value()
// }
var haveready bool
for _, condiction := range v.NodeStatus.Conditions {
for _, condiction := range k8sNode.Status.Conditions {
if condiction.Status == "True" && (condiction.Type == "OutOfDisk" || condiction.Type == "MemoryPressure" || condiction.Type == "DiskPressure") {
v.Status = "error"
return
}
if v.Status == "unschedulable" || v.Status == "init" || v.Status == "init_success" || v.Status == "init_failed" || v.Status == "installing" || v.Status == "install_success" || v.Status == "install_failed" {
}
if condiction.Type == "Ready" {
haveready = true
@ -193,10 +190,13 @@ func (n *Cluster) handleNodeStatus(v *client.HostNode) {
}
if v.Alived {
for _, condition := range v.NodeStatus.Conditions {
if condition.Type == "NodeInit" && condition.Status == "True" {
if condition.Type == "Ready" && condition.Status == "True" {
v.Status = "running"
return
}
}
} else {
v.Status = "down"
}
}
}

View File

@ -19,7 +19,11 @@
package client
import (
"context"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/core/config"
"github.com/goodrain/rainbond/node/core/job"
)
@ -27,6 +31,7 @@ import (
//ClusterClient ClusterClient
type ClusterClient interface {
UpdateStatus(*HostNode) error
DownNode(*HostNode) error
GetMasters() ([]*HostNode, error)
GetNode(nodeID string) (*HostNode, error)
GetDataCenterConfig() (*config.DataCenterConfig, error)
@ -37,17 +42,28 @@ type ClusterClient interface {
}
//NewClusterClient new cluster client
func NewClusterClient(etcdClient *clientv3.Client) ClusterClient {
func NewClusterClient(conf *option.Conf, etcdClient *clientv3.Client) ClusterClient {
return &etcdClusterClient{
etcdClient: etcdClient,
conf: conf,
}
}
type etcdClusterClient struct {
etcdClient *clientv3.Client
conf *option.Conf
onlineLes clientv3.LeaseID
}
func (e *etcdClusterClient) UpdateStatus(*HostNode) error {
func (e *etcdClusterClient) UpdateStatus(n *HostNode) error {
n.UpTime = time.Now()
if err := e.Update(n); err != nil {
return err
}
if err := e.nodeOnlinePut(n); err != nil {
return err
}
return nil
}
@ -66,3 +82,43 @@ func (e *etcdClusterClient) WatchJobs() <-chan *job.Event {
func (e *etcdClusterClient) GetNode(nodeID string) (*HostNode, error) {
return nil, nil
}
//nodeOnlinePut onde noline status update
func (e *etcdClusterClient) nodeOnlinePut(h *HostNode) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
if e.onlineLes != 0 {
if _, err := e.etcdClient.KeepAlive(ctx, e.onlineLes); err == nil {
return nil
}
e.onlineLes = 0
}
les, err := e.etcdClient.Grant(ctx, 30)
if err != nil {
return err
}
e.onlineLes = les.ID
_, err = e.etcdClient.Put(ctx, e.conf.OnlineNodePath+"/"+h.ID, h.PID, clientv3.WithLease(les.ID))
if err != nil {
return err
}
return nil
}
//Update update node info
func (e *etcdClusterClient) Update(h *HostNode) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := e.etcdClient.Put(ctx, e.conf.NodePath+"/"+h.ID, h.String())
return err
}
//Down node
func (e *etcdClusterClient) DownNode(h *HostNode) error {
h.Alived, h.DownTime = false, time.Now()
e.Update(h)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := e.etcdClient.Delete(ctx, e.conf.OnlineNodePath+h.ID)
return err
}

View File

@ -19,19 +19,129 @@
package monitor
import (
"net/http"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/api"
"github.com/goodrain/rainbond/node/monitormessage"
"github.com/goodrain/rainbond/node/statsd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/node_exporter/collector"
)
//Manager Manager
type Manager interface {
Start(errchan chan error)
Start(errchan chan error) error
Stop() error
SetAPIRoute(apim *api.Manager) error
}
type manager struct {
statsdExporter *statsd.Exporter
nodeExporter *nodeExporter
statsdExporter *statsd.Exporter
statsdRegistry *prometheus.Registry
nodeExporterRestry *prometheus.Registry
meserver *monitormessage.UDPServer
}
type nodeExporter struct {
func createNodeExporterRestry() (*prometheus.Registry, error) {
registry := prometheus.NewRegistry()
filters := []string{"cpu", "diskstats", "filesystem", "ipvs", "loadavg", "meminfo", "netdev", "netstat", "uname", "mountstats", "nfs"}
nc, err := collector.NewNodeCollector(filters...)
if err != nil {
return nil, err
}
for n := range nc.Collectors {
logrus.Infof("node collector - %s", n)
}
err = registry.Register(nc)
if err != nil {
return nil, err
}
return registry, nil
}
//CreateManager CreateManager
func CreateManager(c *option.Conf) (Manager, error) {
//statsd exporter
statsdRegistry := prometheus.NewRegistry()
exporter := statsd.CreateExporter(c.StatsdConfig, statsdRegistry)
meserver := monitormessage.CreateUDPServer("0.0.0.0", 6666, c.Etcd.Endpoints)
nodeExporterRestry, err := createNodeExporterRestry()
if err != nil {
return nil, err
}
manage := &manager{
statsdExporter: exporter,
statsdRegistry: statsdRegistry,
nodeExporterRestry: nodeExporterRestry,
meserver: meserver,
}
return manage, nil
}
func (m *manager) Start(errchan chan error) error {
if err := m.statsdExporter.Start(); err != nil {
logrus.Errorf("start statsd exporter server error,%s", err.Error())
return err
}
if err := m.meserver.Start(); err != nil {
return err
}
return nil
}
func (m *manager) Stop() error {
return nil
}
//ReloadStatsdMappConfig ReloadStatsdMappConfig
func (m *manager) ReloadStatsdMappConfig(w http.ResponseWriter, r *http.Request) {
if err := m.statsdExporter.ReloadConfig(); err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(500)
} else {
w.Write([]byte("Success reload"))
w.WriteHeader(200)
}
}
//HandleStatsd statsd handle
func (m *manager) HandleStatsd(w http.ResponseWriter, r *http.Request) {
gatherers := prometheus.Gatherers{
prometheus.DefaultGatherer,
m.statsdRegistry,
}
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(gatherers,
promhttp.HandlerOpts{
ErrorLog: logrus.StandardLogger(),
ErrorHandling: promhttp.ContinueOnError,
})
h.ServeHTTP(w, r)
}
//NodeExporter node exporter
func (m *manager) NodeExporter(w http.ResponseWriter, r *http.Request) {
gatherers := prometheus.Gatherers{
prometheus.DefaultGatherer,
m.nodeExporterRestry,
}
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(gatherers,
promhttp.HandlerOpts{
ErrorLog: logrus.StandardLogger(),
ErrorHandling: promhttp.ContinueOnError,
})
h.ServeHTTP(w, r)
}
//SetAPIRoute set api route rule
func (m *manager) SetAPIRoute(apim *api.Manager) error {
apim.GetRouter().Get("/app/metrics", m.HandleStatsd)
apim.GetRouter().Get("/-/statsdreload", m.ReloadStatsdMappConfig)
apim.GetRouter().Get("/node/metrics", m.NodeExporter)
return nil
}

View File

@ -28,7 +28,10 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/version"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/api"
"github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/nodem/controller"
"github.com/goodrain/rainbond/node/nodem/healthy"
@ -49,17 +52,40 @@ type NodeManager struct {
controller controller.Manager
taskrun taskrun.Manager
cfg *option.Conf
apim *api.Manager
}
//NewNodeManager new a node manager
func NewNodeManager(conf *option.Conf) *NodeManager {
func NewNodeManager(conf *option.Conf) (*NodeManager, error) {
etcdcli, err := clientv3.New(conf.Etcd)
if err != nil {
return nil, err
}
taskrun, err := taskrun.Newmanager(conf, etcdcli)
if err != nil {
return nil, err
}
cluster := client.NewClusterClient(conf, etcdcli)
monitor, err := monitor.CreateManager(conf)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
nodem := &NodeManager{
cfg: conf,
ctx: ctx,
cancel: cancel,
cfg: conf,
ctx: ctx,
cancel: cancel,
taskrun: taskrun,
cluster: cluster,
monitor: monitor,
}
return nodem
return nodem, nil
}
//AddAPIManager AddApiManager
func (n *NodeManager) AddAPIManager(apim *api.Manager) error {
n.apim = apim
return n.monitor.SetAPIRoute(apim)
}
//Start start
@ -67,19 +93,19 @@ func (n *NodeManager) Start(errchan chan error) error {
if err := n.init(); err != nil {
return err
}
if err := n.controller.Start(); err != nil {
return fmt.Errorf("start node controller error,%s", err.Error())
}
services, err := n.controller.GetAllService()
if err != nil {
return fmt.Errorf("get all services error,%s", err.Error())
}
if err := n.healthy.AddServices(services); err != nil {
return fmt.Errorf("get all services error,%s", err.Error())
}
if err := n.healthy.Start(); err != nil {
return fmt.Errorf("node healty start error,%s", err.Error())
}
// if err := n.controller.Start(); err != nil {
// return fmt.Errorf("start node controller error,%s", err.Error())
// }
// services, err := n.controller.GetAllService()
// if err != nil {
// return fmt.Errorf("get all services error,%s", err.Error())
// }
// if err := n.healthy.AddServices(services); err != nil {
// return fmt.Errorf("get all services error,%s", err.Error())
// }
// if err := n.healthy.Start(); err != nil {
// return fmt.Errorf("node healty start error,%s", err.Error())
// }
go n.monitor.Start(errchan)
go n.taskrun.Start(errchan)
go n.heartbeat()
@ -89,6 +115,7 @@ func (n *NodeManager) Start(errchan chan error) error {
//Stop Stop
func (n *NodeManager) Stop() {
n.cancel()
n.cluster.DownNode(&n.HostNode)
if n.taskrun != nil {
n.taskrun.Stop()
}
@ -165,6 +192,7 @@ func (n *NodeManager) init() error {
if node.AvailableCPU == 0 {
node.AvailableCPU = int64(runtime.NumCPU())
}
node.Version = version.Version
return nil
}

View File

@ -19,7 +19,16 @@
package taskrun
import (
"context"
"sync"
"github.com/Sirupsen/logrus"
clientv3 "github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/node/core/job"
"github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/util/watch"
"github.com/robfig/cron"
)
//Manager Manager
@ -28,208 +37,198 @@ type Manager interface {
Stop() error
}
// //Config config server
// type Config struct {
// EtcdEndPoints []string
// EtcdTimeout int
// EtcdPrefix string
// ClusterName string
// APIAddr string
// K8SConfPath string
// EventServerAddress []string
// PrometheusMetricPath string
// TTL int64
// }
//Jobs jobs
type Jobs map[string]*job.Job
// //manager node manager server
// type manager struct {
// cluster client.ClusterClient
// *cron.Cron
// ctx context.Context
// jobs Jobs // 和结点相关的任务
// onceJobs Jobs //记录执行的单任务
// jobLock sync.Mutex
// cmds map[string]*corejob.Cmd
// delIDs map[string]bool
// ttl int64
// }
//manager node manager server
type manager struct {
cluster client.ClusterClient
etcdcli *clientv3.Client
HostNode *client.HostNode
*cron.Cron
ctx context.Context
cancel context.CancelFunc
Conf option.Conf
jobs Jobs // 和结点相关的任务
onceJobs Jobs //记录执行的单任务
jobLock sync.Mutex
cmds map[string]*job.Cmd
delIDs map[string]bool
ttl int64
}
// //Run taskrun start
// func (n *manager) Start(errchan chan error) {
// go n.watchJobs(errchan)
// n.Cron.Start()
// if err := corejob.StartProc(); err != nil {
// logrus.Warnf("[process key will not timeout]proc lease id set err: %s", err.Error())
// }
// return
// }
//Run taskrun start
func (n *manager) Start(errchan chan error) {
go n.watchJobs(errchan)
n.Cron.Start()
return
}
// func (n *manager) watchJobs(errChan chan error) error {
// watcher := watch.New(store.DefalutClient.Client, "")
// watchChan, err := watcher.WatchList(n.ctx, n.Conf.JobPath, "")
// if err != nil {
// errChan <- err
// return err
// }
// defer watchChan.Stop()
// for event := range watchChan.ResultChan() {
// switch event.Type {
// case watch.Added:
// j := new(job.Job)
// err := j.Decode(event.GetValue())
// if err != nil {
// logrus.Errorf("decode job error :%s", err)
// continue
// }
// n.addJob(j)
// case watch.Modified:
// j := new(job.Job)
// err := j.Decode(event.GetValue())
// if err != nil {
// logrus.Errorf("decode job error :%s", err)
// continue
// }
// n.modJob(j)
// case watch.Deleted:
// n.delJob(event.GetKey())
// default:
// logrus.Errorf("watch job error:%v", event.Error)
// errChan <- event.Error
// }
// }
// return nil
// }
func (n *manager) watchJobs(errChan chan error) error {
watcher := watch.New(n.etcdcli, "")
watchChan, err := watcher.WatchList(n.ctx, n.Conf.JobPath, "")
if err != nil {
errChan <- err
return err
}
defer watchChan.Stop()
for event := range watchChan.ResultChan() {
switch event.Type {
case watch.Added:
j := new(job.Job)
err := j.Decode(event.GetValue())
if err != nil {
logrus.Errorf("decode job error :%s", err)
continue
}
n.addJob(j)
case watch.Modified:
j := new(job.Job)
err := j.Decode(event.GetValue())
if err != nil {
logrus.Errorf("decode job error :%s", err)
continue
}
n.modJob(j)
case watch.Deleted:
n.delJob(event.GetKey())
default:
logrus.Errorf("watch job error:%v", event.Error)
errChan <- event.Error
}
}
return nil
}
// //添加job缓存
// func (n *manager) addJob(j *corejob.Job) {
// if !j.IsRunOn(n.HostNode) {
// return
// }
// //一次性任务
// if j.Rules.Mode != corejob.Cycle {
// n.runOnceJob(j)
// return
// }
// n.jobLock.Lock()
// defer n.jobLock.Unlock()
// n.jobs[j.ID] = j
// cmds := j.Cmds(n.HostNode)
// if len(cmds) == 0 {
// return
// }
// for _, cmd := range cmds {
// n.addCmd(cmd)
// }
// return
// }
//添加job缓存
func (n *manager) addJob(j *job.Job) {
if !j.IsRunOn(n.HostNode.ID) {
return
}
//一次性任务
if j.Rules.Mode != job.Cycle {
n.runOnceJob(j)
return
}
n.jobLock.Lock()
defer n.jobLock.Unlock()
n.jobs[j.ID] = j
cmds := j.Cmds()
if len(cmds) == 0 {
return
}
for _, cmd := range cmds {
n.addCmd(cmd)
}
return
}
// func (n *manager) delJob(id string) {
// n.jobLock.Lock()
// defer n.jobLock.Unlock()
// n.delIDs[id] = true
// job, ok := n.jobs[id]
// // 之前此任务没有在当前结点执行
// if !ok {
// return
// }
// cmds := job.Cmds(n.HostNode)
// if len(cmds) == 0 {
// return
// }
// for _, cmd := range cmds {
// n.delCmd(cmd)
// }
// delete(n.jobs, id)
// return
// }
func (n *manager) delJob(id string) {
n.jobLock.Lock()
defer n.jobLock.Unlock()
n.delIDs[id] = true
job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行
if !ok {
return
}
cmds := job.Cmds()
if len(cmds) == 0 {
return
}
for _, cmd := range cmds {
n.delCmd(cmd)
}
delete(n.jobs, id)
return
}
// func (n *manager) modJob(job *corejob.Job) {
// if !job.IsRunOn(n.HostNode) {
// return
// }
// //一次性任务
// if job.Rules.Mode != corejob.Cycle {
// n.runOnceJob(job)
// return
// }
// oJob, ok := n.jobs[job.ID]
// // 之前此任务没有在当前结点执行,直接增加任务
// if !ok {
// n.addJob(job)
// return
// }
// prevCmds := oJob.Cmds(n.HostNode)
func (n *manager) modJob(jobb *job.Job) {
if !jobb.IsRunOn(n.HostNode.ID) {
return
}
//一次性任务
if jobb.Rules.Mode != job.Cycle {
n.runOnceJob(jobb)
return
}
oJob, ok := n.jobs[jobb.ID]
// 之前此任务没有在当前结点执行,直接增加任务
if !ok {
n.addJob(jobb)
return
}
prevCmds := oJob.Cmds()
// job.Count = oJob.Count
// *oJob = *job
// cmds := oJob.Cmds(n.HostNode)
// for id, cmd := range cmds {
// n.modCmd(cmd)
// delete(prevCmds, id)
// }
// for _, cmd := range prevCmds {
// n.delCmd(cmd)
// }
// }
jobb.Count = oJob.Count
*oJob = *jobb
cmds := oJob.Cmds()
for id, cmd := range cmds {
n.modCmd(cmd)
delete(prevCmds, id)
}
for _, cmd := range prevCmds {
n.delCmd(cmd)
}
}
// func (n *manager) addCmd(cmd *corejob.Cmd) {
// n.Cron.Schedule(cmd.Rule.Schedule, cmd)
// n.cmds[cmd.GetID()] = cmd
// logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
// return
// }
func (n *manager) addCmd(cmd *job.Cmd) {
n.Cron.Schedule(cmd.Rule.Schedule, cmd)
n.cmds[cmd.GetID()] = cmd
logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
return
}
// func (n *manager) modCmd(cmd *corejob.Cmd) {
// c, ok := n.cmds[cmd.GetID()]
// if !ok {
// n.addCmd(cmd)
// return
// }
// sch := c.Rule.Timer
// *c = *cmd
// // 节点执行时间改变,更新 cron
// // 否则不用更新 cron
// if c.Rule.Timer != sch {
// n.Cron.Schedule(c.Rule.Schedule, c)
// }
// logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Rule.ID, c.Rule.Timer)
// }
func (n *manager) modCmd(cmd *job.Cmd) {
c, ok := n.cmds[cmd.GetID()]
if !ok {
n.addCmd(cmd)
return
}
sch := c.Rule.Timer
*c = *cmd
// 节点执行时间改变,更新 cron
// 否则不用更新 cron
if c.Rule.Timer != sch {
n.Cron.Schedule(c.Rule.Schedule, c)
}
logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Rule.ID, c.Rule.Timer)
}
// func (n *manager) delCmd(cmd *corejob.Cmd) {
// delete(n.cmds, cmd.GetID())
// n.Cron.DelJob(cmd)
// logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
// }
func (n *manager) delCmd(cmd *job.Cmd) {
delete(n.cmds, cmd.GetID())
n.Cron.DelJob(cmd)
logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
}
// //job must be schedulered
// func (n *manager) runOnceJob(j *corejob.Job) {
// go j.RunWithRecovery()
// }
//job must be schedulered
func (n *manager) runOnceJob(j *job.Job) {
go j.RunWithRecovery()
}
// //Stop 停止服务
// func (n *manager) Stop(i interface{}) {
// n.Cron.Stop()
// }
//Stop 停止服务
func (n *manager) Stop() error {
n.cancel()
n.Cron.Stop()
return nil
}
// //Newmanager new server
// func Newmanager(cfg *conf.Conf) (*manager, error) {
// currentNode, err := GetCurrentNode(cfg)
// if err != nil {
// return nil, err
// }
// if cfg.TTL == 0 {
// cfg.TTL = 10
// }
// n := &manager{
// Cron: cron.New(),
// jobs: make(Jobs, 8),
// onceJobs: make(Jobs, 8),
// cmds: make(map[string]*corejob.Cmd),
// delIDs: make(map[string]bool, 8),
// ttl: cfg.TTL,
// }
// return n, nil
// }
//Newmanager new server
func Newmanager(cfg *option.Conf, etcdCli *clientv3.Client) (Manager, error) {
if cfg.TTL == 0 {
cfg.TTL = 10
}
ctx, cancel := context.WithCancel(context.Background())
n := &manager{
ctx: ctx,
cancel: cancel,
Cron: cron.New(),
jobs: make(Jobs, 8),
onceJobs: make(Jobs, 8),
cmds: make(map[string]*job.Cmd),
delIDs: make(map[string]bool, 8),
ttl: cfg.TTL,
etcdcli: etcdCli,
}
return n, nil
}

View File

@ -1,309 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package nodeserver
import (
"context"
"sync"
"time"
"github.com/goodrain/rainbond/node/core/job"
"github.com/goodrain/rainbond/util/watch"
conf "github.com/goodrain/rainbond/cmd/node/option"
corejob "github.com/goodrain/rainbond/node/core/job"
"github.com/goodrain/rainbond/node/core/store"
nodeclient "github.com/goodrain/rainbond/node/nodem/client"
"github.com/robfig/cron"
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
)
//Config config server
type Config struct {
EtcdEndPoints []string
EtcdTimeout int
EtcdPrefix string
ClusterName string
APIAddr string
K8SConfPath string
EventServerAddress []string
PrometheusMetricPath string
TTL int64
}
//Jobs jobs
type Jobs map[string]*corejob.Job
//NodeServer node manager server
type NodeServer struct {
*store.Client
*nodeclient.HostNode
*cron.Cron
ctx context.Context
cancel context.CancelFunc
jobs Jobs // 和结点相关的任务
onceJobs Jobs //记录执行的单任务
jobLock sync.Mutex
cmds map[string]*corejob.Cmd
// 删除的 job id用于 group 更新
delIDs map[string]bool
ttl int64
lID client.LeaseID // lease id
regLeaseID client.LeaseID // lease id
done chan struct{}
//Config
*conf.Conf
}
//Regist 节点注册
func (n *NodeServer) Regist() error {
resp, err := n.Client.Grant(n.ttl + 2)
if err != nil {
return err
}
if _, err = n.HostNode.Update(); err != nil {
return err
}
if _, err = n.HostNode.Put(client.WithLease(resp.ID)); err != nil {
return err
}
n.lID = resp.ID
logrus.Infof("node(%s) registe success", n.HostName)
return nil
}
//Run 启动
func (n *NodeServer) Run(errchan chan error) (err error) {
n.ctx, n.cancel = context.WithCancel(context.Background())
n.Regist()
go n.keepAlive()
go n.watchJobs(errchan)
n.Cron.Start()
if err := corejob.StartProc(); err != nil {
logrus.Warnf("[process key will not timeout]proc lease id set err: %s", err.Error())
}
return
}
func (n *NodeServer) watchJobs(errChan chan error) error {
watcher := watch.New(store.DefalutClient.Client, "")
watchChan, err := watcher.WatchList(n.ctx, n.Conf.JobPath, "")
if err != nil {
errChan <- err
return err
}
defer watchChan.Stop()
for event := range watchChan.ResultChan() {
switch event.Type {
case watch.Added:
j := new(job.Job)
err := j.Decode(event.GetValue())
if err != nil {
logrus.Errorf("decode job error :%s", err)
continue
}
n.addJob(j)
case watch.Modified:
j := new(job.Job)
err := j.Decode(event.GetValue())
if err != nil {
logrus.Errorf("decode job error :%s", err)
continue
}
n.modJob(j)
case watch.Deleted:
n.delJob(event.GetKey())
default:
logrus.Errorf("watch job error:%v", event.Error)
errChan <- event.Error
}
}
return nil
}
//添加job缓存
func (n *NodeServer) addJob(j *corejob.Job) {
if !j.IsRunOn(n.HostNode.ID) {
return
}
//一次性任务
if j.Rules.Mode != corejob.Cycle {
n.runOnceJob(j)
return
}
n.jobLock.Lock()
defer n.jobLock.Unlock()
n.jobs[j.ID] = j
cmds := j.Cmds()
if len(cmds) == 0 {
return
}
for _, cmd := range cmds {
n.addCmd(cmd)
}
return
}
func (n *NodeServer) delJob(id string) {
n.jobLock.Lock()
defer n.jobLock.Unlock()
n.delIDs[id] = true
job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行
if !ok {
return
}
cmds := job.Cmds()
if len(cmds) == 0 {
return
}
for _, cmd := range cmds {
n.delCmd(cmd)
}
delete(n.jobs, id)
return
}
func (n *NodeServer) modJob(job *corejob.Job) {
if !job.IsRunOn(n.HostNode.ID) {
return
}
//一次性任务
if job.Rules.Mode != corejob.Cycle {
n.runOnceJob(job)
return
}
oJob, ok := n.jobs[job.ID]
// 之前此任务没有在当前结点执行,直接增加任务
if !ok {
n.addJob(job)
return
}
prevCmds := oJob.Cmds()
job.Count = oJob.Count
*oJob = *job
cmds := oJob.Cmds()
for id, cmd := range cmds {
n.modCmd(cmd)
delete(prevCmds, id)
}
for _, cmd := range prevCmds {
n.delCmd(cmd)
}
}
func (n *NodeServer) addCmd(cmd *corejob.Cmd) {
n.Cron.Schedule(cmd.Rule.Schedule, cmd)
n.cmds[cmd.GetID()] = cmd
logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
return
}
func (n *NodeServer) modCmd(cmd *corejob.Cmd) {
c, ok := n.cmds[cmd.GetID()]
if !ok {
n.addCmd(cmd)
return
}
sch := c.Rule.Timer
*c = *cmd
// 节点执行时间改变,更新 cron
// 否则不用更新 cron
if c.Rule.Timer != sch {
n.Cron.Schedule(c.Rule.Schedule, c)
}
logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Rule.ID, c.Rule.Timer)
}
func (n *NodeServer) delCmd(cmd *corejob.Cmd) {
delete(n.cmds, cmd.GetID())
n.Cron.DelJob(cmd)
logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
}
//job must be schedulered
func (n *NodeServer) runOnceJob(j *corejob.Job) {
go j.RunWithRecovery()
}
//Stop 停止服务
func (n *NodeServer) Stop(i interface{}) {
n.cancel()
n.HostNode.Down()
close(n.done)
n.HostNode.Del()
n.Client.Close()
n.Cron.Stop()
}
func (n *NodeServer) keepAlive() {
duration := time.Duration(n.ttl) * time.Second
timer := time.NewTimer(duration)
for {
select {
case <-n.done:
timer.Stop()
return
case <-timer.C:
if n.lID > 0 {
_, err := n.Client.KeepAliveOnce(n.lID)
if err == nil {
timer.Reset(duration)
continue
}
logrus.Warnf("%s lid[%x] keepAlive err: %s, try to reset...", n.HostName, n.lID, err.Error())
n.lID = 0
}
if err := n.Regist(); err != nil {
logrus.Warnf("%s set lid err: %s, try to reset after %d seconds...", n.HostName, err.Error(), n.ttl)
} else {
logrus.Infof("%s set lid[%x] success", n.HostName, n.lID)
}
timer.Reset(duration)
}
}
}
//NewNodeServer new server
func NewNodeServer(cfg *conf.Conf) (*NodeServer, error) {
//currentNode, err := GetCurrentNode(cfg)
// if err != nil {
// return nil, err
// }
if cfg.TTL == 0 {
cfg.TTL = 10
}
n := &NodeServer{
Client: store.DefalutClient,
//HostNode: currentNode,
Cron: cron.New(),
jobs: make(Jobs, 8),
onceJobs: make(Jobs, 8),
cmds: make(map[string]*corejob.Cmd),
delIDs: make(map[string]bool, 8),
Conf: cfg,
ttl: cfg.TTL,
done: make(chan struct{}),
}
return n, nil
}

View File

@ -72,12 +72,12 @@ func registerCollector(collector string, isDefaultEnabled bool, factory func() (
}
// NodeCollector implements the prometheus.Collector interface.
type nodeCollector struct {
type NodeCollector struct {
Collectors map[string]Collector
}
// NewNodeCollector creates a new NodeCollector
func NewNodeCollector(filters ...string) (*nodeCollector, error) {
func NewNodeCollector(filters ...string) (*NodeCollector, error) {
f := make(map[string]bool)
for _, filter := range filters {
enabled, exist := collectorState[filter]
@ -103,17 +103,17 @@ func NewNodeCollector(filters ...string) (*nodeCollector, error) {
}
}
}
return &nodeCollector{Collectors: collectors}, nil
return &NodeCollector{Collectors: collectors}, nil
}
// Describe implements the prometheus.Collector interface.
func (n nodeCollector) Describe(ch chan<- *prometheus.Desc) {
func (n NodeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeDurationDesc
ch <- scrapeSuccessDesc
}
// Collect implements the prometheus.Collector interface.
func (n nodeCollector) Collect(ch chan<- prometheus.Metric) {
func (n NodeCollector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(n.Collectors))
for name, c := range n.Collectors {