Merge remote-tracking branch 'upstream/dev' into task-end

# Conflicts:
#	.asf.yaml
#	dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
#	dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AlertGroupControllerTest.java
#	dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
#	dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
#	dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/ConditionsTaskExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DependentTaskExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/MasterBaseTaskExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/MasterTaskExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/SubProcessTaskExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
#	dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
#	dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
#	dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
#	dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
#	dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutorTest.java
#	dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTaskTest.java
#	dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
#	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
#	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
#	dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
#	dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java
#	dolphinscheduler-standalone-server/pom.xml
#	dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
#	dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
#	dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannelFactory.java
#	dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
#	dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
#	dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
#	dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
#	dolphinscheduler-ui/package.json
#	dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss
#	dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
#	dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
#	sql/dolphinscheduler_h2.sql
This commit is contained in:
CalvinKirs 2021-09-08 17:11:32 +08:00
parent dbb4adf6c3
commit 2dd5279388
9 changed files with 50 additions and 173 deletions

View File

@ -205,6 +205,7 @@ public class WorkflowExecuteThread implements Runnable {
* @param processInstance processInstance * @param processInstance processInstance
* @param processService processService * @param processService processService
* @param nettyExecutorManager nettyExecutorManager * @param nettyExecutorManager nettyExecutorManager
* @param taskTimeoutCheckList
*/ */
public WorkflowExecuteThread(ProcessInstance processInstance public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService , ProcessService processService
@ -568,12 +569,10 @@ public class WorkflowExecuteThread implements Runnable {
startDate = endDate; startDate = endDate;
endDate = tmp; endDate = tmp;
} }
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
processInstance.getProcessDefinitionVersion());
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinition.getId());
complementListDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules)); complementListDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules));
logger.info(" process definition id:{} complement data: {}", logger.info(" process definition code:{} complement data: {}",
processDefinition.getId(), complementListDate.toString()); processInstance.getProcessDefinitionCode(), complementListDate.toString());
} }
} }
@ -998,6 +997,7 @@ public class WorkflowExecuteThread implements Runnable {
/** /**
* generate the latest process instance status by the tasks state * generate the latest process instance status by the tasks state
* *
* @param instance
* @return process instance execution status * @return process instance execution status
*/ */
private ExecutionStatus getProcessInstanceState(ProcessInstance instance) { private ExecutionStatus getProcessInstanceState(ProcessInstance instance) {

View File

@ -35,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.logging.log4j.util.Strings;
import java.util.Date; import java.util.Date;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -92,8 +90,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
/** /**
* common task cannot be paused * common task cannot be paused
*
* @return
*/ */
@Override @Override
protected boolean pauseTask() { protected boolean pauseTask() {
@ -151,7 +147,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (taskInstance.getState().typeIsFinished()) { if (taskInstance.getState().typeIsFinished()) {
return true; return true;
} }
if (Strings.isBlank(taskInstance.getHost())) { if (null == taskInstance.getHost() || taskInstance.getHost().isEmpty()) {
taskInstance.setState(ExecutionStatus.KILL); taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date()); taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance); processService.updateTaskInstance(taskInstance);

View File

@ -1,155 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
/**
* dependent parameters
*/
private DependentParameters dependentParameters;
/**
* complete task map
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
/**
* condition result
*/
private DependResult conditionResult;
/**
* constructor of MasterBaseTaskExecThread
*
* @param taskInstance task instance
*/
public ConditionsTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
taskInstance.setStartTime(new Date());
}
@Override
public Boolean submitWaitComplete() {
try {
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
logger.info("dependent task start");
waitTaskQuit();
updateTaskState();
} catch (Exception e) {
logger.error("conditions task run exception", e);
}
return true;
}
private void waitTaskQuit() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getName(), task.getState());
}
List<DependResult> modelResultList = new ArrayList<>();
for (DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()) {
List<DependResult> itemDependResult = new ArrayList<>();
for (DependentItem item : dependentTaskModel.getDependItemList()) {
itemDependResult.add(getDependResultForItem(item));
}
DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
modelResultList.add(modelResult);
}
conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList);
logger.info("the conditions task depend result : {}", conditionResult);
}
/**
*
*/
private void updateTaskState() {
ExecutionStatus status;
if (this.cancel) {
status = ExecutionStatus.KILL;
} else {
status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
}
/**
* depend result for depend item
*/
private DependResult getDependResultForItem(DependentItem item) {
DependResult dependResult = DependResult.SUCCESS;
if (!completeTaskList.containsKey(item.getDepTasks())) {
logger.info("depend item: {} have not completed yet.", item.getDepTasks());
dependResult = DependResult.FAILED;
return dependResult;
}
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
if (executionStatus != item.getStatus()) {
logger.info("depend item : {} expect status: {}, actual status: {}", item.getDepTasks(), item.getStatus(), executionStatus);
dependResult = DependResult.FAILED;
}
logger.info("dependent item complete {} {},{}",
Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult);
return dependResult;
}
}

View File

@ -123,7 +123,7 @@ public class WorkflowExecuteThreadTest {
@Test @Test
public void testParallelWithOutSchedule() throws ParseException { public void testParallelWithOutSchedule() throws ParseException {
try { try {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionId)).thenReturn(zeroSchedulerList());
Method method = WorkflowExecuteThread.class.getDeclaredMethod("executeComplementProcess"); Method method = WorkflowExecuteThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true); method.setAccessible(true);
method.invoke(workflowExecuteThread); method.invoke(workflowExecuteThread);
@ -141,7 +141,7 @@ public class WorkflowExecuteThreadTest {
@Test @Test
public void testParallelWithSchedule() { public void testParallelWithSchedule() {
try { try {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionId)).thenReturn(oneSchedulerList());
Method method = WorkflowExecuteThread.class.getDeclaredMethod("executeComplementProcess"); Method method = WorkflowExecuteThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true); method.setAccessible(true);
method.invoke(workflowExecuteThread); method.invoke(workflowExecuteThread);

View File

@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import java.io.File; import java.io.File;

View File

@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;

View File

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.Charsets; import org.apache.commons.io.Charsets;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.ParseException; import org.apache.http.ParseException;

View File

@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.procedure;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
public class ProcedureTaskChannelFactory implements TaskChannelFactory {
@Override
public String getName() {
return "PROCEDURE";
}
@Override
public List<PluginParams> getParams() {
return null;
}
@Override
public TaskChannel create() {
return new ProcedureTaskChannel();
}
}