mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-03 20:58:11 +08:00
Merge remote-tracking branch 'upstream/dev-20190415' into dev-20190415
This commit is contained in:
commit
4d9d72fec9
@ -68,7 +68,7 @@ public class TaskRecordController extends BaseController{
|
||||
try{
|
||||
logger.info("query task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}",
|
||||
taskName, state, taskDate, startTime, endTime);
|
||||
Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize);
|
||||
Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(false, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize);
|
||||
return returnDataListPaging(result);
|
||||
}catch (Exception e){
|
||||
logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e);
|
||||
@ -77,4 +77,36 @@ public class TaskRecordController extends BaseController{
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* query history task record list paging
|
||||
*
|
||||
* @param loginUser
|
||||
* @return
|
||||
*/
|
||||
@GetMapping("/history-list-paging")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
public Result queryHistoryTaskRecordListPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@RequestParam(value = "taskName", required = false) String taskName,
|
||||
@RequestParam(value = "state", required = false) String state,
|
||||
@RequestParam(value = "sourceTable", required = false) String sourceTable,
|
||||
@RequestParam(value = "destTable", required = false) String destTable,
|
||||
@RequestParam(value = "taskDate", required = false) String taskDate,
|
||||
@RequestParam(value = "startDate", required = false) String startTime,
|
||||
@RequestParam(value = "endDate", required = false) String endTime,
|
||||
@RequestParam("pageNo") Integer pageNo,
|
||||
@RequestParam("pageSize") Integer pageSize
|
||||
){
|
||||
|
||||
try{
|
||||
logger.info("query hisotry task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}",
|
||||
taskName, state, taskDate, startTime, endTime);
|
||||
Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(true, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize);
|
||||
return returnDataListPaging(result);
|
||||
}catch (Exception e){
|
||||
logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e);
|
||||
return error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getCode(), QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -29,6 +29,8 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.escheduler.common.Constants.*;
|
||||
|
||||
/**
|
||||
* task record service
|
||||
*/
|
||||
@ -51,7 +53,7 @@ public class TaskRecordService extends BaseService{
|
||||
* @param pageSize
|
||||
* @return
|
||||
*/
|
||||
public Map<String,Object> queryTaskRecordListPaging(String taskName, String startDate,
|
||||
public Map<String,Object> queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate,
|
||||
String taskDate, String sourceTable,
|
||||
String destTable, String endDate,
|
||||
String state, Integer pageNo, Integer pageSize) {
|
||||
@ -69,8 +71,9 @@ public class TaskRecordService extends BaseService{
|
||||
map.put("offset", pageInfo.getStart().toString());
|
||||
map.put("pageSize", pageInfo.getPageSize().toString());
|
||||
|
||||
int count = TaskRecordDao.countTaskRecord(map);
|
||||
List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map);
|
||||
String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG;
|
||||
int count = TaskRecordDao.countTaskRecord(map, table);
|
||||
List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map, table);
|
||||
pageInfo.setTotalCount(count);
|
||||
pageInfo.setLists(recordList);
|
||||
result.put(Constants.DATA_LIST, pageInfo);
|
||||
|
@ -463,6 +463,10 @@ public final class Constants {
|
||||
|
||||
public static final String TASK_RECORD_PWD = "task.record.datasource.password";
|
||||
|
||||
public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd";
|
||||
|
||||
public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd";
|
||||
|
||||
public static final String STATUS = "status";
|
||||
|
||||
|
||||
@ -826,4 +830,9 @@ public final class Constants {
|
||||
public static final String CONTENT = "content";
|
||||
public static final String DEPENDENT_SPLIT = ":||";
|
||||
public static final String DEPENDENT_ALL = "ALL";
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
}
|
||||
|
@ -120,48 +120,64 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
* find one command from command queue, construct process instance
|
||||
* @param logger
|
||||
* @param host
|
||||
* @param vaildThreadNum
|
||||
* @param validThreadNum
|
||||
* @return
|
||||
*/
|
||||
@Transactional(value = "TransactionManager",rollbackFor = Exception.class)
|
||||
public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){
|
||||
public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){
|
||||
|
||||
ProcessInstance processInstance = null;
|
||||
Command command = findOneCommand();
|
||||
|
||||
if (command == null) {
|
||||
return null;
|
||||
}
|
||||
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
|
||||
|
||||
processInstance = constructProcessInstance(command, host);
|
||||
|
||||
//cannot construct process instance, return null;
|
||||
if(processInstance == null){
|
||||
logger.error("scan command, command parameter is error: %s", command.toString());
|
||||
}else{
|
||||
// check thread number enough for this command, if not, change state to waiting thread.
|
||||
int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
|
||||
if(vaildThreadNum < commandThreadCount){
|
||||
logger.info("there is not enough thread for this command: {}",command.toString() );
|
||||
processInstance.setState(ExecutionStatus.WAITTING_THREAD);
|
||||
if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){
|
||||
processInstance.addHistoryCmd(command.getCommandType());
|
||||
}
|
||||
saveProcessInstance(processInstance);
|
||||
this.setSubProcessParam(processInstance);
|
||||
createRecoveryWaitingThreadCommand(command, processInstance);
|
||||
try{
|
||||
processInstance = constructProcessInstance(command, host);
|
||||
//cannot construct process instance, return null;
|
||||
if(processInstance == null){
|
||||
logger.error("scan command, command parameter is error: %s", command.toString());
|
||||
delCommandByid(command.getId());
|
||||
return null;
|
||||
}else if(!checkThreadNum(command, validThreadNum)){
|
||||
logger.info("there is not enough thread for this command: {}",command.toString() );
|
||||
return setWaitingThreadProcess(command, processInstance);
|
||||
}else{
|
||||
processInstance.setCommandType(command.getCommandType());
|
||||
processInstance.addHistoryCmd(command.getCommandType());
|
||||
saveProcessInstance(processInstance);
|
||||
this.setSubProcessParam(processInstance);
|
||||
processInstance.setCommandType(command.getCommandType());
|
||||
processInstance.addHistoryCmd(command.getCommandType());
|
||||
saveProcessInstance(processInstance);
|
||||
this.setSubProcessParam(processInstance);
|
||||
delCommandByid(command.getId());
|
||||
return processInstance;
|
||||
}
|
||||
}catch (Exception e){
|
||||
logger.error("scan command error ", e);
|
||||
delCommandByid(command.getId());
|
||||
}
|
||||
// delete command
|
||||
delCommandByid(command.getId());
|
||||
return processInstance;
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* set process waiting thread
|
||||
* @param command
|
||||
* @param processInstance
|
||||
* @return
|
||||
*/
|
||||
private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) {
|
||||
processInstance.setState(ExecutionStatus.WAITTING_THREAD);
|
||||
if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){
|
||||
processInstance.addHistoryCmd(command.getCommandType());
|
||||
}
|
||||
saveProcessInstance(processInstance);
|
||||
this.setSubProcessParam(processInstance);
|
||||
createRecoveryWaitingThreadCommand(command, processInstance);
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean checkThreadNum(Command command, int validThreadNum) {
|
||||
int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId());
|
||||
return validThreadNum >= commandThreadCount;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId()));
|
||||
processInstance.setCommandParam(JSONUtils.toJson(paramMap));
|
||||
processInstance.setIsSubProcess(Flag.YES);
|
||||
this.updateProcessInstance(processInstance);
|
||||
this.saveProcessInstance(processInstance);
|
||||
}
|
||||
// copy parent instance user def params to sub process..
|
||||
String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
|
||||
@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
|
||||
if(parentInstance != null){
|
||||
processInstance.setGlobalParams(parentInstance.getGlobalParams());
|
||||
this.updateProcessInstance(processInstance);
|
||||
this.saveProcessInstance(processInstance);
|
||||
}else{
|
||||
logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
|
||||
}
|
||||
|
@ -40,6 +40,8 @@ public class TaskRecordDao {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName());
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 加载配置文件
|
||||
*/
|
||||
@ -134,7 +136,7 @@ public class TaskRecordDao {
|
||||
* @param filterMap
|
||||
* @return
|
||||
*/
|
||||
public static int countTaskRecord(Map<String, String> filterMap){
|
||||
public static int countTaskRecord(Map<String, String> filterMap, String table){
|
||||
|
||||
int count = 0;
|
||||
Connection conn = null;
|
||||
@ -143,7 +145,7 @@ public class TaskRecordDao {
|
||||
if(conn == null){
|
||||
return count;
|
||||
}
|
||||
String sql = "select count(1) as count from eamp_hive_log_hd";
|
||||
String sql = String.format("select count(1) as count from %s", table);
|
||||
sql += getWhereString(filterMap);
|
||||
PreparedStatement pstmt;
|
||||
pstmt = conn.prepareStatement(sql);
|
||||
@ -171,9 +173,9 @@ public class TaskRecordDao {
|
||||
* @param filterMap
|
||||
* @return
|
||||
*/
|
||||
public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap ) {
|
||||
public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap , String table) {
|
||||
|
||||
String sql = "select * from eamp_hive_log_hd ";
|
||||
String sql = String.format("select * from %s", table);
|
||||
sql += getWhereString(filterMap);
|
||||
|
||||
int offset = Integer.parseInt(filterMap.get("offset"));
|
||||
|
Loading…
Reference in New Issue
Block a user