[3.1.1-prepare]cherry-pick [Improvement] remove log-server and server module #12206 (#12562)

This commit is contained in:
Kerwin 2022-10-27 12:54:38 +08:00 committed by GitHub
parent 541105a358
commit 58ef3cccd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 451 additions and 585 deletions

2
.github/CODEOWNERS vendored
View File

@ -26,14 +26,12 @@
/dolphinscheduler-dao/src/main/resources/sql/ @zhongjiajie
/dolphinscheduler-common/ @caishunfeng
/dolphinscheduler-standalone-server/ @kezhenxu94 @caishunfeng
/dolphinscheduler-log-server/ @caishunfeng
/dolphinscheduler-datasource-plugin/ @caishunfeng
/dolphinscheduler-dist/ @kezhenxu94 @caishunfeng
/dolphinscheduler-meter/ @caishunfeng @kezhenxu94 @ruanwenjun @EricGao888
/dolphinscheduler-scheduler-plugin/ @caishunfeng
/dolphinscheduler-master/ @caishunfeng @SbloodyS @ruanwenjun
/dolphinscheduler-worker/ @caishunfeng @SbloodyS @ruanwenjun
/dolphinscheduler-server/ @caishunfeng
/dolphinscheduler-service/ @caishunfeng
/dolphinscheduler-remote/ @caishunfeng
/dolphinscheduler-spi/ @caishunfeng

View File

@ -26,12 +26,10 @@ backend:
- 'dolphinscheduler-data-quality/**/*'
- 'dolphinscheduler-datasource-plugin/**/*'
- 'dolphinscheduler-dist/**/*'
- 'dolphinscheduler-log-server/**/*'
- 'dolphinscheduler-master/**/*'
- 'dolphinscheduler-registry/**/*'
- 'dolphinscheduler-remote/**/*'
- 'dolphinscheduler-scheduler-plugin/**/*'
- 'dolphinscheduler-server/**/*'
- 'dolphinscheduler-service/**/*'
- 'dolphinscheduler-spi/**/*'
- 'dolphinscheduler-standalone-server/**/*'

View File

@ -30,7 +30,6 @@ on:
- 'dolphinscheduler-common/**'
- 'dolphinscheduler-dao/**'
- 'dolphinscheduler-rpc/**'
- 'dolphinscheduler-server/**'
pull_request:
concurrency:

1
.gitignore vendored
View File

@ -43,7 +43,6 @@ dolphinscheduler-dao/src/main/resources/dao/data_source.properties
dolphinscheduler-alert/logs/
dolphinscheduler-alert/src/main/resources/alert.properties_bak
dolphinscheduler-alert/src/main/resources/logback.xml
dolphinscheduler-server/src/main/resources/logback.xml
dolphinscheduler-ui/dist
dolphinscheduler-ui/node
dolphinscheduler-common/sql

View File

@ -49,6 +49,10 @@ process fails and ends
### 2.Module introduction
- dolphinscheduler-master master module, provides workflow management and orchestration.
- dolphinscheduler-worker worker module, provides task execution management.
- dolphinscheduler-alert alarm module, providing AlertServer service.
- dolphinscheduler-api web application module, providing ApiServer service.
@ -59,8 +63,6 @@ process fails and ends
- dolphinscheduler-remote client and server based on netty
- dolphinscheduler-server MasterServer and WorkerServer services
- dolphinscheduler-service service module, including Quartz, Zookeeper, log client access service, easy to call server
module and api module

View File

