Merge pull request #1 from apache/dev

udpate
This commit is contained in:
samz406 2020-02-08 18:08:04 +08:00 committed by GitHub
commit 49b4d084b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
85 changed files with 3654 additions and 1600 deletions

View File

@ -64,7 +64,7 @@ Overload processing: Task queue mechanism, the number of schedulable tasks on a
- [**User manual**](https://dolphinscheduler.apache.org/en-us/docs/1.2.0/user_doc/system-manual.html?_blank "System manual")
- [**Upgrade document**](https://dolphinscheduler.apache.org/en-us/docs/1.2.0/release/upgrade.html?_blank "Upgrade document")
- [**Upgrade document**](https://dolphinscheduler.apache.org/en-us/docs/1.2.0/user_doc/upgrade.html?_blank "Upgrade document")
- <a href="http://106.75.43.194:8888" target="_blank">Online Demo</a>
@ -99,7 +99,7 @@ It is because of the shoulders of these open source projects that the birth of t
### Get Help
1. Submit an issue
1. Subscribe the mail list : https://dolphinscheduler.apache.org/en-us/docs/1.2.0/user_doc/subscribe.html. then send mail to dev@dolphinscheduler.apache.org
1. Subscribe the mail list : https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html. then send mail to dev@dolphinscheduler.apache.org
1. Contact WeChat group manager, ID 510570367. This is for Mandarin(CN) discussion.
### License

View File

@ -50,7 +50,7 @@ Dolphin Scheduler Official Website
- [**使用手册**](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/user_doc/system-manual.html?_blank "系统使用手册")
- [**升级文档**](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/release/upgrade.html?_blank "升级文档")
- [**升级文档**](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/user_doc/upgrade.html?_blank "升级文档")
- <a href="http://106.75.43.194:8888" target="_blank">我要体验</a>
@ -83,13 +83,12 @@ dolphinscheduler-dist/dolphinscheduler-src/target/apache-dolphinscheduler-incuba
### 感谢
Dolphin Scheduler使用了很多优秀的开源项目比如google的guava、guice、grpcnettyali的bonecpquartz以及apache的众多开源项目等等
正是由于站在这些开源项目的肩膀上才有Dolphin Scheduler的诞生的可能。对此我们对使用的所有开源软件表示非常的感谢我们也希望自己不仅是开源的受益者也能成为开源的
贡献者,于是我们决定把易调度贡献出来,并承诺长期维护。也希望对开源有同样热情和信念的伙伴加入进来,一起为开源献出一份力!
正是由于站在这些开源项目的肩膀上才有Dolphin Scheduler的诞生的可能。对此我们对使用的所有开源软件表示非常的感谢我们也希望自己不仅是开源的受益者也能成为开源的贡献者,也希望对开源有同样热情和信念的伙伴加入进来,一起为开源献出一份力!
### 获得帮助
1. 提交issue
1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/user_doc/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org.
1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org.
1. 联系微信群助手(ID:dailidong66). 微信仅用于中国用户讨论.
### 版权

View File

@ -31,8 +31,8 @@
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.server.worker.log.TaskLogFilter"></filter>
<Discriminator class="org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator">
<filter class="org.apache.dolphinscheduler.common.log.TaskLogFilter"></filter>
<Discriminator class="org.apache.dolphinscheduler.common.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
@ -52,7 +52,7 @@
<appender name="COMBINEDLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-combined.log</file>
<filter class="org.apache.dolphinscheduler.server.worker.log.WorkerLogFilter">
<filter class="org.apache.dolphinscheduler.common.log.WorkerLogFilter">
<level>INFO</level>
</filter>

View File

@ -18,3 +18,4 @@
export PYTHON_HOME=/usr/bin/python
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$PYTHON_HOME:$JAVA_HOME/bin:$PATH
export DATAX_HOME=/opt/datax/bin/datax.py

View File

@ -47,7 +47,7 @@ org.quartz.jobStore.dataSource = myDs
#============================================================================
# Configure Datasources
#============================================================================
org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.server.quartz.DruidConnectionProvider
org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.dao.quartz.DruidConnectionProvider
org.quartz.dataSource.myDs.driver = org.postgresql.Driver
org.quartz.dataSource.myDs.URL=jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
org.quartz.dataSource.myDs.user=root

View File

@ -31,8 +31,8 @@
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.server.worker.log.TaskLogFilter"></filter>
<Discriminator class="org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator">
<filter class="org.apache.dolphinscheduler.common.log.TaskLogFilter"></filter>
<Discriminator class="org.apache.dolphinscheduler.common.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
@ -52,7 +52,7 @@
<appender name="WORKERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-worker.log</file>
<filter class="org.apache.dolphinscheduler.server.worker.log.WorkerLogFilter">
<filter class="org.apache.dolphinscheduler.common.log.WorkerLogFilter">
<level>INFO</level>
</filter>

View File

@ -101,6 +101,11 @@ public class AlertSender{
}else if (alert.getAlertType() == AlertType.SMS){
retMaps = emailManager.send(getReciversForSMS(users), alert.getTitle(), alert.getContent(),alert.getShowType());
alert.setInfo(retMaps);
} else {
logger.error("AlertType is not defined. code: {}, descp: {}",
alert.getAlertType().getCode(),
alert.getAlertType().getDescp());
return;
}
//send flag

View File

@ -1,49 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" -->
<property name="log.base" value="logs" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="ALERTLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-alert.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>20</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="ALERTLOGFILE"/>
</root>
</configuration>

View File

@ -31,33 +31,17 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-dao</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>com.google</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<artifactId>leveldbjni-all</artifactId>
<groupId>org.fusesource.leveldbjni</groupId>
</exclusion>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
<artifactId>dolphinscheduler-dao</artifactId>
</dependency>
<!--springboot-->
@ -92,8 +76,6 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
@ -193,13 +175,13 @@
</dependency>
<dependency>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</dependency>
<!-- just for test -->

View File

@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.server.utils.ScheduleUtils;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -512,8 +512,7 @@ public class ExecutorService extends BaseService{
List<Date> listDate = new LinkedList<>();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule item : schedules) {
List<Date> list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end);
listDate.addAll(list);
listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
}
}
if(!CollectionUtils.isEmpty(listDate)){

View File

@ -35,8 +35,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.server.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.dao.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.dao.quartz.QuartzExecutors;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.quartz.CronExpression;
@ -167,6 +167,7 @@ public class SchedulerService extends BaseService {
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
result.put("scheduleId", scheduleObj.getId());
return result;
}

View File

@ -59,23 +59,22 @@ public class FourLetterWordMain {
*/
public static String send4LetterWord(String host, int port, String cmd, int timeout)
throws IOException {
LOG.info("connecting to " + host + " " + port);
Socket sock = new Socket();
LOG.info("connecting to {} {}", host, port);
InetSocketAddress hostaddress= host != null ? new InetSocketAddress(host, port) :
new InetSocketAddress(InetAddress.getByName(null), port);
BufferedReader reader = null;
try {
try (Socket sock = new Socket();
OutputStream outstream = sock.getOutputStream();
BufferedReader reader =
new BufferedReader(
new InputStreamReader(sock.getInputStream()))) {
sock.setSoTimeout(timeout);
sock.connect(hostaddress, timeout);
OutputStream outstream = sock.getOutputStream();
outstream.write(cmd.getBytes());
outstream.flush();
// this replicates NC - close the output stream before reading
sock.shutdownOutput();
reader =
new BufferedReader(
new InputStreamReader(sock.getInputStream()));
StringBuilder sb = new StringBuilder();
String line;
while((line = reader.readLine()) != null) {
@ -84,11 +83,6 @@ public class FourLetterWordMain {
return sb.toString();
} catch (SocketTimeoutException e) {
throw new IOException("Exception while executing four letter word: " + cmd, e);
} finally {
sock.close();
if (reader != null) {
reader.close();
}
}
}
}

View File

@ -145,7 +145,7 @@ public class ZooKeeperState {
sendThread.setName("FourLetterCmd:" + cmd);
sendThread.start();
try {
sendThread.join(waitTimeout * 1000);
sendThread.join(waitTimeout * 1000L);
return sendThread.ret;
} catch (InterruptedException e) {
logger.error("send " + cmd + " to server " + host + ":" + port + " failed!", e);

View File

@ -1,60 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.hbase" level="WARN"/>
<logger name="org.apache.hadoop" level="WARN"/>
<property name="log.base" value="logs" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="APISERVERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- Log level filter -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<file>${log.base}/dolphinscheduler-api-server.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="APISERVERLOGFILE" />
</root>
</configuration>

View File

@ -15,8 +15,6 @@
# limitations under the License.
#
logging.config=classpath:apiserver_logback.xml
# server port
server.port=12345

View File

@ -1,80 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds">
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%highlight([%level]) %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{10}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.server.worker.log.TaskLogFilter"></filter>
<Discriminator class="org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<appender name="COMBINEDLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-combined.log</file>
<filter class="org.apache.dolphinscheduler.server.worker.log.WorkerLogFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-combined.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
     
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
  
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="COMBINEDLOGFILE"/>
</root>
</configuration>

View File

@ -203,7 +203,7 @@ public class ExecutorService2Test {
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(16)).createCommand(any(Command.class));
verify(processDao, times(15)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}

View File

@ -29,6 +29,7 @@
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<codehaus.janino.version>3.1.0</codehaus.janino.version>
</properties>
<dependencies>
<dependency>
@ -604,5 +605,11 @@
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>${codehaus.janino.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -33,6 +33,7 @@ public enum TaskType {
* 7 DEPENDENT
* 8 FLINK
* 9 HTTP
* 10 DATAX
*/
SHELL(0, "shell"),
SQL(1, "sql"),
@ -43,7 +44,8 @@ public enum TaskType {
PYTHON(6, "python"),
DEPENDENT(7, "dependent"),
FLINK(8, "flink"),
HTTP(9, "http");
HTTP(9, "http"),
DATAX(10, "datax");
TaskType(int code, String descp){
this.code = code;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.log;
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;

View File

@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.pattern.MessageConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.SensitiveLogUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -77,7 +77,7 @@ public class SensitiveDataConverter extends MessageConverter {
String password = matcher.group();
String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password);
String maskPassword = SensitiveLogUtils.maskDataSourcePwd(password);
matcher.appendReplacement(sb, maskPassword);
}

View File

@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.AbstractDiscriminator;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
/**
* Task Log Discriminator

View File

@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
/**
* task log filter

View File

@ -14,12 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import java.util.Arrays;
/**
* worker log filter
@ -40,6 +43,7 @@ public class WorkerLogFilter extends Filter<ILoggingEvent> {
if (event.getThreadName().startsWith("Worker-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
public void setLevel(String level) {

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.common.model;
import java.util.Date;
import java.util.Objects;
/**
* date interval class
@ -35,12 +36,14 @@ public class DateInterval {
@Override
public boolean equals(Object obj) {
try{
DateInterval dateInterval = (DateInterval) obj;
return startTime.equals(dateInterval.getStartTime()) &&
endTime.equals(dateInterval.getEndTime());
}catch (Exception e){
if (obj == null || getClass() != obj.getClass()) {
return false;
} else if (this == obj) {
return true;
} else {
DateInterval that = (DateInterval) obj;
return startTime.equals(that.startTime) &&
endTime.equals(that.endTime);
}
}
@ -60,4 +63,8 @@ public class DateInterval {
this.endTime = endTime;
}
@Override
public int hashCode() {
return Objects.hash(startTime, endTime);
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.common.model;
import java.util.Objects;
public class TaskNodeRelation {
/**
@ -69,4 +71,9 @@ public class TaskNodeRelation {
", endNode='" + endNode + '\'' +
'}';
}
@Override
public int hashCode() {
return Objects.hash(startNode, endNode);
}
}

View File

@ -30,7 +30,7 @@ import java.util.*;
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implemention
* tasks queue implementation
*/
@Service
public class TaskQueueZkImpl implements ITaskQueue {
@ -72,7 +72,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
} catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e);
}
return new ArrayList<>();
return Collections.emptyList();
}
/**
@ -196,11 +196,11 @@ public class TaskQueueZkImpl implements ITaskQueue {
}
}
List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
List<String> tasksList = getTasksListFromTreeSet(tasksNum, taskTreeSet);
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(tasksList.toArray()), size - tasksList.size());
return taskslist;
return tasksList;
}else{
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
@ -208,7 +208,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
return new ArrayList<String>();
return Collections.emptyList();
}
@ -221,15 +221,15 @@ public class TaskQueueZkImpl implements ITaskQueue {
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
Iterator<String> iterator = taskTreeSet.iterator();
int j = 0;
List<String> taskslist = new ArrayList<>(tasksNum);
List<String> tasksList = new ArrayList<>(tasksNum);
while(iterator.hasNext()){
if(j++ >= tasksNum){
break;
}
String task = iterator.next();
taskslist.add(getOriginTaskFormat(task));
tasksList.add(getOriginTaskFormat(task));
}
return taskslist;
return tasksList;
}
/**
@ -330,22 +330,13 @@ public class TaskQueueZkImpl implements ITaskQueue {
*/
@Override
public Set<String> smembers(String key) {
Set<String> tasksSet = new HashSet<>();
try {
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
for (String task : list) {
tasksSet.add(task);
}
return tasksSet;
return new HashSet<>(list);
} catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e);
}
return tasksSet;
return Collections.emptySet();
}
/**

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.datax;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
/**
* DataX parameter
*/
public class DataxParameters extends AbstractParameters {
/**
* data source typeeg MYSQL, POSTGRES ...
*/
private String dsType;
/**
* datasource id
*/
private int dataSource;
/**
* data target typeeg MYSQL, POSTGRES ...
*/
private String dtType;
/**
* datatarget id
*/
private int dataTarget;
/**
* sql
*/
private String sql;
/**
* target table
*/
private String targetTable;
/**
* Pre Statements
*/
private List<String> preStatements;
/**
* Post Statements
*/
private List<String> postStatements;
/**
* speed byte num
*/
private int jobSpeedByte;
/**
* speed record count
*/
private int jobSpeedRecord;
public String getDsType() {
return dsType;
}
public void setDsType(String dsType) {
this.dsType = dsType;
}
public int getDataSource() {
return dataSource;
}
public void setDataSource(int dataSource) {
this.dataSource = dataSource;
}
public String getDtType() {
return dtType;
}
public void setDtType(String dtType) {
this.dtType = dtType;
}
public int getDataTarget() {
return dataTarget;
}
public void setDataTarget(int dataTarget) {
this.dataTarget = dataTarget;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getTargetTable() {
return targetTable;
}
public void setTargetTable(String targetTable) {
this.targetTable = targetTable;
}
public List<String> getPreStatements() {
return preStatements;
}
public void setPreStatements(List<String> preStatements) {
this.preStatements = preStatements;
}
public List<String> getPostStatements() {
return postStatements;
}
public void setPostStatements(List<String> postStatements) {
this.postStatements = postStatements;
}
public int getJobSpeedByte() {
return jobSpeedByte;
}
public void setJobSpeedByte(int jobSpeedByte) {
this.jobSpeedByte = jobSpeedByte;
}
public int getJobSpeedRecord() {
return jobSpeedRecord;
}
public void setJobSpeedRecord(int jobSpeedRecord) {
this.jobSpeedRecord = jobSpeedRecord;
}
@Override
public boolean checkParameters() {
if (!(dataSource != 0
&& dataTarget != 0
&& StringUtils.isNotEmpty(sql)
&& StringUtils.isNotEmpty(targetTable))) {
return false;
}
return true;
}
@Override
public List<String> getResourceFilesList() {
return new ArrayList<>();
}
@Override
public String toString() {
return "DataxParameters{" +
"dsType='" + dsType + '\'' +
", dataSource=" + dataSource +
", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget +
", sql='" + sql + '\'' +
", targetTable='" + targetTable + '\'' +
", preStatements=" + preStatements +
", postStatements=" + postStatements +
", jobSpeedByte=" + jobSpeedByte +
", jobSpeedRecord=" + jobSpeedRecord +
'}';
}
}

View File

@ -119,7 +119,9 @@ public class HadoopUtils implements Closeable {
fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
}else{
logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS );
throw new RuntimeException("property:{} can not to be empty, please set!");
throw new RuntimeException(
String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
);
}
}else{
logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
@ -219,10 +221,12 @@ public class HadoopUtils implements Closeable {
return null;
}
FSDataInputStream in = fs.open(new Path(hdfsFilePath));
BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){
BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
}
/**

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
@ -44,6 +44,11 @@ public class LoggerUtils {
*/
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/**
* Task Logger Thread's name
*/
public static final String TASK_APPID_LOG_FORMAT = "[taskAppId=";
/**
* build job id
*
@ -58,7 +63,7 @@ public class LoggerUtils {
int processInstId,
int taskId){
// - [taskAppId=TASK_79_4084_15210]
return String.format(" - [taskAppId=%s-%s-%s-%s]",affix,
return String.format(" - %s%s-%s-%s-%s]",TASK_APPID_LOG_FORMAT,affix,
processDefId,
processInstId,
taskId);

View File

@ -16,13 +16,17 @@
*/
package org.apache.dolphinscheduler.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.Reader;
import java.sql.*;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Slightly modified version of the com.ibatis.common.jdbc.ScriptRunner class
@ -94,9 +98,7 @@ public class ScriptRunner {
} finally {
connection.setAutoCommit(originalAutoCommit);
}
} catch (IOException e) {
throw e;
} catch (SQLException e) {
} catch (IOException | SQLException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Error running script. Cause: " + e, e);
@ -114,9 +116,7 @@ public class ScriptRunner {
} finally {
connection.setAutoCommit(originalAutoCommit);
}
} catch (IOException e) {
throw e;
} catch (SQLException e) {
} catch (IOException | SQLException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Error running script. Cause: " + e, e);
@ -161,44 +161,34 @@ public class ScriptRunner {
|| fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
command.append(line.substring(0, line.lastIndexOf(getDelimiter())));
command.append(" ");
Statement statement = conn.createStatement();
boolean hasResults = false;
logger.info("sql:"+command.toString());
if (stopOnError) {
hasResults = statement.execute(command.toString());
} else {
try {
statement.execute(command.toString());
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw e;
}
}
ResultSet rs = statement.getResultSet();
if (hasResults && rs != null) {
ResultSetMetaData md = rs.getMetaData();
int cols = md.getColumnCount();
for (int i = 0; i < cols; i++) {
String name = md.getColumnLabel(i);
logger.info(name + "\t");
}
logger.info("");
while (rs.next()) {
for (int i = 0; i < cols; i++) {
String value = rs.getString(i);
logger.info(value + "\t");
}
logger.info("");
}
}
logger.info("sql: {}", command);
try (Statement statement = conn.createStatement()) {
statement.execute(command.toString());
try (ResultSet rs = statement.getResultSet()) {
if (stopOnError && rs != null) {
ResultSetMetaData md = rs.getMetaData();
int cols = md.getColumnCount();
for (int i = 0; i < cols; i++) {
String name = md.getColumnLabel(i);
logger.info("{} \t", name);
}
logger.info("");
while (rs.next()) {
for (int i = 0; i < cols; i++) {
String value = rs.getString(i);
logger.info("{} \t", value);
}
logger.info("");
}
}
}
} catch (SQLException e) {
logger.error("SQLException", e);
throw e;
}
command = null;
try {
statement.close();
} catch (Exception e) {
// Ignore to workaround a bug in Jakarta DBCP
}
Thread.yield();
} else {
command.append(line);
@ -207,11 +197,11 @@ public class ScriptRunner {
}
} catch (SQLException e) {
logger.error("Error executing: " + command.toString());
logger.error("Error executing: {}", command);
throw e;
} catch (IOException e) {
e.fillInStackTrace();
logger.error("Error executing: " + command.toString());
logger.error("Error executing: {}", command);
throw e;
}
}
@ -243,46 +233,35 @@ public class ScriptRunner {
|| fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
command.append(line.substring(0, line.lastIndexOf(getDelimiter())));
command.append(" ");
Statement statement = conn.createStatement();
sql = command.toString().replaceAll("\\{\\{APPDB\\}\\}", dbName);
boolean hasResults = false;
logger.info("sql : " + sql);
if (stopOnError) {
hasResults = statement.execute(sql);
} else {
try {
statement.execute(sql);
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw e;
}
}
ResultSet rs = statement.getResultSet();
if (hasResults && rs != null) {
ResultSetMetaData md = rs.getMetaData();
int cols = md.getColumnCount();
for (int i = 0; i < cols; i++) {
String name = md.getColumnLabel(i);
logger.info(name + "\t");
}
logger.info("");
while (rs.next()) {
for (int i = 0; i < cols; i++) {
String value = rs.getString(i);
logger.info(value + "\t");
}
logger.info("");
}
}
logger.info("sql : {}", sql);
try (Statement statement = conn.createStatement()) {
statement.execute(sql);
try (ResultSet rs = statement.getResultSet()) {
if (stopOnError && rs != null) {
ResultSetMetaData md = rs.getMetaData();
int cols = md.getColumnCount();
for (int i = 0; i < cols; i++) {
String name = md.getColumnLabel(i);
logger.info("{} \t", name);
}
logger.info("");
while (rs.next()) {
for (int i = 0; i < cols; i++) {
String value = rs.getString(i);
logger.info("{} \t", value);
}
logger.info("");
}
}
}
} catch (SQLException e) {
logger.error("SQLException", e);
throw e;
}
command = null;
try {
statement.close();
} catch (Exception e) {
// Ignore to workaround a bug in Jakarta DBCP
}
Thread.yield();
} else {
command.append(line);
@ -291,11 +270,10 @@ public class ScriptRunner {
}
} catch (SQLException e) {
logger.error("Error executing: " + sql);
throw e;
} catch (IOException e) {
e.fillInStackTrace();
logger.error("Error executing: " + sql);
logger.error("Error executing: {}", sql);
throw e;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.common.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.Constants;
/**
* sensitive log Util
*/
public class SensitiveLogUtil {
public class SensitiveLogUtils {
/**
* @param dataSourcePwd data source password

View File

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
@ -68,6 +69,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, FlinkParameters.class);
case HTTP:
return JSONUtils.parseObject(parameter, HttpParameters.class);
case DATAX:
return JSONUtils.parseObject(parameter, DataxParameters.class);
default:
return null;
}

View File

@ -0,0 +1,169 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" -->
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- master server logback config start -->
<appender name="MASTERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-master.log</file>
<!--<filter class="org.apache.dolphinscheduler.common.log.MasterLogFilter">
<level>INFO</level>
</filter>-->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- master server logback config end -->
<!-- worker server logback config start -->
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.common.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.common.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.common.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<appender name="WORKERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-worker.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.common.log.WorkerLogFilter"/>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- worker server logback config end -->
<!-- alert server logback config start -->
<appender name="ALERTLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-alert.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>20</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- alert server logback config end -->
<!-- api server logback config start -->
<appender name="APILOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-api-server.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- api server logback config end -->
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.hbase" level="WARN"/>
<logger name="org.apache.hadoop" level="WARN"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<if condition='p("server").contains("master-server")'>
<then>
<appender-ref ref="MASTERLOGFILE"/>
</then>
</if>
<if condition='p("server").contains("worker-server")'>
<then>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/>
</then>
</if>
<if condition='p("server").contains("alert-server")'>
<then>
<appender-ref ref="ALERTLOGFILE"/>
</then>
</if>
<if condition='p("server").contains("api-server")'>
<then>
<appender-ref ref="APILOGFILE"/>
</then>
</if>
</root>
</configuration>

View File

@ -53,11 +53,12 @@ org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.clusterCheckinInterval = 5000
org.quartz.jobStore.acquireTriggersWithinLock=true
org.quartz.jobStore.dataSource = myDs
#============================================================================
# Configure Datasources
#============================================================================
org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.server.quartz.DruidConnectionProvider
org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.dao.quartz.DruidConnectionProvider
org.quartz.dataSource.myDs.maxConnections = 10
org.quartz.dataSource.myDs.validationQuery = select 1

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Marker;
import java.util.Map;
public class MasterLogFilterTest {
@Test
public void decide() {
MasterLogFilter masterLogFilter = new MasterLogFilter();
FilterReply filterReply = masterLogFilter.decide(new ILoggingEvent() {
@Override
public String getThreadName() {
return Constants.THREAD_NAME_MASTER_SERVER;
}
@Override
public Level getLevel() {
return Level.INFO;
}
@Override
public String getMessage() {
return "master insert into queue success, task : shell2";
// return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed";
}
@Override
public Object[] getArgumentArray() {
return new Object[0];
}
@Override
public String getFormattedMessage() {
return "master insert into queue success, task : shell2";
}
@Override
public String getLoggerName() {
return null;
}
@Override
public LoggerContextVO getLoggerContextVO() {
return null;
}
@Override
public IThrowableProxy getThrowableProxy() {
return null;
}
@Override
public StackTraceElement[] getCallerData() {
return new StackTraceElement[0];
}
@Override
public boolean hasCallerData() {
return false;
}
@Override
public Marker getMarker() {
return null;
}
@Override
public Map<String, String> getMDCPropertyMap() {
return null;
}
@Override
public Map<String, String> getMdc() {
return null;
}
@Override
public long getTimeStamp() {
return 0;
}
@Override
public void prepareForDeferredProcessing() {
}
});
Assert.assertEquals(FilterReply.ACCEPT, filterReply);
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.SensitiveLogUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SensitiveDataConverterTest {
private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class);
/**
* password pattern
*/
private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX);
private final String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," +
"\"database\":\"carbond\"," +
"\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," +
"\"user\":\"view\"," +
"\"password\":\"view1\"}";
private final String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," +
"\"database\":\"carbond\"," +
"\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," +
"\"user\":\"view\"," +
"\"password\":\"******\"}";
@Test
public void convert() {
SensitiveDataConverter sensitiveDataConverter = new SensitiveDataConverter();
String result = sensitiveDataConverter.convert(new ILoggingEvent() {
@Override
public String getThreadName() {
return null;
}
@Override
public Level getLevel() {
return Level.INFO;
}
@Override
public String getMessage() {
return null;
}
@Override
public Object[] getArgumentArray() {
return new Object[0];
}
@Override
public String getFormattedMessage() {
return logMsg;
}
@Override
public String getLoggerName() {
return null;
}
@Override
public LoggerContextVO getLoggerContextVO() {
return null;
}
@Override
public IThrowableProxy getThrowableProxy() {
return null;
}
@Override
public StackTraceElement[] getCallerData() {
return new StackTraceElement[0];
}
@Override
public boolean hasCallerData() {
return false;
}
@Override
public Marker getMarker() {
return null;
}
@Override
public Map<String, String> getMDCPropertyMap() {
return null;
}
@Override
public Map<String, String> getMdc() {
return null;
}
@Override
public long getTimeStamp() {
return 0;
}
@Override
public void prepareForDeferredProcessing() {
}
});
Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg));
}
/**
* mask sensitive logMsg - sql task datasource password
*/
@Test
public void testPwdLogMsgConverter() {
logger.info("parameter : {}", logMsg);
logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg));
Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg));
Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg));
}
/**
* password regex test
*
* @param logMsg original log
*/
private static String passwordHandler(Pattern pattern, String logMsg) {
Matcher matcher = pattern.matcher(logMsg);
StringBuffer sb = new StringBuffer(logMsg.length());
while (matcher.find()) {
String password = matcher.group();
String maskPassword = SensitiveLogUtils.maskDataSourcePwd(password);
matcher.appendReplacement(sb, maskPassword);
}
matcher.appendTail(sb);
return sb.toString();
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Marker;
import java.util.Map;
import static org.junit.Assert.*;
public class TaskLogDiscriminatorTest {
/**
* log base
*/
private String logBase = "logs";
TaskLogDiscriminator taskLogDiscriminator;
@Before
public void before(){
taskLogDiscriminator = new TaskLogDiscriminator();
taskLogDiscriminator.setLogBase("logs");
taskLogDiscriminator.setKey("123");
}
@Test
public void getDiscriminatingValue() {
String result = taskLogDiscriminator.getDiscriminatingValue(new ILoggingEvent() {
@Override
public String getThreadName() {
return null;
}
@Override
public Level getLevel() {
return null;
}
@Override
public String getMessage() {
return null;
}
@Override
public Object[] getArgumentArray() {
return new Object[0];
}
@Override
public String getFormattedMessage() {
return null;
}
@Override
public String getLoggerName() {
return "[taskAppId=TASK-1-1-1";
}
@Override
public LoggerContextVO getLoggerContextVO() {
return null;
}
@Override
public IThrowableProxy getThrowableProxy() {
return null;
}
@Override
public StackTraceElement[] getCallerData() {
return new StackTraceElement[0];
}
@Override
public boolean hasCallerData() {
return false;
}
@Override
public Marker getMarker() {
return null;
}
@Override
public Map<String, String> getMDCPropertyMap() {
return null;
}
@Override
public Map<String, String> getMdc() {
return null;
}
@Override
public long getTimeStamp() {
return 0;
}
@Override
public void prepareForDeferredProcessing() {
}
});
Assert.assertEquals("1/1/", result);
}
@Test
public void start() {
taskLogDiscriminator.start();
Assert.assertEquals(true, taskLogDiscriminator.isStarted());
}
@Test
public void getKey() {
Assert.assertEquals("123", taskLogDiscriminator.getKey());
}
@Test
public void setKey() {
taskLogDiscriminator.setKey("123");
}
@Test
public void getLogBase() {
Assert.assertEquals("logs", taskLogDiscriminator.getLogBase());
}
@Test
public void setLogBase() {
taskLogDiscriminator.setLogBase("logs");
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Marker;
import java.util.Map;
public class TaskLogFilterTest {
@Test
public void decide() {
TaskLogFilter taskLogFilter = new TaskLogFilter();
FilterReply filterReply = taskLogFilter.decide(new ILoggingEvent() {
@Override
public String getThreadName() {
return LoggerUtils.TASK_LOGGER_THREAD_NAME;
}
@Override
public Level getLevel() {
return Level.INFO;
}
@Override
public String getMessage() {
return "raw script : echo 222";
}
@Override
public Object[] getArgumentArray() {
return new Object[0];
}
@Override
public String getFormattedMessage() {
return "raw script : echo 222";
}
@Override
public String getLoggerName() {
return null;
}
@Override
public LoggerContextVO getLoggerContextVO() {
return null;
}
@Override
public IThrowableProxy getThrowableProxy() {
return null;
}
@Override
public StackTraceElement[] getCallerData() {
return new StackTraceElement[0];
}
@Override
public boolean hasCallerData() {
return false;
}
@Override
public Marker getMarker() {
return null;
}
@Override
public Map<String, String> getMDCPropertyMap() {
return null;
}
@Override
public Map<String, String> getMdc() {
return null;
}
@Override
public long getTimeStamp() {
return 0;
}
@Override
public void prepareForDeferredProcessing() {
}
});
Assert.assertEquals(FilterReply.ACCEPT, filterReply);
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.log;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Marker;
import java.util.Map;
public class WorkerLogFilterTest {
@Test
public void decide() {
WorkerLogFilter workerLogFilter = new WorkerLogFilter();
FilterReply filterReply = workerLogFilter.decide(new ILoggingEvent() {
@Override
public String getThreadName() {
return Constants.THREAD_NAME_WORKER_SERVER;
}
@Override
public Level getLevel() {
return Level.INFO;
}
@Override
public String getMessage() {
return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed";
}
@Override
public Object[] getArgumentArray() {
return new Object[0];
}
@Override
public String getFormattedMessage() {
return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed";
}
@Override
public String getLoggerName() {
return null;
}
@Override
public LoggerContextVO getLoggerContextVO() {
return null;
}
@Override
public IThrowableProxy getThrowableProxy() {
return null;
}
@Override
public StackTraceElement[] getCallerData() {
return new StackTraceElement[0];
}
@Override
public boolean hasCallerData() {
return false;
}
@Override
public Marker getMarker() {
return null;
}
@Override
public Map<String, String> getMDCPropertyMap() {
return null;
}
@Override
public Map<String, String> getMdc() {
return null;
}
@Override
public long getTimeStamp() {
return 0;
}
@Override
public void prepareForDeferredProcessing() {
}
});
Assert.assertEquals(FilterReply.ACCEPT, filterReply);
}
}

View File

@ -14,31 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.Date;
import static org.junit.Assert.assertEquals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test ScheduleUtils
*/
public class ScheduleUtilsTest {
import java.util.List;
public class LoggerUtilsTest {
private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class);
/**
* Test the getRecentTriggerTime method
*/
@Test
public void testGetRecentTriggerTime() {
Date from = DateUtils.stringToDate("2020-01-01 00:00:00");
Date to = DateUtils.stringToDate("2020-01-31 01:00:00");
// test date
assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", to, from).size());
// test error cron
assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * *", from, to).size());
// test cron
assertEquals(31, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", from, to).size());
public void buildTaskId() {
String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,79,4084,15210);
Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId);
}
@Test
public void getAppIds() {
List<String> appIdList = LoggerUtils.getAppIds("Running job: application_1_1",logger);
Assert.assertEquals("application_1_1", appIdList.get(0));
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
@ -22,7 +22,7 @@ import org.junit.Assert;
import org.junit.Test;
public class SensitiveLogUtilTest {
public class SensitiveLogUtilsTest {
@Test
public void testMaskDataSourcePwd() {
@ -30,8 +30,8 @@ public class SensitiveLogUtilTest {
String password = "123456";
String emptyPassword = "";
Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtil.maskDataSourcePwd(password));
Assert.assertEquals("", SensitiveLogUtil.maskDataSourcePwd(emptyPassword));
Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtils.maskDataSourcePwd(password));
Assert.assertEquals("", SensitiveLogUtils.maskDataSourcePwd(emptyPassword));
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.quartz;
package org.apache.dolphinscheduler.dao.quartz;
import com.alibaba.druid.pool.DruidDataSource;
import org.quartz.SchedulerException;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.quartz;
package org.apache.dolphinscheduler.dao.quartz;
import org.apache.dolphinscheduler.common.Constants;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.quartz;
package org.apache.dolphinscheduler.dao.quartz;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;

View File

@ -96,7 +96,13 @@ public class DagHelper {
for (String startNodeName : startNodeList) {
TaskNode startNode = findNodeByName(taskNodeList, startNodeName);
List<TaskNode> childNodeList = new ArrayList<>();
if (TaskDependType.TASK_POST == taskDependType) {
if (startNode == null) {
logger.error("start node name [{}] is not in task node list [{}] ",
startNodeName,
taskNodeList
);
continue;
} else if (TaskDependType.TASK_POST == taskDependType) {
childNodeList = getFlowNodeListPost(startNode, taskNodeList);
} else if (TaskDependType.TASK_PRE == taskDependType) {
childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList);
@ -129,7 +135,6 @@ public class DagHelper {
if (null != depList && null != startNode && depList.contains(startNode.getName())) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList));
}
}
resultList.add(startNode);
return resultList;

View File

@ -156,6 +156,23 @@ public class CronUtils {
return dateList;
}
/**
* gets all scheduled times for a period of time based on self dependency
* @param startTime startTime
* @param endTime endTime
* @param cron cron
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, String cron) {
CronExpression cronExpression = null;
try {
cronExpression = parse2CronExpression(cron);
}catch (ParseException e){
logger.error(e.getMessage(), e);
return Collections.EMPTY_LIST;
}
return getSelfFireDateList(startTime, endTime, cronExpression);
}
/**
* get expiration time

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import com.cronutils.builder.CronBuilder;
import com.cronutils.model.Cron;
@ -31,10 +32,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.Date;
import static com.cronutils.model.field.expression.FieldExpressionFactory.*;
/**
* CronUtilsTest
*/
public class CronUtilsTest {
@ -55,8 +58,9 @@ public class CronUtilsTest {
.withSecond(on(0))
.instance();
// Obtain the string expression
String cronAsString = cron.asString(); // 0 */5 * * * ? * Every five minutes(once every 5 minutes)
String cronAsString = cron.asString();
// 0 */5 * * * ? * Every five minutes(once every 5 minutes)
Assert.assertEquals(cronAsString, "0 */5 * * * ? *");
}
@ -68,9 +72,6 @@ public class CronUtilsTest {
@Test
public void testCronParse() throws ParseException {
String strCrontab = "0 1 2 3 * ? *";
strCrontab = "0/50 0/59 * * * ? *";
strCrontab = "3/5 * 0/5 * * ? *";
strCrontab = "1/5 3/5 1/5 3/30 * ? *";
Cron depCron = CronUtils.parse2Cron(strCrontab);
Assert.assertEquals(depCron.retrieve(CronFieldName.SECOND).getExpression().asString(), "0");
@ -87,12 +88,14 @@ public class CronUtilsTest {
*/
@Test
public void testScheduleType() throws ParseException {
CycleEnum cycleEnum = CronUtils.getMaxCycle("0 */1 * * * ? *");
CycleEnum cycleEnum = CronUtils.getMaxCycle(CronUtils.parse2Cron("0 */1 * * * ? *"));
Assert.assertEquals(cycleEnum.name(), "MINUTE");
CycleEnum cycleEnum2 = CronUtils.getMaxCycle("0 * * * * ? *");
Assert.assertEquals(cycleEnum2.name(), "MINUTE");
CycleEnum cycleEnum3 = CronUtils.getMiniCycle(CronUtils.parse2Cron("0 * * * * ? *"));
Assert.assertEquals(cycleEnum3.name(), "MINUTE");
}
/**
@ -109,26 +112,9 @@ public class CronUtilsTest {
.withMinute(every(5))
.withSecond(on(0))
.instance();
String cronAsString = cron1.asString(); // 0 */5 * * * ? * once every 5 minutes
//logger.info(cronAsString);
// Obtain the string expression
//String minCrontab = "0 0 * * * ? *";
//String minCrontab = "0 0 10,14,16 * * ?";
//String minCrontab = "0 0-5 14 * * ? *";
//String minCrontab = "0 0 2 ? * SUN *";
//String minCrontab = "* 0,3 2 SUN * 1#1 *";
//String minCrontab = "* 0,3 * 1W * ? *";
//cron = CronUtils.parse2Cron("0 * * * * ? *");
// month cycle
/*String[] cronArayy = new String[]{"* 0,3 * 1W * ? *","* 0 0 1W * ? *",
"0 0 0 L 3/5 ? *","0 0 0 ? 3/5 2/2 *"};*/
// minute cycle
String[] cronArayy = new String[]{"* * * * * ? *","* 0 * * * ? *",
"* 5 * * 3/5 ? *","0 0 * * * ? *"};
// week cycle
/*String[] cronArayy = new String[]{"* * * ? * 2/1 *","0 *//*5 * ? * 2/1 *",
"* * *//*5 ? * 2/1 *"};*/
for(String minCrontab:cronArayy){
if (!org.quartz.CronExpression.isValidExpression(minCrontab)) {
throw new RuntimeException(minCrontab+" verify failure, cron expression not valid");
@ -171,7 +157,6 @@ public class CronUtilsTest {
logger.info("dayOfWeekField instanceof And:"+(dayOfWeekField.getExpression() instanceof And));
logger.info("dayOfWeekField instanceof QuestionMark:"+(dayOfWeekField.getExpression() instanceof QuestionMark));
CycleEnum cycleEnum = CronUtils.getMaxCycle(minCrontab);
if(cycleEnum !=null){
logger.info(cycleEnum.name());
@ -180,4 +165,34 @@ public class CronUtilsTest {
}
}
}
}
@Test
public void getSelfFireDateList() throws ParseException{
Date from = DateUtils.stringToDate("2020-01-01 00:00:00");
Date to = DateUtils.stringToDate("2020-01-31 00:00:00");
// test date
Assert.assertEquals(0, CronUtils.getSelfFireDateList(to, from, "0 0 0 * * ? ").size());
// test error cron
Assert.assertEquals(0, CronUtils.getSelfFireDateList(from, to, "0 0 0 * *").size());
// test cron
Assert.assertEquals(29, CronUtils.getSelfFireDateList(from, to, "0 0 0 * * ? ").size());
// test other
Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? ")).size());
Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size());
}
@Test
public void getExpirationTime(){
Date startTime = DateUtils.stringToDate("2020-02-07 18:30:00");
Date expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.HOUR);
Assert.assertEquals("2020-02-07 19:30:00", DateUtils.dateToString(expirationTime));
expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.DAY);
Assert.assertEquals("2020-02-07 23:59:59", DateUtils.dateToString(expirationTime));
expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.WEEK);
Assert.assertEquals("2020-02-07 23:59:59", DateUtils.dateToString(expirationTime));
expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.MONTH);
Assert.assertEquals("2020-02-07 23:59:59", DateUtils.dateToString(expirationTime));
expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.YEAR);
Assert.assertEquals("2020-02-07 18:30:00", DateUtils.dateToString(expirationTime));
}
}

View File

@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.server.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.dao.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.dao.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.quartz.SchedulerException;
import org.slf4j.Logger;

View File

@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.utils.ScheduleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -187,7 +187,7 @@ public class MasterExecThread implements Runnable {
/**
* execute process
* @throws Exception excpetion
* @throws Exception exception
*/
private void executeProcess() throws Exception {
prepareProcess();
@ -197,7 +197,7 @@ public class MasterExecThread implements Runnable {
/**
* execute complement process
* @throws Exception excpetion
* @throws Exception exception
*/
private void executeComplementProcess() throws Exception {
@ -213,8 +213,7 @@ public class MasterExecThread implements Runnable {
List<Date> listDate = Lists.newLinkedList();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule schedule : schedules) {
List<Date> list = ScheduleUtils.getRecentTriggerTime(schedule.getCrontab(), startDate, endDate);
listDate.addAll(list);
listDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedule.getCrontab()));
}
}
// get first fire date
@ -247,7 +246,7 @@ public class MasterExecThread implements Runnable {
// execute process ,waiting for end
runProcess();
// process instace failure no more complements
// process instance failure no more complements
if(!processInstance.getState().typeIsSuccess()){
logger.info("process {} state {}, complement not completely!",
processInstance.getId(), processInstance.getState());
@ -304,7 +303,7 @@ public class MasterExecThread implements Runnable {
/**
* prepare process parameter
* @throws Exception excpetion
* @throws Exception exception
*/
private void prepareProcess() throws Exception {
// init task queue
@ -332,7 +331,7 @@ public class MasterExecThread implements Runnable {
/**
* generate process dag
* @throws Exception excpetion
* @throws Exception exception
*/
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
@ -609,7 +608,7 @@ public class MasterExecThread implements Runnable {
/**
* query task instance by complete state
* @param state state
* @return task isntance list
* @return task instance list
*/
private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state){
List<TaskInstance> resultList = new ArrayList<>();
@ -804,7 +803,7 @@ public class MasterExecThread implements Runnable {
}
/**
* add task to standy list
* add task to standby list
* @param taskInstance task instance
*/
private void addTaskToStandByList(TaskInstance taskInstance){

View File

@ -122,7 +122,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
break;
}
if(checkTimeout){
long remainTime = getRemaintime(taskTimeoutParameter.getInterval()*60);
long remainTime = getRemaintime(taskTimeoutParameter.getInterval() * 60L);
if (remainTime < 0) {
logger.warn("task id: {} execution time out",taskInstance.getId());
// process define

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.DbType;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import com.alibaba.druid.sql.parser.SQLStatementParser;
public class DataxUtils {
public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader";
public static final String DATAX_READER_PLUGIN_POSTGRESQL = "postgresqlreader";
public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader";
public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader";
public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter";
public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter";
public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter";
public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter";
public static String getReaderPluginName(DbType dbType) {
switch (dbType) {
case MYSQL:
return DATAX_READER_PLUGIN_MYSQL;
case POSTGRESQL:
return DATAX_READER_PLUGIN_POSTGRESQL;
case ORACLE:
return DATAX_READER_PLUGIN_ORACLE;
case SQLSERVER:
return DATAX_READER_PLUGIN_SQLSERVER;
default:
return null;
}
}
public static String getWriterPluginName(DbType dbType) {
switch (dbType) {
case MYSQL:
return DATAX_WRITER_PLUGIN_MYSQL;
case POSTGRESQL:
return DATAX_WRITER_PLUGIN_POSTGRESQL;
case ORACLE:
return DATAX_WRITER_PLUGIN_ORACLE;
case SQLSERVER:
return DATAX_WRITER_PLUGIN_SQLSERVER;
default:
return null;
}
}
public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) {
switch (dbType) {
case MYSQL:
return new MySqlStatementParser(sql);
case POSTGRESQL:
return new PGSQLStatementParser(sql);
case ORACLE:
return new OracleStatementParser(sql);
case SQLSERVER:
return new SQLServerStatementParser(sql);
default:
return null;
}
}
public static String[] convertKeywordsColumns(DbType dbType, String[] columns) {
if (columns == null) {
return null;
}
String[] toColumns = new String[columns.length];
for (int i = 0; i < columns.length; i++ ) {
toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]);
}
return toColumns;
}
public static String doConvertKeywordsColumn(DbType dbType, String column) {
if (column == null) {
return column;
}
column = column.trim();
column = column.replace("`", "");
column = column.replace("\"", "");
column = column.replace("'", "");
switch (dbType) {
case MYSQL:
return String.format("`%s`", column);
case POSTGRESQL:
return String.format("\"%s\"", column);
case ORACLE:
return String.format("\"%s\"", column);
case SQLSERVER:
return String.format("`%s`", column);
default:
return column;
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;

View File

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
/**
* ScheduleUtils
*/
public class ScheduleUtils {
private static final Logger logger = LoggerFactory.getLogger(ScheduleUtils.class);
/**
* Get the execution time of the time interval
* @param cron
* @param from
* @param to
* @return
*/
public static List<Date> getRecentTriggerTime(String cron, Date from, Date to) {
return getRecentTriggerTime(cron, Integer.MAX_VALUE, from, to);
}
/**
* Get the execution time of the time interval
* @param cron
* @param size
* @param from
* @param to
* @return
*/
public static List<Date> getRecentTriggerTime(String cron, int size, Date from, Date to) {
List list = new LinkedList<Date>();
if(to.before(from)){
logger.error("schedule date from:{} must before date to:{}!", from, to);
return list;
}
try {
CronTriggerImpl trigger = new CronTriggerImpl();
trigger.setCronExpression(cron);
trigger.setStartTime(from);
trigger.setEndTime(to);
trigger.computeFirstFireTime(null);
for (int i = 0; i < size; i++) {
Date schedule = trigger.getNextFireTime();
if(null == schedule){
break;
}
list.add(schedule);
trigger.triggered(null);
}
} catch (ParseException e) {
logger.error("cron:{} error:{}", cron, e.getMessage());
}
return java.util.Collections.unmodifiableList(list);
}
}

View File

@ -35,8 +35,8 @@ import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

View File

@ -23,17 +23,14 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.slf4j.Logger;
import java.io.*;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -150,9 +147,6 @@ public abstract class AbstractCommandExecutor {
// get process id
int pid = getProcessId(process);
// task instance id
int taskInstId = Integer.parseInt(taskAppId.split("_")[2]);
processDao.updatePidByTaskInstId(taskInstId, pid, "");
logger.info("process start, process id is: {}", pid);
@ -207,7 +201,14 @@ public abstract class AbstractCommandExecutor {
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
processBuilder.command("sudo", "-u", tenantCode, commandInterpreter(), commandFile);
List<String> command = new LinkedList<>();
command.add("sudo");
command.add("-u");
command.add(tenantCode);
command.add(commandInterpreter());
command.addAll(commandOptions());
command.add(commandFile);
processBuilder.command(command);
process = processBuilder.start();
@ -559,7 +560,9 @@ public abstract class AbstractCommandExecutor {
}
}
protected List<String> commandOptions() {
return Collections.emptyList();
}
protected abstract String buildCommandFilePath();
protected abstract String commandInterpreter();
protected abstract boolean checkFindApp(String line);

View File

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
@ -194,6 +195,9 @@ public abstract class AbstractTask {
case PYTHON:
paramsClass = PythonParameters.class;
break;
case DATAX:
paramsClass = DataxParameters.class;
break;
default:
logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type");

View File

@ -26,6 +26,7 @@ import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
@ -108,6 +109,16 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
}
}
/**
* get command options
* @return command options list
*/
@Override
protected List<String> commandOptions() {
// unbuffered binary stdout and stderr
return Collections.singletonList("-u");
}
/**
* get python home
* @return python home

View File

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask;
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask;
@ -65,6 +66,8 @@ public class TaskManager {
return new DependentTask(props, logger);
case HTTP:
return new HttpTask(props, logger);
case DATAX:
return new DataxTask(props, logger);
default:
logger.error("unsupport task type: {}", taskType);
throw new IllegalArgumentException("not support task type");

View File

@ -0,0 +1,522 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.datax;
import java.io.File;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.job.db.BaseDataSource;
import org.apache.dolphinscheduler.common.job.db.DataSourceFactory;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.alibaba.fastjson.JSONObject;
/**
* DataX task
*/
public class DataxTask extends AbstractTask {
/**
* python process(datax only supports version 2.7 by default)
*/
private static final String DATAX_PYTHON = "python2.7";
/**
* datax home path
*/
private static final String DATAX_HOME_EVN = "${DATAX_HOME}";
/**
* datax channel count
*/
private static final int DATAX_CHANNEL_COUNT = 1;
/**
* datax parameters
*/
private DataxParameters dataXParameters;
/**
* task dir
*/
private String taskDir;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* process database access
*/
private ProcessDao processDao;
/**
* constructor
*
* @param props
* props
* @param logger
* logger
*/
public DataxTask(TaskProps props, Logger logger) {
super(props, logger);
this.taskDir = props.getTaskDir();
logger.info("task dir : {}", taskDir);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(),
props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
props.getTaskTimeout(), logger);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
/**
* init DataX config
*/
@Override
public void init() {
logger.info("datax task params {}", taskProps.getTaskParams());
dataXParameters = JSONUtils.parseObject(taskProps.getTaskParams(), DataxParameters.class);
if (!dataXParameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");
}
}
/**
* run DataX process
*
* @throws Exception
*/
@Override
public void handle()
throws Exception {
try {
// set the name of the current thread
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
// run datax process
String jsonFilePath = buildDataxJsonFile();
String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processDao);
}
catch (Exception e) {
exitStatusCode = -1;
throw e;
}
}
/**
* cancel DataX process
*
* @param cancelApplication
* @throws Exception
*/
@Override
public void cancelApplication(boolean cancelApplication)
throws Exception {
// cancel process
shellCommandExecutor.cancelApplication();
}
/**
* build datax configuration file
*
* @return
* @throws Exception
*/
private String buildDataxJsonFile()
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson());
JSONObject root = new JSONObject();
root.put("job", job);
root.put("core", buildDataxCoreJson());
logger.debug("datax job json : {}", root.toString());
// create datax json file
FileUtils.writeStringToFile(new File(fileName), root.toString(), Charset.forName("UTF-8"));
return fileName;
}
/**
* build datax job config
*
* @return
* @throws SQLException
*/
private List<JSONObject> buildDataxJobContentJson()
throws SQLException {
DataSource dataSource = processDao.findDataSourceById(dataXParameters.getDataSource());
BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
DataSource dataTarget = processDao.findDataSourceById(dataXParameters.getDataTarget());
BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(),
dataTarget.getConnectionParams());
List<JSONObject> readerConnArr = new ArrayList<>();
JSONObject readerConn = new JSONObject();
readerConn.put("querySql", new String[] {dataXParameters.getSql()});
readerConn.put("jdbcUrl", new String[] {dataSourceCfg.getJdbcUrl()});
readerConnArr.add(readerConn);
JSONObject readerParam = new JSONObject();
readerParam.put("username", dataSourceCfg.getUser());
readerParam.put("password", dataSourceCfg.getPassword());
readerParam.put("connection", readerConnArr);
JSONObject reader = new JSONObject();
reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType()));
reader.put("parameter", readerParam);
List<JSONObject> writerConnArr = new ArrayList<>();
JSONObject writerConn = new JSONObject();
writerConn.put("table", new String[] {dataXParameters.getTargetTable()});
writerConn.put("jdbcUrl", dataTargetCfg.getJdbcUrl());
writerConnArr.add(writerConn);
JSONObject writerParam = new JSONObject();
writerParam.put("username", dataTargetCfg.getUser());
writerParam.put("password", dataTargetCfg.getPassword());
writerParam.put("column",
parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql()));
writerParam.put("connection", writerConnArr);
if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
writerParam.put("preSql", dataXParameters.getPreStatements());
}
if (CollectionUtils.isNotEmpty(dataXParameters.getPostStatements())) {
writerParam.put("postSql", dataXParameters.getPostStatements());
}
JSONObject writer = new JSONObject();
writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType()));
writer.put("parameter", writerParam);
List<JSONObject> contentList = new ArrayList<>();
JSONObject content = new JSONObject();
content.put("reader", reader);
content.put("writer", writer);
contentList.add(content);
return contentList;
}
/**
* build datax setting config
*
* @return
*/
private JSONObject buildDataxJobSettingJson() {
JSONObject speed = new JSONObject();
speed.put("channel", DATAX_CHANNEL_COUNT);
if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
}
if (dataXParameters.getJobSpeedRecord() > 0) {
speed.put("record", dataXParameters.getJobSpeedRecord());
}
JSONObject errorLimit = new JSONObject();
errorLimit.put("record", 0);
errorLimit.put("percentage", 0);
JSONObject setting = new JSONObject();
setting.put("speed", speed);
setting.put("errorLimit", errorLimit);
return setting;
}
private JSONObject buildDataxCoreJson() {
JSONObject speed = new JSONObject();
speed.put("channel", DATAX_CHANNEL_COUNT);
if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
}
if (dataXParameters.getJobSpeedRecord() > 0) {
speed.put("record", dataXParameters.getJobSpeedRecord());
}
JSONObject channel = new JSONObject();
channel.put("speed", speed);
JSONObject transport = new JSONObject();
transport.put("channel", channel);
JSONObject core = new JSONObject();
core.put("transport", transport);
return core;
}
/**
* create command
*
* @return
* @throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath)
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
// datax python command
StringBuilder sbr = new StringBuilder();
sbr.append(DATAX_PYTHON);
sbr.append(" ");
sbr.append(DATAX_HOME_EVN);
sbr.append(" ");
sbr.append(jobConfigFilePath);
String dataxCommand = sbr.toString();
// find process instance by task id
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(), dataXParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime());
if (paramsMap != null) {
dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap));
}
logger.debug("raw script : {}", dataxCommand);
// create shell command file
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
Files.createFile(path, attr);
Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
return fileName;
}
/**
* parsing synchronized column names in SQL statements
*
* @param dsType
* the database type of the data source
* @param dtType
* the database type of the data target
* @param dataSourceCfg
* the database connection parameters of the data source
* @param sql
* sql for data synchronization
* @return
*/
private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql);
if (columnNames == null || columnNames.length == 0) {
logger.info("try to execute sql analysis query column name");
columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql);
}
notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
return DataxUtils.convertKeywordsColumns(dtType, columnNames);
}
/**
* try grammatical parsing column
*
* @param dbType
* database type
* @param sql
* sql for data synchronization
* @return column name array
* @throws RuntimeException
*/
private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) {
String[] columnNames;
try {
SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
notNull(parser, String.format("database driver [%s] is not support", dbType.toString()));
SQLStatement sqlStatement = parser.parseStatement();
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement;
SQLSelect sqlSelect = sqlSelectStatement.getSelect();
List<SQLSelectItem> selectItemList = null;
if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery();
selectItemList = block.getSelectList();
} else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight();
selectItemList = block.getSelectList();
}
notNull(selectItemList,
String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
columnNames = new String[selectItemList.size()];
for (int i = 0; i < selectItemList.size(); i++ ) {
SQLSelectItem item = selectItemList.get(i);
String columnName = null;
if (item.getAlias() != null) {
columnName = item.getAlias();
} else if (item.getExpr() != null) {
if (item.getExpr() instanceof SQLPropertyExpr) {
SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr();
columnName = expr.getName();
} else if (item.getExpr() instanceof SQLIdentifierExpr) {
SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr();
columnName = expr.getName();
}
} else {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
}
if (columnName == null) {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
}
columnNames[i] = columnName;
}
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
return null;
}
return columnNames;
}
/**
* try to execute sql to resolve column names
*
* @param baseDataSource
* the database connection parameters
* @param sql
* sql for data synchronization
* @return column name array
*/
public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) {
String[] columnNames;
sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
sql = sql.replace(";", "");
try (
Connection connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), baseDataSource.getUser(),
baseDataSource.getPassword());
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++ ) {
columnNames[i - 1] = md.getColumnName(i);
}
}
catch (SQLException e) {
logger.warn(e.getMessage(), e);
return null;
}
return columnNames;
}
@Override
public AbstractParameters getParameters() {
return dataXParameters;
}
private void notNull(Object obj, String message) {
if (obj == null) {
throw new RuntimeException(message);
}
}
}

