delete class App, let spring manage connectionFactory (#1233)

* move updateTaskState into try/catch block in case of exception

* fix NPE

* using conf.getInt instead of getString

* for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath.
for AlertDao, correct the spelling.

* duplicate

* refactor getTaskWorkerGroupId

* add friendly log

* update hearbeat thread num = 1

* fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread

* 1. move verifyTaskInstanceIsNull after taskInstance
2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable

* fix the message

* delete before check to avoid KeeperException$NoNodeException

* fix the message

* check processInstance state before delete tenant

* check processInstance state before delete worker group

* refactor

* merge api constants into common constatns

* update the resource perm

* update the dataSource perm

* fix CheckUtils.checkUserParams method

* update AlertGroupService, extends from BaseService, remove duplicate methods

* refactor

* modify method name

* add hasProjectAndPerm method

* using checkProject instead of getResultStatus

* delete checkAuth method, using hasProjectAndPerm instead.

* correct spelling

* add transactional for deleteWorkerGroupById

* add Transactional for deleteProcessInstanceById method

* change sqlSessionTemplate singleton

* change sqlSessionTemplate singleton and reformat code

* fix unsuitable error message

* update shutdownhook methods

* fix worker log bug

* fix api server debug mode bug

* upgrade zk version

* delete this line ,for zkClient.close() will do the whole thing

* fix master server shutdown error

* degrade zk version and add FourLetterWordMain class

* fix PathChildrenCache not close

* add Transactional for createSession method

* add more message for java-doc

* delete App, let spring manage connectionFactory

* add license

* add class Application for test support
This commit is contained in:
Tboy 2019-11-14 16:34:57 +08:00 committed by bao liang
parent 3fd9e5c21e
commit d962dee67a
15 changed files with 95 additions and 73 deletions

View File

@ -16,6 +16,10 @@
*/
package org.apache.dolphinscheduler.dao;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.model.Cron;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.DateInterval;
@ -28,13 +32,9 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.model.Cron;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,13 +46,12 @@ import java.util.*;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.apache.dolphinscheduler.dao.datasource.ConnectionFactory.getMapper;
/**
* process relative dao that some mappers in this.
*/
@Component
public class ProcessDao extends AbstractBaseDao {
public class ProcessDao {
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -106,34 +105,7 @@ public class ProcessDao extends AbstractBaseDao {
/**
* task queue impl
*/
protected ITaskQueue taskQueue;
public ProcessDao(){
init();
}
/**
* init
*/
@Override
protected void init() {
taskQueue = TaskQueueFactory.getTaskQueueInstance();
userMapper = getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class);
processInstanceMapMapper = getMapper(ProcessInstanceMapMapper.class);
taskInstanceMapper = getMapper(TaskInstanceMapper.class);
commandMapper = getMapper(CommandMapper.class);
scheduleMapper = getMapper(ScheduleMapper.class);
udfFuncMapper = getMapper(UdfFuncMapper.class);
resourceMapper = getMapper(ResourceMapper.class);
workerGroupMapper = getMapper(WorkerGroupMapper.class);
taskQueue = TaskQueueFactory.getTaskQueueInstance();
tenantMapper = getMapper(TenantMapper.class);
}
protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
/**

View File

@ -30,6 +30,8 @@ import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
@ -37,6 +39,7 @@ import javax.sql.DataSource;
/**
* data source connection factory
*/
@Service
public class ConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);
@ -68,6 +71,7 @@ public class ConnectionFactory {
* get the data source
* @return druid dataSource
*/
@Bean
public static DruidDataSource getDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
@ -104,6 +108,7 @@ public class ConnectionFactory {
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
@Bean
public static SqlSessionFactory getSqlSessionFactory() throws Exception {
if (sqlSessionFactory == null) {
synchronized (ConnectionFactory.class) {
@ -136,6 +141,7 @@ public class ConnectionFactory {
* get sql session
* @return sqlSession
*/
@Bean
public static SqlSession getSqlSession() {
if (sqlSessionTemplate == null) {
synchronized (ConnectionFactory.class) {

View File

@ -14,14 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao;
package org.apache.dolphinscheduler.dao.mapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
/**
* @Author: Tboy
*/
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class);
@ComponentScan("org.apache.dolphinscheduler.dao")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringApplication implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringApplication.applicationContext = applicationContext;
}
public static <T> T getBean(Class<T> requiredType){
return applicationContext.getBean(requiredType);
}
}

View File

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.AbstractServer;
@ -76,7 +75,8 @@ public class WorkerServer extends AbstractServer {
/**
* alert database access
*/
private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);;
@Autowired
private AlertDao alertDao;
/**
* heartbeat thread pool

View File

@ -16,14 +16,12 @@
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.slf4j.Logger;
import java.io.IOException;
/**
* abstract yarn task
*/
@ -50,7 +48,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
*/
public AbstractYarnTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(),
taskProps.getTaskAppId(),

View File

@ -23,10 +23,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,7 +39,7 @@ public class DependentExecute {
/**
* process dao
*/
private static final ProcessDao processDao = DaoFactory.getDaoInstance(ProcessDao.class);
private static final ProcessDao processDao = SpringApplication.getBean(ProcessDao.class);
/**
* depend item list

View File

@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
@ -88,7 +88,7 @@ public class DependentTask extends AbstractTask {
taskModel.getDependItemList(), taskModel.getRelation()));
}
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.processDao = SpringApplication.getBean(ProcessDao.class);
if(taskProps.getScheduleTime() != null){
this.dependentDate = taskProps.getScheduleTime();

View File

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.worker.task.http;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.enums.HttpParametersType;
@ -27,15 +30,12 @@ import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.utils.Bytes;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.client.config.RequestConfig;
@ -92,7 +92,7 @@ public class HttpTask extends AbstractTask {
*/
public HttpTask(TaskProps props, Logger logger) {
super(props, logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.processDao = SpringApplication.getBean(ProcessDao.class);
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.server.worker.task.processdure;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.utils.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
@ -27,14 +29,12 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.utils.StringUtils;
import org.slf4j.Logger;
import java.sql.*;
@ -82,7 +82,7 @@ public class ProcedureTask extends AbstractTask {
throw new RuntimeException("procedure task params is not valid");
}
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.processDao = SpringApplication.getBean(ProcessDao.class);
}
@Override

View File

@ -22,15 +22,14 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import java.util.Map;
/**
@ -77,7 +76,7 @@ public class PythonTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.processDao = SpringApplication.getBean(ProcessDao.class);
}
@Override

View File

@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@ -84,7 +84,7 @@ public class ShellTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.processDao = SpringApplication.getBean(ProcessDao.class);
}
@Override

View File

@ -37,13 +37,13 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@ -97,8 +97,8 @@ public class SqlTask extends AbstractTask {
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
this.processDao = SpringApplication.getBean(ProcessDao.class);
this.alertDao = SpringApplication.getBean(AlertDao.class);
}
@Override

View File

@ -16,17 +16,17 @@
*/
package org.apache.dolphinscheduler.server.worker.shell;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import com.alibaba.fastjson.JSONObject;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -47,7 +47,7 @@ public class ShellCommandExecutorTest {
@Before
public void before(){
processDao = DaoFactory.getDaoInstance(ProcessDao.class);
processDao = SpringApplication.getBean(ProcessDao.class);
}
@Test

View File

@ -16,18 +16,18 @@
*/
package org.apache.dolphinscheduler.server.worker.sql;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplication;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import com.alibaba.fastjson.JSONObject;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -48,7 +48,7 @@ public class SqlExecutorTest {
@Before
public void before(){
processDao = DaoFactory.getDaoInstance(ProcessDao.class);
processDao = SpringApplication.getBean(ProcessDao.class);
}
@Test