diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index 80ee36b9d1..0b23905059 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -28,9 +28,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -103,12 +101,8 @@ public abstract class AbstractTask { return null; } - /** - * task handle - * - * @throws Exception exception - */ - public abstract void handle() throws Exception; + public abstract void handle() throws TaskException; + /** * cancel application diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index 4f1913ba3d..c9a9c4851c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.api; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; -import java.util.List; -import java.util.regex.Matcher; import java.util.regex.Pattern; /** @@ -51,7 +49,7 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // SHELL task exit code TaskResponse response = shellCommandExecutor.run(buildCommand()); @@ -59,10 +57,15 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor { // set appIds setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.info("The current yarn task has been interrupted", ex); + setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + throw new TaskException("The current yarn task has been interrupted", ex); } catch (Exception e) { logger.error("yarn process failure", e); exitStatusCode = -1; - throw e; + throw new TaskException("Execute task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java index 8d7f7eac7f..a82e5f6bbd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java @@ -39,7 +39,7 @@ public abstract class AbstractK8sTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { TaskResponse response = abstractK8sTaskExecutor.run(buildCommand()); setExitStatusCode(response.getExitStatusCode()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java index a416d0c02c..46fcf2ec2f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java @@ -17,12 +17,12 @@ package org.apache.dolphinscheduler.plugin.task.chunjun; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; - +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.SystemUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -32,9 +32,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.spi.enums.Flag; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.SystemUtils; - import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -48,6 +45,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; + /** * chunjun task */ @@ -101,10 +101,10 @@ public class ChunJunTask extends AbstractTaskExecutor { /** * run chunjun process * - * @throws Exception exception + * @throws TaskException exception */ @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { Map paramsMap = taskExecutionContext.getPrepareParamsMap(); @@ -115,10 +115,15 @@ public class ChunJunTask extends AbstractTaskExecutor { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current ChunJun Task has been interrupted", e); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException("The current ChunJun Task has been interrupted", e); } catch (Exception e) { logger.error("chunjun task failed.", e); setExitStatusCode(EXIT_CODE_FAILURE); - throw e; + throw new TaskException("Execute chunjun task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 08a8c8acc8..873d8f7424 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -17,32 +17,38 @@ package org.apache.dolphinscheduler.plugin.task.datax; -import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; - +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; -import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.Flag; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.SystemUtils; - import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -58,7 +64,6 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -66,17 +71,9 @@ import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; -import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; -import com.alibaba.druid.sql.ast.statement.SQLSelect; -import com.alibaba.druid.sql.ast.statement.SQLSelectItem; -import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; -import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; -import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; -import com.alibaba.druid.sql.parser.SQLStatementParser; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; +import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; public class DataxTask extends AbstractTaskExecutor { /** @@ -150,7 +147,7 @@ public class DataxTask extends AbstractTaskExecutor { * @throws Exception if error throws Exception */ @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // replace placeholder,and combine local and global parameters Map paramsMap = taskExecutionContext.getPrepareParamsMap(); @@ -163,10 +160,15 @@ public class DataxTask extends AbstractTaskExecutor { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current DataX task has been interrupted", e); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException("The current DataX task has been interrupted", e); } catch (Exception e) { logger.error("datax task error", e); setExitStatusCode(EXIT_CODE_FAILURE); - throw e; + throw new TaskException("Execute DataX task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java index 622a674780..987126b0a7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java @@ -17,13 +17,17 @@ package org.apache.dolphinscheduler.plugin.task.dinky; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.MissingNode; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; - import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; @@ -39,10 +43,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.MissingNode; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; public class DinkyTask extends AbstractTaskExecutor { @@ -77,47 +78,55 @@ public class DinkyTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { - String address = this.dinkyParameters.getAddress(); - String taskId = this.dinkyParameters.getTaskId(); - boolean isOnline = this.dinkyParameters.isOnline(); - JsonNode result; - if (isOnline) { - // Online dinky task, and only one job is allowed to execute - result = onlineTask(address, taskId); - } else { - // Submit dinky task - result = submitTask(address, taskId); - } - if (checkResult(result)) { - boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean(); - String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText(); - boolean finishFlag = false; - while (!finishFlag) { - JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId); - if (!checkResult(jobInstanceInfoResult)) { - break; - } - String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText(); - switch (jobInstanceStatus) { - case DinkyTaskConstants.STATUS_FINISHED: - final int exitStatusCode = mapStatusToExitCode(status); - // Use address-taskId as app id - setAppIds(String.format("%s-%s", address, taskId)); - setExitStatusCode(exitStatusCode); - logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS)); - finishFlag = true; + public void handle() throws TaskException { + try { + + String address = this.dinkyParameters.getAddress(); + String taskId = this.dinkyParameters.getTaskId(); + boolean isOnline = this.dinkyParameters.isOnline(); + JsonNode result; + if (isOnline) { + // Online dinky task, and only one job is allowed to execute + result = onlineTask(address, taskId); + } else { + // Submit dinky task + result = submitTask(address, taskId); + } + if (checkResult(result)) { + boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean(); + String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText(); + boolean finishFlag = false; + while (!finishFlag) { + JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId); + if (!checkResult(jobInstanceInfoResult)) { break; - case DinkyTaskConstants.STATUS_FAILED: - case DinkyTaskConstants.STATUS_CANCELED: - case DinkyTaskConstants.STATUS_UNKNOWN: - errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText()); - finishFlag = true; - break; - default: - Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS); + } + String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText(); + switch (jobInstanceStatus) { + case DinkyTaskConstants.STATUS_FINISHED: + final int exitStatusCode = mapStatusToExitCode(status); + // Use address-taskId as app id + setAppIds(String.format("%s-%s", address, taskId)); + setExitStatusCode(exitStatusCode); + logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS)); + finishFlag = true; + break; + case DinkyTaskConstants.STATUS_FAILED: + case DinkyTaskConstants.STATUS_CANCELED: + case DinkyTaskConstants.STATUS_UNKNOWN: + errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText()); + finishFlag = true; + break; + default: + Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS); + } } } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.error("Execute dinkyTask failed", ex); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException("Execute dinkyTask failed", ex); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java index 8176999e4c..2e3bece309 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java @@ -17,11 +17,10 @@ package org.apache.dolphinscheduler.plugin.task.dvc; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; - import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; @@ -30,6 +29,8 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils; import java.util.ArrayList; import java.util.List; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; + /** * shell task */ @@ -74,7 +75,7 @@ public class DvcTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // construct process String command = buildCommand(); @@ -83,10 +84,15 @@ public class DvcTask extends AbstractTaskExecutor { setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); parameters.dealOutParam(shellCommandExecutor.getVarPool()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current DvcTask has been interrupted", e); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException("The current DvcTask has been interrupted", e); } catch (Exception e) { logger.error("dvc task error", e); setExitStatusCode(EXIT_CODE_FAILURE); - throw e; + throw new TaskException("Execute dvc task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java index d747577b71..6f4fc95e48 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java @@ -17,12 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.emr; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; - -import java.util.HashSet; -import java.util.concurrent.TimeUnit; - import com.amazonaws.SdkBaseException; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; @@ -36,6 +30,12 @@ import com.amazonaws.services.elasticmapreduce.model.StepState; import com.amazonaws.services.elasticmapreduce.model.StepStatus; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Sets; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +import java.util.HashSet; +import java.util.concurrent.TimeUnit; /** * AddJobFlowSteps task executor @@ -62,7 +62,7 @@ public class EmrAddStepsTask extends AbstractEmrTask { } @Override - public void handle() throws InterruptedException { + public void handle() throws TaskException { StepStatus stepStatus = null; try { AddJobFlowStepsRequest addJobFlowStepsRequest = createAddJobFlowStepsRequest(); @@ -84,6 +84,9 @@ public class EmrAddStepsTask extends AbstractEmrTask { } catch (EmrTaskException | SdkBaseException e) { logger.error("emr task submit failed with error", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TaskException("Execute emr task failed", e); } finally { final int exitStatusCode = calculateExitStatusCode(stepStatus); setExitStatusCode(exitStatusCode); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java index ed42de2831..bc361f6538 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.plugin.task.emr; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import java.util.HashSet; @@ -55,7 +56,7 @@ public class EmrJobFlowTask extends AbstractEmrTask { } @Override - public void handle() throws InterruptedException { + public void handle() throws TaskException { ClusterStatus clusterStatus = null; try { RunJobFlowRequest runJobFlowRequest = createRunJobFlowRequest(); @@ -76,6 +77,9 @@ public class EmrJobFlowTask extends AbstractEmrTask { } catch (EmrTaskException | SdkBaseException e) { logger.error("emr task submit failed with error", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TaskException("Execute emr task failed", e); } finally { final int exitStatusCode = calculateExitStatusCode(clusterStatus); setExitStatusCode(exitStatusCode); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java index 47bb7e9b21..df17725cf2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.http; import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; @@ -89,7 +90,7 @@ public class HttpTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { long startTime = System.currentTimeMillis(); String formatTimeStamp = DateUtils.formatTimeStamp(startTime); String statusCode = null; @@ -108,7 +109,7 @@ public class HttpTask extends AbstractTaskExecutor { appendMessage(e.toString()); exitStatusCode = -1; logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e); - throw e; + throw new TaskException("Execute http task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java index 17f4946e19..ab135ac99e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java @@ -17,28 +17,27 @@ package org.apache.dolphinscheduler.plugin.task.jupyter; -import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.dolphinscheduler.spi.utils.PropertyUtils; -import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; -import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; - +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; +import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.spi.utils.DateUtils; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.PropertyUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import com.fasterxml.jackson.databind.ObjectMapper; - public class JupyterTask extends AbstractTaskExecutor { /** @@ -78,17 +77,22 @@ public class JupyterTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // SHELL task exit code TaskResponse response = shellCommandExecutor.run(buildCommand()); setExitStatusCode(response.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current Jupyter task has been interrupted", e); + setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + throw new TaskException("The current Jupyter task has been interrupted", e); } catch (Exception e) { logger.error("jupyter task execution failure", e); exitStatusCode = -1; - throw e; + throw new TaskException("Execute jupyter task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java index 6e49da322e..0c42a5c09b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java @@ -17,12 +17,11 @@ package org.apache.dolphinscheduler.plugin.task.mlflow; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; - import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -36,6 +35,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; + /** * shell task */ @@ -80,7 +81,7 @@ public class MlflowTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // construct process String command = buildCommand(); @@ -95,10 +96,15 @@ public class MlflowTask extends AbstractTaskExecutor { setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool()); - } catch (Exception e) { - logger.error("shell task error", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current Mlflow task has been interrupted", e); setExitStatusCode(EXIT_CODE_FAILURE); - throw e; + throw new TaskException("The current Mlflow task has been interrupted", e); + } catch (Exception e) { + logger.error("Mlflow task error", e); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException("Execute Mlflow task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java index 028eddb1c1..e5771d588d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java @@ -17,14 +17,14 @@ package org.apache.dolphinscheduler.plugin.task.pigeon; +import org.apache.commons.collections4.CollectionUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.collections4.CollectionUtils; import org.apache.http.HttpEntity; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; @@ -33,6 +33,8 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; import java.net.HttpURLConnection; import java.net.URI; @@ -42,9 +44,6 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; - /** * TIS DataX Task **/ @@ -74,7 +73,7 @@ public class PigeonTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { // Trigger PIGEON DataX pipeline logger.info("start execute PIGEON task"); long startTime = System.currentTimeMillis(); @@ -150,6 +149,7 @@ public class PigeonTask extends AbstractTaskExecutor { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } + throw new TaskException("Execute pigeon task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index fc886983e5..02e960ebdb 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -17,19 +17,16 @@ package org.apache.dolphinscheduler.plugin.task.procedure; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; - import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; -import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -44,6 +41,9 @@ import java.sql.Types; import java.util.HashMap; import java.util.Map; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; + /** * procedure task */ @@ -84,7 +84,7 @@ public class ProcedureTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}", procedureParameters.getType(), procedureParameters.getDatasource(), @@ -123,7 +123,7 @@ public class ProcedureTask extends AbstractTaskExecutor { } catch (Exception e) { setExitStatusCode(EXIT_CODE_FAILURE); logger.error("procedure task error", e); - throw e; + throw new TaskException("Execute procedure task failed", e); } finally { close(stmt, connection); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index 75dae471e4..6c18d206d9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -100,7 +100,7 @@ public class PythonTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // generate the content of this python script String pythonScriptContent = buildPythonScriptContent(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java index b738194d4e..3cd7c7c4f8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java @@ -65,7 +65,7 @@ public class PytorchTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { String command = buildPythonExecuteCommand(); TaskResponse taskResponse = shellCommandExecutor.run(command); @@ -73,9 +73,14 @@ public class PytorchTask extends AbstractTaskExecutor { setAppIds(taskResponse.getAppIds()); setProcessId(taskResponse.getProcessId()); setVarPool(shellCommandExecutor.getVarPool()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current Pytorch task has been interrupted", e); + setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + throw new TaskException("The current Pytorch task has been interrupted", e); } catch (Exception e) { setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); - throw e; + throw new TaskException("Pytorch task execute failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java index 1431da1e92..2543e60614 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java @@ -24,6 +24,7 @@ import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_G import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; @@ -80,13 +81,13 @@ public class SagemakerTask extends AbstractTaskExecutor { } @Override - public void handle() throws SagemakerTaskException { + public void handle() throws TaskException { try { int exitStatusCode = handleStartPipeline(); setExitStatusCode(exitStatusCode); } catch (Exception e) { setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); - throw new SagemakerTaskException("SageMaker task error", e); + throw new TaskException("SageMaker task error", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index b44bc6eeac..cd3d8ebb22 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -17,12 +17,12 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS; - +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.BooleanUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -30,9 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.BooleanUtils; - import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -42,6 +39,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS; + /** * seatunnel task */ @@ -85,7 +85,7 @@ public class SeatunnelTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // construct process String command = buildCommand(); @@ -94,10 +94,15 @@ public class SeatunnelTask extends AbstractTaskExecutor { setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current SeaTunnel task has been interrupted", e); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException("The current SeaTunnel task has been interrupted", e); } catch (Exception e) { logger.error("SeaTunnel task error", e); setExitStatusCode(EXIT_CODE_FAILURE); - throw e; + throw new TaskException("Execute Seatunnel task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index f333e617b5..7e20bed634 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -17,12 +17,11 @@ package org.apache.dolphinscheduler.plugin.task.shell; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; - +import org.apache.commons.lang3.SystemUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -31,8 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.lang3.SystemUtils; - import java.io.File; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; @@ -44,6 +41,9 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.Map; import java.util.Set; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; + /** * shell task */ @@ -90,7 +90,7 @@ public class ShellTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { // construct process String command = buildCommand(); @@ -99,10 +99,15 @@ public class ShellTask extends AbstractTaskExecutor { setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); shellParameters.dealOutParam(shellCommandExecutor.getVarPool()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("The current Shell task has been interrupted", e); + setExitStatusCode(EXIT_CODE_FAILURE); + throw new TaskException("The current Shell task has been interrupted", e); } catch (Exception e) { logger.error("shell task error", e); setExitStatusCode(EXIT_CODE_FAILURE); - throw e; + throw new TaskException("Execute shell task error", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 6e4e6f14c9..f884f46e5e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.plugin.task.sql; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.collections4.CollectionUtils; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; @@ -39,8 +42,7 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.collections4.CollectionUtils; +import org.slf4j.Logger; import java.sql.Connection; import java.sql.PreparedStatement; @@ -58,11 +60,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.slf4j.Logger; - -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - public class SqlTask extends AbstractTaskExecutor { /** @@ -117,7 +114,7 @@ public class SqlTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { logger.info("Full sql parameters: {}", sqlParameters); logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", sqlParameters.getType(), @@ -163,7 +160,7 @@ public class SqlTask extends AbstractTaskExecutor { } catch (Exception e) { setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); logger.error("sql task error", e); - throw e; + throw new TaskException("Execute sql task failed", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index f46b7ce56c..f51b352d9c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -17,13 +17,15 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin; +import com.fasterxml.jackson.databind.ObjectMapper; +import kong.unirest.Unirest; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; - import org.apache.zeppelin.client.ClientConfig; import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.ParagraphResult; @@ -34,10 +36,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import kong.unirest.Unirest; - -import com.fasterxml.jackson.databind.ObjectMapper; - public class ZeppelinTask extends AbstractTaskExecutor { /** @@ -77,7 +75,7 @@ public class ZeppelinTask extends AbstractTaskExecutor { } @Override - public void handle() throws Exception { + public void handle() throws TaskException { try { final String paragraphId = this.zeppelinParameters.getParagraphId(); final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory(); @@ -142,6 +140,7 @@ public class ZeppelinTask extends AbstractTaskExecutor { } catch (Exception e) { setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); logger.error("zeppelin task submit failed with error", e); + throw new TaskException("Execute ZeppelinTask exception"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index 397d68347e..ff70ba5b45 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -28,6 +28,7 @@ import static org.powermock.api.mockito.PowerMockito.spy; import static org.powermock.api.mockito.PowerMockito.when; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; @@ -127,10 +128,10 @@ public class ZeppelinTaskTest { Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode()); } - @Test + @Test(expected = TaskException.class) public void testHandleWithParagraphExecutionException() throws Exception { when(this.zClient.executeParagraph(any(), any(), any(Map.class))). - thenThrow(new Exception("Something wrong happens from zeppelin side")); + thenThrow(new TaskException("Something wrong happens from zeppelin side")); // when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR); this.zeppelinTask.handle(); Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java index 17af3e9a47..4bb0c92124 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java @@ -17,14 +17,14 @@ package org.apache.dolphinscheduler.server.worker.processor; -import org.apache.dolphinscheduler.common.Constants; +import com.google.common.base.Preconditions; +import io.micrometer.core.annotation.Counted; +import io.micrometer.core.annotation.Timed; +import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -36,26 +36,16 @@ import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; -import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; - -import org.apache.commons.lang.SystemUtils; - -import java.util.Date; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.google.common.base.Preconditions; - -import io.micrometer.core.annotation.Counted; -import io.micrometer.core.annotation.Timed; -import io.netty.channel.Channel; - /** * Used to handle {@link CommandType#TASK_DISPATCH_REQUEST} */ @@ -104,7 +94,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { logger.error("task execute request command content is null"); return; } - final String masterAddress = taskDispatchCommand.getMessageSenderAddress(); + final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress(); logger.info("task execute request message: {}", taskDispatchCommand); TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext(); @@ -114,111 +104,39 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { return; } try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); - // set cache, it will be used when kill task TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); - - // todo custom logger - taskExecutionContext.setHost(workerConfig.getWorkerAddress()); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); - if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { - boolean osUserExistFlag; - // if Using distributed is true and Currently supported systems are linux,Should not let it - // automatically - // create tenants,so TenantAutoCreate has no effect - if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) { - // use the id command to judge in linux - osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode()); - } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { - // if not exists this user, then create - OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); - osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); - } else { - osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); - } - - // check if the OS user exists - if (!osUserExistFlag) { - logger.error("tenantCode: {} does not exist, taskInstanceId: {}", - taskExecutionContext.getTenantCode(), - taskExecutionContext.getTaskInstanceId()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); - taskExecutionContext.setEndTime(new Date()); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, - masterAddress, - CommandType.TASK_EXECUTE_RESULT); - return; - } - - // local execute path - String execLocalPath = getExecLocalPath(taskExecutionContext); - logger.info("task instance local execute path : {}", execLocalPath); - taskExecutionContext.setExecutePath(execLocalPath); - - try { - FileUtils.createWorkDirIfAbsent(execLocalPath); - } catch (Throwable ex) { - logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", - execLocalPath, - taskExecutionContext.getTaskInstanceId(), - ex); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, - masterAddress, - CommandType.TASK_EXECUTE_RESULT); - return; - } - } - // delay task process - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getDelayTime() * 60L); + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); if (remainTime > 0) { - logger.info("delay the execution of task instance {}, delay time: {} s", - taskExecutionContext.getTaskInstanceId(), - remainTime); + logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - taskExecutionContext.setStartTime(null); - workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); + workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT); } + WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory( + taskExecutionContext, + workerConfig, + workflowMasterAddress, + workerMessageSender, + alertClientService, + taskPluginManager, + storageOperate) + .createWorkerTaskExecuteRunnable(); // submit task to manager - boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, - masterAddress, - workerMessageSender, - alertClientService, - taskPluginManager, - storageOperate)); + boolean offer = workerManager.offer(workerTaskExecuteRunnable); if (!offer) { - logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}", - workerManager.getWaitSubmitQueueSize(), - taskExecutionContext.getTaskInstanceId()); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT); + logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize()); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT); } } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } - /** - * get execute local path - * - * @param taskExecutionContext taskExecutionContext - * @return execute local path - */ - private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { - return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index f4f85277df..fd7e1d726c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,6 +17,11 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; @@ -34,25 +39,17 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; -import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; import org.apache.dolphinscheduler.service.log.LogClientService; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; /** * task kill processor @@ -161,12 +158,12 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param taskInstanceId */ protected void cancelApplication(int taskInstanceId) { - TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId); - if (taskExecuteThread == null) { + WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId); + if (workerTaskExecuteRunnable == null) { logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId); return; } - AbstractTask task = taskExecuteThread.getTask(); + AbstractTask task = workerTaskExecuteRunnable.getTask(); if (task == null) { logger.warn("task not found, taskInstanceId:{}", taskInstanceId); return; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java index 5401c66859..899ac7d9b7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java @@ -17,6 +17,10 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.google.common.base.Preconditions; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -27,20 +31,13 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskSavePointResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; - +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.google.common.base.Preconditions; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; - /** * task save point processor */ @@ -98,12 +95,12 @@ public class TaskSavePointProcessor implements NettyRequestProcessor { } protected void doSavePoint(int taskInstanceId) { - TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId); - if (taskExecuteThread == null) { + WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId); + if (workerTaskExecuteRunnable == null) { logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId); return; } - AbstractTask task = taskExecuteThread.getTask(); + AbstractTask task = workerTaskExecuteRunnable.getTask(); if (task == null) { logger.warn("task not found, taskInstanceId:{}", taskInstanceId); return; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java new file mode 100644 index 0000000000..1b63b063e8 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import lombok.NonNull; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; + +import javax.annotation.Nullable; + +public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable { + + public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull String workflowMaster, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull AlertClientService alertClientService, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate) { + super(taskExecutionContext, workerConfig, workflowMaster, workerMessageSender, alertClientService, taskPluginManager, storageOperate); + } + + @Override + public void executeTask() throws TaskException { + if (task == null) { + throw new TaskException("The task plugin instance is not initialized"); + } + task.handle(); + } + + @Override + protected void afterExecute() { + super.afterExecute(); + } + + @Override + protected void afterThrowing(Throwable throwable) throws TaskException { + super.afterThrowing(throwable); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java new file mode 100644 index 0000000000..53de57ec68 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import lombok.NonNull; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; + +import javax.annotation.Nullable; + +public class DefaultWorkerDelayTaskExecuteRunnableFactory extends WorkerDelayTaskExecuteRunnableFactory { + + protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull String workflowMasterAddress, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull AlertClientService alertClientService, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate) { + super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate); + } + + @Override + public DefaultWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() { + return new DefaultWorkerDelayTaskExecuteRunnable( + taskExecutionContext, + workerConfig, + workflowMasterAddress, + workerMessageSender, + alertClientService, + taskPluginManager, + storageOperate); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java deleted file mode 100644 index 9b7e8c7e85..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import com.google.common.base.Strings; -import lombok.NonNull; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException; -import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; -import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; -import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.server.utils.ProcessUtils; -import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; -import org.apache.dolphinscheduler.service.alert.AlertClientService; -import org.apache.dolphinscheduler.service.exceptions.ServiceException; -import org.apache.dolphinscheduler.service.task.TaskPluginManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Paths; -import java.util.*; -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; - -/** - * task scheduler thread - */ -public class TaskExecuteThread implements Runnable, Delayed { - - /** - * logger - */ - private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class); - - /** - * task instance - */ - private final TaskExecutionContext taskExecutionContext; - - private final String masterAddress; - - private final StorageOperate storageOperate; - - /** - * abstract task - */ - private AbstractTask task; - - /** - * task callback service - */ - private final WorkerMessageSender workerMessageSender; - - /** - * alert client server - */ - private final AlertClientService alertClientService; - - private TaskPluginManager taskPluginManager; - - /** - * constructor - * - * @param taskExecutionContext taskExecutionContext - * @param workerMessageSender used for worker send message to master - */ - public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull String masterAddress, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull AlertClientService alertClientService, - StorageOperate storageOperate) { - this.taskExecutionContext = taskExecutionContext; - this.masterAddress = masterAddress; - this.workerMessageSender = workerMessageSender; - this.alertClientService = alertClientService; - this.storageOperate = storageOperate; - } - - public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull String masterAddress, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull AlertClientService alertClientService, - @NonNull TaskPluginManager taskPluginManager, - StorageOperate storageOperate) { - this.taskExecutionContext = taskExecutionContext; - this.masterAddress = masterAddress; - this.workerMessageSender = workerMessageSender; - this.alertClientService = alertClientService; - this.taskPluginManager = taskPluginManager; - this.storageOperate = storageOperate; - } - - @Override - public void run() { - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); - taskExecutionContext.setStartTime(new Date()); - taskExecutionContext.setEndTime(new Date()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, - masterAddress, - CommandType.TASK_EXECUTE_RESULT); - logger.info("Task dry run success"); - return; - } - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - logger.info("script path : {}", taskExecutionContext.getExecutePath()); - if (taskExecutionContext.getStartTime() == null) { - taskExecutionContext.setStartTime(new Date()); - } - logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); - - // callback task execute running - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, - masterAddress, - CommandType.TASK_EXECUTE_RUNNING); - - // copy hdfs/minio file to local - List> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), - taskExecutionContext.getResources()); - if (!fileDownloads.isEmpty()) { - downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads); - } - - taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); - - taskExecutionContext.setTaskAppId(String.format("%s_%s", - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())); - - TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); - if (null == taskChannel) { - throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", - taskExecutionContext.getTaskType())); - } - String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setTaskLogName(taskLogName); - - // set the name of the current thread - Thread.currentThread().setName(taskLogName); - - task = taskChannel.createTask(taskExecutionContext); - - // task init - this.task.init(); - - // init varPool - this.task.getParameters().setVarPool(taskExecutionContext.getVarPool()); - - // task handle - this.task.handle(); - - // task result process - if (this.task.getNeedAlert()) { - sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus()); - } - - taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus()); - taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); - taskExecutionContext.setProcessId(this.task.getProcessId()); - taskExecutionContext.setAppIds(this.task.getAppIds()); - taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); - logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), - this.task.getExitStatus()); - } catch (Throwable e) { - logger.error("task scheduler failure", e); - kill(); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); - taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); - taskExecutionContext.setProcessId(this.task.getProcessId()); - taskExecutionContext.setAppIds(this.task.getAppIds()); - } finally { - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, - masterAddress, - CommandType.TASK_EXECUTE_RESULT); - clearTaskExecPath(); - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } - - private void sendAlert(TaskAlertInfo taskAlertInfo, TaskExecutionStatus status) { - int strategy = - status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode(); - alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), - taskAlertInfo.getContent(), strategy); - } - - /** - * when task finish, clear execute path. - */ - private void clearTaskExecPath() { - logger.info("develop mode is: {}", CommonUtils.isDevelopMode()); - - if (!CommonUtils.isDevelopMode()) { - // get exec dir - String execLocalPath = taskExecutionContext.getExecutePath(); - - if (Strings.isNullOrEmpty(execLocalPath)) { - logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName()); - return; - } - - if (SINGLE_SLASH.equals(execLocalPath)) { - logger.warn("task: {} exec local path is '/', direct deletion is not allowed", - taskExecutionContext.getTaskName()); - return; - } - - try { - org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath)); - logger.info("exec local path: {} cleared.", execLocalPath); - } catch (IOException e) { - if (e instanceof NoSuchFileException) { - // this is expected - } else { - logger.error("Delete exec dir failed.", e); - } - } - } - } - - /** - * kill task - */ - public void kill() { - if (task != null) { - try { - task.cancelApplication(true); - ProcessUtils.killYarnJob(taskExecutionContext); - } catch (Exception e) { - logger.error("Kill task failed", e); - } - } - } - - /** - * download resource file - * - * @param execLocalPath execLocalPath - * @param fileDownloads projectRes - * @param logger logger - */ - public void downloadResource(String execLocalPath, Logger logger, List> fileDownloads) { - for (Pair fileDownload : fileDownloads) { - try { - // query the tenant code of the resource according to the name of the resource - String fullName = fileDownload.getLeft(); - String tenantCode = fileDownload.getRight(); - String resPath = storageOperate.getResourceFileName(tenantCode, fullName); - logger.info("get resource file from path:{}", resPath); - long resourceDownloadStartTime = System.currentTimeMillis(); - storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true); - WorkerServerMetrics - .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime); - WorkerServerMetrics.recordWorkerResourceDownloadSize( - Files.size(Paths.get(execLocalPath, fullName))); - WorkerServerMetrics.incWorkerResourceDownloadSuccessCount(); - } catch (Exception e) { - WorkerServerMetrics.incWorkerResourceDownloadFailureCount(); - logger.error(e.getMessage(), e); - throw new ServiceException(e.getMessage()); - } - } - } - - /** - * download resource check - * - * @param execLocalPath - * @param projectRes - * @return - */ - public List> downloadCheck(String execLocalPath, Map projectRes) { - if (MapUtils.isEmpty(projectRes)) { - return Collections.emptyList(); - } - List> downloadFile = new ArrayList<>(); - projectRes.forEach((key, value) -> { - File resFile = new File(execLocalPath, key); - boolean notExist = !resFile.exists(); - if (notExist) { - downloadFile.add(Pair.of(key, value)); - } else { - logger.info("file : {} exists ", resFile.getName()); - } - }); - if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) { - throw new StorageOperateNoConfiguredException("Storage service config does not exist!"); - } - return downloadFile; - } - - /** - * get current TaskExecutionContext - * - * @return TaskExecutionContext - */ - public TaskExecutionContext getTaskExecutionContext() { - return this.taskExecutionContext; - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (o == null) { - return 1; - } - return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); - } - - public AbstractTask getTask() { - return task; - } -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java new file mode 100644 index 0000000000..73e14c5132 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import lombok.NonNull; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; + +import javax.annotation.Nullable; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRunnable implements Delayed { + + protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull String masterAddress, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull AlertClientService alertClientService, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate) { + super(taskExecutionContext, workerConfig, masterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate); + } + + @Override + public long getDelay(TimeUnit unit) { + TaskExecutionContext taskExecutionContext = getTaskExecutionContext(); + return unit.convert( + DateUtils.getRemainTime( + taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (o == null) { + return 1; + } + return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java new file mode 100644 index 0000000000..44bb8878d4 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import lombok.NonNull; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; + +import javax.annotation.Nullable; + +public abstract class WorkerDelayTaskExecuteRunnableFactory implements WorkerTaskExecuteRunnableFactory { + + protected final @NonNull TaskExecutionContext taskExecutionContext; + protected final @NonNull WorkerConfig workerConfig; + protected final @NonNull String workflowMasterAddress; + protected final @NonNull WorkerMessageSender workerMessageSender; + protected final @NonNull AlertClientService alertClientService; + protected final @NonNull TaskPluginManager taskPluginManager; + protected final @Nullable StorageOperate storageOperate; + + protected WorkerDelayTaskExecuteRunnableFactory( + @NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull String workflowMasterAddress, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull AlertClientService alertClientService, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate) { + this.taskExecutionContext = taskExecutionContext; + this.workerConfig = workerConfig; + this.workflowMasterAddress = workflowMasterAddress; + this.workerMessageSender = workerMessageSender; + this.alertClientService = alertClientService; + this.taskPluginManager = taskPluginManager; + this.storageOperate = storageOperate; + } + + + public abstract T createWorkerTaskExecuteRunnable(); +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java index 46ecfd0d5b..3bdf9534a1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java @@ -50,17 +50,17 @@ public class WorkerExecService { /** * running task */ - private final ConcurrentHashMap taskExecuteThreadMap; + private final ConcurrentHashMap taskExecuteThreadMap; public WorkerExecService(ExecutorService execService, - ConcurrentHashMap taskExecuteThreadMap) { + ConcurrentHashMap taskExecuteThreadMap) { this.execService = execService; this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); this.taskExecuteThreadMap = taskExecuteThreadMap; WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size); } - public void submit(TaskExecuteThread taskExecuteThread) { + public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) { taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread); ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread); FutureCallback futureCallback = new FutureCallback() { @@ -91,7 +91,7 @@ public class WorkerExecService { return ((ThreadPoolExecutor) this.execService).getQueue().size(); } - public Map getTaskExecuteThreadMap() { + public Map getTaskExecuteThreadMap() { return taskExecuteThreadMap; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 2466700da1..41824a9a44 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -19,17 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; -import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.concurrent.BlockingQueue; +import javax.annotation.Nullable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; @@ -41,31 +39,16 @@ public class WorkerManagerThread implements Runnable { private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class); - /** - * task queue - */ - private final BlockingQueue waitSubmitQueue; + private final DelayQueue waitSubmitQueue; - @Autowired(required = false) - private StorageOperate storageOperate; - - /** - * thread executor service - */ private final WorkerExecService workerExecService; - /** - * task callback service - */ - @Autowired - private WorkerMessageSender workerMessageSender; - - private volatile int workerExecThreads; + private final int workerExecThreads; /** * running task */ - private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); public WorkerManagerThread(WorkerConfig workerConfig) { workerExecThreads = workerConfig.getExecThreads(); @@ -75,8 +58,8 @@ public class WorkerManagerThread implements Runnable { taskExecuteThreadMap); } - public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) { - return this.taskExecuteThreadMap.get(taskInstanceId); + public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer taskInstanceId) { + return taskExecuteThreadMap.get(taskInstanceId); } /** @@ -94,7 +77,7 @@ public class WorkerManagerThread implements Runnable { * @return queue size */ public int getThreadPoolQueueSize() { - return this.workerExecService.getThreadPoolQueueSize(); + return workerExecService.getThreadPoolQueueSize(); } /** @@ -108,13 +91,7 @@ public class WorkerManagerThread implements Runnable { .forEach(waitSubmitQueue::remove); } - /** - * submit task - * - * @param taskExecuteThread taskExecuteThread - * @return submit result - */ - public boolean offer(TaskExecuteThread taskExecuteThread) { + public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) { if (waitSubmitQueue.size() > workerExecThreads) { WorkerServerMetrics.incWorkerSubmitQueueIsFullCount(); // if waitSubmitQueue is full, it will wait 1s, then try add @@ -123,7 +100,7 @@ public class WorkerManagerThread implements Runnable { return false; } } - return waitSubmitQueue.offer(taskExecuteThread); + return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable); } public void start() { @@ -137,15 +114,14 @@ public class WorkerManagerThread implements Runnable { @Override public void run() { Thread.currentThread().setName("Worker-Execute-Manager-Thread"); - TaskExecuteThread taskExecuteThread; while (!ServerLifeCycleManager.isStopped()) { try { if (!ServerLifeCycleManager.isRunning()) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } if (this.getThreadPoolQueueSize() <= workerExecThreads) { - taskExecuteThread = waitSubmitQueue.take(); - workerExecService.submit(taskExecuteThread); + final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take(); + workerExecService.submit(workerDelayTaskExecuteRunnable); } else { WorkerServerMetrics.incWorkerOverloadCount(); logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", @@ -161,7 +137,17 @@ public class WorkerManagerThread implements Runnable { public void clearTask() { waitSubmitQueue.clear(); - workerExecService.getTaskExecuteThreadMap().values().forEach(TaskExecuteThread::kill); + workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> { + int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(); + try { + workerTaskExecuteRunnable.cancelTask(); + logger.info("Cancel the taskInstance in worker {}", taskInstanceId); + } catch (Exception ex) { + logger.error("Cancel the taskInstance error {}", taskInstanceId, ex); + } finally { + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); + } + }); workerExecService.getTaskExecuteThreadMap().clear(); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java new file mode 100644 index 0000000000..bd8beafd6b --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import com.google.common.base.Strings; +import lombok.NonNull; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Date; + +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; + +public abstract class WorkerTaskExecuteRunnable implements Runnable { + + protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class)); + + protected final TaskExecutionContext taskExecutionContext; + protected final WorkerConfig workerConfig; + protected final String masterAddress; + protected final WorkerMessageSender workerMessageSender; + protected final AlertClientService alertClientService; + protected final TaskPluginManager taskPluginManager; + protected final @Nullable StorageOperate storageOperate; + + protected @Nullable AbstractTask task; + + protected WorkerTaskExecuteRunnable( + @NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull String masterAddress, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull AlertClientService alertClientService, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate) { + this.taskExecutionContext = taskExecutionContext; + this.workerConfig = workerConfig; + this.masterAddress = masterAddress; + this.workerMessageSender = workerMessageSender; + this.alertClientService = alertClientService; + this.taskPluginManager = taskPluginManager; + this.storageOperate = storageOperate; + String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setTaskLogName(taskLogName); + logger.info("Set task logger name: {}", taskLogName); + } + + protected abstract void executeTask(); + + protected void afterExecute() throws TaskException { + if (task == null) { + throw new TaskException("The current task instance is null"); + } + sendAlertIfNeeded(); + + sendTaskResult(); + + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + logger.info("Remove the current task execute context from worker cache"); + clearTaskExecPathIfNeeded(); + } + + protected void afterThrowing(Throwable throwable) throws TaskException { + cancelTask(); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); + taskExecutionContext.setEndTime(new Date()); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); + logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE); + } + + public void cancelTask() { + // cancel the task + if (task != null) { + try { + task.cancelApplication(true); + ProcessUtils.killYarnJob(taskExecutionContext); + } catch (Exception e) { + logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e); + } + } + } + + @Override + public void run() { + try { + // set the thread name to make sure the log be written to the task log file + Thread.currentThread().setName(taskExecutionContext.getTaskLogName()); + + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + logger.info("Begin to pulling task"); + + initializeTask(); + + if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); + taskExecutionContext.setEndTime(new Date()); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); + logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); + return; + } + + beforeExecute(); + + executeTask(); + + afterExecute(); + + } catch (Throwable ex) { + logger.error("Task execute failed, due to meet an exception", ex); + afterThrowing(ex); + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } + + protected void initializeTask() { + logger.info("Begin to initialize task"); + + Date taskStartTime = new Date(); + taskExecutionContext.setStartTime(taskStartTime); + logger.info("Set task startTime: {}", taskStartTime); + + String systemEnvPath = CommonUtils.getSystemEnvPath(); + taskExecutionContext.setEnvFile(systemEnvPath); + logger.info("Set task envFile: {}", systemEnvPath); + + String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setTaskAppId(taskAppId); + logger.info("Set task appId: {}", taskAppId); + + logger.info("End initialize task"); + } + + protected void beforeExecute() { + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING); + logger.info("Set task status to {}", TaskExecutionStatus.RUNNING_EXECUTION); + + TaskExecutionCheckerUtils.checkTenantExist(workerConfig, taskExecutionContext); + logger.info("TenantCode:{} check success", taskExecutionContext.getTenantCode()); + + TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(taskExecutionContext); + logger.info("ProcessExecDir:{} check success", taskExecutionContext.getExecutePath()); + + TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger); + logger.info("Resources:{} check success", taskExecutionContext.getResources()); + + TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); + if (null == taskChannel) { + throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType())); + } + task = taskChannel.createTask(taskExecutionContext); + if (task == null) { + throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType())); + } + logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType()); + + task.init(); + logger.info("Success initialized task plugin instance success"); + + task.getParameters().setVarPool(taskExecutionContext.getVarPool()); + logger.info("Success set taskVarPool: {}", taskExecutionContext.getVarPool()); + + } + + protected void sendAlertIfNeeded() { + if (!task.getNeedAlert()) { + return; + } + logger.info("The current task need to send alert, begin to send alert"); + TaskExecutionStatus status = task.getExitStatus(); + TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo(); + int strategy = status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode(); + alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy); + logger.info("Success send alert"); + } + + protected void sendTaskResult() { + taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus()); + taskExecutionContext.setEndTime(new Date()); + taskExecutionContext.setProcessId(task.getProcessId()); + taskExecutionContext.setAppIds(task.getAppIds()); + taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool())); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); + + logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus()); + } + + protected void clearTaskExecPathIfNeeded() { + + String execLocalPath = taskExecutionContext.getExecutePath(); + if (!CommonUtils.isDevelopMode()) { + logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", execLocalPath); + // get exec dir + if (Strings.isNullOrEmpty(execLocalPath)) { + logger.warn("The task execute file is {} no need to clear", taskExecutionContext.getTaskName()); + return; + } + + if (SINGLE_SLASH.equals(execLocalPath)) { + logger.warn("The task execute file is '/', direct deletion is not allowed"); + return; + } + + try { + org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath)); + logger.info("Success clear the task execute file: {}", execLocalPath); + } catch (IOException e) { + if (e instanceof NoSuchFileException) { + // this is expected + } else { + logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", execLocalPath, e); + } + } + } else { + logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", execLocalPath); + } + } + + public @NonNull TaskExecutionContext getTaskExecutionContext() { + return taskExecutionContext; + } + + public @Nullable AbstractTask getTask() { + return task; + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java new file mode 100644 index 0000000000..441662f4bc --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +public interface WorkerTaskExecuteRunnableFactory { + + T createWorkerTaskExecuteRunnable(); +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java new file mode 100644 index 0000000000..f3edfd4c1d --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import lombok.NonNull; +import lombok.experimental.UtilityClass; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; + +import javax.annotation.Nullable; + +@UtilityClass +public class WorkerTaskExecuteRunnableFactoryBuilder { + + public static WorkerDelayTaskExecuteRunnableFactory createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull String workflowMasterAddress, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull AlertClientService alertClientService, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate) { + return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext, + workerConfig, + workflowMasterAddress, + workerMessageSender, + alertClientService, + taskPluginManager, + storageOperate); + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java new file mode 100644 index 0000000000..50ac13f331 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.utils; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.SystemUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; +import org.slf4j.Logger; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TaskExecutionCheckerUtils { + + public static void checkTenantExist(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) { + try { + boolean osUserExistFlag; + // if Using distributed is true and Currently supported systems are linux,Should not let it + // automatically + // create tenants,so TenantAutoCreate has no effect + if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) { + // use the id command to judge in linux + osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode()); + } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { + // if not exists this user, then create + OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); + osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); + } else { + osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); + } + if (!osUserExistFlag) { + throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode())); + } + } catch (TaskException ex) { + throw ex; + } catch (Exception ex) { + throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode())); + } + } + + public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException { + try { + // local execute path + String execLocalPath = FileUtils.getProcessExecDir( + taskExecutionContext.getProjectCode(), + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setExecutePath(execLocalPath); + FileUtils.createWorkDirIfAbsent(execLocalPath); + } catch (Throwable ex) { + throw new TaskException("Cannot create process execute dir", ex); + } + } + + public static void downloadResourcesIfNeeded(StorageOperate storageOperate, TaskExecutionContext taskExecutionContext, Logger logger) { + String execLocalPath = taskExecutionContext.getExecutePath(); + Map projectRes = taskExecutionContext.getResources(); + if (MapUtils.isEmpty(projectRes)) { + return; + } + List> downloadFiles = new ArrayList<>(); + projectRes.forEach((key, value) -> { + File resFile = new File(execLocalPath, key); + boolean notExist = !resFile.exists(); + if (notExist) { + downloadFiles.add(Pair.of(key, value)); + } else { + logger.info("file : {} exists ", resFile.getName()); + } + }); + if (!downloadFiles.isEmpty() && !PropertyUtils.getResUploadStartupState()) { + throw new StorageOperateNoConfiguredException("Storage service config does not exist!"); + } + + if (CollectionUtils.isNotEmpty(downloadFiles)) { + for (Pair fileDownload : downloadFiles) { + try { + // query the tenant code of the resource according to the name of the resource + String fullName = fileDownload.getLeft(); + String tenantCode = fileDownload.getRight(); + String resPath = storageOperate.getResourceFileName(tenantCode, fullName); + logger.info("get resource file from path:{}", resPath); + long resourceDownloadStartTime = System.currentTimeMillis(); + storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true); + WorkerServerMetrics + .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime); + WorkerServerMetrics.recordWorkerResourceDownloadSize( + Files.size(Paths.get(execLocalPath, fullName))); + WorkerServerMetrics.incWorkerResourceDownloadSuccessCount(); + } catch (Exception e) { + WorkerServerMetrics.incWorkerResourceDownloadFailureCount(); + throw new TaskException(String.format("Download resource file: %s error", fileDownload), e); + } + } + } + } +} diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java index e66563c0b4..0bf8a26c45 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java @@ -17,156 +17,76 @@ package org.apache.dolphinscheduler.server.worker.processor; +import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; -import org.apache.dolphinscheduler.remote.utils.ChannelUtils; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; -import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; - -import java.util.Date; -import java.util.concurrent.ExecutorService; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.util.Date; /** * test task execute processor */ @RunWith(PowerMockRunner.class) -@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class, - JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) -@Ignore public class TaskDispatchProcessorTest { - private TaskExecutionContext taskExecutionContext; - - private WorkerMessageSender workerMessageSender; - - private ExecutorService workerExecService; - - private StorageOperate storageOperate; + @InjectMocks + private TaskDispatchProcessor taskDispatchProcessor; + @Mock private WorkerConfig workerConfig; - private Command command; - - private Command ackCommand; - - private TaskDispatchCommand taskRequestCommand; + @Mock + private WorkerMessageSender workerMessageSender; + @Mock private AlertClientService alertClientService; - private WorkerManagerThread workerManager; + @Mock + private TaskPluginManager taskPluginManager; - @Before - public void before() throws Exception { - // init task execution context - taskExecutionContext = getTaskExecutionContext(); - workerConfig = new WorkerConfig(); - workerConfig.setExecThreads(1); - workerConfig.setListenPort(1234); - command = new Command(); - command.setType(CommandType.TASK_DISPATCH_REQUEST); - ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234", - "127.0.0.1:5678", - System.currentTimeMillis()).convert2Command(); - taskRequestCommand = new TaskDispatchCommand(taskExecutionContext, - "127.0.0.1:5678", - "127.0.0.1:1234", - System.currentTimeMillis()); - alertClientService = PowerMockito.mock(AlertClientService.class); - workerExecService = PowerMockito.mock(ExecutorService.class); - PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null); + @Mock + private WorkerManagerThread workerManagerThread; - PowerMockito.mockStatic(ChannelUtils.class); - PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null); - - workerMessageSender = PowerMockito.mock(WorkerMessageSender.class); - - PowerMockito.mockStatic(SpringApplicationContext.class); - PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender); - PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig); - - workerManager = PowerMockito.mock(WorkerManagerThread.class); - - storageOperate = PowerMockito.mock(StorageOperate.class); - PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, - "127.0.0.1:5678", - workerMessageSender, - alertClientService, - storageOperate))).thenReturn(Boolean.TRUE); - - PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager); - - PowerMockito.mockStatic(ThreadUtils.class); - PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", - workerConfig.getExecThreads())).thenReturn( - workerExecService); - - PowerMockito.mockStatic(JsonSerializer.class); - PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn( - taskRequestCommand); - - PowerMockito.mockStatic(JSONUtils.class); - PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn( - taskRequestCommand); - - PowerMockito.mockStatic(FileUtils.class); - PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())).thenReturn( - taskExecutionContext.getExecutePath()); - PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath()); - - SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(), - workerMessageSender, - "127.0.0.1:5678", - LoggerFactory.getLogger( - TaskDispatchProcessorTest.class), - alertClientService, - storageOperate); - PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread); - } + @Mock + private StorageOperate storageOperate; @Test - public void testNormalExecution() { - TaskDispatchProcessor processor = new TaskDispatchProcessor(); - processor.process(null, command); + public void process() { + Channel channel = Mockito.mock(Channel.class); + TaskChannel taskChannel = Mockito.mock(TaskChannel.class); + Mockito.when(taskPluginManager.getTaskChannel(Mockito.anyString())).thenReturn(taskChannel); - Assert.assertEquals(TaskExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + TaskExecutionContext taskExecutionContext = getTaskExecutionContext(); + Command dispatchCommand = createDispatchCommand(taskExecutionContext); + taskDispatchProcessor.process(channel, dispatchCommand); + + Mockito.verify(workerManagerThread, Mockito.atMostOnce()).offer(Mockito.any()); + Mockito.verify(workerMessageSender, Mockito.never()).sendMessageWithRetry(taskExecutionContext, "localhost:5678", CommandType.TASK_REJECT); } - @Test - public void testDelayExecution() { - taskExecutionContext.setDelayTime(1); - TaskDispatchProcessor processor = new TaskDispatchProcessor(); - processor.process(null, command); - Assert.assertEquals(TaskExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + public Command createDispatchCommand(TaskExecutionContext taskExecutionContext) { + return new TaskDispatchCommand( + taskExecutionContext, + "localhost:5678", + "localhost:1234", + System.currentTimeMillis() + ).convert2Command(); } public TaskExecutionContext getTaskExecutionContext() { @@ -184,21 +104,4 @@ public class TaskDispatchProcessorTest { taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); return taskExecutionContext; } - - private static class SimpleTaskExecuteThread extends TaskExecuteThread { - - public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, - WorkerMessageSender workerMessageSender, - String masterAddress, - Logger taskLogger, - AlertClientService alertClientService, - StorageOperate storageOperate) { - super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate); - } - - @Override - public void run() { - // - } - } } diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java new file mode 100644 index 0000000000..39f03119ed --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.mockito.Mockito; + +import java.util.Date; + +public class DefaultWorkerDelayTaskExecuteRunnableTest { + + private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + + private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class); + + private String masterAddress = "localhost:5678"; + + private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class); + + private AlertClientService alertClientService = Mockito.mock(AlertClientService.class); + + private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class); + + private StorageOperate storageOperate = Mockito.mock(StorageOperate.class); + + @Test + public void testDryRun() { + TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder() + .dryRun(Constants.DRY_RUN_FLAG_YES) + .taskInstanceId(0) + .processDefineId(0) + .firstSubmitTime(new Date()) + .taskLogName("TestLogName") + .build(); + WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable( + taskExecutionContext, + workerConfig, + masterAddress, + workerMessageSender, + alertClientService, + taskPluginManager, + storageOperate + ); + + Assertions.assertAll(workerTaskExecuteRunnable::run); + Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus()); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java deleted file mode 100644 index bd492c7ff6..0000000000 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; -import org.apache.dolphinscheduler.service.alert.AlertClientService; -import org.apache.dolphinscheduler.service.task.TaskPluginManager; - -import org.apache.commons.lang3.tuple.Pair; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(PowerMockRunner.class) -public class TaskExecuteThreadTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class); - - @Mock - private TaskExecutionContext taskExecutionContext; - - @Mock - private WorkerMessageSender workerMessageSender; - - @Mock - private AlertClientService alertClientService; - - @Mock - private StorageOperate storageOperate; - - @Mock - private TaskPluginManager taskPluginManager; - - @Test - public void checkTest() { - TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, - "127.0.0.1:5678", - workerMessageSender, - alertClientService, - taskPluginManager, - storageOperate); - - String path = "/"; - Map projectRes = new HashMap<>(); - projectRes.put("shell", "shell.sh"); - List> downloads = new ArrayList<>(); - try { - downloads = taskExecuteThread.downloadCheck(path, projectRes); - } catch (Exception e) { - Assert.assertNotNull(e); - } - downloads.add(Pair.of("shell", "shell.sh")); - try{ - taskExecuteThread.downloadResource(path, LOGGER, downloads); - }catch (Exception e){ - - } - } -}