View File

@ -105,7 +105,7 @@ public class SqlTask extends AbstractTask {
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
logger.info(sqlParameters.toString());
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
sqlParameters.getType(),
sqlParameters.getDatasource(),
@ -123,6 +123,14 @@ public class SqlTask extends AbstractTask {
}
dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
// data source is null
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
@ -130,12 +138,6 @@ public class SqlTask extends AbstractTask {
dataSource.getUserId(),
dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection con = null;
List<String> createFuncs = null;
try {
@ -289,12 +291,12 @@ public class SqlTask extends AbstractTask {
}
}
try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds)) {
try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds);
ResultSet resultSet = stmt.executeQuery()) {
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
JSONArray resultJSONArray = new JSONArray();
ResultSet resultSet = stmt.executeQuery();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
@ -305,11 +307,10 @@ public class SqlTask extends AbstractTask {
}
resultJSONArray.add(mapOfColValues);
}
resultSet.close();
logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
// if there is a result set
if (resultJSONArray.size() > 0) {
if ( !resultJSONArray.isEmpty() ) {
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
sendAttachment(sqlParameters.getTitle(),
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
@ -337,6 +338,12 @@ public class SqlTask extends AbstractTask {
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
} finally {
try {
connection.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return connection;
}
@ -349,22 +356,23 @@ public class SqlTask extends AbstractTask {
* @throws Exception
*/
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
// is the timeout set
boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED ||
taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
if(timeoutFlag){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) {
if(timeoutFlag){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
}
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
}
logger.info("prepare statement replace sql : {} ",stmt.toString());
return stmt;
}
/**
@ -452,7 +460,7 @@ public class SqlTask extends AbstractTask {
for(int i=1;i<=sqlParamsMap.size();i++){
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
}
logger.info(logPrint.toString());
logger.info("Sql Params are {}", logPrint);
}
/**

View File

@ -1,52 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds"> <!--debug="true" -->
<property name="log.base" value="logs" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="MASTERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-master.log</file>
<filter class="org.apache.dolphinscheduler.server.master.log.MasterLogFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="MASTERLOGFILE"/>
</root>
</configuration>

View File

@ -1,81 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds">
<conversionRule conversionWord="msg"
converterClass="org.apache.dolphinscheduler.server.worker.log.SensitiveDataConverter"/>
<property name="log.base" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.server.worker.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<appender name="WORKERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-worker.log</file>
<filter class="org.apache.dolphinscheduler.server.worker.log.WorkerLogFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
     
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
  
</appender>
<root level="INFO">
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/>
</root>
</configuration>

View File

@ -134,7 +134,7 @@ public class MasterExecThreadTest {
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save
verify(processDao, times(16)).saveProcessInstance(processInstance);
verify(processDao, times(15)).saveProcessInstance(processInstance);
}catch (Exception e){
Assert.assertTrue(false);
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* DataxUtils Tester.
*/
public class DataxUtilsTest {
/**
*
* Method: getReaderPluginName(DbType dbType)
*
*/
@Test
public void testGetReaderPluginName() {
assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, DataxUtils.getReaderPluginName(DbType.MYSQL));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE));
assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null);
}
/**
*
* Method: getWriterPluginName(DbType dbType)
*
*/
@Test
public void testGetWriterPluginName() {
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, DataxUtils.getWriterPluginName(DbType.MYSQL));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE));
assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null);
}
/**
*
* Method: getSqlStatementParser(DbType dbType, String sql)
*
*/
@Test
public void testGetSqlStatementParser() throws Exception {
assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1") instanceof MySqlStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select 1") instanceof PGSQLStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1") instanceof OracleStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select 1") instanceof SQLServerStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null);
}
/**
*
* Method: convertKeywordsColumns(DbType dbType, String[] columns)
*
*/
@Test
public void testConvertKeywordsColumns() throws Exception {
String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "};
String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"};
String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns);
assertTrue(fromColumns.length == toColumns.length);
for (int i = 0; i < toColumns.length; i++) {
assertEquals(targetColumns[i], toColumns[i]);
}
}
/**
*
* Method: doConvertKeywordsColumn(DbType dbType, String column)
*
*/
@Test
public void testDoConvertKeywordsColumn() throws Exception {
assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" "));
assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" "));
assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" "));
assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" "));
assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" "));
}
}

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SensitiveDataConverterTest {
private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class);
/**
* password pattern
*/
private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX);
/**
* mask sensitive logMsg - sql task datasource password
*/
@Test
public void testPwdLogMsgConverter() {
String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," +
"\"database\":\"carbond\"," +
"\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," +
"\"user\":\"view\"," +
"\"password\":\"view1\"}";
String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," +
"\"database\":\"carbond\"," +
"\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," +
"\"user\":\"view\"," +
"\"password\":\"******\"}";
logger.info("parameter : {}", logMsg);
logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg));
Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg));
Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg));
}
/**
* password regex test
*
* @param logMsg original log
*/
private static String passwordHandler(Pattern pattern, String logMsg) {
Matcher matcher = pattern.matcher(logMsg);
StringBuffer sb = new StringBuffer(logMsg.length());
while (matcher.find()) {
String password = matcher.group();
String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password);
matcher.appendReplacement(sb, maskPassword);
}
matcher.appendTail(sb);
return sb.toString();
}
}

