From d3f4f399b3a0c1aec3b141aa24908da06cd52f28 Mon Sep 17 00:00:00 2001 From: goodrain Date: Thu, 16 Nov 2017 16:32:27 +0800 Subject: [PATCH] [FIX] Ignore the error output from task --- pkg/node/core/service/node_service.go | 4 ++-- pkg/node/masterserver/nodecluster.go | 11 ++++++----- pkg/node/masterserver/task_engine.go | 11 ++++++++++- pkg/node/nodeserver/server.go | 6 ++++-- test/host_id.conf | 2 +- 5 files changed, 23 insertions(+), 11 deletions(-) diff --git a/pkg/node/core/service/node_service.go b/pkg/node/core/service/node_service.go index a9e2e6c3e..57977c830 100644 --- a/pkg/node/core/service/node_service.go +++ b/pkg/node/core/service/node_service.go @@ -126,7 +126,7 @@ func (n *NodeService) CordonNode(nodeID string, unschedulable bool) *utils.APIHa //k8s节点存在 if hostNode.NodeStatus != nil { //true表示drain,不可调度 - node, err := k8s.CordonOrUnCordon(hostNode.ID, true) + node, err := k8s.CordonOrUnCordon(hostNode.ID, unschedulable) if err != nil { return utils.CreateAPIHandleError(500, fmt.Errorf("set node schedulable info error,%s", err.Error())) } @@ -179,7 +179,7 @@ func (n *NodeService) UpNode(nodeID string) (*model.HostNode, *utils.APIHandleEr return nil, apierr } if !hostNode.Role.HasRule(model.ComputeNode) || hostNode.NodeStatus != nil { - return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not k8s node or it not up")) + return nil, utils.CreateAPIHandleError(400, fmt.Errorf("node is not k8s node or it not down")) } node, err := k8s.CreatK8sNodeFromRainbonNode(hostNode) if err != nil { diff --git a/pkg/node/masterserver/nodecluster.go b/pkg/node/masterserver/nodecluster.go index 03449b7e8..f3f3091fd 100644 --- a/pkg/node/masterserver/nodecluster.go +++ b/pkg/node/masterserver/nodecluster.go @@ -91,7 +91,7 @@ func (n *NodeCluster) loadNodes() error { } for _, kv := range res.Kvs { if node := n.getNodeFromKV(kv); node != nil { - n.AddNode(node) + n.CacheNode(node) } } //加载rainbond节点在线信息 @@ -149,6 +149,7 @@ func (n *NodeCluster) UpdateNode(node *model.HostNode) { n.lock.Lock() defer n.lock.Unlock() n.nodes[node.ID] = node + logrus.Debugf("Update node %s info to etcd", node.ID) n.client.Put(option.Config.NodePath+"/"+node.ID, node.String()) } func (n *NodeCluster) getNodeFromKV(kv *mvccpb.KeyValue) *model.HostNode { @@ -189,7 +190,7 @@ func (n *NodeCluster) watchNodes() { switch { case ev.IsCreate(), ev.IsModify(): if node := n.getNodeFromKV(ev.Kv); node != nil { - n.AddNode(node) + n.CacheNode(node) } case ev.Type == client.EventTypeDelete: if node := n.getNodeFromKey(string(ev.Kv.Key)); node != nil { @@ -295,7 +296,7 @@ func (n *NodeCluster) checkNodeInstall(node *model.HostNode) { } var stdout bytes.Buffer var stderr bytes.Buffer - client := util.NewSSHClient(node.InternalIP, "root", node.RootPass, "/usr/bin/whoami", 22, &stdout, &stderr) + client := util.NewSSHClient(node.InternalIP, "root", node.RootPass, "", 22, &stdout, &stderr) if err := client.Connection(); err != nil { logrus.Error("init endpoint node error:", err.Error()) errorCondition("SSH登陆初始化目标节点失败", err) @@ -316,8 +317,8 @@ func (n *NodeCluster) GetAllNode() (nodes []*model.HostNode) { return } -//AddNode 添加节点到缓存 -func (n *NodeCluster) AddNode(node *model.HostNode) { +//CacheNode 添加节点到缓存 +func (n *NodeCluster) CacheNode(node *model.HostNode) { n.lock.Lock() defer n.lock.Unlock() logrus.Debugf("add or update a rainbon node id:%s hostname:%s ip:%s", node.ID, node.HostName, node.InternalIP) diff --git a/pkg/node/masterserver/task_engine.go b/pkg/node/masterserver/task_engine.go index 81211a0d1..d1631e162 100644 --- a/pkg/node/masterserver/task_engine.go +++ b/pkg/node/masterserver/task_engine.go @@ -303,6 +303,9 @@ func (t *TaskEngine) AddTask(task *model.Task) error { task.CreateTime = time.Now() if task.Scheduler.Mode == "" { task.Scheduler.Mode = "Passive" + } + if task.Temp == nil { + } t.CacheTask(task) _, err := store.DefalutClient.Put("/store/tasks/"+task.ID, task.String()) @@ -364,7 +367,12 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) { CompleStatus: "", } if er.Output != "" { - output, err := model.ParseTaskOutPut(er.Output) + index := strings.Index(er.Output, "{") + jsonOutPut := er.Output + if index > -1 { + jsonOutPut = er.Output[index:] + } + output, err := model.ParseTaskOutPut(jsonOutPut) if err != nil { taskStatus.Status = "Parse task output error" logrus.Warning("parse task output error:", err.Error()) @@ -399,6 +407,7 @@ func (t *TaskEngine) handleJobRecord(er *job.ExecutionRecord) { for _, taskID := range status.NextTask { task := t.GetTask(taskID) if task == nil { + logrus.Warningf("task(%s) request exec task(%s) not found", task.ID, taskID) continue } //由哪个节点发起的执行请求,当前task只在此节点执行 diff --git a/pkg/node/nodeserver/server.go b/pkg/node/nodeserver/server.go index d4c978575..f63eead25 100644 --- a/pkg/node/nodeserver/server.go +++ b/pkg/node/nodeserver/server.go @@ -69,7 +69,8 @@ type NodeServer struct { *conf.Conf } -func (n *NodeServer) set() error { +//Regist 节点注册 +func (n *NodeServer) Regist() error { resp, err := n.Client.Grant(n.ttl + 2) if err != nil { return err @@ -86,6 +87,7 @@ func (n *NodeServer) set() error { //Run 启动 func (n *NodeServer) Run() (err error) { + n.Regist() go n.keepAlive() defer func() { if err != nil { @@ -339,7 +341,7 @@ func (n *NodeServer) keepAlive() { logrus.Warnf("%s lid[%x] keepAlive err: %s, try to reset...", n.HostName, n.lID, err.Error()) n.lID = 0 } - if err := n.set(); err != nil { + if err := n.Regist(); err != nil { logrus.Warnf("%s set lid err: %s, try to reset after %d seconds...", n.HostName, err.Error(), n.ttl) } else { logrus.Infof("%s set lid[%x] success", n.HostName, n.lID) diff --git a/test/host_id.conf b/test/host_id.conf index 75f40de1c..685a1b0a2 100755 --- a/test/host_id.conf +++ b/test/host_id.conf @@ -1 +1 @@ -host_uuid=6bc5042f-8c25-4f30-8131-1f61df4c00e8 \ No newline at end of file +host_uuid=10.0.55.73 \ No newline at end of file