Merge branch 'master' of code.goodrain.com:goodrain/rainbond

This commit is contained in:
ysicing 2017-11-16 13:38:23 +08:00
commit fdbe94a7ec
14 changed files with 576 additions and 161 deletions

View File

@ -116,7 +116,7 @@ func (h *HostNode) UpdataCondition(conditions ...NodeCondition) {
h.Conditions[i] = newcon
update = true
}
if con.Type == NodeReady {
if con.Type.Compare(NodeReady) {
con.Status = ready
con.LastTransitionTime = time.Now()
con.LastHeartbeatTime = time.Now()

View File

@ -34,18 +34,30 @@ type TaskTemp struct {
Name string `json:"name" validate:"name|required"`
ID string `json:"id" validate:"id|uuid"`
Shell Shell `json:"shell"`
Envs map[string]string `json:"envs"`
Input string `json:"input"`
Args []string `json:"args"`
Depends []string `json:"depends"`
Timeout int `json:"timeout|required|numeric"`
Envs map[string]string `json:"envs,omitempty"`
Input string `json:"input,omitempty"`
Args []string `json:"args,omitempty"`
Depends []DependStrategy `json:"depends,omitempty"`
Timeout int `json:"timeout" validate:"timeout|required|numeric"`
//OutPutChan
//结果输出通道错误输出OR标准输出
OutPutChan string `json:"out_put_chan" validate:"out_put_chan|required|in:stdout,stderr"`
CreateTime time.Time `json:"create_time"`
Labels map[string]string `json:"labels"`
Labels map[string]string `json:"labels,omitempty"`
}
//DependStrategy 依赖策略
type DependStrategy struct {
DependTaskID string `json:"depend_task_id"`
DetermineStrategy string `json:"strategy"`
}
//AtLeastOnceStrategy 至少已执行一次
var AtLeastOnceStrategy = "AtLeastOnce"
//SameNodeStrategy 相同节点已执行
var SameNodeStrategy = "SameNode"
func (t TaskTemp) String() string {
res, _ := ffjson.Marshal(&t)
return string(res)
@ -88,6 +100,17 @@ func (t Task) String() string {
return string(res)
}
//UpdataOutPut 更新状态
func (t *Task) UpdataOutPut(output TaskOutPut) {
for _, oldOut := range t.OutPut {
if oldOut.NodeID == output.NodeID {
*oldOut = output
return
}
}
t.OutPut = append(t.OutPut, &output)
}
//CanBeDelete 能否被删除
func (t Task) CanBeDelete() bool {
if t.Status == nil || len(t.Status) == 0 {
@ -116,10 +139,12 @@ type TaskOutPut struct {
//返回数据类型,检测结果类(check) 执行安装类 (install) 普通类 (common)
Type string `json:"type"`
Status []TaskOutPutStatus `json:"status"`
Body string `json:"body"`
}
//ParseTaskOutPut json parse
func ParseTaskOutPut(body string) (t TaskOutPut, err error) {
t.Body = body
err = ffjson.Unmarshal([]byte(body), &t)
return
}
@ -129,8 +154,8 @@ type TaskOutPutStatus struct {
Name string `json:"name"`
ConditionType string `json:"condition_type"`
ConditionStatus string `json:"condition_status"`
NextTask []string `json:"next_tasks"`
NextGroups []string `json:"next_groups"`
NextTask []string `json:"next_tasks,omitempty"`
NextGroups []string `json:"next_groups,omitempty"`
}
//TaskStatus 任务状态

View File

@ -20,40 +20,57 @@ package config
import (
"context"
"fmt"
"regexp"
"strings"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/pkg/node/core/store"
)
//GroupContext 组任务会话
type GroupContext struct {
configs map[interface{}]interface{}
ctx context.Context
groupID string
}
//NewGroupContext 创建组配置会话
func NewGroupContext() *GroupContext {
func NewGroupContext(groupID string) *GroupContext {
return &GroupContext{
configs: make(map[interface{}]interface{}),
ctx: context.Background(),
groupID: groupID,
}
}
//Add 添加配置项
func (g *GroupContext) Add(k, v interface{}) {
g.ctx = context.WithValue(g.ctx, k, v)
g.configs[k] = v
store.DefalutClient.Put(fmt.Sprintf("%s/group/%s/%s", option.Config.ConfigStoragePath, g.groupID, k), v.(string))
}
//Get get
func (g *GroupContext) Get(k interface{}) interface{} {
return g.ctx.Value(k)
if v := g.ctx.Value(k); v != nil {
return v
}
res, _ := store.DefalutClient.Get(fmt.Sprintf("%s/group/%s/%s", option.Config.ConfigStoragePath, g.groupID, k))
if res.Count > 0 {
return string(res.Kvs[0].Value)
}
return ""
}
//GetString get
func (g *GroupContext) GetString(k interface{}) string {
return g.ctx.Value(k).(string)
if v := g.ctx.Value(k); v != nil {
return v.(string)
}
res, _ := store.DefalutClient.Get(fmt.Sprintf("%s/group/%s/%s", option.Config.ConfigStoragePath, g.groupID, k))
if res.Count > 0 {
return string(res.Kvs[0].Value)
}
return ""
}
var reg = regexp.MustCompile(`(?U)\$\{.*\}`)

View File

@ -307,14 +307,13 @@ func (j *JobRule) Valid() error {
return nil
}
if len(j.Timer) == 0 {
return utils.ErrNilRule
if len(j.Timer) > 0 {
sch, err := cron.Parse(j.Timer)
if err != nil {
return fmt.Errorf("invalid JobRule[%s], parse err: %s", j.Timer, err.Error())
}
j.Schedule = sch
}
sch, err := cron.Parse(j.Timer)
if err != nil {
return fmt.Errorf("invalid JobRule[%s], parse err: %s", j.Timer, err.Error())
}
j.Schedule = sch
return nil
}
@ -326,19 +325,23 @@ func (j *JobRule) included(node *model.HostNode) bool {
return false
}
}
//是否属于允许节点
for _, id := range j.NodeIDs {
if id == node.ID {
return true
if j.NodeIDs != nil && len(j.NodeIDs) > 0 {
//是否属于允许节点
for _, id := range j.NodeIDs {
if id == node.ID {
return true
}
}
}
//是否匹配label
for k, v := range j.Labels {
if nodev := node.Labels[k]; nodev != v {
return false
} else {
//是否匹配label
for k, v := range j.Labels {
if nodev := node.Labels[k]; nodev != v {
return false
}
}
return true
}
return true
return false
}
//GetJob get job

View File

@ -1,19 +1,18 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
@ -21,18 +20,16 @@ package k8s
import (
"github.com/goodrain/rainbond/cmd/node/option"
"k8s.io/client-go/kubernetes"
//"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd"
)
var (
K8S *K8sClient
K8S *Client
)
type K8sClient struct {
//Client k8sclient
type Client struct {
*kubernetes.Clientset
}
@ -46,7 +43,7 @@ func NewK8sClient(cfg *option.Conf) error {
if err != nil {
return err
}
client := K8sClient{
client := Client{
Clientset: cli,
}
K8S = &client

View File

@ -170,7 +170,7 @@ func (ts *TaskService) ExecTask(taskID string) *utils.APIHandleError {
// }
return utils.CreateAPIHandleError(400, fmt.Errorf("Single task exec can not have depend task"))
}
ts.ms.TaskEngine.ScheduleTask(t.ID)
ts.ms.TaskEngine.ScheduleTask(nil, t)
return nil
}
@ -351,6 +351,6 @@ func (ts *TaskGroupService) ExecTaskGroup(taskGroupID string) *utils.APIHandleEr
return err
}
//TODO:增加执行判断
ts.ms.TaskEngine.ScheduleGroup(t.ID)
ts.ms.TaskEngine.ScheduleGroup(nil, t)
return nil
}

View File

@ -108,19 +108,26 @@ func (n *NodeCluster) loadNodes() error {
}
}
//加载k8s节点信息
list, err := n.k8sClient.Core().Nodes().List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("load k8s nodes from k8s api error:%s", err.Error())
}
for _, node := range list.Items {
if cn, ok := n.nodes[node.Name]; ok {
cn.NodeStatus = &node.Status
cn.UpdataK8sCondition(node.Status.Conditions)
n.UpdateNode(cn)
} else {
logrus.Warningf("k8s node %s can not exist in rainbond cluster.", node.Name)
go func() {
for {
list, err := n.k8sClient.Core().Nodes().List(metav1.ListOptions{})
if err != nil {
logrus.Warnf("load k8s nodes from k8s api error:%s", err.Error())
time.Sleep(time.Second * 3)
continue
}
for _, node := range list.Items {
if cn, ok := n.nodes[node.Name]; ok {
cn.NodeStatus = &node.Status
cn.UpdataK8sCondition(node.Status.Conditions)
n.UpdateNode(cn)
} else {
logrus.Warningf("k8s node %s can not exist in rainbond cluster.", node.Name)
}
}
return
}
}
}()
return nil
}
@ -139,6 +146,9 @@ func (n *NodeCluster) worker() {
//UpdateNode 更新节点信息
func (n *NodeCluster) UpdateNode(node *model.HostNode) {
n.lock.Lock()
defer n.lock.Unlock()
n.nodes[node.ID] = node
n.client.Put(option.Config.NodePath+"/"+node.ID, node.String())
}
func (n *NodeCluster) getNodeFromKV(kv *mvccpb.KeyValue) *model.HostNode {
@ -160,6 +170,8 @@ func (n *NodeCluster) getNodeFromKey(key string) *model.HostNode {
//GetNode 从缓存获取节点信息
func (n *NodeCluster) GetNode(id string) *model.HostNode {
n.lock.Lock()
defer n.lock.Unlock()
if node, ok := n.nodes[id]; ok {
return node
}
@ -210,10 +222,15 @@ func (n *NodeCluster) watchK8sNodes() {
for {
wc, err := n.k8sClient.Core().Nodes().Watch(metav1.ListOptions{})
if err != nil {
logrus.Error("watch k8s node error.", err.Error())
logrus.Warningf("watch k8s node error.", err.Error())
time.Sleep(time.Second * 5)
continue
}
defer wc.Stop()
defer func() {
if wc != nil {
wc.Stop()
}
}()
loop:
for {
select {
@ -225,6 +242,7 @@ func (n *NodeCluster) watchK8sNodes() {
switch {
case event.Type == watch.Added, event.Type == watch.Modified:
if node, ok := event.Object.(*v1.Node); ok {
//k8s node name is rainbond node id
if rbnode := n.GetNode(node.Name); rbnode != nil {
rbnode.NodeStatus = &node.Status
rbnode.UpdataK8sCondition(node.Status.Conditions)
@ -329,4 +347,5 @@ func (n *NodeCluster) UpdateNodeCondition(nodeID, ctype, cvalue string) {
Message: "",
Reason: "",
})
n.UpdateNode(node)
}

View File

@ -20,6 +20,7 @@ package masterserver
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
@ -35,6 +36,7 @@ import (
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/pkg/node/api/model"
"github.com/goodrain/rainbond/pkg/node/core/job"
@ -47,35 +49,33 @@ type TaskEngine struct {
ctx context.Context
cancel context.CancelFunc
config *option.Conf
statics map[string]*model.Task
staticLock sync.Mutex
tasks map[string]*model.Task
tasksLock sync.Mutex
dataCenterConfig *config.DataCenterConfig
groups map[string]*model.TaskGroup
groupContexts map[string]*config.GroupContext
groupLock sync.Mutex
nodeCluster *NodeCluster
}
//CreateTaskEngine 创建task管理引擎
func CreateTaskEngine(nodeCluster *NodeCluster) *TaskEngine {
ctx, cancel := context.WithCancel(context.Background())
return &TaskEngine{
task := &TaskEngine{
ctx: ctx,
cancel: cancel,
statics: make(map[string]*model.Task),
tasks: make(map[string]*model.Task),
config: option.Config,
dataCenterConfig: config.GetDataCenterConfig(),
groups: make(map[string]*model.TaskGroup),
groupContexts: make(map[string]*config.GroupContext),
nodeCluster: nodeCluster,
}
task.loadTask()
task.LoadStaticTask()
return task
}
//Start 启动
func (t *TaskEngine) Start() {
logrus.Info("task engine start")
go t.LoadStaticTask()
go t.HandleJobRecord()
go t.watchTasks()
}
//Stop 启动
@ -83,6 +83,71 @@ func (t *TaskEngine) Stop() {
t.cancel()
}
//loadTask 加载所有task
func (t *TaskEngine) loadTask() error {
//加载节点信息
res, err := store.DefalutClient.Get("/store/tasks/", client.WithPrefix())
if err != nil {
return fmt.Errorf("load tasks error:%s", err.Error())
}
for _, kv := range res.Kvs {
if task := t.getTaskFromKV(kv); task != nil {
t.CacheTask(task)
}
}
return nil
}
//watchTasks watchTasks
func (t *TaskEngine) watchTasks() {
ch := store.DefalutClient.Watch("/store/tasks/", client.WithPrefix())
for {
select {
case <-t.ctx.Done():
return
case event := <-ch:
for _, ev := range event.Events {
switch {
case ev.IsCreate(), ev.IsModify():
if task := t.getTaskFromKV(ev.Kv); task != nil {
t.CacheTask(task)
}
case ev.Type == client.EventTypeDelete:
if task := t.getTaskFromKey(string(ev.Kv.Key)); task != nil {
t.RemoveTask(task)
}
}
}
}
}
}
func (t *TaskEngine) getTaskFromKey(key string) *model.Task {
index := strings.LastIndex(key, "/")
if index < 0 {
return nil
}
id := key[index+1:]
return t.GetTask(id)
}
//RemoveTask 从缓存移除task
func (t *TaskEngine) RemoveTask(task *model.Task) {
t.tasksLock.Lock()
defer t.tasksLock.Unlock()
if _, ok := t.tasks[task.ID]; ok {
delete(t.tasks, task.ID)
}
}
func (t *TaskEngine) getTaskFromKV(kv *mvccpb.KeyValue) *model.Task {
var task model.Task
if err := ffjson.Unmarshal(kv.Value, &task); err != nil {
logrus.Error("parse task info error:", err.Error())
return nil
}
return &task
}
//LoadStaticTask 从文件加载task
//TODO:动态加载
func (t *TaskEngine) LoadStaticTask() {
@ -115,34 +180,64 @@ func (t *TaskEngine) loadFile(path string) {
logrus.Errorf("read static task file %s error.%s", path, err.Error())
return
}
var task model.Task
if err := ffjson.Unmarshal(taskBody, &task); err != nil {
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
return
var filename string
index := strings.LastIndex(path, "/")
if index < 0 {
filename = path
}
if task.ID == "" {
task.ID = task.Name
filename = path[index+1:]
if strings.Contains(filename, "group") {
var group model.TaskGroup
if err := ffjson.Unmarshal(taskBody, &group); err != nil {
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
return
}
if group.ID == "" {
group.ID = group.Name
}
if group.Name == "" {
logrus.Errorf("task group name can not be empty. file %s", path)
return
}
if group.Tasks == nil {
logrus.Errorf("task group tasks can not be empty. file %s", path)
return
}
t.ScheduleGroup(nil, &group)
logrus.Infof("Load a static group %s.", group.Name)
}
if task.Name == "" {
logrus.Errorf("task name can not be empty. file %s", path)
return
if strings.Contains(filename, "task") {
var task model.Task
if err := ffjson.Unmarshal(taskBody, &task); err != nil {
logrus.Errorf("unmarshal static task file %s error.%s", path, err.Error())
return
}
if task.ID == "" {
task.ID = task.Name
}
if task.Name == "" {
logrus.Errorf("task name can not be empty. file %s", path)
return
}
if task.Temp == nil {
logrus.Errorf("task [%s] temp can not be empty.", task.Name)
return
}
if task.Temp.ID == "" {
task.Temp.ID = task.Temp.Name
}
t.AddTask(&task)
logrus.Infof("Load a static task %s.", task.Name)
}
if task.Temp == nil {
logrus.Errorf("task [%s] temp can not be empty.", task.Name)
return
}
if task.Temp.ID == "" {
task.Temp.ID = task.Temp.Name
}
t.staticLock.Lock()
defer t.staticLock.Unlock()
t.statics[task.Name] = &task
t.AddTask(&task)
logrus.Infof("Load a static task %s.", task.Name)
}
//GetTask gettask
func (t *TaskEngine) GetTask(taskID string) *model.Task {
t.tasksLock.Lock()
defer t.tasksLock.Unlock()
if task, ok := t.tasks[taskID]; ok {
return task
}
res, err := store.DefalutClient.Get("/store/tasks/" + taskID)
if err != nil {
return nil
@ -171,9 +266,20 @@ func (t *TaskEngine) StopTask(task *model.Task) {
logrus.Errorf("stop task %s error.%s", task.Name, err.Error())
}
}
_, err := store.DefalutClient.Delete(t.config.ExecutionRecordPath+"/"+task.JobID, client.WithPrefix())
if err != nil {
logrus.Errorf("delete execution record for task %s error.%s", task.Name, err.Error())
}
}
}
//CacheTask 缓存task
func (t *TaskEngine) CacheTask(task *model.Task) {
t.tasksLock.Lock()
defer t.tasksLock.Unlock()
t.tasks[task.ID] = task
}
//AddTask 添加task
func (t *TaskEngine) AddTask(task *model.Task) error {
oldTask := t.GetTask(task.ID)
@ -198,24 +304,36 @@ func (t *TaskEngine) AddTask(task *model.Task) error {
if task.Scheduler.Mode == "" {
task.Scheduler.Mode = "Passive"
}
t.CacheTask(task)
_, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String())
if err != nil {
return err
}
if task.Scheduler.Mode == "Intime" {
t.ScheduleTask(task.ID)
t.ScheduleTask(nil, task)
}
return nil
}
//UpdateTask 更新task
func (t *TaskEngine) UpdateTask(task *model.Task) {
t.tasksLock.Lock()
defer t.tasksLock.Unlock()
t.tasks[task.ID] = task
_, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String())
if err != nil {
logrus.Errorf("update task error,%s", err.Error())
}
}
//UpdateGroup 更新taskgroup
func (t *TaskEngine) UpdateGroup(group *model.TaskGroup) {
_, err := store.DefalutClient.Put("/store/taskgroups/"+group.ID, group.String())
if err != nil {
logrus.Errorf("update taskgroup error,%s", err.Error())
}
}
//GetTaskGroup 获取taskgroup
func (t *TaskEngine) GetTaskGroup(taskGroupID string) *model.TaskGroup {
res, err := store.DefalutClient.Get("/store/taskgroups/" + taskGroupID)
@ -238,46 +356,68 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
if task == nil {
return
}
//更新task信息
defer t.UpdateTask(task)
taskStatus := model.TaskStatus{
StartTime: er.BeginTime,
EndTime: er.EndTime,
CompleStatus: "",
}
output, err := model.ParseTaskOutPut(er.Output)
if err != nil {
taskStatus.Status = "Parse task output error"
logrus.Warning("parse task output error:", err.Error())
} else {
if output.Global != nil && len(output.Global) > 0 {
for k, v := range output.Global {
err := t.dataCenterConfig.PutConfig(&model.ConfigUnit{
Name: strings.ToUpper(k),
Value: v,
ValueType: "string",
IsConfigurable: false,
})
if err != nil {
logrus.Errorf("save datacenter config %s=%s error.%s", k, v, err.Error())
if er.Output != "" {
output, err := model.ParseTaskOutPut(er.Output)
if err != nil {
taskStatus.Status = "Parse task output error"
logrus.Warning("parse task output error:", err.Error())
output.NodeID = er.Node
} else {
output.NodeID = er.Node
if output.Global != nil && len(output.Global) > 0 {
for k, v := range output.Global {
err := t.dataCenterConfig.PutConfig(&model.ConfigUnit{
Name: strings.ToUpper(k),
Value: v,
ValueType: "string",
IsConfigurable: false,
})
if err != nil {
logrus.Errorf("save datacenter config %s=%s error.%s", k, v, err.Error())
}
}
}
//groupID不为空处理group连环操作
if output.Inner != nil && len(output.Inner) > 0 && task.GroupID != "" {
t.AddGroupConfig(task.GroupID, output.Inner)
}
for _, status := range output.Status {
//install or check类型结果写入节点
if output.Type == "install" || output.Type == "check" {
if status.ConditionType != "" && status.ConditionStatus != "" {
t.nodeCluster.UpdateNodeCondition(er.Node, status.ConditionType, status.ConditionStatus)
}
if status.NextTask != nil && len(status.NextTask) > 0 {
for _, taskID := range status.NextTask {
task := t.GetTask(taskID)
if task == nil {
continue
}
//由哪个节点发起的执行请求当前task只在此节点执行
t.ScheduleTask([]string{output.NodeID}, task)
}
}
if status.NextGroups != nil && len(status.NextGroups) > 0 {
for _, groupID := range status.NextGroups {
group := t.GetTaskGroup(groupID)
if group == nil {
continue
}
t.ScheduleGroup([]string{output.NodeID}, group)
}
}
}
}
}
//groupID不为空处理group连环操作
if output.Inner != nil && len(output.Inner) > 0 && task.GroupID != "" {
t.AddGroupConfig(task.GroupID, output.Inner)
}
for _, status := range output.Status {
//install or check类型结果写入节点
if output.Type == "install" || output.Type == "check" {
t.nodeCluster.UpdateNodeCondition(er.Node, status.ConditionType, status.ConditionStatus)
if status.NextTask != nil && len(status.NextTask) > 0 {
t.ScheduleTask(status.NextTask...)
}
if status.NextGroups != nil && len(status.NextGroups) > 0 {
t.ScheduleGroup(status.NextGroups...)
}
}
}
task.OutPut = append(task.OutPut, &output)
task.UpdataOutPut(output)
}
if er.Success {
taskStatus.CompleStatus = "Success"
@ -288,71 +428,193 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
task.Status = make(map[string]model.TaskStatus)
}
task.Status[er.Node] = taskStatus
t.UpdateTask(task)
er.CompleteHandle()
//如果是is_once的任务处理完成后删除job
if task.IsOnce {
task.CompleteTime = time.Now()
t.StopTask(task)
} else { //如果是一次性任务,执行记录已经被删除,无需更新
er.CompleteHandle()
}
}
func (t *TaskEngine) waitScheduleTask(nodes []string, task *model.Task) {
canRun := func() bool {
defer t.UpdateTask(task)
if task.Temp.Depends != nil && len(task.Temp.Depends) > 0 {
var result = true
for _, dep := range task.Temp.Depends {
if depTask := t.GetTask(dep.DependTaskID); depTask != nil {
if depTask.Scheduler.Mode == "Passive" && depTask.Scheduler.Status == "" {
t.ScheduleTask(nodes, depTask)
}
if dep.DetermineStrategy == model.AtLeastOnceStrategy {
if len(depTask.Status) > 0 {
result = result && true
}
}
if dep.DetermineStrategy == model.SameNodeStrategy {
if depTask.Status == nil || len(depTask.Status) < 1 {
result = result && false
task.Scheduler.Message = fmt.Sprintf("depend task %s is not complete", depTask.ID)
task.Scheduler.Status = "Waiting"
return false
}
if nodes != nil {
for _, node := range nodes {
if nodestatus, ok := depTask.Status[node]; !ok || nodestatus.EndTime.IsZero() {
result = result && false
task.Scheduler.Message = fmt.Sprintf("depend task %s is not complete in node %s", depTask.ID, node)
task.Scheduler.Status = "Waiting"
return false
}
}
} else {
return true
}
}
} else {
task.Scheduler.Message = fmt.Sprintf("depend task %s is not found", depTask.ID)
task.Scheduler.Status = "Failure"
result = result && false
return false
}
}
return result
}
return true
}
for {
logrus.Infof("task %s can not be run .waiting depend tasks complete", task.Name)
if canRun() {
j, err := job.CreateJobFromTask(task, nil)
if err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
//如果指定nodes
if nodes != nil {
for _, rule := range j.Rules {
rule.NodeIDs = nodes
}
}
if j.IsOnce {
if err := job.PutOnce(j); err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
} else {
if err := job.AddJob(j); err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
}
task.JobID = j.ID
task.StartTime = time.Now()
task.Scheduler.Status = "Success"
task.Scheduler.Message = "scheduler success"
t.UpdateTask(task)
return
}
time.Sleep(2 * time.Second)
}
}
//ScheduleTask 调度执行指定task
func (t *TaskEngine) ScheduleTask(nextTask ...string) {
for _, taskID := range nextTask {
task := t.GetTask(taskID)
func (t *TaskEngine) ScheduleTask(nodes []string, nextTask ...*model.Task) {
for _, task := range nextTask {
if task == nil {
continue
}
if task.JobID != "" {
t.StopTask(task)
}
j, err := job.CreateJobFromTask(task, nil)
if err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
if task.Temp == nil {
continue
}
if j.IsOnce {
if err := job.PutOnce(j); err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
if nodes == nil {
nodes = task.Nodes
}
if task.Temp.Depends != nil && len(task.Temp.Depends) > 0 {
go t.waitScheduleTask(nodes, task)
task.Scheduler.Status = "Waiting"
} else {
if err := job.AddJob(j); err != nil {
logrus.Infof("scheduler a task %s", task.Name)
j, err := job.CreateJobFromTask(task, nil)
if err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
//如果指定nodes
if nodes != nil {
for _, rule := range j.Rules {
rule.NodeIDs = nodes
}
}
if j.IsOnce {
if err := job.PutOnce(j); err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
} else {
if err := job.AddJob(j); err != nil {
task.Scheduler.Status = "Failure"
task.Scheduler.Message = err.Error()
t.UpdateTask(task)
logrus.Errorf("run task %s error.%s", task.Name, err.Error())
return
}
}
task.JobID = j.ID
task.StartTime = time.Now()
task.Scheduler.Status = "Success"
task.Scheduler.Message = "scheduler success"
}
task.JobID = j.ID
t.UpdateTask(task)
}
}
//ScheduleGroup 调度执行指定task
func (t *TaskEngine) ScheduleGroup(nextGroups ...string) {
for _, groupID := range nextGroups {
_ = t.GetTaskGroup(groupID)
func (t *TaskEngine) ScheduleGroup(nodes []string, nextGroups ...*model.TaskGroup) {
for _, group := range nextGroups {
if group.Tasks == nil || len(group.Tasks) < 1 {
group.Status = &model.TaskGroupStatus{
StartTime: time.Now(),
EndTime: time.Now(),
Status: "NotDefineTask",
}
t.UpdateGroup(group)
}
for _, task := range group.Tasks {
task.GroupID = group.ID
t.AddTask(task)
}
group.Status = &model.TaskGroupStatus{
StartTime: time.Now(),
Status: "Start",
}
t.UpdateGroup(group)
}
}
//AddGroupConfig 添加组会话配置
func (t *TaskEngine) AddGroupConfig(groupID string, configs map[string]string) {
t.groupLock.Lock()
defer t.groupLock.Unlock()
if ctx, ok := t.groupContexts[groupID]; ok {
for k, v := range configs {
ctx.Add(k, v)
}
} else {
ctx := config.NewGroupContext()
for k, v := range configs {
ctx.Add(k, v)
}
t.groupContexts[groupID] = ctx
ctx := config.NewGroupContext(groupID)
for k, v := range configs {
ctx.Add(k, v)
}
}

View File

@ -0,0 +1,41 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package masterserver
import (
"testing"
"github.com/goodrain/rainbond/pkg/node/api/model"
)
func TestGroupWorker(t *testing.T) {
taskEngine := CreateTaskEngine(nil)
group := &model.TaskGroup{
Tasks: []*model.Task{
&model.Task{ID: "1", Temp: &model.TaskTemp{Depends: []model.DependStrategy{model.DependStrategy{DependTaskID: "5"}}}},
&model.Task{ID: "2", Temp: &model.TaskTemp{Depends: []model.DependStrategy{model.DependStrategy{DependTaskID: "5"}}}},
&model.Task{ID: "3", Temp: &model.TaskTemp{Depends: []model.DependStrategy{model.DependStrategy{DependTaskID: "5"}}}},
&model.Task{ID: "4", Temp: &model.TaskTemp{Depends: []model.DependStrategy{}}},
&model.Task{ID: "5", Temp: &model.TaskTemp{}},
&model.Task{ID: "6", Temp: &model.TaskTemp{}},
&model.Task{ID: "7", Temp: &model.TaskTemp{}},
},
}
taskEngine.ScheduleGroup(nil, group)
}

1
test/shell/echo1.sh Normal file
View File

@ -0,0 +1 @@
echo "{\"inner\":{\"HOME_PATH\":\"/home\"},\"type\":\"check\",\"status\":[{\"next_tasks\":[\"echo2\"]}]}" >&2

1
test/shell/echo2.sh Normal file
View File

@ -0,0 +1 @@
echo "{\"inner\":{\"HOME_PATH\":\"/home\"},\"type\":\"install\",\"status\":[{\"condition_type\":\"INIT_ECHO\",\"condition_status\":\"True\"}]}" >&2

View File

@ -4,7 +4,7 @@
"temp": {
"name": "echo-temp",
"shell": {
"cmd": ["echo", "\"mysql host is:${MYSQL_HOST}\""]
"cmd": ["echo", "\"mysql host is:${MYSQL_HOST} dependsss\""]
}
},
"event_id": "xxx",

View File

@ -5,7 +5,11 @@
"name": "echo-once-temp",
"shell": {
"cmd": ["echo", "\"mysql host is:${MYSQL_HOST}\""]
}
},
"depends": [{
"depend_task_id": "echo",
"strategy": "AtLeastOnce"
}]
},
"event_id": "xyxyxyxyxy",
"is_once": true,

View File

@ -0,0 +1,45 @@
{
"name": "echo-group",
"id": "echo-group",
"tasks": [{
"name": "echo2",
"id": "echo2",
"temp": {
"name": "echo-temp",
"shell": {
"cmd": ["sh", "/Users/qingguo/gopath/src/github.com/goodrain/rainbond/test/shell/echo2.sh"]
}
},
"event_id": "xxx",
"is_once": true
}, {
"name": "echo1",
"id": "echo1",
"temp": {
"name": "echo-temp",
"shell": {
"cmd": ["sh", "/Users/qingguo/gopath/src/github.com/goodrain/rainbond/test/shell/echo1.sh"]
}
},
"event_id": "xxx",
"is_once": true
}, {
"name": "echo-once",
"id": "echo-once",
"temp": {
"name": "echo-once-temp",
"shell": {
"cmd": ["echo", "XXXXX"]
},
"depends": [{
"depend_task_id": "echo1",
"strategy": "AtLeastOnce"
}]
},
"event_id": "xyxyxyxyxy",
"is_once": true,
"scheduler": {
"mode": "Intime"
}
}]
}