View File

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

View File

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

View File

@ -0,0 +1,352 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.datax;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.job.db.BaseDataSource;
import org.apache.dolphinscheduler.common.job.db.DataSourceFactory;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
/**
* DataxTask Tester.
*/
public class DataxTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class);
private DataxTask dataxTask;
private ProcessDao processDao;
private ShellCommandExecutor shellCommandExecutor;
private ApplicationContext applicationContext;
@Before
public void before()
throws Exception {
processDao = Mockito.mock(ProcessDao.class);
shellCommandExecutor = Mockito.mock(ShellCommandExecutor.class);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessDao.class)).thenReturn(processDao);
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams(
"{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}");
dataxTask = PowerMockito.spy(new DataxTask(props, logger));
dataxTask.init();
Mockito.when(processDao.findDataSourceById(1)).thenReturn(getDataSource());
Mockito.when(processDao.findDataSourceById(2)).thenReturn(getDataSource());
Mockito.when(processDao.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId());
Mockito.when(shellCommandExecutor.run(fileName, processDao)).thenReturn(0);
}
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
dataSource.setConnectionParams(
"{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}");
dataSource.setUserId(1);
return dataSource;
}
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.START_PROCESS);
processInstance.setScheduleTime(new Date());
return processInstance;
}
@After
public void after()
throws Exception {}
/**
* Method: DataxTask()
*/
@Test
public void testDataxTask()
throws Exception {
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
Assert.assertNotNull(new DataxTask(props, logger));
}
/**
* Method: init
*/
@Test
public void testInit()
throws Exception {
try {
dataxTask.init();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: handle()
*/
@Test
public void testHandle()
throws Exception {
try {
dataxTask.handle();
} catch (RuntimeException e) {
if (e.getMessage().indexOf("process error . exitCode is : -1") < 0) {
Assert.fail();
}
}
}
/**
* Method: cancelApplication()
*/
@Test
public void testCancelApplication()
throws Exception {
try {
dataxTask.cancelApplication(true);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource
* dataSourceCfg, String sql)
*/
@Test
public void testParsingSqlColumnNames()
throws Exception {
try {
BaseDataSource dataSource = DataSourceFactory.getDatasource(getDataSource().getType(),
getDataSource().getConnectionParams());
Method method = DataxTask.class.getDeclaredMethod("parsingSqlColumnNames", DbType.class, DbType.class, BaseDataSource.class, String.class);
method.setAccessible(true);
String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, DbType.MYSQL, dataSource, "select 1 as a, 2 as `table` from dual");
Assert.assertNotNull(columns);
Assert.assertTrue(columns.length == 2);
Assert.assertEquals("[`a`, `table`]", Arrays.toString(columns));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: tryGrammaticalParsingSqlColumnNames(DbType dbType, String sql)
*/
@Test
public void testTryGrammaticalAnalysisSqlColumnNames()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("tryGrammaticalAnalysisSqlColumnNames", DbType.class, String.class);
method.setAccessible(true);
String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, "select t1.a, t1.b from test t1 union all select a, t2.b from (select a, b from test) t2");
Assert.assertNotNull(columns);
Assert.assertTrue(columns.length == 2);
Assert.assertEquals("[a, b]", Arrays.toString(columns));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource,
* String sql)
*/
@Test
public void testTryExecuteSqlResolveColumnNames()
throws Exception {
// TODO: Test goes here...
}
/**
* Method: buildDataxJsonFile()
*/
@Test
public void testBuildDataxJsonFile()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
method.setAccessible(true);
String filePath = (String) method.invoke(dataxTask, null);
Assert.assertNotNull(filePath);
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildDataxJobContentJson()
*/
@Test
public void testBuildDataxJobContentJson()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJobContentJson");
method.setAccessible(true);
List<JSONObject> contentList = (List<JSONObject>) method.invoke(dataxTask, null);
Assert.assertNotNull(contentList);
JSONObject content = contentList.get(0);
JSONObject reader = (JSONObject) content.get("reader");
Assert.assertNotNull(reader);
String readerPluginName = (String) reader.get("name");
Assert.assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, readerPluginName);
JSONObject writer = (JSONObject) content.get("writer");
Assert.assertNotNull(writer);
String writerPluginName = (String) writer.get("name");
Assert.assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, writerPluginName);
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildDataxJobSettingJson()
*/
@Test
public void testBuildDataxJobSettingJson()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJobSettingJson");
method.setAccessible(true);
JSONObject setting = (JSONObject) method.invoke(dataxTask, null);
Assert.assertNotNull(setting);
Assert.assertNotNull(setting.get("speed"));
Assert.assertNotNull(setting.get("errorLimit"));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildDataxCoreJson()
*/
@Test
public void testBuildDataxCoreJson()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxCoreJson");
method.setAccessible(true);
JSONObject coreConfig = (JSONObject) method.invoke(dataxTask, null);
Assert.assertNotNull(coreConfig);
Assert.assertNotNull(coreConfig.get("transport"));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildShellCommandFile(String jobConfigFilePath)
*/
@Test
public void testBuildShellCommandFile()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class);
method.setAccessible(true);
Assert.assertNotNull(method.invoke(dataxTask, "test.json"));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: getParameters
*/
@Test
public void testGetParameters()
throws Exception {
Assert.assertTrue(dataxTask.getParameters() != null);
}
/**
* Method: notNull(Object obj, String message)
*/
@Test
public void testNotNull()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("notNull", Object.class, String.class);
method.setAccessible(true);
method.invoke(dataxTask, "abc", "test throw RuntimeException");
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}

