Merge remote-tracking branch 'origin/master'

This commit is contained in:
bay1ts 2017-11-28 16:33:20 +08:00
commit 55b1788129
2 changed files with 29 additions and 18 deletions

View File

@ -126,22 +126,18 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.NodePath, "nodePath", "/rainbond/nodes/", "the path of node in etcd")
fs.StringVar(&a.HostIDFile, "nodeid-file", "/etc/goodrain/host_uuid.conf", "the unique ID for this node. Just specify, don't modify")
fs.StringVar(&a.OnlineNodePath, "onlineNodePath", "/rainbond/onlinenodes/", "the path of master node in etcd")
fs.StringVar(&a.Proc, "procPath", "/rainbond/proc/", "the path of proc in etcd")
fs.StringVar(&a.Proc, "procPath", "/rainbond/task/proc/", "the path of proc in etcd")
fs.StringVar(&a.HostIP, "hostIP", "", "the host ip you can define. default get ip from eth0")
fs.StringVar(&a.ExecutionRecordPath, "execRecordPath", "/acp_node/exec_record", "the path of job exec record")
fs.StringVar(&a.ExecutionRecordPath, "execRecordPath", "/rainbond/exec_record", "the path of job exec record")
fs.StringSliceVar(&a.EventLogServer, "event-log-server", []string{"127.0.0.1:6367"}, "host:port slice of event log server")
fs.StringVar(&a.K8SNode, "k8sNode", "/store/nodes/", "the path of k8s node")
fs.StringVar(&a.InstalledMarker, "installed-marker", "/etc/acp_node/check/install/success", "the path of a file for check node is installed")
fs.StringVar(&a.BuildIn, "build-in-jobs", "/store/buildin/", "the path of build-in job")
fs.StringVar(&a.CompJobStatus, "jobStatus", "/store/jobStatus/", "the path of tree node install status")
fs.StringVar(&a.BuildInExec, "build-in-exec", "/acp_node/exec_buildin/", "the path of build-in job to watch")
fs.StringVar(&a.ConfigStoragePath, "config-path", "/rainbond/acp_configs", "the path of config to store(new)")
fs.StringVar(&a.InitStatus, "init-status", "/acp_node/init_status/", "the path of init status to store")
fs.StringVar(&a.InitStatus, "init-status", "/rainbond/init_status/", "the path of init status to store")
fs.StringVar(&a.Service, "servicePath", "/traefik/backends", "the path of service info to store")
fs.StringVar(&a.Cmd, "cmdPath", "/acp_node/cmd/", "the path of cmd in etcd")
fs.StringVar(&a.Once, "oncePath", "/acp_node/once/", "the path of once in etcd")
fs.StringVar(&a.Lock, "lockPath", "/acp_node/lock/", "the path of lock in etcd")
fs.StringVar(&a.Group, "groupPath", "/acp_node/group/", "the path of group in etcd")
fs.StringVar(&a.Cmd, "cmdPath", "/rainbond/cmd/", "the path of cmd in etcd")
fs.StringVar(&a.Once, "oncePath", "/rainbond/once/", "the path of once in etcd")
fs.StringVar(&a.Lock, "lockPath", "/rainbond/lock/", "the path of lock in etcd")
fs.IntVar(&a.FailTime, "failTime", 3, "the fail time of healthy check")
fs.IntVar(&a.CheckIntervalSec, "checkInterval-second", 5, "the interval time of healthy check")
fs.StringSliceVar(&a.Etcd.Endpoints, "etcd", []string{"http://127.0.0.1:2379"}, "the path of node in etcd")

View File

@ -46,14 +46,16 @@ import (
// 处理任务的执行,结果处理,任务自动调度
// TODO:执行记录清理工作
type TaskEngine struct {
ctx context.Context
cancel context.CancelFunc
config *option.Conf
tasks map[string]*model.Task
tasksLock sync.Mutex
dataCenterConfig *config.DataCenterConfig
nodeCluster *NodeCluster
currentNode *model.HostNode
ctx context.Context
cancel context.CancelFunc
config *option.Conf
tasks map[string]*model.Task
tasksLock sync.Mutex
dataCenterConfig *config.DataCenterConfig
nodeCluster *NodeCluster
currentNode *model.HostNode
schedulerCache map[string]bool
schedulerCacheLock sync.Mutex
}
//TaskSchedulerInfo 请求调度信息
@ -91,6 +93,7 @@ func (t TaskSchedulerInfo) Post() {
body, err := ffjson.Marshal(t)
if err == nil {
store.DefalutClient.Post("/rainbond-node/scheduler/taskshcedulers/"+t.TaskID+"/"+t.Node, string(body))
logrus.Infof("put a scheduler info %s:%s", t.TaskID, t.Node)
}
}
@ -118,6 +121,7 @@ func CreateTaskEngine(nodeCluster *NodeCluster, node *model.HostNode) *TaskEngin
dataCenterConfig: config.GetDataCenterConfig(),
nodeCluster: nodeCluster,
currentNode: node,
schedulerCache: make(map[string]bool),
}
return task
}
@ -206,6 +210,7 @@ func (t *TaskEngine) watcheScheduler() {
//PutSchedul 发布请求调度信息
//同样请求将被拒绝,在上一次请求完成之前
//目前单节点调度,本地保证不重复调度
func (t *TaskEngine) PutSchedul(taskID string, node string) error {
if node == "" {
//执行节点确定
@ -235,10 +240,17 @@ func (t *TaskEngine) PutSchedul(taskID string, node string) error {
}
}
} else {
//保证同时只调度一次
t.schedulerCacheLock.Lock()
defer t.schedulerCacheLock.Unlock()
if _, ok := t.schedulerCache[taskID+node]; ok {
return nil
}
if n := t.nodeCluster.GetNode(node); n != nil {
info := NewTaskSchedulerInfo(taskID, node)
info.Post()
}
t.schedulerCache[taskID+node] = true
}
return nil
}
@ -575,6 +587,9 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
} else { //如果是一次性任务,执行记录已经被删除,无需更新
er.CompleteHandle()
}
t.schedulerCacheLock.Lock()
defer t.schedulerCacheLock.Unlock()
delete(t.schedulerCache, task.ID+er.Node)
}
//waitScheduleTask 等待调度条件成熟