Merge remote-tracking branch 'origin/master'

This commit is contained in:
bay1ts 2017-11-16 17:04:17 +08:00
commit e0b9982331
5 changed files with 23 additions and 11 deletions

View File

@ -126,7 +126,7 @@ func (n *NodeService) CordonNode(nodeID string, unschedulable bool) *utils.APIHa
//k8s节点存在
if hostNode.NodeStatus != nil {
//true表示drain不可调度
node, err := k8s.CordonOrUnCordon(hostNode.ID, true)
node, err := k8s.CordonOrUnCordon(hostNode.ID, unschedulable)
if err != nil {
return utils.CreateAPIHandleError(500, fmt.Errorf("set node schedulable info error,%s", err.Error()))
}
@ -179,7 +179,7 @@ func (n *NodeService) UpNode(nodeID string) (*model.HostNode, *utils.APIHandleEr
return nil, apierr
}
if !hostNode.Role.HasRule(model.ComputeNode) || hostNode.NodeStatus != nil {
return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not k8s node or it not up"))
return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not k8s node or it not down"))
}
node, err := k8s.CreatK8sNodeFromRainbonNode(hostNode)
if err != nil {

View File

@ -91,7 +91,7 @@ func (n *NodeCluster) loadNodes() error {
}
for _, kv := range res.Kvs {
if node := n.getNodeFromKV(kv); node != nil {
n.AddNode(node)
n.CacheNode(node)
}
}
//加载rainbond节点在线信息
@ -149,6 +149,7 @@ func (n *NodeCluster) UpdateNode(node *model.HostNode) {
n.lock.Lock()
defer n.lock.Unlock()
n.nodes[node.ID] = node
logrus.Debugf("Update node %s info to etcd", node.ID)
n.client.Put(option.Config.NodePath+"/"+node.ID, node.String())
}
func (n *NodeCluster) getNodeFromKV(kv *mvccpb.KeyValue) *model.HostNode {
@ -189,7 +190,7 @@ func (n *NodeCluster) watchNodes() {
switch {
case ev.IsCreate(), ev.IsModify():
if node := n.getNodeFromKV(ev.Kv); node != nil {
n.AddNode(node)
n.CacheNode(node)
}
case ev.Type == client.EventTypeDelete:
if node := n.getNodeFromKey(string(ev.Kv.Key)); node != nil {
@ -295,7 +296,7 @@ func (n *NodeCluster) checkNodeInstall(node *model.HostNode) {
}
var stdout bytes.Buffer
var stderr bytes.Buffer
client := util.NewSSHClient(node.InternalIP, "root", node.RootPass, "/usr/bin/whoami", 22, &stdout, &stderr)
client := util.NewSSHClient(node.InternalIP, "root", node.RootPass, "", 22, &stdout, &stderr)
if err := client.Connection(); err != nil {
logrus.Error("init endpoint node error:", err.Error())
errorCondition("SSH登陆初始化目标节点失败", err)
@ -316,8 +317,8 @@ func (n *NodeCluster) GetAllNode() (nodes []*model.HostNode) {
return
}
//AddNode 添加节点到缓存
func (n *NodeCluster) AddNode(node *model.HostNode) {
//CacheNode 添加节点到缓存
func (n *NodeCluster) CacheNode(node *model.HostNode) {
n.lock.Lock()
defer n.lock.Unlock()
logrus.Debugf("add or update a rainbon node id:%s hostname:%s ip:%s", node.ID, node.HostName, node.InternalIP)

View File

@ -303,6 +303,9 @@ func (t *TaskEngine) AddTask(task *model.Task) error {
task.CreateTime = time.Now()
if task.Scheduler.Mode == "" {
task.Scheduler.Mode = "Passive"
}
if task.Temp == nil {
}
t.CacheTask(task)
_, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String())
@ -364,7 +367,12 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
CompleStatus: "",
}
if er.Output != "" {
output, err := model.ParseTaskOutPut(er.Output)
index := strings.Index(er.Output, "{")
jsonOutPut := er.Output
if index > -1 {
jsonOutPut = er.Output[index:]
}
output, err := model.ParseTaskOutPut(jsonOutPut)
if err != nil {
taskStatus.Status = "Parse task output error"
logrus.Warning("parse task output error:", err.Error())
@ -399,6 +407,7 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) {
for _, taskID := range status.NextTask {
task := t.GetTask(taskID)
if task == nil {
logrus.Warningf("task(%s) request exec task(%s) not found", task.ID, taskID)
continue
}
//由哪个节点发起的执行请求当前task只在此节点执行

View File

@ -69,7 +69,8 @@ type NodeServer struct {
*conf.Conf
}
func (n *NodeServer) set() error {
//Regist 节点注册
func (n *NodeServer) Regist() error {
resp, err := n.Client.Grant(n.ttl + 2)
if err != nil {
return err
@ -86,6 +87,7 @@ func (n *NodeServer) set() error {
//Run 启动
func (n *NodeServer) Run() (err error) {
n.Regist()
go n.keepAlive()
defer func() {
if err != nil {
@ -339,7 +341,7 @@ func (n *NodeServer) keepAlive() {
logrus.Warnf("%s lid[%x] keepAlive err: %s, try to reset...", n.HostName, n.lID, err.Error())
n.lID = 0
}
if err := n.set(); err != nil {
if err := n.Regist(); err != nil {
logrus.Warnf("%s set lid err: %s, try to reset after %d seconds...", n.HostName, err.Error(), n.ttl)
} else {
logrus.Infof("%s set lid[%x] success", n.HostName, n.lID)

View File

@ -1 +1 @@
host_uuid=6bc5042f-8c25-4f30-8131-1f61df4c00e8
host_uuid=10.0.55.73