View File

@ -279,6 +279,10 @@ let tasksType = {
'HTTP': {
desc: 'HTTP',
color: '#E46F13'
},
'DATAX': {
desc: 'DataX',
color: '#1fc747'
}
}

View File

@ -101,6 +101,9 @@
.icos-HTTP {
background: url("../img/toobar_HTTP.png") no-repeat 50% 50%;
}
.icos-DATAX {
background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%;
}
.toolbar {
width: 60px;
height: 100%;

View File

@ -191,7 +191,13 @@
ref="HTTP"
:backfill-item="backfillItem">
</m-http>
<m-datax
v-if="taskType === 'DATAX'"
@on-params="_onParams"
@on-cache-params="_onCacheParams"
ref="DATAX"
:backfill-item="backfillItem">
</m-datax>
</div>
</div>
<div class="bottom-box">
@ -216,6 +222,7 @@
import mProcedure from './tasks/procedure'
import mDependent from './tasks/dependent'
import mHttp from './tasks/http'
import mDatax from './tasks/datax'
import mSubProcess from './tasks/sub_process'
import mSelectInput from './_source/selectInput'
import mTimeoutAlarm from './_source/timeoutAlarm'
@ -565,6 +572,7 @@
mPython,
mDependent,
mHttp,
mDatax,
mSelectInput,
mTimeoutAlarm,
mPriority,

View File

@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<div class="datax-model">
<m-list-box>
<div slot="text">{{$t('Datasource')}}</div>
<div slot="content">
<m-datasource
ref="refDs"
@on-dsData="_onDsData"
:supportType="['MYSQL','POSTGRESQL', 'ORACLE', 'SQLSERVER']"
:data="{ type:dsType,datasource:datasource }">
</m-datasource>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('SQL Statement')}}</div>
<div slot="content">
<div class="from-mirror">
<textarea
id="code-sql-mirror"
name="code-sql-mirror"
style="opacity: 0;">
</textarea>
</div>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetDataBase')}}</div>
<div slot="content">
<m-datasource
ref="refDt"
@on-dsData="_onDtData"
:supportType="['MYSQL','POSTGRESQL', 'ORACLE', 'SQLSERVER']"
:data="{ type:dtType,datasource:datatarget }">
</m-datasource>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetTable')}}</div>
<div slot="content">
<x-input
type="input"
v-model="targetTable"
:placeholder="$t('Please enter the table of target')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetDataBase')}}{{$t('Pre Statement')}}</div>
<div slot="content">
<m-statement-list
ref="refPreStatements"
@on-statement-list="_onPreStatements"
:statement-list="preStatements">
</m-statement-list>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetDataBase')}}{{$t('Post Statement')}}</div>
<div slot="content">
<m-statement-list
ref="refPostStatements"
@on-statement-list="_onPostStatements"
:statement-list="postStatements">
</m-statement-list>
</div>
</m-list-box>
<m-list-box>
<div slot="text">
<span>{{$t('SpeedByte')}}</span>
</div>
<div slot="content">
<m-select-input v-model="jobSpeedByte" :list="[0,1,10,50,100,512]">
</m-select-input>
<span>({{$t('0 means unlimited by byte')}})</span>
</div>
</m-list-box>
<m-list-box>
<div slot="text">
<span>{{$t('SpeedRecord')}}</span>
</div>
<div slot="content">
<m-select-input v-model="jobSpeedRecord" :list="[0,500,1000,1500,2000,2500,3000]">
</m-select-input>
<span>({{$t('0 means unlimited by count')}})</span>
</div>
</m-list-box>
</div>
</template>
<script>
import _ from 'lodash'
import i18n from '@/module/i18n'
import mListBox from './_source/listBox'
import mDatasource from './_source/datasource'
import mLocalParams from './_source/localParams'
import mStatementList from './_source/statementList'
import disabledState from '@/module/mixin/disabledState'
import mSelectInput from '../_source/selectInput'
import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
let editor
export default {
name: 'datax',
data () {
return {
// Data source type
dsType: '',
// data source
datasource: '',
// Data source type
dtType: '',
// data source
datatarget: '',
// Return to the selected data source
rtDatasource: '',
// Return to the selected data target
rtDatatarget: '',
// Sql statement
sql: '',
// target table
targetTable: '',
// Pre statements
preStatements: [],
// Post statements
postStatements: [],
// speed byte
jobSpeedByte: 0,
// speed record
jobSpeedRecord: 1000,
}
},
mixins: [disabledState],
props: {
backfillItem: Object,
createNodeId: Number
},
methods: {
/**
* return data source
*/
_onDsData (o) {
this.dsType = o.type
this.rtDatasource = o.datasource
},
/**
* return data target
*/
_onDtData (o) {
this.dtType = o.type
this.rtDatatarget = o.datasource
},
/**
* return pre statements
*/
_onPreStatements (a) {
this.preStatements = a
},
/**
* return post statements
*/
_onPostStatements (a) {
this.postStatements = a
},
/**
* verification
*/
_verification () {
if (!editor.getValue()) {
this.$message.warning(`${i18n.$t('Please enter a SQL Statement(required)')}`)
return false
}
// datasource Subcomponent verification
if (!this.$refs.refDs._verifDatasource()) {
return false
}
// datasource Subcomponent verification
if (!this.$refs.refDt._verifDatasource()) {
return false
}
if (!this.targetTable) {
this.$message.warning(`${i18n.$t('Please enter a Target Table(required)')}`)
return false
}
// preStatements Subcomponent verification
if (!this.$refs.refPreStatements._verifProp()) {
return false
}
// postStatements Subcomponent verification
if (!this.$refs.refPostStatements._verifProp()) {
return false
}
// storage
this.$emit('on-params', {
dsType: this.dsType,
dataSource: this.rtDatasource,
dtType: this.dtType,
dataTarget: this.rtDatatarget,
sql: editor.getValue(),
targetTable: this.targetTable,
jobSpeedByte: this.jobSpeedByte * 1024,
jobSpeedRecord: this.jobSpeedRecord,
preStatements: this.preStatements,
postStatements: this.postStatements
})
return true
},
/**
* Processing code highlighting
*/
_handlerEditor () {
// editor
editor = codemirror('code-sql-mirror', {
mode: 'sql',
readOnly: this.isDetails
})
this.keypress = () => {
if (!editor.getOption('readOnly')) {
editor.showHint({
completeSingle: false
})
}
}
// Monitor keyboard
editor.on('keypress', this.keypress)
editor.setValue(this.sql)
return editor
}
},
created () {
let o = this.backfillItem
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
// backfill
this.dsType = o.params.dsType || ''
this.datasource = o.params.dataSource || ''
this.dtType = o.params.dtType || ''
this.datatarget = o.params.dataTarget || ''
this.sql = o.params.sql || ''
this.targetTable = o.params.targetTable || ''
this.jobSpeedByte = o.params.jobSpeedByte / 1024 || 0
this.jobSpeedRecord = o.params.jobSpeedRecord || 0
this.preStatements = o.params.preStatements || []
this.postStatements = o.params.postStatements || []
}
},
mounted () {
setTimeout(() => {
this._handlerEditor()
}, 200)
},
destroyed () {
/**
* Destroy the editor instance
*/
if (editor) {
editor.toTextArea() // Uninstall
editor.off($('.code-sql-mirror'), 'keypress', this.keypress)
}
},
computed: {},
components: { mListBox, mDatasource, mLocalParams, mStatementList, mSelectInput }
}
</script>

