Merge remote-tracking branch 'remotes/upstream/1.3.2-release' into 1.3.2-release

This commit is contained in:
qiaozhanwei 2020-08-05 14:37:00 +08:00
commit ef8dd3c499
3 changed files with 36 additions and 15 deletions

View File

@ -59,7 +59,7 @@
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.group</name>
<name>worker.groups</name>
<value>default</value>
<description>default worker group</description>
<on-ambari-upgrade add="true"/>

View File

@ -216,7 +216,7 @@ public class SparkParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
return mainJar != null && programType != null && sparkVersion != null;
return mainJar != null && programType != null;
}
@Override

View File

@ -663,37 +663,58 @@ public class MasterExecThread implements Runnable {
if(startNodes.contains(taskName)){
return DependResult.SUCCESS;
}
TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){
if(!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)){
|| forbiddenTaskList.containsKey(depsNode)){
continue;
}
// dependencies must be fully completed
if(skipTaskNodeList.containsKey(depsNode)){
return DependResult.FAILED;
}
// all the dependencies must be completed
if(!completeTaskList.containsKey(depsNode)){
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
// conditions task would not return failed.
if(depTaskState.typeIsFailure()
&& !DagHelper.haveConditionsAfterNode(depsNode, dag )
&& !dag.getNode(depsNode).isConditionsTask()){
return DependResult.FAILED;
}
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING;
}
// ignore task state if current task is condition
if(taskNode.isConditionsTask()){
continue;
}
if(!dependTaskSuccess(depsNode, taskName)){
return DependResult.FAILED;
}
}
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
return DependResult.SUCCESS;
}
/**
* depend node is completed, but here need check the condition task branch is the next node
* @param dependNodeName
* @param nextNodeName
* @return
*/
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
if(dag.getNode(dependNodeName).isConditionsTask()){
//condition task need check the branch to run
List<String> nextTaskList = parseConditionTask(dependNodeName);
if(!nextTaskList.contains(nextNodeName)){
return false;
}
}else {
ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
if(depTaskState.typeIsFailure()){
return false;
}
}
return true;
}
/**
* query task instance by complete state