[FIX] Complete the basic v2 task scheduling test

This commit is contained in:
goodrain 2017-12-19 18:46:49 +08:00
parent a0ed39a6af
commit 0a3f2747c2
27 changed files with 1143 additions and 3686 deletions

View File

@ -65,9 +65,6 @@ func Init() error {
for n := range nc.Collectors {
logrus.Infof(" - %s", n)
}
// if err := Config.watch(); err != nil {
// return err
// }
initialized = true
return nil
}
@ -87,8 +84,7 @@ type Conf struct {
OnlineNodePath string //上线节点信息存储路径
Proc string // 当前节点正在执行任务存储路径
StaticTaskPath string // 配置静态task文件宿主机路径
Cmd string // 节点执行任务保存路径
Once string // 马上执行任务路径//立即执行任务保存地址
JobPath string // 节点执行任务保存路径
Lock string // job lock 路径
Group string // 节点分组
Noticer string // 通知
@ -135,8 +131,7 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.ConfigStoragePath, "config-path", "/rainbond/acp_configs", "the path of config to store(new)")
fs.StringVar(&a.InitStatus, "init-status", "/rainbond/init_status", "the path of init status to store")
fs.StringVar(&a.Service, "servicePath", "/traefik/backends", "the path of service info to store")
fs.StringVar(&a.Cmd, "cmdPath", "/rainbond/cmd", "the path of cmd in etcd")
fs.StringVar(&a.Once, "oncePath", "/rainbond/once", "the path of once in etcd")
fs.StringVar(&a.JobPath, "jobPath", "/rainbond/jobs", "the path of job in etcd")
fs.StringVar(&a.Lock, "lockPath", "/rainbond/lock", "the path of lock in etcd")
fs.IntVar(&a.FailTime, "failTime", 3, "the fail time of healthy check")
fs.IntVar(&a.CheckIntervalSec, "checkInterval-second", 5, "the interval time of healthy check")
@ -213,8 +208,7 @@ func (c *Conf) parse() error {
c.NodePath = cleanKeyPrefix(c.NodePath)
c.Proc = cleanKeyPrefix(c.Proc)
c.Cmd = cleanKeyPrefix(c.Cmd)
c.Once = cleanKeyPrefix(c.Once)
c.JobPath = cleanKeyPrefix(c.JobPath)
c.Lock = cleanKeyPrefix(c.Lock)
c.Group = cleanKeyPrefix(c.Group)
c.Noticer = cleanKeyPrefix(c.Noticer)
@ -222,42 +216,6 @@ func (c *Conf) parse() error {
return nil
}
// func (c *Conf) watch() error {
// var err error
// watcher, err = fsnotify.NewWatcher()
// if err != nil {
// return err
// }
// go func() {
// duration := 3 * time.Second
// timer, update := time.NewTimer(duration), false
// for {
// select {
// case <-exitChan:
// return
// case event := <-watcher.Events:
// // 保存文件时会产生多个事件
// if event.Op&(fsnotify.Write|fsnotify.Chmod) > 0 {
// update = true
// }
// timer.Reset(duration)
// case <-timer.C:
// if update {
// c.reload()
// event.Emit(event.WAIT, nil)
// update = false
// }
// timer.Reset(duration)
// case err := <-watcher.Errors:
// logrus.Warnf("config watcher err: %v", err)
// }
// }
// }()
// return watcher.Add(*confFile)
// }
func Exit(i interface{}) {
close(exitChan)
if watcher != nil {

View File

@ -1,17 +1,22 @@
package region
import (
"github.com/goodrain/rainbond/pkg/node/api/model"
"bytes"
"net/http"
"io/ioutil"
"encoding/json"
"github.com/bitly/go-simplejson"
"github.com/Sirupsen/logrus"
"fmt"
"io/ioutil"
"net/http"
"github.com/Sirupsen/logrus"
"github.com/bitly/go-simplejson"
"github.com/goodrain/rainbond/pkg/node/api/model"
"github.com/pquerna/ffjson/ffjson"
//"github.com/goodrain/rainbond/pkg/grctl/cmd"
"errors"
utilhttp "github.com/goodrain/rainbond/pkg/util/http"
)
var nodeServer *RNodeServer
func NewNode(nodeAPI string) {
@ -24,6 +29,7 @@ func NewNode(nodeAPI string) {
func GetNode() *RNodeServer {
return nodeServer
}
type RNodeServer struct {
NodeAPI string
}
@ -34,6 +40,7 @@ func (r *RNodeServer)Tasks() TaskInterface {
func (r *RNodeServer) Nodes() NodeInterface {
return &Node{}
}
type Task struct {
TaskID string `json:"task_id"`
Task *model.Task
@ -44,10 +51,11 @@ type Node struct {
}
type TaskInterface interface {
Get(name string) (*Task, error)
Add(task *model.Task) (error)
AddGroup(group *model.TaskGroup) (error)
Exec(nodes []string ) error
Add(task *model.Task) error
AddGroup(group *model.TaskGroup) error
Exec(name string, nodes []string) error
List() ([]*model.Task, error)
Refresh() error
}
type NodeInterface interface {
Add(node *model.APIHostNode)
@ -62,8 +70,6 @@ type NodeInterface interface {
Label(label map[string]string)
}
func (t *Node) Label(label map[string]string) {
body, _ := json.Marshal(label)
_, _, err := nodeServer.Request("/nodes/"+t.Id+"/labels", "PUT", body)
@ -207,6 +213,8 @@ func (t *Task)Get(id string) (*Task,error) {
t.Task = &task
return t, nil
}
//List list all task
func (t *Task) List() ([]*model.Task, error) {
url := "/tasks"
resp, _, err := nodeServer.Request(url, "GET", nil)
@ -214,44 +222,41 @@ func (t *Task)List() ([]*model.Task,error) {
logrus.Errorf("error request url %s,details %s", url, err.Error())
return nil, err
}
jsonTop,err:=simplejson.NewJson(resp)
if err!=nil {
logrus.Errorf("error get json from url %s",err.Error())
var rb utilhttp.ResponseBody
var tasks = new([]*model.Task)
rb.List = tasks
if err := ffjson.Unmarshal(resp, &rb); err != nil {
return nil, err
}
nodeArr,err:=jsonTop.Get("list").Array()
if err != nil {
logrus.Infof("error occurd,details %s",err.Error())
return nil,err
if rb.List == nil {
return nil, nil
}
jsonA, _ := json.Marshal(nodeArr)
tasks := []*model.Task{}
err=json.Unmarshal(jsonA, &tasks)
if err != nil {
logrus.Infof("error occurd,details %s",err.Error())
return nil,err
list, ok := rb.List.(*[]*model.Task)
if ok {
return *list, nil
}
return nil, fmt.Errorf("unmarshal tasks data error")
}
return tasks,nil
}
func (t *Task)Exec(nodes []string ) error {
taskId:=t.TaskID
//Exec 执行任务
func (t *Task) Exec(taskID string, nodes []string) error {
var nodesBody struct {
Nodes []string `json:"nodes"`
}
nodesBody.Nodes = nodes
body, _ := json.Marshal(nodesBody)
url:="/tasks/"+taskId+"/exec"
url := "/tasks/" + taskID + "/exec"
resp, code, err := nodeServer.Request(url, "POST", body)
if code != 200 {
fmt.Println("executing failed:" + string(resp))
return fmt.Errorf("exec failure," + string(resp))
}
if err != nil {
return err
}
return err
}
func (t *Task)Add(task *model.Task) (error) {
func (t *Task) Add(task *model.Task) error {
body, _ := json.Marshal(task)
url := "/tasks"
@ -264,7 +269,7 @@ func (t *Task)Add(task *model.Task) (error) {
}
return nil
}
func (t *Task) AddGroup(group *model.TaskGroup) (error){
func (t *Task) AddGroup(group *model.TaskGroup) error {
body, _ := json.Marshal(group)
url := "/taskgroups"
resp, code, err := nodeServer.Request(url, "POST", body)
@ -276,9 +281,25 @@ func (t *Task) AddGroup(group *model.TaskGroup) (error){
}
return nil
}
//Refresh 刷新静态配置
func (t *Task) Refresh() error {
url := "/tasks/taskreload"
resp, code, err := nodeServer.Request(url, "PUT", nil)
if code != 200 {
fmt.Println("executing failed:" + string(resp))
return fmt.Errorf("refresh error code,%d", code)
}
if err != nil {
return err
}
return nil
}
type TaskStatus struct {
Status map[string]model.TaskStatus `json:"status,omitempty"`
}
func (t *Task) Status() (*TaskStatus, error) {
taskId := t.TaskID

View File

@ -19,11 +19,10 @@
package clients
import (
"github.com/goodrain/rainbond/pkg/api/region"
"github.com/goodrain/rainbond/cmd/grctl/option"
"github.com/goodrain/rainbond/pkg/api/region"
)
var RegionClient *region.Region
var NodeClient *region.RNodeServer

View File

@ -41,14 +41,14 @@ func GetCmds() []cli.Command {
cmds = append(cmds, NewCmdGet())
cmds = append(cmds, NewCmdInit())
cmds = append(cmds, NewCmdShow())
cmds = append(cmds, NewCmdTask())
//task相关命令
cmds = append(cmds, NewCmdTasks())
//cmds = append(cmds, NewCmdAddNode())
//cmds = append(cmds, NewCmdComputeGroup())
cmds = append(cmds, NewCmdInstall())
//cmds = append(cmds, NewCmdInstallStatus())
cmds = append(cmds, NewCmdAddTask())
//cmds = append(cmds, NewCmdStatus())
cmds = append(cmds, NewCmdDomain())

View File

@ -17,18 +17,21 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"github.com/urfave/cli"
"github.com/Sirupsen/logrus"
"os/exec"
"net/http"
"io/ioutil"
"strings"
"bytes"
"io/ioutil"
"net/http"
"os/exec"
"strings"
"github.com/Sirupsen/logrus"
"github.com/urfave/cli"
//"github.com/goodrain/rainbond/pkg/grctl/clients"
"fmt"
"time"
"github.com/goodrain/rainbond/pkg/grctl/clients"
)
@ -72,7 +75,6 @@ func NewCmdInstallStatus() cli.Command {
Name: "taskID",
Usage: "install_k8s,空则自动寻找",
},
},
Usage: "获取task执行状态。grctl install_status",
Action: func(c *cli.Context) error {
@ -101,9 +103,6 @@ func NewCmdInstallStatus() cli.Command {
return c
}
func initCluster(c *cli.Context) error {
resp, err := http.Get("http://repo.goodrain.com/gaops/jobs/install/prepare/init.sh")
@ -142,7 +141,10 @@ func initCluster(c *cli.Context) error {
fmt.Println(jsonStr)
return nil
}
//TODO:
//检查node是否启动
time.Sleep(5 * time.Second)
//获取当前节点ID
Task(c, "check_manage_base_services", false)
Task(c, "check_manage_services", false)
@ -153,4 +155,3 @@ func initCluster(c *cli.Context) error {
fmt.Println(" up compute node--grctl node up -h")
return nil
}

View File

@ -17,14 +17,16 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"github.com/urfave/cli"
"fmt"
"os"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/pkg/grctl/clients"
"time"
"fmt"
"strings"
"os"
"github.com/urfave/cli"
)
func GetCommand(status bool) []cli.Command {
@ -64,7 +66,6 @@ func GetCommand(status bool)[]cli.Command {
},
},
},
},
{
Name: "manage_base",
@ -92,7 +93,6 @@ func GetCommand(status bool)[]cli.Command {
Action: func(c *cli.Context) error {
return Task(c, "install_db", status)
},
},
{
Name: "base_plugins",
@ -100,7 +100,6 @@ func GetCommand(status bool)[]cli.Command {
Action: func(c *cli.Context) error {
return Task(c, "install_base_plugins", status)
},
},
{
Name: "acp_plugins",
@ -108,7 +107,6 @@ func GetCommand(status bool)[]cli.Command {
Action: func(c *cli.Context) error {
return Task(c, "install_acp_plugins", status)
},
},
},
},
@ -128,7 +126,6 @@ func GetCommand(status bool)[]cli.Command {
Action: func(c *cli.Context) error {
return Task(c, "install_storage", status)
},
},
{
Name: "k8s",
@ -136,7 +133,6 @@ func GetCommand(status bool)[]cli.Command {
Action: func(c *cli.Context) error {
return Task(c, "install_k8s", status)
},
},
{
Name: "network",
@ -144,7 +140,6 @@ func GetCommand(status bool)[]cli.Command {
Action: func(c *cli.Context) error {
return Task(c, "install_network", status)
},
},
{
Name: "plugins",
@ -152,7 +147,6 @@ func GetCommand(status bool)[]cli.Command {
Action: func(c *cli.Context) error {
return Task(c, "install_plugins", status)
},
},
},
Action: func(c *cli.Context) error {
@ -163,7 +157,6 @@ func GetCommand(status bool)[]cli.Command {
return c
}
func NewCmdInstall() cli.Command {
c := cli.Command{
Name: "install",
@ -178,6 +171,7 @@ func NewCmdInstall() cli.Command {
}
return c
}
//func NewCmdStatus() cli.Command {
// c:=cli.Command{
// Name: "status",
@ -272,22 +266,15 @@ func Status(task string,nodes []string) {
}
func Task(c *cli.Context, task string, status bool) error {
nodes := c.StringSlice("nodes")
taskEntity,err:=clients.NodeClient.Tasks().Get(task)
if taskEntity==nil||err!=nil {
logrus.Errorf("error get task entity from server,please check server api")
return nil
if len(nodes) == 0 {
return fmt.Errorf("nodes can not be empty")
}
err=taskEntity.Exec(nodes)
err := clients.NodeClient.Tasks().Exec(task, nodes)
if err != nil {
logrus.Errorf("error exec task:%s,details %s", task, err.Error())
return err
}
if nodes==nil||len(nodes)==0 {
nodes=taskEntity.Task.Nodes
}
Status(task, nodes)
return nil
}

View File

@ -1,157 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"github.com/urfave/cli"
"github.com/Sirupsen/logrus"
"strings"
"github.com/pquerna/ffjson/ffjson"
"io/ioutil"
"github.com/goodrain/rainbond/pkg/node/api/model"
"time"
"github.com/goodrain/rainbond/pkg/grctl/clients"
)
func NewCmdAddTask() cli.Command {
c:=cli.Command{
Name: "add_task",
Flags: []cli.Flag{
cli.StringFlag{
Name: "taskfile",
Usage: "file path",
},
},
Usage: "添加task。grctl add_task",
Action: func(c *cli.Context) error {
file:=c.String("filepath")
if file!="" {
err:=loadFile(file)
if err != nil {
logrus.Errorf("error add task from file,details %s",err.Error())
return err
}
}else {
logrus.Errorf("error get task from path")
}
return nil
},
}
return c
}
func ScheduleGroup(nodes []string, nextGroups ...*model.TaskGroup) error{
for _, group := range nextGroups {
if group.Tasks == nil || len(group.Tasks) < 1 {
group.Status = &model.TaskGroupStatus{
StartTime: time.Now(),
EndTime: time.Now(),
Status: "NotDefineTask",
}
//create group
err:=clients.NodeClient.Tasks().AddGroup(group)
if err!=nil{
logrus.Errorf("error add group,details %s",err.Error())
return err
}
}
for _, task := range group.Tasks {
task.GroupID = group.ID
err:=clients.NodeClient.Tasks().Add(task)
if err!=nil{
logrus.Errorf("error add group,details %s",err.Error())
return err
}
}
group.Status = &model.TaskGroupStatus{
StartTime: time.Now(),
Status: "Start",
}
//create group
err:=clients.NodeClient.Tasks().AddGroup(group)
if err!=nil{
logrus.Errorf("error add group,details %s",err.Error())
return err
}
}
return nil
}
func loadFile(path string) error{
taskBody, err := ioutil.ReadFile(path)
if err != nil {
logrus.Errorf("read static task file %s error.%s", path, err.Error())
return nil
}
var filename string
index := strings.LastIndex(path, "/")
if index < 0 {
filename = path
}
filename = path[index+1:]
if strings.Contains(filename, "group") {
var group model.TaskGroup
if err := ffjson.Unmarshal(taskBody, &group); err != nil {
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
return nil
}
if group.ID == "" {
group.ID = group.Name
}
if group.Name == "" {
logrus.Errorf("task group name can not be empty. file %s", path)
return nil
}
if group.Tasks == nil {
logrus.Errorf("task group tasks can not be empty. file %s", path)
return nil
}
ScheduleGroup(nil, &group)
logrus.Infof("Load a static group %s.", group.Name)
}
if strings.Contains(filename, "task") {
var task model.Task
if err := ffjson.Unmarshal(taskBody, &task); err != nil {
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
return err
}
if task.ID == "" {
task.ID = task.Name
}
if task.Name == "" {
logrus.Errorf("task name can not be empty. file %s", path)
return err
}
if task.Temp == nil {
logrus.Errorf("task [%s] temp can not be empty.", task.Name)
return err
}
if task.Temp.ID == "" {
task.Temp.ID = task.Temp.Name
}
err:=clients.NodeClient.Tasks().Add(&task)
if err!=nil{
logrus.Errorf("error add task,details %s",err.Error())
return err
}
logrus.Infof("Load a static group %s.", task.Name)
return nil
}
return nil
}

165
pkg/grctl/cmd/tasks.go Normal file
View File

@ -0,0 +1,165 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"encoding/json"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/apcera/termtables"
"github.com/goodrain/rainbond/pkg/grctl/clients"
"github.com/goodrain/rainbond/pkg/node/api/model"
"github.com/urfave/cli"
)
//NewCmdTasks 任务相关命令
func NewCmdTasks() cli.Command {
c := cli.Command{
Name: "tasks",
Usage: "系统任务相关命令grctl tasks -h",
Subcommands: []cli.Command{
cli.Command{
Name: "static-refresh",
Usage: "Refresh static task config",
Action: func(c *cli.Context) error {
if err := clients.NodeClient.Tasks().Refresh(); err != nil {
return err
}
return nil
},
},
cli.Command{
Name: "list",
Usage: "List all task",
Action: func(c *cli.Context) error {
tasks, err := clients.NodeClient.Tasks().List()
if err != nil {
logrus.Println("list all task error,", err.Error())
return err
}
if len(tasks) > 0 {
taskTable := termtables.CreateTable()
taskTable.AddHeaders("ID", "GroupID", "DepTask", "Status", "Scheduler")
for _, v := range tasks {
var depstr string
for _, dep := range v.Temp.Depends {
depstr += fmt.Sprintf("%s(%s);", dep.DependTaskID, dep.DetermineStrategy)
}
var status string
for k, v := range v.Status {
status += fmt.Sprintf("%s:%s(%s);", k, v.Status, v.CompleStatus)
}
var scheduler = v.Scheduler.Mode + ";"
if len(v.Scheduler.Status) == 0 {
scheduler += "暂未调度"
} else {
for k, v := range v.Scheduler.Status {
scheduler += fmt.Sprintf("%s:%s(%s);", k, v.Status, v.SchedulerTime)
}
}
taskTable.AddRow(v.ID, v.GroupID, depstr, status, scheduler)
}
fmt.Println(taskTable.Render())
return nil
}
fmt.Println("not found tasks")
return nil
},
},
cli.Command{
Name: "get",
Usage: "Displays the specified task details",
Action: func(c *cli.Context) error {
taskID := c.Args().First()
if taskID == "" {
fmt.Println("Please specified task id")
}
task, err := clients.NodeClient.Tasks().Get(taskID)
if err != nil {
return fmt.Errorf("get task error,%s", err.Error())
}
taskStr, _ := json.MarshalIndent(&task, "", "\t")
fmt.Println(string(taskStr))
return nil
},
},
cli.Command{
Name: "exec",
Usage: "Exec the specified task",
Flags: []cli.Flag{
cli.StringFlag{
Name: "node",
Usage: "exec task nodeid",
},
cli.StringFlag{
Name: "f",
Usage: "exec task nodeid and return status",
},
},
Action: func(c *cli.Context) error {
taskID := c.Args().First()
if taskID == "" {
fmt.Println("Please specified task id")
}
nodeID := c.String("node")
if nodeID == "" {
fmt.Println("Please specified nodeid use `-node`")
}
err := clients.NodeClient.Tasks().Exec(taskID, []string{nodeID})
return err
},
},
},
}
return c
}
func getDependTask(task *model.Task, path string) {
if task == nil || task.Temp == nil {
fmt.Println("wrong task")
return
}
depends := task.Temp.Depends
for k, v := range depends {
tid := v.DependTaskID
taskD, err := clients.NodeClient.Tasks().Get(tid)
if err != nil {
logrus.Errorf("error get task,details %s", err.Error())
return
}
//fmt.Print("task %s depend %s",task.ID,taskD.Task.ID)
if k == 0 {
fmt.Print("-->" + taskD.Task.ID)
} else {
fmt.Println()
for i := 0; i < len(path); i++ {
fmt.Print(" ")
}
fmt.Print("-->" + taskD.Task.ID)
//path+="-->"+taskD.Task.ID
}
getDependTask(taskD.Task, path+"-->"+taskD.Task.ID)
}
}

View File

@ -1,83 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"github.com/urfave/cli"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/pkg/grctl/clients"
"github.com/goodrain/rainbond/pkg/node/api/model"
"fmt"
)
func NewCmdTask() cli.Command {
c:=cli.Command{
Name: "tasks",
Usage: "tasks",
Action: func(c *cli.Context) error {
tasks,_:=clients.NodeClient.Tasks().List()
//var total [][]string
for _,v:=range tasks {
fmt.Printf("%s",v.ID)
path:=v.ID
getDependTask(v,path)
fmt.Println()
}
return nil
},
}
return c
}
func getDependTask(task *model.Task,path string) {
if task==nil||task.Temp==nil {
fmt.Println("wrong task")
return
}
depends:=task.Temp.Depends
for k,v:=range depends{
tid:=v.DependTaskID
taskD,err:=clients.NodeClient.Tasks().Get(tid)
if err != nil {
logrus.Errorf("error get task,details %s",err.Error())
return
}
//fmt.Print("task %s depend %s",task.ID,taskD.Task.ID)
if k==0 {
fmt.Print("-->"+taskD.Task.ID)
}else{
fmt.Println()
for i:=0;i<len(path);i++{
fmt.Print(" ")
}
fmt.Print("-->"+taskD.Task.ID)
//path+="-->"+taskD.Task.ID
}
getDependTask(taskD.Task,path+"-->"+taskD.Task.ID)
}
}

View File

@ -1,697 +0,0 @@
// // RAINBOND, Application Management Platform
// // Copyright (C) 2014-2017 Goodrain Co., Ltd.
// // This program is free software: you can redistribute it and/or modify
// // it under the terms of the GNU General Public License as published by
// // the Free Software Foundation, either version 3 of the License, or
// // (at your option) any later version. For any non-GPL usage of Rainbond,
// // one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// // must be obtained first.
// // This program is distributed in the hope that it will be useful,
// // but WITHOUT ANY WARRANTY; without even the implied warranty of
// // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// // GNU General Public License for more details.
// // You should have received a copy of the GNU General Public License
// // along with this program. If not, see <http://www.gnu.org/licenses/>.
package controller
import (
"net/http"
"strconv"
"time"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/pkg/event"
"github.com/goodrain/rainbond/pkg/node/core/job"
)
// import (
// "encoding/json"
// "net/http"
// "sort"
// "strings"
// "time"
// conf "github.com/goodrain/rainbond/cmd/node/option"
// "github.com/goodrain/rainbond/pkg/node/core"
// corenode "github.com/goodrain/rainbond/pkg/node/core/node"
// "github.com/goodrain/rainbond/pkg/node/core/store"
// "github.com/goodrain/rainbond/pkg/node/utils"
// "github.com/twinj/uuid"
// "github.com/Sirupsen/logrus"
// "github.com/coreos/etcd/clientv3"
// "github.com/go-chi/chi"
// //"github.com/gorilla/websocket"
// //"github.com/goodrain/rainbond/pkg/event"
// "strconv"
// "github.com/goodrain/rainbond/pkg/event"
// )
// type ProcFetchOptions struct {
// Groups []string
// NodeIds []string
// JobIds []string
// }
// func SubtractStringArray(a, b []string) (c []string) {
// c = []string{}
// for _, _a := range a {
// if !InStringArray(_a, b) {
// c = append(c, _a)
// }
// }
// return
// }
// func UniqueStringArray(a []string) []string {
// al := len(a)
// if al == 0 {
// return a
// }
// ret := make([]string, al)
// index := 0
// loopa:
// for i := 0; i < al; i++ {
// for j := 0; j < index; j++ {
// if a[i] == ret[j] {
// continue loopa
// }
// }
// ret[index] = a[i]
// index++
// }
// return ret[:index]
// }
// func getStringArrayFromQuery(name, sep string, r *http.Request) (arr []string) {
// val := strings.TrimSpace(r.FormValue(name))
// if len(val) == 0 {
// return
// }
// return strings.Split(val, sep)
// }
// //func NewComputeNodeToInstall(w http.ResponseWriter, r *http.Request) {
// // nodeIP := strings.TrimSpace(chi.URLParam(r, "ip"))
// // j,err:=core.NewComputeNodeToInstall(nodeIP)
// // if err != nil {
// // outRespDetails(w,500,"reg jobs to node failed,details :"+err.Error(),"为内置任务注册新增节点失败",nil,nil)
// // return
// // }
// // outRespSuccess(w,j,nil)
// //}
func writer(eventId string, nodeIp string, doneAll chan *job.JobList, doneOne chan *job.BuildInJob) {
//defer func() {
// ws.Close()
//}()
done := make(chan int, 1)
//核心逻辑写在这有新的执行完了就给一个channel add一个
//eventId:=""
go func(done chan int) {
logrus.Infof("starting ping")
for {
select {
case <-done:
logrus.Infof("heart beat stopping")
return
default:
{
time.Sleep(5 * time.Second)
if eventId == "" {
logrus.Warnf("heart beat msg failed,because event id is null,caused by no job executed")
continue
}
logrus.Infof("sending ping")
logger := event.GetManager().GetLogger(eventId)
logger.Info("ping", nil)
event.GetManager().ReleaseLogger(logger)
}
}
}
}(done)
for {
select {
case job := <-doneOne:
logrus.Infof("job %s execute done", job.JobName)
logger := event.GetManager().GetLogger(job.JobSEQ)
logger.Info("one job done", map[string]string{"jobId": job.JobId, "status": strconv.Itoa(job.JobResult)})
eventId = job.JobSEQ
event.GetManager().ReleaseLogger(logger)
case result := <-doneAll:
logrus.Infof("job execute done")
logger := event.GetManager().GetLogger(eventId)
time.Sleep(2 * time.Second)
done <- 1
logrus.Infof("stopping heart beat")
logrus.Infof("send final message ,using eventID:%s", eventId)
logger.Info("all job done", map[string]string{"step": "final", "status": strconv.FormatBool(result.Result)})
event.GetManager().ReleaseLogger(logger)
}
}
}
func Ping(w http.ResponseWriter, r *http.Request) {
outSuccess(w)
}
// func GetALLGroup(w http.ResponseWriter, r *http.Request) {
// // swagger:operation GET /v2/job/group v2 GetAllGroup
// //
// // 获取所有job的group
// //
// // get all groups
// //
// // ---
// // produces:
// // - application/json
// //
// // Responses:
// // '200':
// // description: '["group1",...]'
// resp, err := store.DefalutClient.Get(conf.Config.Cmd, clientv3.WithPrefix(), clientv3.WithKeysOnly())
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// var cmdKeyLen = len(conf.Config.Cmd)
// var groupMap = make(map[string]bool, 8)
// for i := range resp.Kvs {
// ss := strings.Split(string(resp.Kvs[i].Key)[cmdKeyLen:], "/")
// groupMap[ss[0]] = true
// }
// var groupList = make([]string, 0, len(groupMap))
// for k := range groupMap {
// groupList = append(groupList, k)
// }
// sort.Strings(groupList)
// outJSON(w, groupList)
// }
// func JobExecute(w http.ResponseWriter, r *http.Request) {
// // swagger:operation PUT /v2/job/{group}-{id}/execute/{name} v2 JobExecute
// //
// // 立即在 node上 执行一次指定group/id 的job
// //
// // execute job
// //
// // ---
// // produces:
// // - application/json
// // parameters:
// // - name: group
// // in: path
// // description: group name
// // required: true
// // type: string
// // format: string
// // - name: id
// // in: path
// // description: job id
// // required: true
// // type: string
// // format: string
// // - name: name
// // in: path
// // description: node name
// // required: true
// // type: string
// // format: string
// //
// // Responses:
// // '200':
// // description: '{"ok":true}'
// group := strings.TrimSpace(chi.URLParam(r, "group"))
// id := strings.TrimSpace(chi.URLParam(r, "id"))
// if len(group) == 0 || len(id) == 0 {
// outJSONWithCode(w, http.StatusBadRequest, "Invalid job id or group.")
// return
// }
// //node := getStringVal("node", ctx.R)
// //node :=r.FormValue("node")
// node := chi.URLParam(r, "name")
// err := core.PutOnce(group, id, node)
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// outSuccess(w)
// //outJSONWithCode(w, http.StatusNoContent, nil)
// }
// func GetJobNodes(w http.ResponseWriter, r *http.Request) {
// // swagger:operation GET /v2/job/{group}-{id}/nodes v2 GetJobNodes
// //
// // 获取job的可执行节点
// //
// // get job runnable nodes
// //
// // ---
// // produces:
// // - application/json
// // parameters:
// // - name: group
// // in: path
// // description: group name
// // required: true
// // type: string
// // format: string
// // - name: id
// // in: path
// // description: job id
// // required: true
// // type: string
// // format: string
// //
// // Responses:
// // '200':
// // description: '["10.1.1.2",...]'
// job, err := core.GetJob(chi.URLParam(r, "group"), chi.URLParam(r, "id"))
// var statusCode int
// if err != nil {
// if err == utils.ErrNotFound {
// statusCode = http.StatusNotFound
// } else {
// statusCode = http.StatusInternalServerError
// }
// outJSONWithCode(w, statusCode, err.Error())
// return
// }
// var nodes []string
// var exNodes []string
// groups, err := core.GetGroups("")
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// for i := range job.Rules {
// inNodes := append(nodes, job.Rules[i].NodeIDs...)
// for _, gid := range job.Rules[i].GroupIDs {
// if g, ok := groups[gid]; ok {
// inNodes = append(inNodes, g.NodeIDs...)
// }
// }
// exNodes = append(exNodes, job.Rules[i].ExcludeNodeIDs...)
// inNodes = SubtractStringArray(inNodes, exNodes)
// nodes = append(nodes, inNodes...)
// }
// outJSON(w, UniqueStringArray(nodes))
// }
// func DeleteJob(w http.ResponseWriter, r *http.Request) {
// // swagger:operation DELETE /v2/job/{group}-{id} v2 DeleteJob
// //
// // 删除 job
// //
// // delete job by group and id
// //
// // ---
// // produces:
// // - application/json
// // parameters:
// // - name: group
// // in: path
// // description: group name
// // required: true
// // type: string
// // format: string
// // - name: id
// // in: path
// // description: job id
// // required: true
// // type: string
// // format: string
// //
// // Responses:
// // '200':
// // description: '{"ok":true}'
// _, err := core.DeleteJob(chi.URLParam(r, "group"), chi.URLParam(r, "id"))
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// outSuccess(w)
// //outJSONWithCode(w, http.StatusNoContent, nil)
// }
// func GetJob(w http.ResponseWriter, r *http.Request) {
// // swagger:operation GET /v2/job/{group}-{id} v2 GetJob
// //
// // 获取 job
// //
// // get job by group and id
// //
// // ---
// // produces:
// // - application/json
// // parameters:
// // - name: group
// // in: path
// // description: group name
// // required: true
// // type: string
// // format: string
// // - name: id
// // in: path
// // description: job id
// // required: true
// // type: string
// // format: string
// //
// // Responses:
// // '200':
// // description: '{"id":"","kind":0,"name":"aac","group":"default","user":"","cmd":"echo \"hello \">/tmp/aac.txt","pause":true,"parallels":0,"timeout":0,"interval":0,"retry":0,"rules":[{"id":"NEW0.5930536330436825","nids":["172.16.0.118"],"timer":"* 5 * * * *","exclude_nids":["172.16.0.131"]}],"fail_notify":false,"to":[]}'
// job, err := core.GetJob(chi.URLParam(r, "group"), chi.URLParam(r, "id"))
// var statusCode int
// if err != nil {
// if err == utils.ErrNotFound {
// statusCode = http.StatusNotFound
// } else {
// statusCode = http.StatusInternalServerError
// }
// outJSONWithCode(w, statusCode, err.Error())
// return
// }
// outJSON(w, job)
// }
// func ChangeJobStatus(w http.ResponseWriter, r *http.Request) {
// // swagger:operation POST /v2/job/{group}-{id} v2 ChangeJobStatus
// //
// // 更改 job 状态
// //
// // change job status
// //
// // ---
// // produces:
// // - application/json
// // parameters:
// // - name: group
// // in: path
// // description: group name
// // required: true
// // type: string
// // format: string
// // - name: id
// // in: path
// // description: job id
// // required: true
// // type: string
// // format: string
// // - name:
// // in: body
// // description: '{"id":"","kind":0,"name":"aac","group":"default","user":"","cmd":"echo \"hello \">/tmp/aac.txt","pause":true,"parallels":0,"timeout":0,"interval":0,"retry":0,"rules":[{"id":"NEW0.5930536330436825","nids":["172.16.0.118"],"timer":"* 5 * * * *","exclude_nids":["172.16.0.131"]}],"fail_notify":false,"to":[]}'
// // required: true
// // type: string
// // format: string
// //
// // Responses:
// // '200':
// // description: '{"id":"","kind":0,"name":"aac","group":"default","user":"","cmd":"echo \"hello \">/tmp/aac.txt","pause":true,"parallels":0,"timeout":0,"interval":0,"retry":0,"rules":[{"id":"NEW0.5930536330436825","nids":["172.16.0.118"],"timer":"* 5 * * * *","exclude_nids":["172.16.0.131"]}],"fail_notify":false,"to":[]}'
// job := &core.Job{}
// decoder := json.NewDecoder(r.Body)
// err := decoder.Decode(&job)
// if err != nil {
// outJSONWithCode(w, http.StatusBadRequest, err.Error())
// return
// }
// defer r.Body.Close()
// originJob, rev, err := core.GetJobAndRev(chi.URLParam(r, "group"), chi.URLParam(r, "id"))
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// originJob.Pause = job.Pause
// b, err := json.Marshal(originJob)
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// _, err = store.DefalutClient.PutWithModRev(originJob.Key(), string(b), rev)
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// outJSON(w, originJob)
// }
// func GetExecutingJob(w http.ResponseWriter, r *http.Request) {
// opt := &ProcFetchOptions{
// Groups: getStringArrayFromQuery("groups", ",", r),
// NodeIds: getStringArrayFromQuery("nodes", ",", r),
// JobIds: getStringArrayFromQuery("jobs", ",", r),
// }
// gresp, err := store.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix())
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// var list = make([]*core.Process, 0, 8)
// for i := range gresp.Kvs {
// proc, err := core.GetProcFromKey(string(gresp.Kvs[i].Key))
// if err != nil {
// logrus.Errorf("Failed to unmarshal Proc from key: %s", err.Error())
// continue
// }
// if !opt.Match(proc) {
// continue
// }
// proc.Time, _ = time.Parse(time.RFC3339, string(gresp.Kvs[i].Value))
// list = append(list, proc)
// }
// sort.Sort(ByProcTime(list))
// outJSON(w, list)
// }
// func InStringArray(k string, ss []string) bool {
// for i := range ss {
// if ss[i] == k {
// return true
// }
// }
// return false
// }
// func (opt *ProcFetchOptions) Match(proc *core.Process) bool {
// if len(opt.Groups) > 0 && !InStringArray(proc.Group, opt.Groups) {
// return false
// }
// if len(opt.JobIds) > 0 && !InStringArray(proc.JobID, opt.JobIds) {
// return false
// }
// if len(opt.NodeIds) > 0 && !InStringArray(proc.NodeID, opt.NodeIds) {
// return false
// }
// return true
// }
// func UpdateJob(w http.ResponseWriter, r *http.Request) {
// // swagger:operation PUT /v2/job v2 UpdateJob
// //
// // 添加或者更新job
// //
// // add or update job
// //
// // ---
// // produces:
// // - application/json
// // parameters:
// // - name: job
// // in: body
// // description: '{"id":"","kind":0,"name":"aac","oldGroup":"","group":"default","user":"","cmd":"echo \"hello \">/tmp/aac.txt","pause":true,"parallels":0,"timeout":0,"interval":0,"retry":0,"rules":[{"id":"NEW0.5930536330436825","nids":["172.16.0.118"],"timer":"* 5 * * * *","exclude_nids":["172.16.0.131"]}],"fail_notify":false,"to":[]}'
// // required: true
// // type: json
// // format: string
// //
// // Responses:
// // '200':
// // description: '{"ok":true}'
// var job = &struct {
// *core.Job
// OldGroup string `json:"oldGroup"`
// }{}
// decoder := json.NewDecoder(r.Body)
// defer r.Body.Close()
// err := decoder.Decode(&job)
// if err != nil {
// outJSONWithCode(w, http.StatusBadRequest, err.Error())
// return
// }
// r.Body.Close()
// if err = job.Check(); err != nil {
// outJSONWithCode(w, http.StatusBadRequest, err.Error())
// return
// }
// var deleteOldKey string
// if len(job.ID) == 0 {
// job.ID = uuid.NewV4().String()
// } else {
// job.OldGroup = strings.TrimSpace(job.OldGroup)
// if job.OldGroup != job.Group {
// deleteOldKey = core.JobKey(job.OldGroup, job.ID)
// }
// }
// b, err := json.Marshal(job)
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// // remove old key
// // it should be before the put method
// if len(deleteOldKey) > 0 {
// if _, err = store.DefalutClient.Delete(deleteOldKey); err != nil {
// logrus.Errorf("failed to remove old job key[%s], err: %s.", deleteOldKey, err.Error())
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// }
// _, err = store.DefalutClient.Put(job.Key(), string(b))
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// outSuccess(w)
// }
// //todo response
// func JobList(w http.ResponseWriter, r *http.Request) {
// // swagger:operation GET /v2/job v2 JobList
// //
// // 获取job列表
// //
// // get job list
// //
// // ---
// // produces:
// // - application/json
// // parameters:
// // - name: node
// // in: form
// // description: node name
// // required: false
// // type: string
// // format: string
// // - name: group
// // in: form
// // description: group name
// // required: false
// // type: string
// // format: string
// // Responses:
// // '200':
// // description: '{"ok":true}'
// node := r.FormValue("node")
// group := r.FormValue("group")
// var prefix = conf.Config.Cmd
// if len(group) != 0 {
// prefix += group
// }
// type jobStatus struct {
// *core.Job
// LatestStatus *core.JobLatestLog `json:"latestStatus"`
// }
// resp, err := store.DefalutClient.Get(prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// var nodeGroupMap map[string]*core.Group
// if len(node) > 0 {
// nodeGrouplist, err := corenode.GetNodeGroups()
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// nodeGroupMap = map[string]*core.Group{}
// for i := range nodeGrouplist {
// nodeGroupMap[nodeGrouplist[i].ID] = nodeGrouplist[i]
// }
// }
// var jobIds []string
// var jobList = make([]*jobStatus, 0, resp.Count)
// for i := range resp.Kvs {
// job := core.Job{}
// err = json.Unmarshal(resp.Kvs[i].Value, &job)
// if err != nil {
// outJSONWithCode(w, http.StatusInternalServerError, err.Error())
// return
// }
// if len(node) > 0 && !job.IsRunOn(node, nodeGroupMap) {
// continue
// }
// jobList = append(jobList, &jobStatus{Job: &job})
// jobIds = append(jobIds, job.ID)
// }
// m, err := core.GetJobLatestLogListByJobIds(jobIds)
// if err != nil {
// logrus.Errorf("GetJobLatestLogListByJobIds error: %s", err.Error())
// } else {
// for i := range jobList {
// jobList[i].LatestStatus = m[jobList[i].ID]
// }
// }
// outJSON(w, jobList)
// }
// type ByProcTime []*core.Process
// func (a ByProcTime) Len() int { return len(a) }
// func (a ByProcTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// func (a ByProcTime) Less(i, j int) bool { return a[i].Time.After(a[j].Time) }

View File

@ -20,9 +20,7 @@ package controller
import (
conf "github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/pkg/node/core/job"
"github.com/goodrain/rainbond/pkg/node/core/k8s"
"github.com/goodrain/rainbond/pkg/node/core/store"
"encoding/json"
"fmt"
@ -42,245 +40,8 @@ import (
"strconv"
"github.com/goodrain/rainbond/pkg/node/api/model"
"github.com/coreos/etcd/clientv3"
"bytes"
"github.com/goodrain/rainbond/pkg/util"
)
func LoginCompute(w http.ResponseWriter, r *http.Request) {
loginInfo := new(model.Login)
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
err := decoder.Decode(loginInfo)
_, err = job.UnifiedLogin(loginInfo)
if err != nil {
logrus.Errorf("login remote host failed,details %s", err.Error())
api.ReturnError(r, w, http.StatusBadRequest, err.Error())
return
}
//check instation
nodeIP := strings.Split(loginInfo.HostPort, ":")[0]
logrus.Infof("target hostport is %s,node ip is %s", loginInfo.HostPort, nodeIP)
mayExist, err := k8s.GetSource(conf.Config.K8SNode + nodeIP)
if err == nil || mayExist != nil {
//if err != nil {
// logrus.Warnf("error wile test node exist,details %s",err.Error())
//}
logrus.Infof("already installed")
api.ReturnError(r, w, 400, "already installed")
return
}
cli2, err := job.UnifiedLogin(loginInfo)
if err != nil {
logrus.Errorf("login remote host failed,details %s", err.Error())
api.ReturnError(r, w, http.StatusBadRequest, err.Error())
return
}
sess, err := cli2.NewSession()
if err != nil {
logrus.Errorf("get remote host ssh session failed,details %s", err.Error())
api.ReturnError(r, w, http.StatusBadRequest, err.Error())
return
}
defer sess.Close()
buf := bytes.NewBuffer(nil)
sess.Stdout = buf
err = sess.Run("cat " + conf.Config.InstalledMarker)
if err == nil {
logrus.Infof("already installed,checked by installed marker file,details %s", err.Error())
api.ReturnError(r, w, 400, "already installed")
return
}
installedType := buf.String()
if strings.Contains(installedType, "\n") {
installedType = strings.Replace(installedType, "\n", "", -1)
}
if installedType == loginInfo.HostType {
logrus.Infof("already installed,checked by installed marker file,details %s", err.Error())
api.ReturnError(r, w, 400, "already installed")
return
} else {
//可以安装
logrus.Infof("installing new role to a node,whose installed role is %s,instaling %s", installedType, loginInfo.HostType)
}
_, err = newComputeNodeToInstall(nodeIP)
if err != nil {
logrus.Warnf("reg node %s to build-in jobs failed,details: %s", nodeIP, err.Error())
}
//todo 在这里给全局channel<-
logrus.Infof("prepare add item to channel canRunJob")
//core.CanRunJob<-nodeIP
store.DefalutClient.NewRunnable("/acp_node/runnable/"+nodeIP, nodeIP)
logrus.Infof("add runnable to node ip %s", nodeIP)
result := new(model.LoginResult)
result.HostPort = loginInfo.HostPort
result.LoginType = loginInfo.LoginType
result.Result = "success"
//添加一条记录,保存信息
//sess.Run()
cnode := &model.HostNode{
ID: nodeIP,
HostName: nodeIP,
InternalIP: nodeIP,
ExternalIP: nodeIP,
Role: []string{loginInfo.HostType},
Status: "installing",
Unschedulable: false,
Labels: nil,
AvailableCPU: 0,
AvailableMemory: 0,
}
err = k8s.AddSource(conf.Config.K8SNode+nodeIP, cnode)
if err != nil {
logrus.Errorf("error add source ,details %s", err.Error())
api.ReturnError(r, w, 500, err.Error())
return
}
//k8s.AddSource(conf.Config.K8SNode+node.UUID, node)
b, _ := json.Marshal(loginInfo)
store.DefalutClient.Put(conf.Config.ConfigStoragePath+"login/"+strings.Split(loginInfo.HostPort, ":")[0], string(b))
api.ReturnSuccess(r, w, result)
}
func newComputeNodeToInstall(node string) (*job.JobList, error) {
//这里改成所有
// jobs, err := job.GetBuildinJobs() //状态为未安装
// if err != nil {
// return nil, err
// }
// logrus.Infof("added new node %s to jobs", node)
// err = job.AddNewNodeToJobs(jobs, node)
// if err != nil {
// return nil, err
// }
return nil, nil
}
func NodeInit(w http.ResponseWriter, r *http.Request) {
// nodeIP := strings.TrimSpace(chi.URLParam(r, "ip"))
// logrus.Infof("init node whose ip is %s", nodeIP)
// loginInfo := new(model.Login)
// resp, err := store.DefalutClient.Get(conf.Config.ConfigPath + "login/" + nodeIP)
// if err != nil {
// logrus.Errorf("prepare stage failed,get login info failed,details %s", err.Error())
// api.ReturnError(r, w, http.StatusBadRequest, err.Error())
// return
// }
// if resp.Count > 0 {
// err := json.Unmarshal(resp.Kvs[0].Value, loginInfo)
// if err != nil {
// logrus.Errorf("decode request failed,details %s", err.Error())
// api.ReturnError(r, w, http.StatusBadRequest, err.Error())
// return
// }
// } else {
// logrus.Errorf("prepare stage failed,get login info failed,details %s", err.Error())
// api.ReturnError(r, w, http.StatusBadRequest, err.Error())
// return
// }
// logrus.Infof("starting new goruntine to init")
// go asyncInit(loginInfo, nodeIP)
api.ReturnSuccess(r, w, nil)
}
func CheckInitStatus(w http.ResponseWriter, r *http.Request) {
nodeIP := strings.TrimSpace(chi.URLParam(r, "ip"))
var result InitStatus
logrus.Infof("geting init status by key %s", conf.Config.InitStatus+nodeIP)
resp, err := store.DefalutClient.Get(conf.Config.InitStatus+nodeIP, clientv3.WithPrefix())
if err != nil {
logrus.Warnf("error getting resp from etcd with given key %s,details %s", conf.Config.InitStatus+nodeIP, err.Error())
api.ReturnError(r, w, 500, err.Error())
return
}
if resp.Count > 0 {
status := string(resp.Kvs[0].Value)
result.Status = status
if strings.HasPrefix(status, "failed") {
result.Status = "failed"
logrus.Infof("init failed")
errmsg := strings.Split(status, "|")[1]
result.Msg = errmsg
}
} else {
logrus.Infof("get nothing from etcd")
result.Status = "uninit"
}
api.ReturnSuccess(r, w, &result)
}
type InitStatus struct {
Status string `json:"status"`
Msg string `json:"msg"`
}
func asyncInit(login *model.Login, nodeIp string) {
//save initing to etcd
// store.DefalutClient.Put(conf.Config.InitStatus+nodeIp, "initing")
// logrus.Infof("changing init stauts to initing ")
// _, err := job.PrepareState(login)
// if err != nil {
// logrus.Errorf("async prepare stage failed,details %s", err.Error())
// //save error to etcd
// store.DefalutClient.Put(conf.Config.InitStatus+nodeIp, "failed|"+err.Error())
// //api.ReturnError(r,w,http.StatusBadRequest,err.Error())
// return
// }
// //save init success to etcd
// logrus.Infof("changing init stauts to success ")
store.DefalutClient.Put(conf.Config.InitStatus+nodeIp, "success")
}
func CheckJobGetStatus(w http.ResponseWriter, r *http.Request) {
nodeIP := strings.TrimSpace(chi.URLParam(r, "ip"))
jl, err := job.GetJobStatusByNodeIP(nodeIP)
if err != nil {
logrus.Warnf("get job status failed")
api.ReturnError(r, w, http.StatusInternalServerError, err.Error())
}
api.ReturnSuccess(r, w, jl)
}
func StartBuildInJobs(w http.ResponseWriter, r *http.Request) {
nodeIP := strings.TrimSpace(chi.URLParam(r, "ip"))
logrus.Infof("node start install %s", nodeIP)
done := make(chan *job.JobList)
doneOne := make(chan *job.BuildInJob)
store.DefalutClient.NewRunnable("/acp_node/runnable/"+nodeIP, nodeIP)
logrus.Infof("adding install runnable to node ip %s", nodeIP)
jl, err := job.GetJobStatusByNodeIP(nodeIP)
if err != nil {
logrus.Warnf("get job status failed")
api.ReturnError(r, w, http.StatusInternalServerError, err.Error())
}
jl.SEQ = util.NewUUID()
for _, v := range jl.List {
v.JobSEQ = jl.SEQ
}
go writer(jl.SEQ, nodeIP, done, doneOne)
for _, v := range jl.List {
v.JobSEQ = jl.SEQ
}
job.UpdateNodeJobStatus(nodeIP, jl.List)
//core.CanRunJob<-nodeIP
go job.RunBuildJobs(nodeIP, done, doneOne)
api.ReturnSuccess(r, w, jl)
}
func GetNodeDetails(w http.ResponseWriter, r *http.Request) {
nodeUID := strings.TrimSpace(chi.URLParam(r, "node"))
hostNode, err := k8s.GetSource(conf.Config.K8SNode + nodeUID)

View File

@ -19,6 +19,8 @@
package controller
import (
"net/http"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/pkg/node/core/config"
"github.com/goodrain/rainbond/pkg/node/core/service"
@ -52,3 +54,8 @@ func Exist(i interface{}) {
datacenterConfig.Stop()
}
}
//Ping Ping
func Ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}

View File

@ -92,7 +92,7 @@ type Task struct {
CompleteTime time.Time `json:"complete_time"`
ResultPath string `json:"result_path"`
EventID string `json:"event_id"`
IsOnce bool `json:"is_once"`
RunMode string `json:"run_mode"`
OutPut []*TaskOutPut `json:"out_put"`
}

View File

@ -67,15 +67,8 @@ func Routers(mode string) *chi.Mux {
r.Put("/{node_id}/labels", controller.PutLabel)
r.Post("/{node_id}/down", controller.DownNode) //节点下线
r.Post("/{node_id}/up", controller.UpNode) //节点上线
r.Get("/{node}/details", controller.GetNodeDetails) //节点详情
//历史API
r.Get("/{node}/details", controller.GetNodeDetails)
r.Put("/login", controller.LoginCompute)
//此处会安装
r.Put("/{ip}/init", controller.NodeInit)
r.Get("/{ip}/init/status", controller.CheckInitStatus)
r.Get("/{ip}/install/status", controller.CheckJobGetStatus)
r.Put("/{ip}/install", controller.StartBuildInJobs)
})
//TODO:
@ -104,12 +97,9 @@ func Routers(mode string) *chi.Mux {
r.Post("/{group_id}/exec", controller.ExecTaskGroup)
r.Get("/{group_id}/status", controller.GetTaskGroupStatus)
})
r.Put("/tasks/taskreload", controller.ReloadStaticTasks)
}
})
//重新加载task文件
if mode == "master" {
r.Put("/-/taskreload", controller.ReloadStaticTasks)
}
//节点监控
r.Get("/metrics", controller.NodeExporter)
return r

View File

@ -40,6 +40,8 @@ type DataCenterConfig struct {
options *option.Conf
ctx context.Context
cancel context.CancelFunc
//group config 不持久化
groupConfigs map[string]*GroupContext
}
var dataCenterConfig *DataCenterConfig
@ -62,6 +64,7 @@ func CreateDataCenterConfig() *DataCenterConfig {
config: &model.GlobalConfig{
Configs: make(map[string]*model.ConfigUnit),
},
groupConfigs: make(map[string]*GroupContext),
}
res, err := store.DefalutClient.Get(dataCenterConfig.options.ConfigStoragePath+"/global", client.WithPrefix())
if err != nil {
@ -162,3 +165,13 @@ func (d *DataCenterConfig) PutConfigKV(kv *mvccpb.KeyValue) {
func (d *DataCenterConfig) DeleteConfig(name string) {
d.config.Delete(name)
}
//GetGroupConfig get group config
func (d *DataCenterConfig) GetGroupConfig(groupID string) *GroupContext {
if c, ok := d.groupConfigs[groupID]; ok {
return c
}
c := NewGroupContext(groupID)
d.groupConfigs[groupID] = c
return c
}

File diff suppressed because it is too large Load Diff

View File

@ -67,7 +67,8 @@ type Job struct {
Stdin string `json:"stdin"`
Envs []string `json:"envs"`
User string `json:"user"`
Rules []*JobRule `json:"rules"`
//rules 为nil 即当前任务是一次任务
Rules *Rule `json:"rule"`
Pause bool `json:"pause"` // 可手工控制的状态
Timeout int64 `json:"timeout"` // 任务执行时间超时设置,大于 0 时有效
// 执行任务失败重试次数
@ -86,18 +87,48 @@ type Job struct {
cmd []string
// 控制同时执行任务数
Count *int64 `json:"-"`
Scheduler *Scheduler
RunStatus *RunStatus
}
//JobRule 任务规则
type JobRule struct {
//Scheduler 调度信息
type Scheduler struct {
NodeID string `json:"node_id"`
SchedulerTime time.Time `json:"scheduler_time"`
CanRun bool `json:"can_run"`
Message string `json:"message"`
SchedulerStatus string `json:"scheduler_status"`
}
//RunStatus job run status
type RunStatus struct {
Status string `json:"status"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
RecordID string `json:"record_id"`
}
//Rule 任务规则
type Rule struct {
ID string `json:"id"`
Mode RuleMode `json:"mode"` //once,
Timer string `json:"timer"`
NodeIDs []string `json:"nids"`
ExcludeNodeIDs []string `json:"exclude_nids"`
Labels map[string]string `json:"labels"`
Schedule cron.Schedule `json:"-"`
}
//RuleMode RuleMode
type RuleMode string
//OnlyOnce 只能一次
var OnlyOnce RuleMode = "onlyonce"
//ManyOnce 多次运行
var ManyOnce RuleMode = "manyonce"
//Cycle 循环运行
var Cycle RuleMode = "cycle"
// 任务锁
type locker struct {
kind int
@ -140,12 +171,12 @@ func (l *locker) unlock() {
//Cmd 可执行任务
type Cmd struct {
*Job
*JobRule
*Rule
}
//GetID GetID
func (c *Cmd) GetID() string {
return c.Job.ID + c.JobRule.ID
return c.Job.ID + c.Rule.ID
}
//Run 执行
@ -169,8 +200,8 @@ func (c *Cmd) Run() {
//lockTTL
func (c *Cmd) lockTTL() int64 {
now := time.Now()
prev := c.JobRule.Schedule.Next(now)
ttl := int64(c.JobRule.Schedule.Next(prev).Sub(prev) / time.Second)
prev := c.Rule.Schedule.Next(now)
ttl := int64(c.Rule.Schedule.Next(prev).Sub(prev) / time.Second)
if ttl == 0 {
return 0
}
@ -247,13 +278,18 @@ func (c *Cmd) lock() *locker {
}
//Valid 验证 timer 字段,创建Schedule
func (j *JobRule) Valid() error {
func (j *Rule) Valid() error {
// 注意 interface nil 的比较
if j.Schedule != nil {
return nil
}
if len(j.Timer) > 0 {
if j.Mode != OnlyOnce && j.Mode != ManyOnce && j.Mode != Cycle {
return fmt.Errorf("job rule mode(%s) can not be support", j.Mode)
}
if j.Mode == Cycle && len(j.Timer) <= 0 {
return fmt.Errorf("job rule mode(%s) timer can not be empty", Cycle)
}
if j.Mode == Cycle && len(j.Timer) > 0 {
sch, err := cron.Parse(j.Timer)
if err != nil {
return fmt.Errorf("invalid JobRule[%s], parse err: %s", j.Timer, err.Error())
@ -263,33 +299,6 @@ func (j *JobRule) Valid() error {
return nil
}
//included 当前节点是否符合规则
func (j *JobRule) included(node *model.HostNode) bool {
//是否属于排除节点
for _, excludeID := range j.ExcludeNodeIDs {
if excludeID == node.ID {
return false
}
}
if j.NodeIDs != nil && len(j.NodeIDs) > 0 {
//是否属于允许节点
for _, id := range j.NodeIDs {
if id == node.ID {
return true
}
}
} else {
//是否匹配label
for k, v := range j.Labels {
if nodev := node.Labels[k]; nodev != v {
return false
}
}
return true
}
return false
}
//GetJob get job
func GetJob(id string) (job *Job, err error) {
job, _, err = GetJobAndRev(id)
@ -317,17 +326,15 @@ func GetJobAndRev(id string) (job *Job, rev int64, err error) {
return
}
//DeleteJob 删除job
func DeleteJob(id string) (resp *client.DeleteResponse, err error) {
return store.DefalutClient.Delete(CreateJobKey(id))
}
//GetJobs 获取jobs
func GetJobs() (jobs map[string]*Job, err error) {
if conf.Config.Cmd == "" {
//GetJobs 获取当前节点jobs
func GetJobs(node *model.HostNode) (jobs map[string]*Job, err error) {
if conf.Config.JobPath == "" {
return nil, fmt.Errorf("job save path can not be empty")
}
resp, err := store.DefalutClient.Get(conf.Config.Cmd, client.WithPrefix())
if node == nil {
return nil, fmt.Errorf("current node can not be nil")
}
resp, err := store.DefalutClient.Get(conf.Config.JobPath, client.WithPrefix())
if err != nil {
return
}
@ -346,6 +353,9 @@ func GetJobs() (jobs map[string]*Job, err error) {
logrus.Warnf("job[%s] is invalid: %s", string(j.Key), err.Error())
continue
}
if !job.IsRunOn(node) {
continue
}
jobs[job.ID] = job
}
return
@ -353,12 +363,21 @@ func GetJobs() (jobs map[string]*Job, err error) {
//WatchJobs watch jobs
func WatchJobs() client.WatchChan {
return store.DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
return store.DefalutClient.Watch(conf.Config.JobPath, client.WithPrefix())
}
//AddJob 添加job
func AddJob(j *Job) error {
_, err := store.DefalutClient.Put(conf.Config.Cmd+"/"+j.ID, j.String())
//PutJob 添加获取更新job
func PutJob(j *Job) error {
_, err := store.DefalutClient.Put(conf.Config.JobPath+"/"+j.Hash, j.String())
if err != nil {
return err
}
return nil
}
//DeleteJob delete job
func DeleteJob(hash string) error {
_, err := store.DefalutClient.Delete(conf.Config.JobPath + "/" + hash)
if err != nil {
return err
}
@ -454,7 +473,7 @@ func (j *Job) Run(nid string) bool {
if err := cmd.Start(); err != nil {
logrus.Warnf("job exec failed,details :%s", err.Error())
j.Fail(start, fmt.Sprintf("%s\n%s", b.String(), err.Error()))
j.Fail(start, err.Error()+"\n"+b.String())
return false
}
proc = &Process{
@ -467,7 +486,7 @@ func (j *Job) Run(nid string) bool {
defer proc.Stop()
if err := cmd.Wait(); err != nil {
j.Fail(start, fmt.Sprintf("%s\n%s", b.String(), err.Error()))
j.Fail(start, err.Error()+"\n"+b.String())
return false
}
j.Success(start, b.String())
@ -517,7 +536,7 @@ func GetIDFromKey(key string) string {
//CreateJobKey JobKey
func CreateJobKey(id string) string {
return conf.Config.Cmd + "/" + id
return conf.Config.JobPath + "/" + id
}
//Key Key
@ -539,10 +558,10 @@ func (j *Job) Check() error {
j.User = strings.TrimSpace(j.User)
for i := range j.Rules {
id := strings.TrimSpace(j.Rules[i].ID)
if j.Rules != nil {
id := strings.TrimSpace(j.Rules.ID)
if id == "" || strings.HasPrefix(id, "NEW") {
j.Rules[i].ID = uuid.NewV4().String()
j.Rules.ID = uuid.NewV4().String()
}
}
@ -582,32 +601,33 @@ func (j *Job) Cmds(node *model.HostNode) (cmds map[string]*Cmd) {
if j.Pause {
return
}
for _, r := range j.Rules {
if r.included(node) {
if j.Rules != nil {
cmd := &Cmd{
Job: j,
JobRule: r,
Rule: j.Rules,
}
cmds[cmd.GetID()] = cmd
}
}
return
}
//IsRunOn 是否在本节点执行
//只要有一个rule满足条件即可
func (j Job) IsRunOn(node *model.HostNode) bool {
if j.Rules == nil || len(j.Rules) == 0 {
if j.Scheduler == nil {
return false
}
if j.Scheduler.NodeID != node.ID {
return false
}
if !j.Scheduler.CanRun {
return false
}
//已有执行状态
if j.RunStatus != nil {
return false
}
for _, r := range j.Rules {
if r.included(node) {
return true
}
}
return false
}
//Valid 安全选项验证
func (j *Job) Valid() error {
@ -664,10 +684,11 @@ func (j *Job) genReal() {
//ValidRules ValidRules
func (j *Job) ValidRules() error {
for _, r := range j.Rules {
if err := r.Valid(); err != nil {
return err
if j.Rules == nil {
return fmt.Errorf("job rule can not be nil")
}
if err := j.Rules.Valid(); err != nil {
return err
}
return nil
}

View File

@ -107,7 +107,7 @@ func CreateExecutionRecord(j *Job, t time.Time, rs string, success bool) {
TaskID: j.TaskID,
User: j.User,
Name: j.Name,
Node: j.runOn,
Node: j.NodeID,
Command: j.Command,
Output: rs,
Success: success,
@ -118,16 +118,15 @@ func CreateExecutionRecord(j *Job, t time.Time, rs string, success bool) {
if err != nil {
logrus.Error("put exec record to etcd error.", err.Error())
}
//单次执行job,更新job rule将已执行节点加入到排除节点范围
if j.IsOnce {
if j.Rules != nil {
for _, rule := range j.Rules {
rule.ExcludeNodeIDs = append(rule.ExcludeNodeIDs, j.runOn)
status := "Success"
if !success {
status = "Failure"
}
} else {
j.Rules = append(j.Rules, &JobRule{ExcludeNodeIDs: []string{j.runOn}})
}
//更新job
PutOnce(j)
j.RunStatus = &RunStatus{
Status: status,
StartTime: t,
EndTime: time.Now(),
RecordID: record.ID,
}
PutJob(j)
}

View File

@ -1,38 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package job
import (
client "github.com/coreos/etcd/clientv3"
conf "github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/pkg/node/core/store"
)
//PutOnce 添加立即执行的任务,只执行一次,执行完成后删除
//也可以更新job状态。node节点不监听更改事件
func PutOnce(j *Job) error {
_, err := store.DefalutClient.Put(conf.Config.Once+"/"+j.ID, j.String())
return err
}
//WatchOnce 监听任务
func WatchOnce() client.WatchChan {
return store.DefalutClient.Watch(conf.Config.Once, client.WithPrefix())
}

View File

@ -48,40 +48,45 @@ func CreateJobFromTask(task *model.Task, groupCtx *config.GroupContext) (*Job, e
for k, v := range envMaps {
envs = append(envs, fmt.Sprintf("%s=%s", k, v))
}
var rules []*JobRule
//如果任务不是一次任务
if !task.IsOnce {
if task.Timer == "" {
return nil, fmt.Errorf("timer can not be empty")
}
rule := &JobRule{
Labels: task.Temp.Labels,
NodeIDs: task.Nodes,
ID: uuid.NewV4().String(),
Timer: task.Timer,
}
rules = append(rules, rule)
} else {
rule := &JobRule{
Labels: task.Temp.Labels,
NodeIDs: task.Nodes,
ID: uuid.NewV4().String(),
}
rules = append(rules, rule)
}
job := &Job{
ID: uuid.NewV4().String(),
TaskID: task.ID,
EventID: task.EventID,
IsOnce: task.IsOnce,
Name: task.Name,
Command: strings.Join(command, " "),
Stdin: stdin,
Envs: envs,
Rules: rules,
Timeout: task.TimeOut,
Retry: task.Retry,
Interval: task.Interval,
}
//如果任务不是一次任务
if task.RunMode == string(Cycle) {
if task.Timer == "" {
return nil, fmt.Errorf("timer can not be empty")
}
rule := &Rule{
Labels: task.Temp.Labels,
Mode: Cycle,
ID: uuid.NewV4().String(),
Timer: task.Timer,
}
job.Rules = rule
} else if task.RunMode == string(OnlyOnce) {
rule := &Rule{
Labels: task.Temp.Labels,
Mode: OnlyOnce,
ID: uuid.NewV4().String(),
}
job.Rules = rule
} else if task.RunMode == string(ManyOnce) {
rule := &Rule{
Labels: task.Temp.Labels,
Mode: ManyOnce,
ID: uuid.NewV4().String(),
}
job.Rules = rule
}
return job, nil
}

View File

@ -49,7 +49,7 @@ var taskService *TaskService
func CreateTaskService(c *option.Conf, ms *masterserver.MasterServer) *TaskService {
if taskService == nil {
taskService = &TaskService{
SavePath: "/store/tasks",
SavePath: "/rainbond/store/tasks",
conf: c,
ms: ms,
}
@ -87,7 +87,6 @@ func (ts *TaskService) AddTask(t *model.Task) *utils.APIHandleError {
}
t.CreateTime = time.Now()
err := ts.ms.TaskEngine.AddTask(t)
if err != nil {
return utils.CreateAPIHandleErrorFromDBError("save task", err)
@ -179,17 +178,23 @@ func (ts *TaskService) ExecTask(taskID string, nodes []string) *utils.APIHandleE
}
}
if nodes == nil || len(nodes) == 0 {
ts.ms.TaskEngine.PutSchedul(taskID, "")
} else {
return utils.CreateAPIHandleError(400, fmt.Errorf("exec node can not be empty"))
}
for _, node := range nodes {
if n := ts.ms.Cluster.GetNode(node); n == nil {
return utils.CreateAPIHandleError(400, fmt.Errorf(" exec node %s not found", node))
}
}
var er error
for _, node := range nodes {
ts.ms.TaskEngine.PutSchedul(taskID, node)
er = ts.ms.TaskEngine.PutSchedul(taskID, node)
if er != nil {
logrus.Error("create task scheduler info error,", er.Error())
}
}
if er != nil {
return utils.CreateAPIHandleError(400, fmt.Errorf("exec task encounters an error"))
}
return nil
}
@ -369,12 +374,28 @@ func (ts *TaskGroupService) DeleteTaskGroup(taskGroupID string) *utils.APIHandle
}
//ExecTaskGroup 执行组任务API处理
func (ts *TaskGroupService) ExecTaskGroup(taskGroupID string) *utils.APIHandleError {
func (ts *TaskGroupService) ExecTaskGroup(taskGroupID string, nodes []string) *utils.APIHandleError {
t, err := ts.GetTaskGroup(taskGroupID)
if err != nil {
return err
}
//TODO:增加执行判断
ts.ms.TaskEngine.ScheduleGroup(nil, t)
if nodes == nil || len(nodes) == 0 {
return utils.CreateAPIHandleError(400, fmt.Errorf("exec node can not be empty"))
}
for _, node := range nodes {
if n := ts.ms.Cluster.GetNode(node); n == nil {
return utils.CreateAPIHandleError(400, fmt.Errorf(" exec node %s not found", node))
}
}
var er error
for _, node := range nodes {
er = ts.ms.TaskEngine.ScheduleGroup(t, node)
if er != nil {
logrus.Error("create task scheduler info error,", err.Error())
}
}
if er != nil {
return utils.CreateAPIHandleError(400, fmt.Errorf("exec task encounters an error"))
}
return nil
}

View File

@ -34,8 +34,10 @@ import (
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/pkg/node/api/model"
"github.com/goodrain/rainbond/pkg/node/core/config"
"github.com/goodrain/rainbond/pkg/node/core/job"
"github.com/goodrain/rainbond/pkg/node/core/store"
"github.com/goodrain/rainbond/pkg/node/masterserver/node"
"github.com/goodrain/rainbond/pkg/node/nodeserver"
"github.com/pquerna/ffjson/ffjson"
)
@ -47,14 +49,14 @@ type TaskEngine struct {
cancel context.CancelFunc
config *option.Conf
tasks map[string]*model.Task
tasksLock sync.Mutex
jobs nodeserver.Jobs
tasksLock, jobsLock sync.Mutex
dataCenterConfig *config.DataCenterConfig
nodeCluster *node.NodeCluster
currentNode *model.HostNode
schedulerCache map[string]bool
schedulerCacheLock sync.Mutex
down chan struct{}
masterID client.LeaseID
scheduler *Scheduler
}
//CreateTaskEngine 创建task管理引擎
@ -64,13 +66,15 @@ func CreateTaskEngine(nodeCluster *node.NodeCluster, node *model.HostNode) *Task
ctx: ctx,
cancel: cancel,
tasks: make(map[string]*model.Task),
jobs: make(nodeserver.Jobs),
config: option.Config,
dataCenterConfig: config.GetDataCenterConfig(),
nodeCluster: nodeCluster,
currentNode: node,
schedulerCache: make(map[string]bool),
down: make(chan struct{}),
}
scheduler := createScheduler(task)
task.scheduler = scheduler
return task
}
@ -187,7 +191,7 @@ func (t *TaskEngine) Stop() {
//watchTasks watchTasks
func (t *TaskEngine) loadAndWatchTasks() error {
//加载节点信息
res, err := store.DefalutClient.Get("/store/tasks/", client.WithPrefix())
res, err := store.DefalutClient.Get("/rainbond/store/tasks/", client.WithPrefix())
if err != nil {
return fmt.Errorf("load tasks error:%s", err.Error())
}
@ -197,7 +201,7 @@ func (t *TaskEngine) loadAndWatchTasks() error {
}
}
go func() {
ch := store.DefalutClient.Watch("/store/tasks/", client.WithPrefix(), client.WithRev(res.Header.Revision))
ch := store.DefalutClient.Watch("/rainbond/store/tasks/", client.WithPrefix(), client.WithRev(res.Header.Revision))
for {
select {
case <-t.ctx.Done():
@ -274,6 +278,7 @@ func (t *TaskEngine) LoadStaticTask() {
t.loadFile(t.config.StaticTaskPath)
}
}
func (t *TaskEngine) loadFile(path string) {
taskBody, err := ioutil.ReadFile(path)
if err != nil {
@ -292,18 +297,36 @@ func (t *TaskEngine) loadFile(path string) {
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
return
}
if group.ID == "" {
group.ID = group.Name
}
if group.Name == "" {
logrus.Errorf("task group name can not be empty. file %s", path)
return
}
if group.ID == "" {
group.ID = group.Name
}
if group.Tasks == nil {
logrus.Errorf("task group tasks can not be empty. file %s", path)
return
}
t.ScheduleGroup(nil, &group)
for _, task := range group.Tasks {
task.GroupID = group.ID
if task.Name == "" {
logrus.Errorf("task name can not be empty. file %s", path)
return
}
if task.ID == "" {
task.ID = task.Name
}
if task.Temp == nil {
logrus.Errorf("task [%s] temp can not be empty.", task.Name)
return
}
if task.Temp.ID == "" {
task.Temp.ID = task.Temp.Name
}
t.AddTask(task)
}
t.UpdateGroup(&group)
logrus.Infof("Load a static group %s.", group.Name)
}
if strings.Contains(filename, "task") {
@ -312,13 +335,13 @@ func (t *TaskEngine) loadFile(path string) {
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
return
}
if task.ID == "" {
task.ID = task.Name
}
if task.Name == "" {
logrus.Errorf("task name can not be empty. file %s", path)
return
}
if task.ID == "" {
task.ID = task.Name
}
if task.Temp == nil {
logrus.Errorf("task [%s] temp can not be empty.", task.Name)
return
@ -338,7 +361,7 @@ func (t *TaskEngine) GetTask(taskID string) *model.Task {
if task, ok := t.tasks[taskID]; ok {
return task
}
res, err := store.DefalutClient.Get("/store/tasks/" + taskID)
res, err := store.DefalutClient.Get("/rainbond/store/tasks/" + taskID)
if err != nil {
return nil
}
@ -390,13 +413,18 @@ func (t *TaskEngine) AddTask(task *model.Task) error {
if task.Scheduler.Mode == "" {
task.Scheduler.Mode = "Passive"
}
if task.RunMode == "" {
task.RunMode = string(job.OnlyOnce)
}
t.CacheTask(task)
_, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String())
_, err := store.DefalutClient.Put("/rainbond/store/tasks/"+task.ID, task.String())
if err != nil {
return err
}
if task.Scheduler.Mode == "Intime" {
t.PutSchedul(task.ID, "")
for _, node := range t.nodeCluster.GetLabelsNode(task.Temp.Labels) {
t.PutSchedul(task.ID, node)
}
}
return nil
}
@ -406,7 +434,7 @@ func (t *TaskEngine) UpdateTask(task *model.Task) {
t.tasksLock.Lock()
defer t.tasksLock.Unlock()
t.tasks[task.ID] = task
_, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String())
_, err := store.DefalutClient.Put("/rainbond/store/tasks/"+task.ID, task.String())
if err != nil {
logrus.Errorf("update task error,%s", err.Error())
}
@ -414,7 +442,8 @@ func (t *TaskEngine) UpdateTask(task *model.Task) {
//UpdateGroup 更新taskgroup
func (t *TaskEngine) UpdateGroup(group *model.TaskGroup) {
_, err := store.DefalutClient.Put("/store/taskgroups/"+group.ID, group.String())
group.Tasks = nil
_, err := store.DefalutClient.Put("/rainbond/store/taskgroups/"+group.ID, group.String())
if err != nil {
logrus.Errorf("update taskgroup error,%s", err.Error())
}
@ -422,7 +451,7 @@ func (t *TaskEngine) UpdateGroup(group *model.TaskGroup) {
//GetTaskGroup 获取taskgroup
func (t *TaskEngine) GetTaskGroup(taskGroupID string) *model.TaskGroup {
res, err := store.DefalutClient.Get("/store/taskgroups/" + taskGroupID)
res, err := store.DefalutClient.Get("/rainbond/store/taskgroups/" + taskGroupID)
if err != nil {
return nil
}
@ -445,7 +474,7 @@ func (t *TaskEngine) CacheTask(task *model.Task) {
//AddGroupConfig 添加组会话配置
func (t *TaskEngine) AddGroupConfig(groupID string, configs map[string]string) {
ctx := config.NewGroupContext(groupID)
ctx := t.dataCenterConfig.GetGroupConfig(groupID)
for k, v := range configs {
ctx.Add(k, v)
}

View File

@ -47,7 +47,9 @@ func (t *TaskEngine) startHandleJobRecord() {
return
case event := <-ch:
if err := event.Err(); err != nil {
logrus.Error("watch job recoder error,", err.Error())
time.Sleep(time.Second * 3)
continue
}
for _, ev := range event.Events {
switch {
@ -91,6 +93,7 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
}
//更新task信息
defer t.UpdateTask(task)
defer er.CompleteHandle()
taskStatus := model.TaskStatus{
JobID: er.JobID,
StartTime: er.BeginTime,
@ -151,7 +154,7 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
if group == nil {
continue
}
t.ScheduleGroup([]string{output.NodeID}, group)
t.ScheduleGroup(group, output.NodeID)
}
}
}
@ -164,14 +167,4 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
task.Status = make(map[string]model.TaskStatus)
}
task.Status[er.Node] = taskStatus
//如果是is_once的任务处理完成后删除job
if task.IsOnce {
task.CompleteTime = time.Now()
t.StopTask(task, er.Node)
} else { //如果是一次性任务,执行记录已经被删除,无需更新
er.CompleteHandle()
}
t.schedulerCacheLock.Lock()
defer t.schedulerCacheLock.Unlock()
delete(t.schedulerCache, task.ID+er.Node)
}

View File

@ -19,106 +19,213 @@
package task
import (
"context"
"crypto/sha1"
"fmt"
"time"
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/goodrain/rainbond/pkg/node/api/model"
"github.com/goodrain/rainbond/pkg/node/core/job"
"github.com/goodrain/rainbond/pkg/node/core/store"
"github.com/pquerna/ffjson/ffjson"
)
//Scheduler 调度器
type Scheduler struct {
taskEngine *TaskEngine
cache chan *job.Job
ctx context.Context
cancel context.CancelFunc
}
func createScheduler(taskEngine *TaskEngine) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
taskEngine: taskEngine,
cache: make(chan *job.Job, 100),
ctx: ctx,
cancel: cancel,
}
}
func (s *Scheduler) putSchedulerChan(jb *job.Job, duration time.Duration) {
go func() {
time.Sleep(duration)
s.cache <- jb
}()
}
//Next 下一个调度对象
func (s *Scheduler) Next() (*job.Job, error) {
ctx, cancel := context.WithTimeout(s.ctx, time.Second*5)
defer cancel()
select {
case job := <-s.cache:
return job, nil
case <-s.ctx.Done():
return nil, fmt.Errorf("context cancel")
case <-ctx.Done():
return nil, fmt.Errorf("time out")
}
}
//Stop 停止
func (s *Scheduler) Stop() {
s.cancel()
}
//StartScheduler 开始调度
func (t *TaskEngine) startScheduler() {
t.loadAndWatchJobs()
logrus.Info("Start scheduler worke")
for {
next, err := t.scheduler.Next()
if err != nil {
if err.Error() == "time out" {
continue
}
if err.Error() == "context cancel" {
return
}
continue
}
logrus.Infof("Start schedule job %s to node %s", next.Hash, next.NodeID)
task := t.GetTask(next.TaskID)
if task == nil {
logrus.Errorf("job %s task %s not found when scheduler", next.ID, next.TaskID)
continue
}
vas := t.GetValidationCriteria(task)
for i, va := range vas {
ok, err := va(next.NodeID, task)
if err != nil {
task.Scheduler.Status[next.NodeID] = model.SchedulerStatus{
Status: "Failure",
Message: err.Error(),
SchedulerMaster: t.currentNode.ID,
SchedulerTime: time.Now(),
}
t.UpdateTask(task)
next.Scheduler = &job.Scheduler{
NodeID: next.NodeID,
SchedulerTime: time.Now(),
SchedulerStatus: "Failure",
Message: err.Error(),
}
t.UpdateJob(next)
logrus.Infof("Failure schedule job %s to node %s", next.Hash, next.NodeID)
break
}
if !ok {
task.Scheduler.Status[next.NodeID] = model.SchedulerStatus{
Status: "Waiting",
Message: "waiting validation criteria",
SchedulerMaster: t.currentNode.ID,
SchedulerTime: time.Now(),
}
t.UpdateTask(task)
t.scheduler.putSchedulerChan(next, 3*time.Second)
break
}
//全部条件满足
if i == len(vas)-1 {
if task.Scheduler.Status == nil {
task.Scheduler.Status = make(map[string]model.SchedulerStatus)
}
task.Scheduler.Status[next.NodeID] = model.SchedulerStatus{
Status: "Success",
Message: "Success",
SchedulerMaster: t.currentNode.ID,
SchedulerTime: time.Now(),
}
task.Status[next.NodeID] = model.TaskStatus{
JobID: next.ID,
Status: "Start",
}
t.UpdateTask(task)
next.Scheduler = &job.Scheduler{
NodeID: next.NodeID,
SchedulerTime: time.Now(),
SchedulerStatus: "Success",
CanRun: true,
}
t.UpdateJob(next)
logrus.Infof("Success schedule job %s to node %s", next.Hash, next.NodeID)
}
}
}
}
func (t *TaskEngine) stopScheduler() {
t.scheduler.Stop()
}
//TaskSchedulerInfo 请求调度信息
//指定任务到指定节点执行
//执行完成后该数据从集群中删除
//存储key: taskid+nodeid
type TaskSchedulerInfo struct {
TaskID string `json:"taskID"`
Node string `json:"node"`
JobID string `json:"jobID"`
CreateTime time.Time `json:"create_time"`
SchedulerMasterNode string `json:"create_master_node"`
Status model.SchedulerStatus
}
//NewTaskSchedulerInfo 创建请求调度信息
func NewTaskSchedulerInfo(taskID, nodeID string) *TaskSchedulerInfo {
return &TaskSchedulerInfo{
TaskID: taskID,
Node: nodeID,
CreateTime: time.Now(),
}
}
func getTaskSchedulerInfoFromKV(kv *mvccpb.KeyValue) *TaskSchedulerInfo {
var taskinfo TaskSchedulerInfo
if err := ffjson.Unmarshal(kv.Value, &taskinfo); err != nil {
logrus.Error("parse task scheduler info error:", err.Error())
return nil
}
return &taskinfo
}
//Post 发布
func (t TaskSchedulerInfo) Post() {
body, err := ffjson.Marshal(t)
if err == nil {
store.DefalutClient.Post("/rainbond-node/scheduler/taskshcedulers/"+t.TaskID+"/"+t.Node, string(body))
logrus.Infof("put a scheduler info %s:%s", t.TaskID, t.Node)
}
}
//Update 更新数据
func (t TaskSchedulerInfo) Update() {
body, err := ffjson.Marshal(t)
if err == nil {
store.DefalutClient.Put("/rainbond-node/scheduler/taskshcedulers/"+t.TaskID+"/"+t.Node, string(body))
}
}
//Delete 删除数据
func (t TaskSchedulerInfo) Delete() {
store.DefalutClient.Delete("/rainbond-node/scheduler/taskshcedulers/" + t.TaskID + "/" + t.Node)
}
func (t *TaskEngine) watcheScheduler() {
load, _ := store.DefalutClient.Get("/rainbond-node/scheduler/taskshcedulers/", client.WithPrefix())
func (t *TaskEngine) loadAndWatchJobs() {
load, _ := store.DefalutClient.Get(t.config.JobPath, client.WithPrefix())
if load != nil && load.Count > 0 {
for _, kv := range load.Kvs {
logrus.Debugf("watch a scheduler task %s", kv.Key)
if taskinfo := getTaskSchedulerInfoFromKV(kv); taskinfo != nil {
t.prepareScheduleTask(taskinfo)
jb, err := job.GetJobFromKv(kv)
if err != nil {
logrus.Errorf("load job(%s) error,%s", kv.Key, err.Error())
continue
}
t.andOrUpdateJob(jb)
}
}
}
ch := store.DefalutClient.Watch("/rainbond-node/scheduler/taskshcedulers/", client.WithPrefix())
logrus.Infof("load exist job success,count %d", len(t.jobs))
go func() {
ch := store.DefalutClient.Watch(t.config.JobPath, client.WithPrefix())
for {
select {
case <-t.ctx.Done():
return
case event := <-ch:
if err := event.Err(); err != nil {
logrus.Error("watch job error,", err.Error())
time.Sleep(time.Second * 3)
continue
}
for _, ev := range event.Events {
switch {
case ev.IsCreate(), ev.IsModify():
logrus.Debugf("watch a scheduler task %s", ev.Kv.Key)
if taskinfo := getTaskSchedulerInfoFromKV(ev.Kv); taskinfo != nil {
t.prepareScheduleTask(taskinfo)
jb, err := job.GetJobFromKv(ev.Kv)
if err != nil {
logrus.Errorf("load job(%s) error,%s", ev.Kv.Key, err.Error())
continue
}
t.andOrUpdateJob(jb)
case ev.Type == client.EventTypeDelete:
t.deleteJob(job.GetIDFromKey(string(ev.Kv.Key)))
}
}
}
}
}()
}
func (t *TaskEngine) andOrUpdateJob(jb *job.Job) {
t.jobsLock.Lock()
defer t.jobsLock.Unlock()
t.jobs[jb.Hash] = jb
if jb.Scheduler == nil {
t.scheduler.putSchedulerChan(jb, 0)
logrus.Infof("cache a job and put scheduler")
}
}
//UpdateJob 持久化增加or更新job
func (t *TaskEngine) UpdateJob(jb *job.Job) {
t.jobsLock.Lock()
defer t.jobsLock.Unlock()
t.jobs[jb.Hash] = jb
job.PutJob(jb)
}
func (t *TaskEngine) deleteJob(jbHash string) {
t.jobsLock.Lock()
defer t.jobsLock.Unlock()
if _, ok := t.jobs[jbHash]; ok {
delete(t.jobs, jbHash)
}
}
@ -138,7 +245,15 @@ func (t *TaskEngine) PutSchedul(taskID string, nodeID string) (err error) {
return fmt.Errorf("node %s not found", nodeID)
}
hash := getHash(taskID, nodeID)
logrus.Infof("scheduler hash %s", hash)
logrus.Infof("put scheduler hash %s", hash)
//初步判断任务是否能被创建
if oldjob := t.GetJob(hash); oldjob != nil {
if task.RunMode == string(job.OnlyOnce) {
if oldjob.Scheduler != nil && oldjob.Scheduler.SchedulerStatus == "Success" {
return fmt.Errorf("task %s run on node %s job only run mode %s", taskID, nodeID, job.OnlyOnce)
}
}
}
var jb *job.Job
if task.GroupID == "" {
jb, err = job.CreateJobFromTask(task, nil)
@ -146,9 +261,23 @@ func (t *TaskEngine) PutSchedul(taskID string, nodeID string) (err error) {
return fmt.Errorf("create job error,%s", err.Error())
}
} else {
ctx := t.dataCenterConfig.GetGroupConfig(task.GroupID)
jb, err = job.CreateJobFromTask(task, ctx)
if err != nil {
return fmt.Errorf("create job error,%s", err.Error())
}
}
jb.NodeID = nodeID
jb.Hash = hash
jb.Scheduler = nil
return job.PutJob(jb)
}
//GetJob 获取已经存在的job
func (t *TaskEngine) GetJob(hash string) *job.Job {
if j, ok := t.jobs[hash]; ok {
return j
}
return nil
}
@ -160,268 +289,24 @@ func getHash(source ...string) string {
return fmt.Sprintf("%x", h.Sum(nil))
}
//waitScheduleTask 等待调度条件成熟
func (t *TaskEngine) waitScheduleTask(taskSchedulerInfo *TaskSchedulerInfo, task *model.Task) {
//continueScheduler 是否继续调度,如果调度条件无法满足,停止调度
var continueScheduler = true
canRun := func() bool {
defer t.UpdateTask(task)
if task.Temp.Depends != nil && len(task.Temp.Depends) > 0 {
var result = make([]bool, len(task.Temp.Depends))
for i, dep := range task.Temp.Depends {
if depTask := t.GetTask(dep.DependTaskID); depTask != nil {
//判断依赖任务调度情况
if depTask.Scheduler.Mode == "Passive" {
var needScheduler bool
if depTask.Scheduler.Status == nil {
needScheduler = true
}
//当前节点未调度且依赖策略为当前节点必须执行,则调度
if _, ok := depTask.Scheduler.Status[taskSchedulerInfo.Node]; !ok && dep.DetermineStrategy == model.SameNodeStrategy {
needScheduler = true
}
if needScheduler {
//依赖任务i未就绪
result[i] = false
//发出依赖任务的调度请求
if dep.DetermineStrategy == model.SameNodeStrategy {
t.PutSchedul(depTask.ID, taskSchedulerInfo.Node)
} else if dep.DetermineStrategy == model.AtLeastOnceStrategy {
nodes := t.nodeCluster.GetLabelsNode(depTask.Temp.Labels)
if len(nodes) > 0 {
t.PutSchedul(depTask.ID, nodes[0])
} else {
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s can not found exec node", depTask.ID)
taskSchedulerInfo.Status.Status = "Failure"
taskSchedulerInfo.Status.SchedulerTime = time.Now()
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
continueScheduler = false
continue
}
}
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
taskSchedulerInfo.Status.Status = "Waiting"
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
continue
}
}
//判断依赖任务的执行情况
//依赖策略为任务全局只要执行一次
if dep.DetermineStrategy == model.AtLeastOnceStrategy {
if depTask.Status == nil || len(depTask.Status) < 1 {
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
taskSchedulerInfo.Status.Status = "Waiting"
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
return false
}
var access bool
var faiiureSize int
if len(depTask.Status) > 0 {
for _, status := range depTask.Status {
if status.CompleStatus == "Success" {
logrus.Debugf("dep task %s ready", depTask.ID)
access = true
} else {
faiiureSize++
}
}
}
//如果依赖的某个服务全部执行记录失败,条件不可能满足,本次调度结束
if faiiureSize != 0 && faiiureSize >= len(depTask.Scheduler.Status) {
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s Condition cannot be satisfied", depTask.ID)
taskSchedulerInfo.Status.Status = "Failure"
taskSchedulerInfo.Status.SchedulerTime = time.Now()
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
continueScheduler = false
return false
}
result[i] = access
}
//依赖任务相同节点执行成功
if dep.DetermineStrategy == model.SameNodeStrategy {
if depTask.Status == nil || len(depTask.Status) < 1 {
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
taskSchedulerInfo.Status.Status = "Waiting"
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
return false
}
if nodestatus, ok := depTask.Status[taskSchedulerInfo.Node]; ok && nodestatus.CompleStatus == "Success" {
result[i] = true
continue
} else if ok && nodestatus.CompleStatus != "" {
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s(%s) Condition cannot be satisfied", depTask.ID, nodestatus.CompleStatus)
taskSchedulerInfo.Status.Status = "Failure"
taskSchedulerInfo.Status.SchedulerTime = time.Now()
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
continueScheduler = false
return false
} else {
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
taskSchedulerInfo.Status.Status = "Waiting"
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
return false
}
}
} else {
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not found", dep.DependTaskID)
taskSchedulerInfo.Status.Status = "Failure"
taskSchedulerInfo.Status.SchedulerTime = time.Now()
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
result[i] = false
continueScheduler = false
return false
}
}
for _, ok := range result {
if !ok {
return false
}
}
}
return true
}
for continueScheduler {
if canRun() {
t.scheduler(taskSchedulerInfo, task)
return
}
logrus.Infof("task %s can not be run .waiting depend tasks complete", task.Name)
time.Sleep(2 * time.Second)
}
//调度失败,删除任务
taskSchedulerInfo.Delete()
}
//ScheduleTask 调度执行指定task
//单节点或不确定节点
func (t *TaskEngine) prepareScheduleTask(taskSchedulerInfo *TaskSchedulerInfo) {
if task := t.GetTask(taskSchedulerInfo.TaskID); task != nil {
if task == nil {
return
}
//已经调度且没有完成
if status, ok := task.Status[taskSchedulerInfo.Node]; ok && status.Status == "start" {
logrus.Warningf("prepare scheduler task(%s) error,it already scheduler", taskSchedulerInfo.TaskID)
return
}
if task.Temp == nil {
logrus.Warningf("prepare scheduler task(%s) temp can not be nil", taskSchedulerInfo.TaskID)
return
}
if task.Scheduler.Status == nil {
task.Scheduler.Status = make(map[string]model.SchedulerStatus)
}
if task.Temp.Depends != nil && len(task.Temp.Depends) > 0 {
go t.waitScheduleTask(taskSchedulerInfo, task)
} else {
//真正调度
t.scheduler(taskSchedulerInfo, task)
}
t.UpdateTask(task)
}
}
//scheduler 调度一个Task到一个节点执行
func (t *TaskEngine) scheduler(taskSchedulerInfo *TaskSchedulerInfo, task *model.Task) {
j, err := job.CreateJobFromTask(task, nil)
if err != nil {
taskSchedulerInfo.Status.Status = "Failure"
taskSchedulerInfo.Status.Message = err.Error()
//更新调度状态
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
//如果指定nodes
if taskSchedulerInfo.Node != "" {
for _, rule := range j.Rules {
rule.NodeIDs = []string{taskSchedulerInfo.Node}
}
}
if j.IsOnce {
if err := job.PutOnce(j); err != nil {
taskSchedulerInfo.Status.Status = "Failure"
taskSchedulerInfo.Status.Message = err.Error()
//更新调度状态
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
} else {
if err := job.AddJob(j); err != nil {
taskSchedulerInfo.Status.Status = "Failure"
taskSchedulerInfo.Status.Message = err.Error()
//更新调度状态
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
}
task.StartTime = time.Now()
taskSchedulerInfo.Status.Status = "Success"
taskSchedulerInfo.Status.Message = "scheduler success"
taskSchedulerInfo.Status.SchedulerTime = time.Now()
taskSchedulerInfo.Status.SchedulerMaster = t.currentNode.ID
//更新调度状态
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
if task.Status == nil {
task.Status = make(map[string]model.TaskStatus)
}
task.Status[t.currentNode.ID] = model.TaskStatus{
JobID: j.ID,
StartTime: time.Now(),
Status: "Start",
}
t.UpdateTask(task)
logrus.Infof("success scheduler a task %s to node %s", task.Name, taskSchedulerInfo.Node)
}
//ScheduleGroup 调度执行指定task
func (t *TaskEngine) ScheduleGroup(nodes []string, nextGroups ...*model.TaskGroup) {
for _, group := range nextGroups {
if group.Tasks == nil || len(group.Tasks) < 1 {
group.Status = &model.TaskGroupStatus{
StartTime: time.Now(),
EndTime: time.Now(),
Status: "NotDefineTask",
}
t.UpdateGroup(group)
}
for _, task := range group.Tasks {
task.GroupID = group.ID
t.AddTask(task)
}
group.Status = &model.TaskGroupStatus{
StartTime: time.Now(),
Status: "Start",
}
t.UpdateGroup(group)
}
func (t *TaskEngine) ScheduleGroup(nextGroups *model.TaskGroup, node string) error {
//TODO:调度组任务
return nil
}
//StopTask 停止任务即删除任务对应的JOB
func (t *TaskEngine) StopTask(task *model.Task, node string) {
if status, ok := task.Status[node]; ok {
if status.JobID != "" {
if task.IsOnce {
_, err := store.DefalutClient.Delete(t.config.Once + "/" + status.JobID)
_, err := store.DefalutClient.Delete(t.config.JobPath + "/" + status.JobID)
if err != nil {
logrus.Errorf("stop task %s error.%s", task.Name, err.Error())
}
} else {
_, err := store.DefalutClient.Delete(t.config.Cmd + "/" + status.JobID)
if err != nil {
logrus.Errorf("stop task %s error.%s", task.Name, err.Error())
}
}
_, err := store.DefalutClient.Delete(t.config.ExecutionRecordPath+"/"+status.JobID, client.WithPrefix())
_, err = store.DefalutClient.Delete(t.config.ExecutionRecordPath+"/"+status.JobID, client.WithPrefix())
if err != nil {
logrus.Errorf("delete execution record for task %s error.%s", task.Name, err.Error())
}
}
}
store.DefalutClient.Delete("/rainbond-node/scheduler/taskshcedulers/" + task.ID + "/" + node)
}

View File

@ -0,0 +1,125 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package task
import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/pkg/node/api/model"
)
//ValidationCriteria 在某个节点执行任务,行不行
type ValidationCriteria func(string, *model.Task) (bool, error)
//AllCouldRun 可以执行
var AllCouldRun ValidationCriteria = func(string, *model.Task) (bool, error) {
return true, nil
}
//ModeRun 验证任务执行策略
var ModeRun ValidationCriteria = func(node string, task *model.Task) (bool, error) {
if task.RunMode == "OnlyOnce" {
if status, ok := task.Status[node]; ok {
if status.CompleStatus == "Success" {
return false, fmt.Errorf("this job In violation of the task runmode")
}
}
}
return true, nil
}
//DependRun 验证依赖任务执行情况
func (t *TaskEngine) DependRun(node string, task, depTask *model.Task, Strategy string) (bool, error) {
if depTask != nil {
//判断依赖任务调度情况
if depTask.Scheduler.Mode == "Passive" {
var needScheduler bool
//当前节点未调度且依赖策略为当前节点必须执行,则调度
if Strategy == model.SameNodeStrategy {
if job := t.GetJob(getHash(depTask.ID, node)); job == nil {
needScheduler = true
}
}
if needScheduler {
//发出依赖任务的调度请求
t.PutSchedul(depTask.ID, node)
return false, nil
}
}
//判断依赖任务的执行情况
//依赖策略为任务全局只要执行一次
if Strategy == model.AtLeastOnceStrategy {
if depTask.Status == nil || len(depTask.Status) < 1 {
return false, nil
}
var faiiureSize int
if len(depTask.Status) > 0 {
for _, status := range depTask.Status {
if status.CompleStatus == "Success" {
logrus.Debugf("dep task %s ready", depTask.ID)
return true, nil
}
faiiureSize++
}
}
// if faiiureSize > 0 {
// return false, fmt.Errorf("dep task run error count %d", faiiureSize)
// }
return false, nil
}
//依赖任务相同节点执行成功
if Strategy == model.SameNodeStrategy {
if depTask.Status == nil || len(depTask.Status) < 1 {
return false, nil
}
if nodestatus, ok := depTask.Status[node]; ok && nodestatus.CompleStatus == "Success" {
return true, nil
} else if ok && nodestatus.CompleStatus != "" {
return false, fmt.Errorf("depend task %s(%s) Condition cannot be satisfied", depTask.ID, nodestatus.CompleStatus)
} else {
return false, nil
}
}
} else {
return false, fmt.Errorf("task (%s) dep task is nil", task.ID)
}
return false, nil
}
//GetValidationCriteria 获取调度必要条件
func (t *TaskEngine) GetValidationCriteria(task *model.Task) (vas []ValidationCriteria) {
vas = append(vas, ModeRun)
vas = append(vas, t.DependsRun)
return
}
//DependRun DependRun
func (t *TaskEngine) DependsRun(node string, task *model.Task) (bool, error) {
for _, dep := range task.Temp.Depends {
depTask := t.GetTask(dep.DependTaskID)
if depTask != nil {
ok, err := t.DependRun(node, task, depTask, dep.DetermineStrategy)
if !ok {
return ok, err
}
}
}
return true, nil
}

View File

@ -1,24 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package nodeserver
import "github.com/goodrain/rainbond/pkg/node/core/job"
//Jobs jobs
type Jobs map[string]*job.Job

View File

@ -28,7 +28,7 @@ import (
conf "github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/pkg/node/api/model"
"github.com/goodrain/rainbond/pkg/node/core/job"
corejob "github.com/goodrain/rainbond/pkg/node/core/job"
"github.com/goodrain/rainbond/pkg/node/core/store"
"github.com/goodrain/rainbond/pkg/util"
"github.com/robfig/cron"
@ -50,6 +50,9 @@ type Config struct {
TTL int64
}
//Jobs jobs
type Jobs map[string]*corejob.Job
//NodeServer node manager server
type NodeServer struct {
*store.Client
@ -58,7 +61,7 @@ type NodeServer struct {
jobs Jobs // 和结点相关的任务
onceJobs Jobs //记录执行的单任务
jobLock sync.Mutex
cmds map[string]*job.Cmd
cmds map[string]*corejob.Cmd
// 删除的 job id用于 group 更新
delIDs map[string]bool
ttl int64
@ -99,15 +102,14 @@ func (n *NodeServer) Run() (err error) {
}
n.Cron.Start()
go n.watchJobs()
go n.watchOnce()
logrus.Info("node registe success")
if err := job.StartProc(); err != nil {
if err := corejob.StartProc(); err != nil {
logrus.Warnf("[process key will not timeout]proc lease id set err: %s", err.Error())
}
return
}
func (n *NodeServer) loadJobs() (err error) {
jobs, err := job.GetJobs()
jobs, err := corejob.GetJobs(n.HostNode)
if err != nil {
return err
}
@ -115,44 +117,49 @@ func (n *NodeServer) loadJobs() (err error) {
return
}
for _, job := range jobs {
job.Init(n.ID)
n.addJob(job)
}
return
}
func (n *NodeServer) watchJobs() {
rch := job.WatchJobs()
rch := corejob.WatchJobs()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
j, err := job.GetJobFromKv(ev.Kv)
j, err := corejob.GetJobFromKv(ev.Kv)
if err != nil {
logrus.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
}
j.Init(n.ID)
n.addJob(j)
case ev.IsModify():
j, err := job.GetJobFromKv(ev.Kv)
j, err := corejob.GetJobFromKv(ev.Kv)
if err != nil {
logrus.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
}
j.Init(n.ID)
n.modJob(j)
case ev.Type == client.EventTypeDelete:
n.delJob(job.GetIDFromKey(string(ev.Kv.Key)))
n.delJob(corejob.GetIDFromKey(string(ev.Kv.Key)))
default:
logrus.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key))
}
}
}
}
func (n *NodeServer) addJob(j *job.Job) {
//添加job缓存
func (n *NodeServer) addJob(j *corejob.Job) {
if !j.IsRunOn(n.HostNode) {
return
}
//一次性任务
if j.Rules.Mode != corejob.Cycle {
n.runOnceJob(j)
return
}
n.jobLock.Lock()
defer n.jobLock.Unlock()
n.jobs[j.ID] = j
@ -186,7 +193,15 @@ func (n *NodeServer) delJob(id string) {
return
}
func (n *NodeServer) modJob(job *job.Job) {
func (n *NodeServer) modJob(job *corejob.Job) {
if !job.IsRunOn(n.HostNode) {
return
}
//一次性任务
if job.Rules.Mode != corejob.Cycle {
n.runOnceJob(job)
return
}
oJob, ok := n.jobs[job.ID]
// 之前此任务没有在当前结点执行,直接增加任务
if !ok {
@ -207,113 +222,39 @@ func (n *NodeServer) modJob(job *job.Job) {
}
}
func (n *NodeServer) addCmd(cmd *job.Cmd) {
n.Cron.Schedule(cmd.JobRule.Schedule, cmd)
func (n *NodeServer) addCmd(cmd *corejob.Cmd) {
n.Cron.Schedule(cmd.Rule.Schedule, cmd)
n.cmds[cmd.GetID()] = cmd
logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.JobRule.ID, cmd.JobRule.Timer)
logrus.Infof("job[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
return
}
func (n *NodeServer) modCmd(cmd *job.Cmd) {
func (n *NodeServer) modCmd(cmd *corejob.Cmd) {
c, ok := n.cmds[cmd.GetID()]
if !ok {
n.addCmd(cmd)
return
}
sch := c.JobRule.Timer
sch := c.Rule.Timer
*c = *cmd
// 节点执行时间改变,更新 cron
// 否则不用更新 cron
if c.JobRule.Timer != sch {
n.Cron.Schedule(c.JobRule.Schedule, c)
if c.Rule.Timer != sch {
n.Cron.Schedule(c.Rule.Schedule, c)
}
logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.JobRule.ID, c.JobRule.Timer)
logrus.Infof("job[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Rule.ID, c.Rule.Timer)
}
func (n *NodeServer) delCmd(cmd *job.Cmd) {
func (n *NodeServer) delCmd(cmd *corejob.Cmd) {
delete(n.cmds, cmd.GetID())
n.Cron.DelJob(cmd)
logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.JobRule.ID, cmd.JobRule.Timer)
}
func (n *NodeServer) watchOnce() {
rch := job.WatchOnce()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
j, err := job.GetJobFromKv(ev.Kv)
if err != nil {
logrus.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
}
j.Init(n.ID)
if j.Rules != nil {
if !j.IsRunOn(n.HostNode) {
continue
}
}
if !j.IsOnce {
continue
logrus.Infof("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Rule.ID, cmd.Rule.Timer)
}
//job must be schedulered
func (n *NodeServer) runOnceJob(j *corejob.Job) {
go j.RunWithRecovery()
}
}
}
}
func (n *NodeServer) watchBuildIn() {
//todo 在这里给<-channel,如果没有,立刻返回,可以用无循环switchdefault实现
// rch := job.WatchBuildIn()
// for wresp := range rch {
// for _, ev := range wresp.Events {
// switch {
// case ev.IsCreate() || ev.IsModify():
// canRun := store.DefalutClient.IsRunnable("/acp_node/runnable/" + n.ID)
// if !canRun {
// logrus.Infof("job can't run on node %s,skip", n.ID)
// continue
// }
// logrus.Infof("new build-in job to run ,key is %s,local ip is %s", ev.Kv.Key, n.ID)
// job := &job.Job{}
// k := string(ev.Kv.Key)
// paths := strings.Split(k, "/")
// ps := strings.Split(paths[len(paths)-1], "-")
// buildInJobId := ps[0]
// jobResp, err := store.DefalutClient.Get(conf.Config.BuildIn + buildInJobId)
// if err != nil {
// logrus.Warnf("get build-in job failed")
// }
// json.Unmarshal(jobResp.Kvs[0].Value, job)
// job.Init(n.ID)
// //job.Check()
// err = job.ResolveShell()
// if err != nil {
// logrus.Infof("resolve shell to runnable failed , details %s", err.Error())
// }
// n.addJob(job)
// //logrus.Infof("is ok? %v and is job runing on %v",ok,job.IsRunOn(n.ID, n.groups))
// ////if !ok || !job.IsRunOn(n.ID, n.groups) {
// //// continue
// ////}
// for _, v := range job.Rules {
// for _, v2 := range v.NodeIDs {
// if v2 == n.ID {
// logrus.Infof("prepare run new build-in job")
// go job.RunBuildInWithRecovery(n.ID)
// go n.watchBuildIn()
// return
// }
// }
// }
// }
// }
// }
}
//Stop 停止服务
func (n *NodeServer) Stop(i interface{}) {
@ -367,7 +308,7 @@ func NewNodeServer(cfg *conf.Conf) (*NodeServer, error) {
Cron: cron.New(),
jobs: make(Jobs, 8),
onceJobs: make(Jobs, 8),
cmds: make(map[string]*job.Cmd),
cmds: make(map[string]*corejob.Cmd),
delIDs: make(map[string]bool, 8),
Conf: cfg,
ttl: cfg.TTL,