Binary file not shown.

After

Width:  |  Height:  |  Size: 571 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

@ -46,11 +46,11 @@
<span>{{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}}</span>
</td>
<td>
<span class="ellipsis">
<span class="ellipsis" v-tooltip.large.top.start.light="{text: item.alias, maxWidth: '500px'}">
<a href="javascript:" class="links" @click="_go(item)">{{item.alias}}</a>
</span>
</td>
<td><span class="ellipsis">{{item.fileName}}</span></td>
<td><span class="ellipsis" v-tooltip.large.top.start.light="{text: item.fileName, maxWidth: '500px'}">{{item.fileName}}</span></td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>

View File

@ -76,7 +76,8 @@
filterable
v-model="resourceId"
:disabled="isUpdate"
style="width: 200px">
:add-title="true"
style="width: 261px">
<x-option
v-for="city in udfResourceList"
:key="city.id"

View File

@ -55,7 +55,7 @@ v-ps<template>
<span>{{$index + 1}}</span>
</td>
<td>
<span class="ellipsis">
<span class="ellipsis" v-tooltip.large.top.start.light="{text: item.funcName, maxWidth: '500px'}">
<a href="javascript:" class="links">{{item.funcName}}</a>
</span>
</td>
@ -142,7 +142,7 @@ v-ps<template>
id: item.id
}).then(res => {
this.$refs[`poptip-${i}`][0].doClose()
this.$emit('_updateList')
this.$emit('on-update')
this.$message.success(res.msg)
}).catch(e => {
this.$refs[`poptip-${i}`][0].doClose()
@ -156,6 +156,7 @@ v-ps<template>
showMask: true,
escClose: true,
className: 'v-modal-custom',
width: '800px',
transitionName: 'opacityp',
render (h) {
return h(mCreateUdf, {

View File

@ -49,11 +49,11 @@
<span>{{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}}</span>
</td>
<td>
<span class="ellipsis">
<span class="ellipsis" v-tooltip.large.top.start.light="{text: item.alias, maxWidth: '500px'}">
<a href="javascript:" class="links" >{{item.alias}}</a>
</span>
</td>
<td><span class="ellipsis">{{item.fileName}}</span></td>
<td><span class="ellipsis" v-tooltip.large.top.start.light="{text: item.fileName, maxWidth: '500px'}">{{item.fileName}}</span></td>
<td>
<span>{{_rtSize(item.size)}}</span>
</td>

View File

@ -17,7 +17,7 @@
<template>
<m-popup :ok-text="$t('Submit')" :nameText="type.name + $t('Authorize')" @ok="_ok" ref="popup">
<template slot="content">
<div class="clearfix transfer-model">
<div class="clearfix transfer-model" style="width: 660px">
<div>
<x-button-group v-model="checkedValue" size="small">
<x-button type="ghost" value="fileResource" @click="_ckFile">{{$t('File resources')}}</x-button>
@ -195,7 +195,7 @@
.transfer-model {
padding: 0 20px;
.select-list-box {
width: 220px;
width: 300px;
float: left;
border: 1px solid #dcdee2;
border-radius: 3px;
@ -237,7 +237,7 @@
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
width: 210px;
width: 290px;
display: inline-block;
}
&:hover {

View File

@ -509,5 +509,13 @@ export default {
'IP address cannot be empty': 'IP address cannot be empty',
'Please enter the correct IP': 'Please enter the correct IP',
'Please generate token': 'Please generate token',
'Spark Version': 'Spark Version'
'Spark Version': 'Spark Version',
'TargetDataBase': 'target database',
'TargetTable': 'target table',
'Please enter the table of target': 'Please enter the table of target',
'Please enter a Target Table(required)': 'Please enter a Target Table(required)',
'SpeedByte': 'speed(byte count)',
'SpeedRecord': 'speed(record count)',
'0 means unlimited by byte': '0 means unlimited',
'0 means unlimited by count': '0 means unlimited',
}

View File

@ -509,5 +509,13 @@ export default {
'IP address cannot be empty': 'IP地址不能为空',
'Please enter the correct IP': '请输入正确的IP',
'Please generate token': '请生成Token',
'Spark Version': 'Spark版本'
'Spark Version': 'Spark版本',
'TargetDataBase': '目标库',
'TargetTable': '目标表',
'Please enter the table of target': '请输入目标表名',
'Please enter a Target Table(required)': '请输入目标表(必填)',
'SpeedByte': '限流(字节数)',
'SpeedRecord': '限流(记录数)',
'0 means unlimited by byte': 'KB0代表不限制',
'0 means unlimited by count': '0代表不限制',
}

1576
pom.xml

File diff suppressed because it is too large Load Diff

View File

@ -57,16 +57,16 @@ pid=$DOLPHINSCHEDULER_LOG_DIR/dolphinscheduler-$command.pid
cd $DOLPHINSCHEDULER_HOME
if [ "$command" = "api-server" ]; then
LOG_FILE="-Dlogging.config=classpath:apiserver_logback.xml -Dspring.profiles.active=api"
LOG_FILE="-Dserver=api-server -Dspring.profiles.active=api"
CLASS=org.apache.dolphinscheduler.api.ApiApplicationServer
elif [ "$command" = "master-server" ]; then
LOG_FILE="-Dlogging.config=classpath:master_logback.xml -Ddruid.mysql.usePingMethod=false"
LOG_FILE="-Dserver=master-server -Ddruid.mysql.usePingMethod=false"
CLASS=org.apache.dolphinscheduler.server.master.MasterServer
elif [ "$command" = "worker-server" ]; then
LOG_FILE="-Dlogging.config=classpath:worker_logback.xml -Ddruid.mysql.usePingMethod=false"
LOG_FILE="-Dserver=worker-server -Ddruid.mysql.usePingMethod=false"
CLASS=org.apache.dolphinscheduler.server.worker.WorkerServer
elif [ "$command" = "alert-server" ]; then
LOG_FILE="-Dlogback.configurationFile=conf/alert_logback.xml"
LOG_FILE="-Dserver=alert-server"
CLASS=org.apache.dolphinscheduler.alert.AlertServer
elif [ "$command" = "logger-server" ]; then
CLASS=org.apache.dolphinscheduler.server.rpc.LoggerServer
@ -93,8 +93,8 @@ case $startStop in
exec_command="$LOG_FILE $DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS"
echo "nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 < /dev/null &"
nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 < /dev/null &
echo "nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 > /dev/null &"
nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 > /dev/null &
echo $! > $pid
;;

View File

@ -20,6 +20,6 @@ UPDATE QRTZ_CRON_TRIGGERS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='Ea
UPDATE QRTZ_TRIGGERS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler';
UPDATE QRTZ_FIRED_TRIGGERS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler';
UPDATE QRTZ_JOB_DETAILS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler';
UPDATE QRTZ_JOB_DETAILS SET JOB_CLASS_NAME='org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob' WHERE JOB_CLASS_NAME='cn.escheduler.server.quartz.ProcessScheduleJob';
UPDATE QRTZ_JOB_DETAILS SET JOB_CLASS_NAME='org.apache.dolphinscheduler.dao.quartz.ProcessScheduleJob' WHERE JOB_CLASS_NAME='cn.escheduler.server.quartz.ProcessScheduleJob';
UPDATE QRTZ_LOCKS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler';
UPDATE QRTZ_SCHEDULER_STATE SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler';