Merge pull request #81 from qiaozhanwei/dev-20190415

dev merge
This commit is contained in:
乔占卫 2019-04-18 16:53:36 +08:00 committed by GitHub
commit 267df899c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 137 additions and 71 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
</parent>
<artifactId>escheduler-alert</artifactId>
<packaging>jar</packaging>

View File

@ -3,7 +3,7 @@
<parent>
<groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
</parent>
<artifactId>escheduler-api</artifactId>
<packaging>jar</packaging>

View File

@ -125,7 +125,7 @@ public class ProcessScheduleJob implements Job {
}
Command command = new Command();
command.setCommandType(CommandType.START_PROCESS);
command.setCommandType(CommandType.SCHEDULER);
command.setExecutorId(schedule.getUserId());
command.setFailureStrategy(schedule.getFailureStrategy());
command.setProcessDefinitionId(schedule.getProcessDefinitionId());

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
</parent>
<artifactId>escheduler-common</artifactId>
<name>escheduler-common</name>

View File

@ -70,25 +70,6 @@ public final class Constants {
*/
public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address";
/**
* spring.redis.maxIdle
*/
public static final String SPRING_REDIS_MAXIDLE = "spring.redis.maxIdle";
/**
* spring.redis.maxTotal
*/
public static final String SPRING_REDIS_MAXTOTAL = "spring.redis.maxTotal";
/**
* spring.redis.host
*/
public static final String SPRING_REDIS_HOST = "spring.redis.host";
/**
* spring.redis.port
*/
public static final String SPRING_REDIS_PORT = "spring.redis.port";
/**
* hdfs configuration
@ -117,9 +98,14 @@ public final class Constants {
public static final String ESCHEDULER_ENV_PATH = "escheduler.env.path";
/**
* escheduler.env.py
* escheduler.env.sh
*/
public static final String ESCHEDULER_ENV_PY = "escheduler.env.py";
public static final String ESCHEDULER_ENV_SH = ".escheduler_env.sh";
/**
* python home
*/
public static final String PYTHON_HOME="PYTHON_HOME";
/**
* resource.view.suffixs
@ -255,8 +241,6 @@ public final class Constants {
public static final String SCHEDULER_QUEUE_IMPL = "escheduler.queue.impl";
public static final String SCHEDULER_QUEUE_REDIS_IMPL = "redis";
/**
* date format of yyyy-MM-dd HH:mm:ss

View File

@ -46,13 +46,6 @@ public class CommonUtils {
return envPath;
}
/**
* @return get the path of Python system environment variables
*/
public static String getPythonSystemEnvPath() {
return getString(ESCHEDULER_ENV_PY);
}
/**
* @return get queue implementation name
*/

View File

@ -18,7 +18,6 @@ hdfs.startup.state=true
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
escheduler.env.path=/opt/.escheduler_env.sh
escheduler.env.py=/opt/escheduler_env.py
#resource.view.suffixs
resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml

View File

@ -4,7 +4,7 @@
<parent>
<groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
</parent>
<artifactId>escheduler-dao</artifactId>
<name>escheduler-dao</name>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent>
<artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
</parent>
<artifactId>escheduler-server</artifactId>
<name>escheduler-server</name>

View File

@ -172,7 +172,7 @@ public class FetchTaskThread implements Runnable{
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
processInstance.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
}

View File

@ -16,12 +16,13 @@
*/
package cn.escheduler.server.worker.task;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
@ -34,6 +35,8 @@ import java.util.function.Consumer;
*/
public class PythonCommandExecutor extends AbstractCommandExecutor {
private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);
public static final String PYTHON = "python";
@ -63,27 +66,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("proxy user:{}, work dir:{}", tenantCode, taskDir);
logger.info("tenant :{}, work dir:{}", tenantCode, taskDir);
if (!Files.exists(Paths.get(commandFile))) {
logger.info("generate command file:{}", commandFile);
StringBuilder sb = new StringBuilder(200);
sb.append("#-*- encoding=utf8 -*-\n");
sb.append("import os,sys\n");
sb.append("BASEDIR = os.path.dirname(os.path.realpath(__file__))\n");
sb.append("os.chdir(BASEDIR)\n");
if (StringUtils.isNotEmpty(envFile)) {
String[] envArray = envFile.split("\\.");
if(envArray.length == 2){
String path = envArray[0];
logger.info("path:"+path);
int index = path.lastIndexOf("/");
sb.append(String.format("sys.path.append('%s')\n",path.substring(0,index)));
sb.append(String.format("import %s\n",path.substring(index+1)));
}
}
sb.append("\n\n");
sb.append(String.format("import py_%s_node\n",taskAppId));
@ -96,7 +85,14 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
@Override
protected String commandType() {
return PYTHON;
String envPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + "conf"+
Constants.SINGLE_SLASH +"env" + Constants.SINGLE_SLASH + Constants.ESCHEDULER_ENV_SH;
String pythonHome = getPythonHome(envPath);
if (StringUtils.isEmpty(pythonHome)){
return PYTHON;
}
return pythonHome;
}
@Override
@ -109,4 +105,45 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
return true;
}
/**
* get python home
* @param envPath
* @return
*/
private static String getPythonHome(String envPath){
BufferedReader br = null;
String line = null;
StringBuilder sb = new StringBuilder();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
while ((line = br.readLine()) != null){
if (line.contains(Constants.PYTHON_HOME)){
sb.append(line);
break;
}
}
String result = sb.toString();
if (org.apache.commons.lang.StringUtils.isEmpty(result)){
return null;
}
String[] arrs = result.split("=");
if (arrs.length == 2){
return arrs[1];
}
}catch (IOException e){
logger.error("read file failed : " + e.getMessage(),e);
}finally {
try {
if (br != null){
br.close();
}
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
}
return null;
}
}

View File

@ -72,7 +72,7 @@ public class PythonTask extends AbstractTask {
this.pythonProcessTask = new PythonCommandExecutor(this::logHandle,
taskProps.getTaskDir(), taskProps.getTaskAppId(),
taskProps.getTenantCode(), CommonUtils.getPythonSystemEnvPath(), taskProps.getTaskStartTime(),
taskProps.getTenantCode(), null, taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(), logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}

View File

@ -0,0 +1,65 @@
package cn.escheduler.server.worker;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* Created by qiaozhanwei on 2019/4/15.
*/
public class EnvFileTest {
private static final Logger logger = LoggerFactory.getLogger(EnvFileTest.class);
public static void main(String[] args) {
String path = System.getProperty("user.dir")+"\\script\\env\\.escheduler_env.sh";
String pythonHome = getPythonHome(path);
logger.info(pythonHome);
}
/**
* get python home
* @param path
* @return
*/
private static String getPythonHome(String path){
BufferedReader br = null;
String line = null;
StringBuilder sb = new StringBuilder();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
while ((line = br.readLine()) != null){
if (line.contains("PYTHON_HOME")){
sb.append(line);
break;
}
}
String result = sb.toString();
if (StringUtils.isEmpty(result)){
return null;
}
String[] arrs = result.split("=");
if (arrs.length == 2){
return arrs[1];
}
}catch (IOException e){
logger.error("read file failed : " + e.getMessage(),e);
}finally {
try {
if (br != null){
br.close();
}
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
}
return null;
}
}

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>escheduler</name>
<url>http://maven.apache.org</url>

View File

@ -1,12 +0,0 @@
import os
HADOOP_HOME="/opt/soft/hadoop"
SPARK_HOME1="/opt/soft/spark1"
SPARK_HOME2="/opt/soft/spark2"
PYTHON_HOME="/opt/soft/python"
JAVA_HOME="/opt/soft/java"
HIVE_HOME="/opt/soft/hive"
PATH=os.environ['PATH']
PATH="%s/bin:%s/bin:%s/bin:%s/bin:%s/bin:%s/bin:%s"%(HIVE_HOME,HADOOP_HOME,SPARK_HOME1,SPARK_HOME2,JAVA_HOME,PYTHON_HOME,PATH)
os.putenv('PATH','%s'%PATH)