@ -197,10 +197,10 @@ In the early schedule design, if there is no priority design and use the fair sc
- For details, please refer to the logback configuration of Master and Worker, as shown in the following example:
```xml
<conversionRule conversionWord="messsage" converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
<conversionRule conversionWord="messsage" converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>

View File

@ -33,6 +33,10 @@
### 2.模块介绍
- dolphinscheduler-master master模块提供工作流管理和编排服务。
- dolphinscheduler-worker worker模块提供任务执行管理服务。
- dolphinscheduler-alert 告警模块,提供 AlertServer 服务。
- dolphinscheduler-api web应用模块提供 ApiServer 服务。
@ -43,8 +47,6 @@
- dolphinscheduler-remote 基于 netty 的客户端、服务端
- dolphinscheduler-server MasterServer 和 WorkerServer 服务
- dolphinscheduler-service service模块包含Quartz、Zookeeper、日志客户端访问服务便于server模块和api模块调用
- dolphinscheduler-ui 前端模块

View File

@ -195,10 +195,10 @@
- 详情可参考Master和Worker的logback配置如下示例
```xml
<conversionRule conversionWord="messsage" converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
<conversionRule conversionWord="messsage" converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>

View File

@ -42,10 +42,6 @@
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>

View File

@ -1,61 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>3.1.1-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-log-server</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -53,10 +53,6 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-all</artifactId>
@ -266,10 +262,6 @@
<scope>test</scope>
<!-- master should never depend on worker, this is only for tests -->
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-log-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>

View File

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
@ -31,8 +30,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProce
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
import javax.annotation.PostConstruct;
import org.apache.dolphinscheduler.service.log.LoggerRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -96,7 +94,8 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST,
workflowExecutingDataRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor);
// logger server

View File

@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;

View File

@ -19,16 +19,16 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
@ -143,9 +143,8 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
private void setConditionResult() {
List<TaskInstance> taskInstances =
processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
List<TaskInstance> taskInstances = processService
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
}

View File

@ -17,7 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -34,8 +35,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.master.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
@ -47,7 +48,7 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import com.google.auto.service.AutoService;
/**
* dependent task processor
@ -57,9 +58,11 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class);
private final ProcessDefinitionMapper processDefinitionMapper =
SpringApplicationContext.getBean(ProcessDefinitionMapper.class);
private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class);
private final TaskDefinitionMapper taskDefinitionMapper =
SpringApplicationContext.getBean(TaskDefinitionMapper.class);
private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class);
@ -174,9 +177,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
taskDefinitionCodes.add(dependentItem.getDepTaskCode());
});
});
projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity()));
processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
projectCodeMap = projectMapper.queryByCodes(projectCodes).stream()
.collect(Collectors.toMap(Project::getCode, Function.identity()));
processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream()
.collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream()
.collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation());
@ -184,25 +190,31 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
Project project = projectCodeMap.get(dependentItem.getProjectCode());
if (project == null) {
logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem);
throw new RuntimeException(
"The dependent task's project is not exist, dependentItem: " + dependentItem);
}
ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode());
if (processDefinition == null) {
logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem);
throw new RuntimeException(
"The dependent task's workflow is not exist, dependentItem: " + dependentItem);
}
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}",
logger.info(
"Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}",
project.getName(), processDefinition.getName(), dependentItem.getKey());
} else {
TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
if (taskDefinition == null) {
logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}",
dependentItem);
throw new RuntimeException(
"The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
}
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
project.getName(), processDefinition.getName(), taskDefinition.getName(),
dependentItem.getKey());
}
}
this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
@ -237,7 +249,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
// save depend result to log
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}",
entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {

View File

@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.utils.LogUtils;
import org.apache.commons.lang3.StringUtils;

View File

@ -17,9 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -29,7 +28,10 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.utils.LogUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
import java.util.HashMap;
@ -39,7 +41,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
import com.google.auto.service.AutoService;
/**
* switch task processor

View File

@ -36,10 +36,10 @@ import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutor
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.service.utils.ProcessUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
@ -163,7 +163,8 @@ public class MasterFailoverService {
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processInstanceId);
for (TaskInstance taskInstance : taskInstanceList) {
try {
LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());

View File

@ -17,10 +17,6 @@
package org.apache.dolphinscheduler.server.master.service;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
@ -39,15 +35,15 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.apache.dolphinscheduler.service.utils.ProcessUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -56,6 +52,14 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class WorkerFailoverService {

View File

@ -28,10 +28,10 @@
</appender>
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>

View File

@ -1,224 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>3.1.1-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-server</artifactId>
<packaging>jar</packaging>
<name>dolphinscheduler-server</name>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- dolphinscheduler -->
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-noop-htrace</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
<exclusion>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<excludes>
<exclude>config/</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,43 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# master execute thread num
#master.exec.threads=100
# master execute task number in parallel
#master.exec.task.num=20
# master dispatch task number
master.dispatch.task.num=6
# master heartbeat interval
#master.heartbeat.interval=10
# master commit task retry times
#master.task.commit.retryTimes=5
# master commit task interval
#master.task.commit.interval=1000
# only less than cpu avg load, master server can work. default value -1 : the number of cpu cores * 2
#master.max.cpuload.avg=-1
# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G.
#master.reserved.memory=0.3
# master listen port
#master.listen.port=5678

View File

@ -62,6 +62,20 @@
<artifactId>dolphinscheduler-task-api</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.12.4</version>
<!-- TODO: move this dependency to root pom after removing powermock in the whole project -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
@ -73,5 +87,153 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-noop-htrace</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
<exclusion>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
@ -25,6 +25,7 @@ import ch.qos.logback.core.spi.FilterReply;
* master log filter
*/
public class MasterLogFilter extends Filter<ILoggingEvent> {
/**
* log level
*/
@ -37,7 +38,7 @@ public class MasterLogFilter extends Filter<ILoggingEvent> {
*/
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("Master-") ){
if (event.getThreadName().startsWith("Master-")) {
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
@ -46,4 +47,4 @@ public class MasterLogFilter extends Filter<ILoggingEvent> {
public void setLevel(String level) {
this.level = Level.toLevel(level);
}
}
}

View File

@ -15,16 +15,18 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.Constants;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.base.Strings;
import ch.qos.logback.classic.pattern.MessageConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.google.common.base.Strings;
/**
* sensitive data log converter
*/
@ -35,7 +37,6 @@ public class SensitiveDataConverter extends MessageConverter {
*/
private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX);
@Override
public String convert(ILoggingEvent event) {
@ -85,5 +86,4 @@ public class SensitiveDataConverter extends MessageConverter {
return sb.toString();
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -52,7 +52,8 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) {
String threadName = event.getThreadName();
if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) {
threadName = threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length());
threadName =
threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length());
}
String part1 = threadName.split(Constants.EQUAL_SIGN)[1];
String prefix = TaskConstants.TASK_LOGGER_INFO_PREFIX + "-";
@ -60,7 +61,8 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
key = part1.substring(prefix.length()).replaceFirst("-", "/");
}
}
logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(), event.getLoggerName());
logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(),
event.getLoggerName());
return key;
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -57,7 +57,8 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
|| event.getLevel().isGreaterOrEqual(level)) {
filterReply = FilterReply.ACCEPT;
}
logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(), event.getLoggerName(), filterReply.name(), level);
logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(),
event.getLoggerName(), filterReply.name(), level);
return filterReply;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
@ -25,6 +25,7 @@ import ch.qos.logback.core.spi.FilterReply;
* worker log filter
*/
public class WorkerLogFilter extends Filter<ILoggingEvent> {
/**
* level
*/
@ -37,7 +38,7 @@ public class WorkerLogFilter extends Filter<ILoggingEvent> {
*/
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("Worker-")){
if (event.getThreadName().startsWith("Worker-")) {
return FilterReply.ACCEPT;
}
@ -46,4 +47,4 @@ public class WorkerLogFilter extends Filter<ILoggingEvent> {
public void setLevel(String level) {
this.level = Level.toLevel(level);
}
}
}

View File

@ -15,12 +15,12 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.service.log.TaskLogDiscriminator;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -44,39 +44,40 @@ public class LogUtils {
/**
* get task log path
*/
public static String getTaskLogPath(Date firstSubmitTime, Long processDefineCode, int processDefineVersion, int processInstanceId, int taskInstanceId) {
public static String getTaskLogPath(Date firstSubmitTime, Long processDefineCode, int processDefineVersion,
int processInstanceId, int taskInstanceId) {
// format /logs/YYYYMMDD/defintion-code_defintion_version-processInstanceId-taskInstanceId.log
final String taskLogFileName = new StringBuilder(String.valueOf(processDefineCode))
.append(Constants.UNDERLINE)
.append(processDefineVersion)
.append(Constants.SUBTRACT_CHAR)
.append(processInstanceId)
.append(Constants.SUBTRACT_CHAR)
.append(taskInstanceId)
.append(LOG_TAILFIX)
.toString();
.append(Constants.UNDERLINE)
.append(processDefineVersion)
.append(Constants.SUBTRACT_CHAR)
.append(processInstanceId)
.append(Constants.SUBTRACT_CHAR)
.append(taskInstanceId)
.append(LOG_TAILFIX)
.toString();
// Optional.map will be skipped if null
return Optional.of(LoggerFactory.getILoggerFactory())
.map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
.map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
.map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
.map(TaskLogDiscriminator::getLogBase)
.map(e -> Paths.get(e)
.toAbsolutePath()
.resolve(DateUtils.format(firstSubmitTime,Constants.YYYYMMDD, null))
.resolve(taskLogFileName))
.map(Path::toString)
.orElse("");
.map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
.map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
.map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
.map(TaskLogDiscriminator::getLogBase)
.map(e -> Paths.get(e)
.toAbsolutePath()
.resolve(DateUtils.format(firstSubmitTime, Constants.YYYYMMDD, null))
.resolve(taskLogFileName))
.map(Path::toString)
.orElse("");
}
/**
* get task log path by TaskExecutionContext
*/
public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
return getTaskLogPath(taskExecutionContext.getFirstSubmitTime(),taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
return getTaskLogPath(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.service.utils;
import lombok.NonNull;
import org.apache.commons.collections.CollectionUtils;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -23,26 +23,37 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.mockito.junit.MockitoJUnitRunner;
import io.netty.channel.Channel;
@RunWith(PowerMockRunner.class)
@PrepareForTest({LoggerUtils.class})
@RunWith(MockitoJUnitRunner.class)
public class LoggerRequestProcessorTest {
private MockedStatic<LoggerUtils> mockedStaticLoggerUtils;
@Before
public void setUp() {
mockedStaticLoggerUtils = Mockito.mockStatic(LoggerUtils.class);
}
@After
public void after() {
mockedStaticLoggerUtils.close();
}
@Test
public void testProcessViewWholeLogRequest() {
System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir"));
Channel channel = PowerMockito.mock(Channel.class);
PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
String userDir = System.getProperty("user.dir");
ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/path/a.log");
@ -57,10 +68,8 @@ public class LoggerRequestProcessorTest {
@Test(expected = IllegalArgumentException.class)
public void testProcessViewWholeLogRequestError() {
System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir"));
Channel channel = PowerMockito.mock(Channel.class);
PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
Channel channel = Mockito.mock(Channel.class);
Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
String userDir = System.getProperty("user.dir");
ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/path/a");
@ -75,10 +84,8 @@ public class LoggerRequestProcessorTest {
@Test(expected = IllegalArgumentException.class)
public void testProcessViewWholeLogRequestErrorRelativePath() {
System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir"));
Channel channel = PowerMockito.mock(Channel.class);
PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
Channel channel = Mockito.mock(Channel.class);
Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
String userDir = System.getProperty("user.dir");
ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/../../a.log");
@ -93,10 +100,8 @@ public class LoggerRequestProcessorTest {
@Test(expected = IllegalArgumentException.class)
public void testProcessViewWholeLogRequestErrorStartWith() {
System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir"));
Channel channel = PowerMockito.mock(Channel.class);
PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
Channel channel = Mockito.mock(Channel.class);
Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand("/log/a.log");
Command command = new Command();

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.Constants;

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import static org.apache.dolphinscheduler.server.log.SensitiveDataConverter.passwordHandler;
import static org.apache.dolphinscheduler.service.log.SensitiveDataConverter.passwordHandler;
import org.apache.dolphinscheduler.common.Constants;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.Constants;

View File

@ -15,12 +15,12 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.service.log.TaskLogDiscriminator;
import java.nio.file.Path;
import java.nio.file.Paths;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.service.utils;
import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.commons.lang3.SystemUtils;

View File

@ -55,11 +55,6 @@
<artifactId>dolphinscheduler-alert-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-log-server</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>

View File

@ -51,10 +51,10 @@
<logger name="org.apache.hadoop" level="WARN"/>
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>

View File

@ -41,6 +41,10 @@
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
@ -49,10 +53,6 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
@ -99,10 +99,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-log-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>

View File

@ -17,10 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -32,7 +28,6 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
@ -41,11 +36,19 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.service.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
/**
* Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
*/
@ -104,7 +107,8 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
@ -112,14 +116,17 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT);
workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress,
CommandType.TASK_EXECUTE_RESULT);
}
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory(
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder
.createWorkerDelayTaskExecuteRunnableFactory(
taskExecutionContext,
workerConfig,
workflowMasterAddress,
@ -133,8 +140,9 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
} else
} else {
logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize());
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}

View File

@ -17,14 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.micrometer.core.lang.NonNull;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@ -40,19 +32,29 @@ import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.utils.ProcessUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.micrometer.core.lang.NonNull;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
/**
* task kill processor
@ -80,7 +82,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
if (killCommand == null) {
logger.error("task kill request command is null");
@ -98,25 +100,25 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return;
}
int processId = taskExecutionContext.getProcessId();
if (processId == 0) {
this.cancelApplication(taskInstanceId);
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
sendTaskKillResponseCommand(channel, taskExecutionContext);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return;
}
int processId = taskExecutionContext.getProcessId();
if (processId == 0) {
this.cancelApplication(taskInstanceId);
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
sendTaskKillResponseCommand(channel, taskExecutionContext);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return;
}
// if processId > 0, it should call cancelApplication to cancel remote application too.
this.cancelApplication(taskInstanceId);
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
@ -131,7 +133,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus());
if (taskExecutionContext.getAppIds() != null) {
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
taskKillResponseCommand
.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
}
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
@ -158,9 +161,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
// find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
}
@ -226,8 +229,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
String executePath,
String tenantCode) {
if (logPath == null || executePath == null || tenantCode == null) {
logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}",
host, logPath, executePath, tenantCode);
logger.error(
"Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}",
host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList());
}
try {
@ -244,9 +248,10 @@ public class TaskKillProcessor implements NettyRequestProcessor {
Thread.currentThread().interrupt();
logger.error("kill yarn job error, the current thread has been interrtpted", e);
} catch (Exception e) {
logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, executePath, tenantCode, e);
logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath,
executePath, tenantCode, e);
}
return Pair.of(false, Collections.emptyList());
}
}
}

View File

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
@ -29,6 +28,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAck
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor;
import org.apache.dolphinscheduler.service.log.LoggerRequestProcessor;
import java.io.Closeable;
@ -78,7 +78,8 @@ public class WorkerRpcServer implements Closeable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);

View File

@ -17,9 +17,8 @@
package org.apache.dolphinscheduler.server.worker.runner;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Strings;
import lombok.NonNull;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
@ -38,27 +37,33 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.dolphinscheduler.service.utils.ProcessUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Date;
import java.util.List;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Strings;
public abstract class WorkerTaskExecuteRunnable implements Runnable {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class));
protected final Logger logger = LoggerFactory
.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class));
protected final TaskExecutionContext taskExecutionContext;
protected final WorkerConfig workerConfig;
@ -71,13 +76,13 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
protected @Nullable AbstractTask task;
protected WorkerTaskExecuteRunnable(
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.masterAddress = masterAddress;
@ -115,7 +120,9 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE);
logger.info(
"Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}",
TaskExecutionStatus.FAILURE);
}
public void cancelTask() {
@ -125,10 +132,13 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
task.cancel();
List<String> appIds = LogUtils.getAppIdsFromLogFile(taskExecutionContext.getLogPath());
if (CollectionUtils.isNotEmpty(appIds)) {
ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(),
taskExecutionContext.getExecutePath());
}
} catch (Exception e) {
logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
logger.error(
"Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual",
e);
}
}
}
@ -139,7 +149,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
// set the thread name to make sure the log be written to the task log file
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("Begin to pulling task");
initializeTask();
@ -148,14 +159,17 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info(
"The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
return;
}
beforeExecute();
TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender).masterAddress(masterAddress).build();
TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender)
.masterAddress(masterAddress).build();
executeTask(taskCallBack);
afterExecute();
@ -179,7 +193,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskExecutionContext.setEnvFile(systemEnvPath);
logger.info("Set task envFile: {}", systemEnvPath);
String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskAppId(taskAppId);
logger.info("Set task appId: {}", taskAppId);
@ -202,11 +217,13 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType()));
throw new TaskPluginException(String.format("%s task plugin not found, please check config file.",
taskExecutionContext.getTaskType()));
}
task = taskChannel.createTask(taskExecutionContext);
if (task == null) {
throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType()));
throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct",
taskExecutionContext.getTaskType()));
}
logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType());
@ -225,8 +242,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
logger.info("The current task need to send alert, begin to send alert");
TaskExecutionStatus status = task.getExitStatus();
TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo();
int strategy = status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
int strategy =
status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(),
taskAlertInfo.getContent(), strategy);
logger.info("Success send alert");
}
@ -238,14 +257,16 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus());
logger.info("Send task execute result to master, the current task status: {}",
taskExecutionContext.getCurrentExecutionStatus());
}
protected void clearTaskExecPathIfNeeded() {
String execLocalPath = taskExecutionContext.getExecutePath();
if (!CommonUtils.isDevelopMode()) {
logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", execLocalPath);
logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}",
execLocalPath);
// get exec dir
if (Strings.isNullOrEmpty(execLocalPath)) {
logger.warn("The task execute file is {} no need to clear", taskExecutionContext.getTaskName());
@ -264,11 +285,14 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
if (e instanceof NoSuchFileException) {
// this is expected
} else {
logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", execLocalPath, e);
logger.error(
"Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually",
execLocalPath, e);
}
}
} else {
logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", execLocalPath);
logger.info("The current execute mode is develop mode, will not clear the task execute file: {}",
execLocalPath);
}
}

View File

@ -29,10 +29,10 @@
</appender>
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>

12
pom.xml
View File

@ -37,7 +37,6 @@
<module>dolphinscheduler-spi</module>
<module>dolphinscheduler-registry</module>
<module>dolphinscheduler-task-plugin</module>
<module>dolphinscheduler-server</module>
<module>dolphinscheduler-common</module>
<module>dolphinscheduler-api</module>
<module>dolphinscheduler-dao</module>
@ -52,7 +51,6 @@
<module>dolphinscheduler-meter</module>
<module>dolphinscheduler-master</module>
<module>dolphinscheduler-worker</module>
<module>dolphinscheduler-log-server</module>
<module>dolphinscheduler-tools</module>
<module>dolphinscheduler-ui</module>
<module>dolphinscheduler-scheduler-plugin</module>
@ -98,11 +96,6 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-master</artifactId>
@ -113,11 +106,6 @@
<artifactId>dolphinscheduler-worker</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-log-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-standalone-server</artifactId>