2017-11-13 21:54:11 +08:00
|
|
|
|
// RAINBOND, Application Management Platform
|
|
|
|
|
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
|
|
|
|
|
|
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
|
|
|
// it under the terms of the GNU General Public License as published by
|
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
|
// (at your option) any later version. For any non-GPL usage of Rainbond,
|
|
|
|
|
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
|
|
|
|
|
// must be obtained first.
|
|
|
|
|
|
|
|
|
|
// This program is distributed in the hope that it will be useful,
|
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
|
// GNU General Public License for more details.
|
|
|
|
|
|
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
|
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
|
|
package masterserver
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2017-11-16 11:31:09 +08:00
|
|
|
|
"fmt"
|
2017-11-13 21:54:11 +08:00
|
|
|
|
"io/ioutil"
|
|
|
|
|
"os"
|
|
|
|
|
"path"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
2017-11-14 16:21:59 +08:00
|
|
|
|
"time"
|
2017-11-13 21:54:11 +08:00
|
|
|
|
|
2017-11-14 16:21:59 +08:00
|
|
|
|
"github.com/goodrain/rainbond/pkg/node/core/config"
|
2017-11-13 21:54:11 +08:00
|
|
|
|
"github.com/goodrain/rainbond/pkg/node/core/store"
|
|
|
|
|
|
|
|
|
|
"github.com/pquerna/ffjson/ffjson"
|
|
|
|
|
|
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
|
|
|
|
|
|
|
|
client "github.com/coreos/etcd/clientv3"
|
2017-11-16 11:31:09 +08:00
|
|
|
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
2017-11-13 21:54:11 +08:00
|
|
|
|
"github.com/goodrain/rainbond/cmd/node/option"
|
|
|
|
|
"github.com/goodrain/rainbond/pkg/node/api/model"
|
|
|
|
|
"github.com/goodrain/rainbond/pkg/node/core/job"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
//TaskEngine 任务引擎
|
2017-11-14 16:21:59 +08:00
|
|
|
|
// 处理任务的执行,结果处理,任务自动调度
|
|
|
|
|
// TODO:执行记录清理工作
|
2017-11-13 21:54:11 +08:00
|
|
|
|
type TaskEngine struct {
|
2017-11-14 16:21:59 +08:00
|
|
|
|
ctx context.Context
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
config *option.Conf
|
2017-11-16 11:31:09 +08:00
|
|
|
|
tasks map[string]*model.Task
|
|
|
|
|
tasksLock sync.Mutex
|
2017-11-14 16:21:59 +08:00
|
|
|
|
dataCenterConfig *config.DataCenterConfig
|
|
|
|
|
nodeCluster *NodeCluster
|
2017-11-17 18:19:54 +08:00
|
|
|
|
currentNode *model.HostNode
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//TaskSchedulerInfo 请求调度信息
|
|
|
|
|
//指定任务到指定节点执行
|
|
|
|
|
//执行完成后该数据从集群中删除
|
|
|
|
|
//存储key: taskid+nodeid
|
|
|
|
|
type TaskSchedulerInfo struct {
|
|
|
|
|
TaskID string `json:"taskID"`
|
|
|
|
|
Node string `json:"node"`
|
|
|
|
|
JobID string `json:"jobID"`
|
|
|
|
|
CreateTime time.Time `json:"create_time"`
|
|
|
|
|
SchedulerMasterNode string `json:"create_master_node"`
|
|
|
|
|
Status model.SchedulerStatus
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//NewTaskSchedulerInfo 创建请求调度信息
|
|
|
|
|
func NewTaskSchedulerInfo(taskID, nodeID string) *TaskSchedulerInfo {
|
|
|
|
|
return &TaskSchedulerInfo{
|
|
|
|
|
TaskID: taskID,
|
|
|
|
|
Node: nodeID,
|
|
|
|
|
CreateTime: time.Now(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
func getTaskSchedulerInfoFromKV(kv *mvccpb.KeyValue) *TaskSchedulerInfo {
|
|
|
|
|
var taskinfo TaskSchedulerInfo
|
|
|
|
|
if err := ffjson.Unmarshal(kv.Value, &taskinfo); err != nil {
|
|
|
|
|
logrus.Error("parse task scheduler info error:", err.Error())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return &taskinfo
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//Post 发布
|
|
|
|
|
func (t TaskSchedulerInfo) Post() {
|
|
|
|
|
body, err := ffjson.Marshal(t)
|
|
|
|
|
if err == nil {
|
|
|
|
|
store.DefalutClient.Post("/rainbond-node/scheduler/taskshcedulers/"+t.TaskID+"/"+t.Node, string(body))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//Update 更新数据
|
|
|
|
|
func (t TaskSchedulerInfo) Update() {
|
|
|
|
|
body, err := ffjson.Marshal(t)
|
|
|
|
|
if err == nil {
|
|
|
|
|
store.DefalutClient.Put("/rainbond-node/scheduler/taskshcedulers/"+t.TaskID+"/"+t.Node, string(body))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//Delete 删除数据
|
|
|
|
|
func (t TaskSchedulerInfo) Delete() {
|
|
|
|
|
store.DefalutClient.Delete("/rainbond-node/scheduler/taskshcedulers/" + t.TaskID + "/" + t.Node)
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//CreateTaskEngine 创建task管理引擎
|
2017-11-17 18:19:54 +08:00
|
|
|
|
func CreateTaskEngine(nodeCluster *NodeCluster, node *model.HostNode) *TaskEngine {
|
2017-11-13 21:54:11 +08:00
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
2017-11-16 11:31:09 +08:00
|
|
|
|
task := &TaskEngine{
|
2017-11-14 16:21:59 +08:00
|
|
|
|
ctx: ctx,
|
|
|
|
|
cancel: cancel,
|
2017-11-16 11:31:09 +08:00
|
|
|
|
tasks: make(map[string]*model.Task),
|
2017-11-14 16:21:59 +08:00
|
|
|
|
config: option.Config,
|
|
|
|
|
dataCenterConfig: config.GetDataCenterConfig(),
|
|
|
|
|
nodeCluster: nodeCluster,
|
2017-11-17 18:19:54 +08:00
|
|
|
|
currentNode: node,
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
return task
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//Start 启动
|
|
|
|
|
func (t *TaskEngine) Start() {
|
|
|
|
|
logrus.Info("task engine start")
|
2017-11-17 18:19:54 +08:00
|
|
|
|
t.loadTask()
|
2017-11-16 11:31:09 +08:00
|
|
|
|
go t.watchTasks()
|
2017-11-17 18:19:54 +08:00
|
|
|
|
go t.HandleJobRecord()
|
|
|
|
|
go t.watcheScheduler()
|
|
|
|
|
t.LoadStaticTask()
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//Stop 启动
|
|
|
|
|
func (t *TaskEngine) Stop() {
|
|
|
|
|
t.cancel()
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-16 11:31:09 +08:00
|
|
|
|
//loadTask 加载所有task
|
|
|
|
|
func (t *TaskEngine) loadTask() error {
|
|
|
|
|
//加载节点信息
|
|
|
|
|
res, err := store.DefalutClient.Get("/store/tasks/", client.WithPrefix())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("load tasks error:%s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
for _, kv := range res.Kvs {
|
|
|
|
|
if task := t.getTaskFromKV(kv); task != nil {
|
|
|
|
|
t.CacheTask(task)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//watchTasks watchTasks
|
|
|
|
|
func (t *TaskEngine) watchTasks() {
|
|
|
|
|
ch := store.DefalutClient.Watch("/store/tasks/", client.WithPrefix())
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-t.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case event := <-ch:
|
|
|
|
|
for _, ev := range event.Events {
|
|
|
|
|
switch {
|
|
|
|
|
case ev.IsCreate(), ev.IsModify():
|
|
|
|
|
if task := t.getTaskFromKV(ev.Kv); task != nil {
|
|
|
|
|
t.CacheTask(task)
|
|
|
|
|
}
|
|
|
|
|
case ev.Type == client.EventTypeDelete:
|
|
|
|
|
if task := t.getTaskFromKey(string(ev.Kv.Key)); task != nil {
|
|
|
|
|
t.RemoveTask(task)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
func (t *TaskEngine) watcheScheduler() {
|
|
|
|
|
load, _ := store.DefalutClient.Get("/rainbond-node/scheduler/taskshcedulers/", client.WithPrefix())
|
|
|
|
|
if load != nil && load.Count > 0 {
|
|
|
|
|
for _, kv := range load.Kvs {
|
|
|
|
|
logrus.Debugf("watch a scheduler task %s", kv.Key)
|
|
|
|
|
if taskinfo := getTaskSchedulerInfoFromKV(kv); taskinfo != nil {
|
|
|
|
|
t.prepareScheduleTask(taskinfo)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ch := store.DefalutClient.Watch("/rainbond-node/scheduler/taskshcedulers/", client.WithPrefix())
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-t.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case event := <-ch:
|
|
|
|
|
for _, ev := range event.Events {
|
|
|
|
|
switch {
|
|
|
|
|
case ev.IsCreate(), ev.IsModify():
|
|
|
|
|
logrus.Debugf("watch a scheduler task %s", ev.Kv.Key)
|
|
|
|
|
if taskinfo := getTaskSchedulerInfoFromKV(ev.Kv); taskinfo != nil {
|
|
|
|
|
t.prepareScheduleTask(taskinfo)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//PutSchedul 发布请求调度信息
|
|
|
|
|
//同样请求将被拒绝,在上一次请求完成之前
|
|
|
|
|
func (t *TaskEngine) PutSchedul(taskID string, node string) error {
|
|
|
|
|
if node == "" {
|
|
|
|
|
//执行节点确定
|
|
|
|
|
task := t.GetTask(taskID)
|
|
|
|
|
if task == nil {
|
|
|
|
|
return fmt.Errorf("task (%s) can not be found", taskID)
|
|
|
|
|
}
|
|
|
|
|
if task.Temp == nil {
|
|
|
|
|
return fmt.Errorf("task (%s) temp can not be found", taskID)
|
|
|
|
|
}
|
|
|
|
|
//task定义了执行节点
|
|
|
|
|
if task.Nodes != nil && len(task.Nodes) > 0 {
|
|
|
|
|
for _, node := range task.Nodes {
|
|
|
|
|
if n := t.nodeCluster.GetNode(node); n != nil {
|
|
|
|
|
info := NewTaskSchedulerInfo(taskID, node)
|
|
|
|
|
info.Post()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
} else { //从lables决定执行节点
|
|
|
|
|
nodes := t.nodeCluster.GetLabelsNode(task.Temp.Labels)
|
|
|
|
|
for _, node := range nodes {
|
|
|
|
|
if n := t.nodeCluster.GetNode(node); n != nil {
|
|
|
|
|
info := NewTaskSchedulerInfo(taskID, node)
|
|
|
|
|
info.Post()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if n := t.nodeCluster.GetNode(node); n != nil {
|
|
|
|
|
info := NewTaskSchedulerInfo(taskID, node)
|
|
|
|
|
info.Post()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-16 11:31:09 +08:00
|
|
|
|
func (t *TaskEngine) getTaskFromKey(key string) *model.Task {
|
|
|
|
|
index := strings.LastIndex(key, "/")
|
|
|
|
|
if index < 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
id := key[index+1:]
|
|
|
|
|
return t.GetTask(id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//RemoveTask 从缓存移除task
|
|
|
|
|
func (t *TaskEngine) RemoveTask(task *model.Task) {
|
|
|
|
|
t.tasksLock.Lock()
|
|
|
|
|
defer t.tasksLock.Unlock()
|
|
|
|
|
if _, ok := t.tasks[task.ID]; ok {
|
|
|
|
|
delete(t.tasks, task.ID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *TaskEngine) getTaskFromKV(kv *mvccpb.KeyValue) *model.Task {
|
|
|
|
|
var task model.Task
|
|
|
|
|
if err := ffjson.Unmarshal(kv.Value, &task); err != nil {
|
|
|
|
|
logrus.Error("parse task info error:", err.Error())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return &task
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-13 21:54:11 +08:00
|
|
|
|
//LoadStaticTask 从文件加载task
|
|
|
|
|
//TODO:动态加载
|
|
|
|
|
func (t *TaskEngine) LoadStaticTask() {
|
|
|
|
|
logrus.Infof("start load static task form path:%s", t.config.StaticTaskPath)
|
|
|
|
|
file, err := os.Stat(t.config.StaticTaskPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("load static task error %s", err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if file.IsDir() {
|
|
|
|
|
files, err := ioutil.ReadDir(t.config.StaticTaskPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("load static task error %s", err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for _, file := range files {
|
|
|
|
|
if file.IsDir() {
|
|
|
|
|
continue
|
|
|
|
|
} else if strings.HasSuffix(file.Name(), ".json") {
|
|
|
|
|
t.loadFile(path.Join(t.config.StaticTaskPath, file.Name()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
t.loadFile(t.config.StaticTaskPath)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
func (t *TaskEngine) loadFile(path string) {
|
|
|
|
|
taskBody, err := ioutil.ReadFile(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("read static task file %s error.%s", path, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
var filename string
|
|
|
|
|
index := strings.LastIndex(path, "/")
|
|
|
|
|
if index < 0 {
|
|
|
|
|
filename = path
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
filename = path[index+1:]
|
|
|
|
|
if strings.Contains(filename, "group") {
|
|
|
|
|
var group model.TaskGroup
|
|
|
|
|
if err := ffjson.Unmarshal(taskBody, &group); err != nil {
|
|
|
|
|
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if group.ID == "" {
|
|
|
|
|
group.ID = group.Name
|
|
|
|
|
}
|
|
|
|
|
if group.Name == "" {
|
|
|
|
|
logrus.Errorf("task group name can not be empty. file %s", path)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if group.Tasks == nil {
|
|
|
|
|
logrus.Errorf("task group tasks can not be empty. file %s", path)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
t.ScheduleGroup(nil, &group)
|
|
|
|
|
logrus.Infof("Load a static group %s.", group.Name)
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if strings.Contains(filename, "task") {
|
|
|
|
|
var task model.Task
|
|
|
|
|
if err := ffjson.Unmarshal(taskBody, &task); err != nil {
|
|
|
|
|
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if task.ID == "" {
|
|
|
|
|
task.ID = task.Name
|
|
|
|
|
}
|
|
|
|
|
if task.Name == "" {
|
|
|
|
|
logrus.Errorf("task name can not be empty. file %s", path)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if task.Temp == nil {
|
|
|
|
|
logrus.Errorf("task [%s] temp can not be empty.", task.Name)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if task.Temp.ID == "" {
|
|
|
|
|
task.Temp.ID = task.Temp.Name
|
|
|
|
|
}
|
|
|
|
|
t.AddTask(&task)
|
|
|
|
|
logrus.Infof("Load a static task %s.", task.Name)
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//GetTask gettask
|
|
|
|
|
func (t *TaskEngine) GetTask(taskID string) *model.Task {
|
2017-11-16 11:31:09 +08:00
|
|
|
|
t.tasksLock.Lock()
|
|
|
|
|
defer t.tasksLock.Unlock()
|
|
|
|
|
if task, ok := t.tasks[taskID]; ok {
|
|
|
|
|
return task
|
|
|
|
|
}
|
2017-11-13 21:54:11 +08:00
|
|
|
|
res, err := store.DefalutClient.Get("/store/tasks/" + taskID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if res.Count < 1 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var task model.Task
|
|
|
|
|
if err := ffjson.Unmarshal(res.Kvs[0].Value, &task); err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return &task
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-14 16:21:59 +08:00
|
|
|
|
//StopTask 停止任务,即删除任务对应的JOB
|
2017-11-17 18:19:54 +08:00
|
|
|
|
func (t *TaskEngine) StopTask(task *model.Task, node string) {
|
|
|
|
|
if status, ok := task.Status[node]; ok {
|
|
|
|
|
if status.JobID != "" {
|
|
|
|
|
if task.IsOnce {
|
|
|
|
|
_, err := store.DefalutClient.Delete(t.config.Once + "/" + status.JobID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("stop task %s error.%s", task.Name, err.Error())
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
_, err := store.DefalutClient.Delete(t.config.Cmd + "/" + status.JobID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("stop task %s error.%s", task.Name, err.Error())
|
|
|
|
|
}
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
_, err := store.DefalutClient.Delete(t.config.ExecutionRecordPath+"/"+status.JobID, client.WithPrefix())
|
2017-11-14 16:21:59 +08:00
|
|
|
|
if err != nil {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
logrus.Errorf("delete execution record for task %s error.%s", task.Name, err.Error())
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
store.DefalutClient.Delete("/rainbond-node/scheduler/taskshcedulers/" + task.ID + "/" + node)
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
|
|
|
|
|
2017-11-16 11:31:09 +08:00
|
|
|
|
//CacheTask 缓存task
|
|
|
|
|
func (t *TaskEngine) CacheTask(task *model.Task) {
|
|
|
|
|
t.tasksLock.Lock()
|
|
|
|
|
defer t.tasksLock.Unlock()
|
|
|
|
|
t.tasks[task.ID] = task
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-14 16:21:59 +08:00
|
|
|
|
//AddTask 添加task
|
|
|
|
|
func (t *TaskEngine) AddTask(task *model.Task) error {
|
|
|
|
|
oldTask := t.GetTask(task.ID)
|
|
|
|
|
if oldTask != nil {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
task.Status = oldTask.Status
|
|
|
|
|
task.OutPut = oldTask.OutPut
|
|
|
|
|
task.EventID = oldTask.EventID
|
|
|
|
|
task.CreateTime = oldTask.CreateTime
|
|
|
|
|
task.ExecCount = oldTask.ExecCount
|
|
|
|
|
task.StartTime = oldTask.StartTime
|
|
|
|
|
if task.Scheduler.Status != nil {
|
|
|
|
|
task.Scheduler.Status = oldTask.Scheduler.Status
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if task.Temp == nil {
|
|
|
|
|
return fmt.Errorf("task temp can not be nil")
|
|
|
|
|
}
|
2017-11-14 18:57:56 +08:00
|
|
|
|
if task.EventID == "" {
|
|
|
|
|
task.EventID = task.ID
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if len(task.Nodes) == 0 {
|
|
|
|
|
task.Nodes = t.nodeCluster.GetLabelsNode(task.Temp.Labels)
|
|
|
|
|
}
|
|
|
|
|
if task.Status == nil {
|
|
|
|
|
task.Status = map[string]model.TaskStatus{}
|
|
|
|
|
for _, n := range task.Nodes {
|
|
|
|
|
task.Status[n] = model.TaskStatus{
|
|
|
|
|
Status: "create",
|
|
|
|
|
}
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if task.CreateTime.IsZero() {
|
|
|
|
|
task.CreateTime = time.Now()
|
|
|
|
|
}
|
2017-11-14 16:21:59 +08:00
|
|
|
|
if task.Scheduler.Mode == "" {
|
|
|
|
|
task.Scheduler.Mode = "Passive"
|
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
t.CacheTask(task)
|
2017-11-14 16:21:59 +08:00
|
|
|
|
_, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if task.Scheduler.Mode == "Intime" {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
t.PutSchedul(task.ID, "")
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//UpdateTask 更新task
|
|
|
|
|
func (t *TaskEngine) UpdateTask(task *model.Task) {
|
2017-11-16 11:31:09 +08:00
|
|
|
|
t.tasksLock.Lock()
|
|
|
|
|
defer t.tasksLock.Unlock()
|
|
|
|
|
t.tasks[task.ID] = task
|
2017-11-14 16:21:59 +08:00
|
|
|
|
_, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String())
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("update task error,%s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-16 11:31:09 +08:00
|
|
|
|
//UpdateGroup 更新taskgroup
|
|
|
|
|
func (t *TaskEngine) UpdateGroup(group *model.TaskGroup) {
|
|
|
|
|
_, err := store.DefalutClient.Put("/store/taskgroups/"+group.ID, group.String())
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("update taskgroup error,%s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-14 16:21:59 +08:00
|
|
|
|
//GetTaskGroup 获取taskgroup
|
|
|
|
|
func (t *TaskEngine) GetTaskGroup(taskGroupID string) *model.TaskGroup {
|
|
|
|
|
res, err := store.DefalutClient.Get("/store/taskgroups/" + taskGroupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if res.Count < 1 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var group model.TaskGroup
|
|
|
|
|
if err := ffjson.Unmarshal(res.Kvs[0].Value, &group); err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return &group
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-13 21:54:11 +08:00
|
|
|
|
//handleJobRecord 处理
|
|
|
|
|
func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
|
|
|
|
|
task := t.GetTask(er.TaskID)
|
|
|
|
|
if task == nil {
|
2017-11-14 16:21:59 +08:00
|
|
|
|
return
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
//更新task信息
|
|
|
|
|
defer t.UpdateTask(task)
|
2017-11-14 18:57:56 +08:00
|
|
|
|
taskStatus := model.TaskStatus{
|
2017-11-17 18:19:54 +08:00
|
|
|
|
JobID: er.JobID,
|
2017-11-14 18:57:56 +08:00
|
|
|
|
StartTime: er.BeginTime,
|
|
|
|
|
EndTime: er.EndTime,
|
2017-11-16 18:55:48 +08:00
|
|
|
|
CompleStatus: "Failure",
|
2017-11-14 18:57:56 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if status, ok := task.Status[er.Node]; ok {
|
|
|
|
|
taskStatus = status
|
|
|
|
|
taskStatus.EndTime = time.Now()
|
|
|
|
|
taskStatus.Status = "complete"
|
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if er.Output != "" {
|
2017-11-16 16:32:27 +08:00
|
|
|
|
index := strings.Index(er.Output, "{")
|
|
|
|
|
jsonOutPut := er.Output
|
|
|
|
|
if index > -1 {
|
|
|
|
|
jsonOutPut = er.Output[index:]
|
|
|
|
|
}
|
|
|
|
|
output, err := model.ParseTaskOutPut(jsonOutPut)
|
2017-11-17 18:19:54 +08:00
|
|
|
|
output.JobID = er.JobID
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
taskStatus.Status = "Parse task output error"
|
2017-11-20 15:32:57 +08:00
|
|
|
|
taskStatus.CompleStatus = "Unknow"
|
2017-11-16 11:31:09 +08:00
|
|
|
|
logrus.Warning("parse task output error:", err.Error())
|
|
|
|
|
output.NodeID = er.Node
|
|
|
|
|
} else {
|
|
|
|
|
output.NodeID = er.Node
|
|
|
|
|
if output.Global != nil && len(output.Global) > 0 {
|
|
|
|
|
for k, v := range output.Global {
|
|
|
|
|
err := t.dataCenterConfig.PutConfig(&model.ConfigUnit{
|
|
|
|
|
Name: strings.ToUpper(k),
|
|
|
|
|
Value: v,
|
|
|
|
|
ValueType: "string",
|
|
|
|
|
IsConfigurable: false,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Errorf("save datacenter config %s=%s error.%s", k, v, err.Error())
|
|
|
|
|
}
|
2017-11-14 18:57:56 +08:00
|
|
|
|
}
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
//groupID不为空,处理group连环操作
|
|
|
|
|
if output.Inner != nil && len(output.Inner) > 0 && task.GroupID != "" {
|
|
|
|
|
t.AddGroupConfig(task.GroupID, output.Inner)
|
|
|
|
|
}
|
|
|
|
|
for _, status := range output.Status {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if status.ConditionType != "" && status.ConditionStatus != "" {
|
|
|
|
|
t.nodeCluster.UpdateNodeCondition(er.Node, status.ConditionType, status.ConditionStatus)
|
|
|
|
|
}
|
|
|
|
|
if status.NextTask != nil && len(status.NextTask) > 0 {
|
|
|
|
|
for _, taskID := range status.NextTask {
|
|
|
|
|
//由哪个节点发起的执行请求,当前task只在此节点执行
|
|
|
|
|
t.PutSchedul(taskID, output.NodeID)
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
}
|
|
|
|
|
if status.NextGroups != nil && len(status.NextGroups) > 0 {
|
|
|
|
|
for _, groupID := range status.NextGroups {
|
|
|
|
|
group := t.GetTaskGroup(groupID)
|
|
|
|
|
if group == nil {
|
|
|
|
|
continue
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
t.ScheduleGroup([]string{output.NodeID}, group)
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
2017-11-14 18:57:56 +08:00
|
|
|
|
}
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-16 18:55:48 +08:00
|
|
|
|
taskStatus.CompleStatus = output.ExecStatus
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
task.UpdataOutPut(output)
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
task.ExecCount++
|
2017-11-14 18:57:56 +08:00
|
|
|
|
task.Status[er.Node] = taskStatus
|
2017-11-16 11:31:09 +08:00
|
|
|
|
//如果是is_once的任务,处理完成后删除job
|
|
|
|
|
if task.IsOnce {
|
|
|
|
|
task.CompleteTime = time.Now()
|
2017-11-17 18:19:54 +08:00
|
|
|
|
t.StopTask(task, er.Node)
|
2017-11-16 11:31:09 +08:00
|
|
|
|
} else { //如果是一次性任务,执行记录已经被删除,无需更新
|
|
|
|
|
er.CompleteHandle()
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
|
|
|
|
|
//waitScheduleTask 等待调度条件成熟
|
|
|
|
|
func (t *TaskEngine) waitScheduleTask(taskSchedulerInfo *TaskSchedulerInfo, task *model.Task) {
|
|
|
|
|
//continueScheduler 是否继续调度,如果调度条件无法满足,停止调度
|
|
|
|
|
var continueScheduler = true
|
2017-11-16 11:31:09 +08:00
|
|
|
|
canRun := func() bool {
|
|
|
|
|
defer t.UpdateTask(task)
|
|
|
|
|
if task.Temp.Depends != nil && len(task.Temp.Depends) > 0 {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
var result = make([]bool, len(task.Temp.Depends))
|
|
|
|
|
for i, dep := range task.Temp.Depends {
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if depTask := t.GetTask(dep.DependTaskID); depTask != nil {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//判断依赖任务调度情况
|
|
|
|
|
if depTask.Scheduler.Mode == "Passive" {
|
|
|
|
|
var needScheduler bool
|
|
|
|
|
if depTask.Scheduler.Status == nil {
|
|
|
|
|
needScheduler = true
|
|
|
|
|
}
|
|
|
|
|
//当前节点未调度且依赖策略为当前节点必须执行,则调度
|
|
|
|
|
if _, ok := depTask.Scheduler.Status[taskSchedulerInfo.Node]; !ok && dep.DetermineStrategy == model.SameNodeStrategy {
|
|
|
|
|
needScheduler = true
|
|
|
|
|
}
|
|
|
|
|
if needScheduler {
|
|
|
|
|
//依赖任务i未就绪
|
|
|
|
|
result[i] = false
|
|
|
|
|
//发出依赖任务的调度请求
|
|
|
|
|
if dep.DetermineStrategy == model.SameNodeStrategy {
|
|
|
|
|
t.PutSchedul(depTask.ID, taskSchedulerInfo.Node)
|
|
|
|
|
} else if dep.DetermineStrategy == model.AtLeastOnceStrategy {
|
|
|
|
|
nodes := t.nodeCluster.GetLabelsNode(depTask.Temp.Labels)
|
|
|
|
|
if len(nodes) > 0 {
|
|
|
|
|
t.PutSchedul(depTask.ID, nodes[0])
|
|
|
|
|
} else {
|
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s can not found exec node", depTask.ID)
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Failure"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
continueScheduler = false
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Waiting"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
continue
|
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//判断依赖任务的执行情况
|
|
|
|
|
//依赖策略为任务全局只要执行一次
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if dep.DetermineStrategy == model.AtLeastOnceStrategy {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if depTask.Status == nil || len(depTask.Status) < 1 {
|
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Waiting"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
var access bool
|
|
|
|
|
var faiiureSize int
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if len(depTask.Status) > 0 {
|
2017-11-16 17:46:57 +08:00
|
|
|
|
for _, status := range depTask.Status {
|
|
|
|
|
if status.CompleStatus == "Success" {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
logrus.Debugf("dep task %s ready", depTask.ID)
|
|
|
|
|
access = true
|
|
|
|
|
} else {
|
|
|
|
|
faiiureSize++
|
2017-11-16 17:46:57 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//如果依赖的某个服务全部执行记录失败,条件不可能满足,本次调度结束
|
|
|
|
|
if faiiureSize != 0 && faiiureSize >= len(depTask.Scheduler.Status) {
|
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s Condition cannot be satisfied", depTask.ID)
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Failure"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
continueScheduler = false
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
result[i] = access
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//依赖任务相同节点执行成功
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if dep.DetermineStrategy == model.SameNodeStrategy {
|
|
|
|
|
if depTask.Status == nil || len(depTask.Status) < 1 {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Waiting"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
2017-11-16 11:31:09 +08:00
|
|
|
|
return false
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if nodestatus, ok := depTask.Status[taskSchedulerInfo.Node]; ok && nodestatus.CompleStatus == "Success" {
|
|
|
|
|
result[i] = true
|
|
|
|
|
continue
|
2017-11-20 15:24:24 +08:00
|
|
|
|
} else if ok && nodestatus.CompleStatus != "" {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s Condition cannot be satisfied", depTask.ID)
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Failure"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
continueScheduler = false
|
|
|
|
|
return false
|
2017-11-16 11:31:09 +08:00
|
|
|
|
} else {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Waiting"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
return false
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2017-11-21 15:06:03 +08:00
|
|
|
|
taskSchedulerInfo.Status.Message = fmt.Sprintf("depend task %s is not found", dep.DependTaskID)
|
2017-11-17 18:19:54 +08:00
|
|
|
|
taskSchedulerInfo.Status.Status = "Failure"
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
result[i] = false
|
|
|
|
|
continueScheduler = false
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for _, ok := range result {
|
|
|
|
|
if !ok {
|
2017-11-16 11:31:09 +08:00
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
for continueScheduler {
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if canRun() {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
t.scheduler(taskSchedulerInfo, task)
|
2017-11-16 11:31:09 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
logrus.Infof("task %s can not be run .waiting depend tasks complete", task.Name)
|
2017-11-16 11:31:09 +08:00
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//调度失败,删除任务
|
|
|
|
|
taskSchedulerInfo.Delete()
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-13 21:54:11 +08:00
|
|
|
|
|
2017-11-14 16:21:59 +08:00
|
|
|
|
//ScheduleTask 调度执行指定task
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//单节点或不确定节点
|
|
|
|
|
func (t *TaskEngine) prepareScheduleTask(taskSchedulerInfo *TaskSchedulerInfo) {
|
|
|
|
|
if task := t.GetTask(taskSchedulerInfo.TaskID); task != nil {
|
2017-11-14 16:21:59 +08:00
|
|
|
|
if task == nil {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
return
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//已经调度且没有完成
|
|
|
|
|
if status, ok := task.Status[taskSchedulerInfo.Node]; ok && status.Status == "start" {
|
|
|
|
|
logrus.Warningf("prepare scheduler task(%s) error,it already scheduler", taskSchedulerInfo.TaskID)
|
|
|
|
|
return
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-16 11:31:09 +08:00
|
|
|
|
if task.Temp == nil {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
logrus.Warningf("prepare scheduler task(%s) temp can not be nil", taskSchedulerInfo.TaskID)
|
|
|
|
|
return
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
2017-11-17 18:19:54 +08:00
|
|
|
|
if task.Scheduler.Status == nil {
|
|
|
|
|
task.Scheduler.Status = make(map[string]model.SchedulerStatus)
|
2017-11-16 11:31:09 +08:00
|
|
|
|
}
|
|
|
|
|
if task.Temp.Depends != nil && len(task.Temp.Depends) > 0 {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
go t.waitScheduleTask(taskSchedulerInfo, task)
|
2017-11-14 16:21:59 +08:00
|
|
|
|
} else {
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//真正调度
|
|
|
|
|
t.scheduler(taskSchedulerInfo, task)
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
|
|
|
|
t.UpdateTask(task)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-17 18:19:54 +08:00
|
|
|
|
//scheduler 调度一个Task到一个节点执行
|
|
|
|
|
func (t *TaskEngine) scheduler(taskSchedulerInfo *TaskSchedulerInfo, task *model.Task) {
|
|
|
|
|
j, err := job.CreateJobFromTask(task, nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Failure"
|
|
|
|
|
taskSchedulerInfo.Status.Message = err.Error()
|
|
|
|
|
//更新调度状态
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
t.UpdateTask(task)
|
|
|
|
|
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
//如果指定nodes
|
|
|
|
|
if taskSchedulerInfo.Node != "" {
|
|
|
|
|
for _, rule := range j.Rules {
|
|
|
|
|
rule.NodeIDs = []string{taskSchedulerInfo.Node}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if j.IsOnce {
|
|
|
|
|
if err := job.PutOnce(j); err != nil {
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Failure"
|
|
|
|
|
taskSchedulerInfo.Status.Message = err.Error()
|
|
|
|
|
//更新调度状态
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
t.UpdateTask(task)
|
|
|
|
|
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if err := job.AddJob(j); err != nil {
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Failure"
|
|
|
|
|
taskSchedulerInfo.Status.Message = err.Error()
|
|
|
|
|
//更新调度状态
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
t.UpdateTask(task)
|
|
|
|
|
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
task.StartTime = time.Now()
|
|
|
|
|
taskSchedulerInfo.Status.Status = "Success"
|
|
|
|
|
taskSchedulerInfo.Status.Message = "scheduler success"
|
|
|
|
|
taskSchedulerInfo.Status.SchedulerTime = time.Now()
|
|
|
|
|
taskSchedulerInfo.Status.SchedulerMaster = t.currentNode.ID
|
|
|
|
|
//更新调度状态
|
|
|
|
|
task.Scheduler.Status[taskSchedulerInfo.Node] = taskSchedulerInfo.Status
|
|
|
|
|
if task.Status == nil {
|
|
|
|
|
task.Status = make(map[string]model.TaskStatus)
|
|
|
|
|
}
|
|
|
|
|
task.Status[t.currentNode.ID] = model.TaskStatus{
|
|
|
|
|
JobID: j.ID,
|
|
|
|
|
StartTime: time.Now(),
|
|
|
|
|
Status: "Start",
|
|
|
|
|
}
|
|
|
|
|
t.UpdateTask(task)
|
|
|
|
|
logrus.Infof("success scheduler a task %s to node %s", task.Name, taskSchedulerInfo.Node)
|
|
|
|
|
}
|
|
|
|
|
|
2017-11-14 16:21:59 +08:00
|
|
|
|
//ScheduleGroup 调度执行指定task
|
2017-11-16 11:31:09 +08:00
|
|
|
|
func (t *TaskEngine) ScheduleGroup(nodes []string, nextGroups ...*model.TaskGroup) {
|
|
|
|
|
for _, group := range nextGroups {
|
|
|
|
|
if group.Tasks == nil || len(group.Tasks) < 1 {
|
|
|
|
|
group.Status = &model.TaskGroupStatus{
|
|
|
|
|
StartTime: time.Now(),
|
|
|
|
|
EndTime: time.Now(),
|
|
|
|
|
Status: "NotDefineTask",
|
|
|
|
|
}
|
|
|
|
|
t.UpdateGroup(group)
|
|
|
|
|
}
|
|
|
|
|
for _, task := range group.Tasks {
|
|
|
|
|
task.GroupID = group.ID
|
|
|
|
|
t.AddTask(task)
|
|
|
|
|
}
|
|
|
|
|
group.Status = &model.TaskGroupStatus{
|
|
|
|
|
StartTime: time.Now(),
|
|
|
|
|
Status: "Start",
|
|
|
|
|
}
|
|
|
|
|
t.UpdateGroup(group)
|
2017-11-14 16:21:59 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//AddGroupConfig 添加组会话配置
|
|
|
|
|
func (t *TaskEngine) AddGroupConfig(groupID string, configs map[string]string) {
|
2017-11-16 11:31:09 +08:00
|
|
|
|
ctx := config.NewGroupContext(groupID)
|
|
|
|
|
for k, v := range configs {
|
|
|
|
|
ctx.Add(k, v)
|
2017-11-13 21:54:11 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//HandleJobRecord 处理task执行记录
|
|
|
|
|
func (t *TaskEngine) HandleJobRecord() {
|
|
|
|
|
jobRecord := t.loadJobRecord()
|
|
|
|
|
if jobRecord != nil {
|
|
|
|
|
for _, er := range jobRecord {
|
|
|
|
|
if !er.IsHandle {
|
|
|
|
|
t.handleJobRecord(er)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ch := store.DefalutClient.Watch(t.config.ExecutionRecordPath, client.WithPrefix())
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-t.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case event := <-ch:
|
|
|
|
|
for _, ev := range event.Events {
|
|
|
|
|
switch {
|
|
|
|
|
case ev.IsCreate():
|
|
|
|
|
var er job.ExecutionRecord
|
|
|
|
|
if err := ffjson.Unmarshal(ev.Kv.Value, &er); err == nil {
|
|
|
|
|
if !er.IsHandle {
|
|
|
|
|
t.handleJobRecord(&er)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
func (t *TaskEngine) loadJobRecord() (ers []*job.ExecutionRecord) {
|
|
|
|
|
res, err := store.DefalutClient.Get(t.config.ExecutionRecordPath, client.WithPrefix())
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Error("load job execution record error.", err.Error())
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
for _, re := range res.Kvs {
|
|
|
|
|
var er job.ExecutionRecord
|
|
|
|
|
if err := ffjson.Unmarshal(re.Value, &er); err == nil {
|
|
|
|
|
ers = append(ers, &er)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|