mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-05 05:38:30 +08:00
Merge pull request #498 from lenboo/dev-1.1.0
pull request from bl dev-1.1.0
This commit is contained in:
commit
c873bb6551
@ -128,6 +128,7 @@ public class ProcessDefinitionService extends BaseDAGService {
|
||||
processDefine.setLocations(locations);
|
||||
processDefine.setConnects(connects);
|
||||
processDefine.setTimeout(processData.getTimeout());
|
||||
processDefine.setTenantId(processData.getTenantId());
|
||||
|
||||
//custom global params
|
||||
List<Property> globalParamsList = processData.getGlobalParams();
|
||||
@ -292,6 +293,7 @@ public class ProcessDefinitionService extends BaseDAGService {
|
||||
processDefine.setLocations(locations);
|
||||
processDefine.setConnects(connects);
|
||||
processDefine.setTimeout(processData.getTimeout());
|
||||
processDefine.setTenantId(processData.getTenantId());
|
||||
|
||||
//custom global params
|
||||
List<Property> globalParamsList = new ArrayList<>();
|
||||
|
@ -364,6 +364,7 @@ public class ProcessInstanceService extends BaseDAGService {
|
||||
String globalParams = null;
|
||||
String originDefParams = null;
|
||||
int timeout = processInstance.getTimeout();
|
||||
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
|
||||
if (StringUtils.isNotEmpty(processInstanceJson)) {
|
||||
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
|
||||
//check workflow json is valid
|
||||
@ -379,6 +380,11 @@ public class ProcessInstanceService extends BaseDAGService {
|
||||
processInstance.getCmdTypeIfComplement(), schedule);
|
||||
timeout = processData.getTimeout();
|
||||
processInstance.setTimeout(timeout);
|
||||
Tenant tenant = processDao.getTenantForProcess(processData.getTenantId(),
|
||||
processDefinition.getUserId());
|
||||
if(tenant != null){
|
||||
processInstance.setTenantCode(tenant.getTenantCode());
|
||||
}
|
||||
processInstance.setProcessInstanceJson(processInstanceJson);
|
||||
processInstance.setGlobalParams(globalParams);
|
||||
}
|
||||
@ -387,7 +393,6 @@ public class ProcessInstanceService extends BaseDAGService {
|
||||
int update = processDao.updateProcessInstance(processInstance);
|
||||
int updateDefine = 1;
|
||||
if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
|
||||
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
|
||||
processDefinition.setProcessDefinitionJson(processInstanceJson);
|
||||
processDefinition.setGlobalParams(originDefParams);
|
||||
processDefinition.setLocations(locations);
|
||||
|
@ -125,9 +125,9 @@ public class TenantService extends BaseService{
|
||||
public Map<String,Object> queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
|
||||
|
||||
Map<String, Object> result = new HashMap<>(5);
|
||||
if (checkAdmin(loginUser, result)) {
|
||||
return result;
|
||||
}
|
||||
// if (checkAdmin(loginUser, result)) {
|
||||
// return result;
|
||||
// }
|
||||
|
||||
Integer count = tenantMapper.countTenantPaging(searchVal);
|
||||
|
||||
|
@ -0,0 +1,15 @@
|
||||
package cn.escheduler.common.enums;
|
||||
|
||||
/**
|
||||
* zk node type
|
||||
*/
|
||||
public enum ZKNodeType {
|
||||
|
||||
/**
|
||||
* 0 do not send warning;
|
||||
* 1 send if process success;
|
||||
* 2 send if process failed;
|
||||
* 3 send if process ending;
|
||||
*/
|
||||
MASTER, WORKER, DEAD_SERVER, TASK_QUEUE;
|
||||
}
|
@ -58,6 +58,7 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
|
||||
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
|
||||
ExecutionStatus.READY_PAUSE.ordinal(),
|
||||
// ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
|
||||
ExecutionStatus.READY_STOP.ordinal()};
|
||||
|
||||
@Autowired
|
||||
@ -96,6 +97,12 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
@Autowired
|
||||
private ErrorCommandMapper errorCommandMapper;
|
||||
|
||||
@Autowired
|
||||
private WorkerServerMapper workerServerMapper;
|
||||
|
||||
@Autowired
|
||||
private TenantMapper tenantMapper;
|
||||
|
||||
/**
|
||||
* task queue impl
|
||||
*/
|
||||
@ -121,7 +128,9 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
udfFuncMapper = getMapper(UdfFuncMapper.class);
|
||||
resourceMapper = getMapper(ResourceMapper.class);
|
||||
workerGroupMapper = getMapper(WorkerGroupMapper.class);
|
||||
workerServerMapper = getMapper(WorkerServerMapper.class);
|
||||
taskQueue = TaskQueueFactory.getTaskQueueInstance();
|
||||
tenantMapper = getMapper(TenantMapper.class);
|
||||
}
|
||||
|
||||
|
||||
@ -485,9 +494,30 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
|
||||
processInstance.setWorkerGroupId(command.getWorkerGroupId());
|
||||
processInstance.setTimeout(processDefinition.getTimeout());
|
||||
processInstance.setTenantId(processDefinition.getTenantId());
|
||||
return processInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* get process tenant
|
||||
* there is tenant id in definition, use the tenant of the definition.
|
||||
* if there is not tenant id in the definiton or the tenant not exist
|
||||
* use definition creator's tenant.
|
||||
* @param tenantId
|
||||
* @param userId
|
||||
* @return
|
||||
*/
|
||||
public Tenant getTenantForProcess(int tenantId, int userId){
|
||||
Tenant tenant = null;
|
||||
if(tenantId >= 0){
|
||||
tenant = tenantMapper.queryById(tenantId);
|
||||
}
|
||||
if(tenant == null){
|
||||
User user = userMapper.queryById(userId);
|
||||
tenant = tenantMapper.queryById(user.getTenantId());
|
||||
}
|
||||
return tenant;
|
||||
}
|
||||
|
||||
/**
|
||||
* check command parameters is valid
|
||||
@ -581,6 +611,8 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
processInstance.setScheduleTime(command.getScheduleTime());
|
||||
}
|
||||
processInstance.setHost(host);
|
||||
|
||||
ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXEUTION;
|
||||
int runTime = processInstance.getRunTimes();
|
||||
switch (commandType){
|
||||
case START_PROCESS:
|
||||
@ -621,6 +653,7 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
case RECOVER_TOLERANCE_FAULT_PROCESS:
|
||||
// recover tolerance fault process
|
||||
processInstance.setRecovery(Flag.YES);
|
||||
runStatus = processInstance.getState();
|
||||
break;
|
||||
case COMPLEMENT_DATA:
|
||||
// delete all the valid tasks when complement data
|
||||
@ -652,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
default:
|
||||
break;
|
||||
}
|
||||
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
|
||||
processInstance.setState(runStatus);
|
||||
return processInstance;
|
||||
}
|
||||
|
||||
@ -1566,7 +1599,6 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
for (ProcessInstance processInstance:processInstanceList){
|
||||
processNeedFailoverProcessInstances(processInstance);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Transactional(value = "TransactionManager",rollbackFor = Exception.class)
|
||||
@ -1633,6 +1665,17 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
return workerGroupMapper.queryById(workerGroupId);
|
||||
}
|
||||
|
||||
/**
|
||||
* query worker server by host
|
||||
* @param host
|
||||
* @return
|
||||
*/
|
||||
public List<WorkerServer> queryWorkerServerByHost(String host){
|
||||
|
||||
return workerServerMapper.queryWorkerByHost(host);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -95,6 +95,7 @@ public interface ProcessDefinitionMapper {
|
||||
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "receivers", column = "receivers", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "receiversCc", column = "receivers_cc", javaType = String.class, jdbcType = JdbcType.VARCHAR)
|
||||
|
||||
@ -123,6 +124,7 @@ public interface ProcessDefinitionMapper {
|
||||
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
|
||||
})
|
||||
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryByDefineName")
|
||||
@ -160,6 +162,7 @@ public interface ProcessDefinitionMapper {
|
||||
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
|
||||
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
|
||||
})
|
||||
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryAllDefinitionList")
|
||||
@ -187,6 +190,7 @@ public interface ProcessDefinitionMapper {
|
||||
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "scheduleReleaseState", column = "schedule_release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
|
||||
})
|
||||
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefineListPaging")
|
||||
@ -216,6 +220,7 @@ public interface ProcessDefinitionMapper {
|
||||
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
|
||||
})
|
||||
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefinitionListByIdList")
|
||||
|
@ -56,6 +56,7 @@ public class ProcessDefinitionMapperProvider {
|
||||
VALUES("`create_time`", "#{processDefinition.createTime}");
|
||||
VALUES("`update_time`", "#{processDefinition.updateTime}");
|
||||
VALUES("`timeout`", "#{processDefinition.timeout}");
|
||||
VALUES("`tenant_id`", "#{processDefinition.tenantId}");
|
||||
VALUES("`flag`", EnumFieldUtil.genFieldStr("processDefinition.flag", ReleaseState.class));
|
||||
VALUES("`user_id`", "#{processDefinition.userId}");
|
||||
|
||||
@ -102,6 +103,7 @@ public class ProcessDefinitionMapperProvider {
|
||||
SET("`create_time`=#{processDefinition.createTime}");
|
||||
SET("`update_time`=#{processDefinition.updateTime}");
|
||||
SET("`timeout`=#{processDefinition.timeout}");
|
||||
SET("`tenant_id`=#{processDefinition.tenantId}");
|
||||
SET("`flag`="+EnumFieldUtil.genFieldStr("processDefinition.flag", Flag.class));
|
||||
SET("`user_id`=#{processDefinition.userId}");
|
||||
|
||||
|
@ -97,6 +97,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "queue", column = "queue", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
})
|
||||
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById")
|
||||
@ -136,6 +137,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
})
|
||||
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById")
|
||||
@ -175,6 +177,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
|
||||
})
|
||||
@ -214,6 +217,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
|
||||
})
|
||||
@ -262,6 +266,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
|
||||
})
|
||||
@ -359,6 +364,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
|
||||
})
|
||||
@ -452,6 +458,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
|
||||
})
|
||||
@ -497,6 +504,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
|
||||
})
|
||||
@ -542,6 +550,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
|
||||
})
|
||||
@ -585,6 +594,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
})
|
||||
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess")
|
||||
@ -628,6 +638,7 @@ public interface ProcessInstanceMapper {
|
||||
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
|
||||
})
|
||||
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess")
|
||||
|
@ -69,6 +69,7 @@ public class ProcessInstanceMapperProvider {
|
||||
VALUES("`executor_id`", "#{processInstance.executorId}");
|
||||
VALUES("`worker_group_id`", "#{processInstance.workerGroupId}");
|
||||
VALUES("`timeout`", "#{processInstance.timeout}");
|
||||
VALUES("`tenant_id`", "#{processInstance.tenantId}");
|
||||
VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class));
|
||||
}
|
||||
}.toString();
|
||||
@ -141,6 +142,7 @@ public class ProcessInstanceMapperProvider {
|
||||
SET("`dependence_schedule_times`=#{processInstance.dependenceScheduleTimes}");
|
||||
SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
|
||||
SET("`executor_id`=#{processInstance.executorId}");
|
||||
SET("`tenant_id`=#{processInstance.tenantId}");
|
||||
SET("`worker_group_id`=#{processInstance.workerGroupId}");
|
||||
SET("`timeout`=#{processInstance.timeout}");
|
||||
|
||||
@ -220,11 +222,11 @@ public class ProcessInstanceMapperProvider {
|
||||
public String queryDetailById(Map<String, Object> parameter) {
|
||||
return new SQL() {
|
||||
{
|
||||
SELECT("inst.*,q.queue_name as queue,t.tenant_code,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
|
||||
SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
|
||||
|
||||
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_tenant t,t_escheduler_queue q");
|
||||
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q");
|
||||
|
||||
WHERE("inst.executor_id = u.id AND u.tenant_id = t.id AND t.queue_id = q.id AND inst.id = #{processId}");
|
||||
WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}");
|
||||
}
|
||||
}.toString();
|
||||
}
|
||||
@ -402,7 +404,12 @@ public class ProcessInstanceMapperProvider {
|
||||
|
||||
FROM(TABLE_NAME);
|
||||
|
||||
WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")");
|
||||
Object host = parameter.get("host");
|
||||
if(host != null && StringUtils.isNotEmpty(host.toString())){
|
||||
|
||||
WHERE("`host` = #{host} ");
|
||||
}
|
||||
WHERE("`state` in (" + strStates.toString() +")");
|
||||
ORDER_BY("`id` asc");
|
||||
|
||||
|
||||
|
@ -228,7 +228,12 @@ public class TaskInstanceMapperProvider {
|
||||
SELECT("*, UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time) as duration");
|
||||
FROM(TABLE_NAME);
|
||||
|
||||
WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")");
|
||||
Object host = parameter.get("host");
|
||||
if(host != null && StringUtils.isNotEmpty(host.toString())){
|
||||
|
||||
WHERE("`host` = #{host} ");
|
||||
}
|
||||
WHERE("`state` in (" + strStates.toString() +")");
|
||||
ORDER_BY("`id` asc");
|
||||
}
|
||||
}.toString();
|
||||
|
@ -42,6 +42,23 @@ public interface WorkerServerMapper {
|
||||
@SelectProvider(type = WorkerServerMapperProvider.class, method = "queryAllWorker")
|
||||
List<WorkerServer> queryAllWorker();
|
||||
|
||||
/**
|
||||
* query worker list
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Results(value = {
|
||||
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "host", column = "host", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "port", column = "port", javaType = int.class, jdbcType = JdbcType.INTEGER),
|
||||
@Result(property = "zkDirectory", column = "zk_directory", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "resInfo", column = "res_info", javaType = String.class, jdbcType = JdbcType.VARCHAR),
|
||||
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
|
||||
@Result(property = "lastHeartbeatTime", column = "last_heartbeat_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP)
|
||||
})
|
||||
@SelectProvider(type = WorkerServerMapperProvider.class, method = "queryWorkerByHost")
|
||||
List<WorkerServer> queryWorkerByHost(@Param("host") String host);
|
||||
|
||||
/**
|
||||
* insert worker server
|
||||
*
|
||||
|
@ -37,6 +37,21 @@ public class WorkerServerMapperProvider {
|
||||
}}.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* query worker list
|
||||
* @return
|
||||
*/
|
||||
public String queryWorkerByHost(Map<String, Object> parameter) {
|
||||
return new SQL() {{
|
||||
SELECT("*");
|
||||
|
||||
FROM(TABLE_NAME);
|
||||
|
||||
WHERE("host = #{host}");
|
||||
}}.toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* insert worker server
|
||||
* @param parameter
|
||||
|
@ -39,6 +39,8 @@ public class ProcessData {
|
||||
|
||||
private int timeout;
|
||||
|
||||
private int tenantId;
|
||||
|
||||
|
||||
public ProcessData() {
|
||||
}
|
||||
@ -92,4 +94,12 @@ public class ProcessData {
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public int getTenantId() {
|
||||
return tenantId;
|
||||
}
|
||||
|
||||
public void setTenantId(int tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
}
|
||||
}
|
||||
|
@ -141,6 +141,11 @@ public class ProcessDefinition {
|
||||
*/
|
||||
private int timeout;
|
||||
|
||||
/**
|
||||
* tenant id
|
||||
*/
|
||||
private int tenantId;
|
||||
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
@ -354,7 +359,15 @@ public class ProcessDefinition {
|
||||
", receiversCc='" + receiversCc + '\'' +
|
||||
", scheduleReleaseState=" + scheduleReleaseState +
|
||||
", timeout=" + timeout +
|
||||
", tenantId=" + tenantId +
|
||||
'}';
|
||||
}
|
||||
|
||||
public int getTenantId() {
|
||||
return tenantId;
|
||||
}
|
||||
|
||||
public void setTenantId(int tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
}
|
||||
}
|
||||
|
@ -188,6 +188,12 @@ public class ProcessInstance {
|
||||
*/
|
||||
private int timeout;
|
||||
|
||||
|
||||
/**
|
||||
* tenant id
|
||||
*/
|
||||
private int tenantId;
|
||||
|
||||
public ProcessInstance(){
|
||||
|
||||
}
|
||||
@ -534,6 +540,7 @@ public class ProcessInstance {
|
||||
", processInstanceJson='" + processInstanceJson + '\'' +
|
||||
", executorId=" + executorId +
|
||||
", tenantCode='" + tenantCode + '\'' +
|
||||
", tenantId='" + tenantId + '\'' +
|
||||
", queue='" + queue + '\'' +
|
||||
", isSubProcess=" + isSubProcess +
|
||||
", locations='" + locations + '\'' +
|
||||
@ -546,4 +553,11 @@ public class ProcessInstance {
|
||||
'}';
|
||||
}
|
||||
|
||||
public void setTenantId(int tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
}
|
||||
|
||||
public int getTenantId() {
|
||||
return this.tenantId ;
|
||||
}
|
||||
}
|
||||
|
@ -18,16 +18,20 @@ package cn.escheduler.dao.utils;
|
||||
|
||||
|
||||
import cn.escheduler.common.enums.TaskDependType;
|
||||
import cn.escheduler.common.graph.DAG;
|
||||
import cn.escheduler.common.model.TaskNode;
|
||||
import cn.escheduler.common.model.TaskNodeRelation;
|
||||
import cn.escheduler.common.process.ProcessDag;
|
||||
import cn.escheduler.common.utils.JSONUtils;
|
||||
import cn.escheduler.dao.model.ProcessData;
|
||||
import cn.escheduler.dao.model.TaskInstance;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* dag tools
|
||||
@ -105,8 +109,7 @@ public class DagHelper {
|
||||
}
|
||||
|
||||
for (TaskNode taskNode : tmpTaskNodeList) {
|
||||
if ( !taskNode.isForbidden()
|
||||
&& null == findNodeByName(destTaskNodeList, taskNode.getName())) {
|
||||
if (null == findNodeByName(destTaskNodeList, taskNode.getName())) {
|
||||
destTaskNodeList.add(taskNode);
|
||||
}
|
||||
}
|
||||
@ -193,6 +196,24 @@ public class DagHelper {
|
||||
return processDag;
|
||||
}
|
||||
|
||||
/**
|
||||
* parse the forbidden task nodes in process definition.
|
||||
* @param processDefinitionJson
|
||||
* @return
|
||||
*/
|
||||
public static Map<String, TaskNode> getForbiddenTaskNodeMaps(String processDefinitionJson){
|
||||
Map<String, TaskNode> forbidTaskNodeMap = new ConcurrentHashMap<>();
|
||||
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
|
||||
|
||||
List<TaskNode> taskNodeList = processData.getTasks();
|
||||
for(TaskNode node : taskNodeList){
|
||||
if(node.isForbidden()){
|
||||
forbidTaskNodeMap.putIfAbsent(node.getName(), node);
|
||||
}
|
||||
}
|
||||
return forbidTaskNodeMap;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* find node by node name
|
||||
@ -210,4 +231,100 @@ public class DagHelper {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get start vertex in one dag
|
||||
* it would find the post node if the start vertex is forbidden running
|
||||
* @param parentNodeName the previous node
|
||||
* @param dag
|
||||
* @param completeTaskList
|
||||
* @return
|
||||
*/
|
||||
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
|
||||
Map<String, TaskInstance> completeTaskList){
|
||||
|
||||
if(completeTaskList == null){
|
||||
completeTaskList = new HashMap<>();
|
||||
}
|
||||
Collection<String> startVertexs = null;
|
||||
if(StringUtils.isNotEmpty(parentNodeName)){
|
||||
startVertexs = dag.getSubsequentNodes(parentNodeName);
|
||||
}else{
|
||||
startVertexs = dag.getBeginNode();
|
||||
}
|
||||
|
||||
List<String> tmpStartVertexs = new ArrayList<>();
|
||||
if(startVertexs!= null){
|
||||
tmpStartVertexs.addAll(startVertexs);
|
||||
}
|
||||
|
||||
for(String start : startVertexs){
|
||||
TaskNode startNode = dag.getNode(start);
|
||||
if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
|
||||
continue;
|
||||
}
|
||||
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
|
||||
|
||||
for(String post : postNodes){
|
||||
if(checkForbiddenPostCanSubmit(post, dag)){
|
||||
tmpStartVertexs.add(post);
|
||||
}
|
||||
}
|
||||
tmpStartVertexs.remove(start);
|
||||
}
|
||||
|
||||
return tmpStartVertexs;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param postNodeName
|
||||
* @param dag
|
||||
* @return
|
||||
*/
|
||||
private static boolean checkForbiddenPostCanSubmit(String postNodeName, DAG<String, TaskNode, TaskNodeRelation> dag){
|
||||
|
||||
TaskNode postNode = dag.getNode(postNodeName);
|
||||
List<String> dependList = postNode.getDepList();
|
||||
|
||||
for(String dependNodeName : dependList){
|
||||
TaskNode dependNode = dag.getNode(dependNodeName);
|
||||
if(!dependNode.isForbidden()){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/***
|
||||
* generate dag graph
|
||||
* @param processDag
|
||||
* @return
|
||||
*/
|
||||
public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
|
||||
|
||||
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
|
||||
|
||||
/**
|
||||
* add vertex
|
||||
*/
|
||||
if (CollectionUtils.isNotEmpty(processDag.getNodes())){
|
||||
for (TaskNode node : processDag.getNodes()){
|
||||
dag.addNode(node.getName(),node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* add edge
|
||||
*/
|
||||
if (CollectionUtils.isNotEmpty(processDag.getEdges())){
|
||||
for (TaskNodeRelation edge : processDag.getEdges()){
|
||||
dag.addEdge(edge.getStartNode(),edge.getEndNode());
|
||||
}
|
||||
}
|
||||
return dag;
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,9 @@
|
||||
# base spring data source configuration
|
||||
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
|
||||
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
|
||||
spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8
|
||||
spring.datasource.username=xx
|
||||
spring.datasource.password=xx
|
||||
spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8
|
||||
spring.datasource.username=root
|
||||
spring.datasource.password=root@123
|
||||
|
||||
# connection configuration
|
||||
spring.datasource.initialSize=5
|
||||
|
@ -79,6 +79,7 @@ public class MasterExecThread implements Runnable {
|
||||
private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
|
||||
private Map<String, TaskInstance> readyToSubmitTaskList = new ConcurrentHashMap<>();
|
||||
private Map<String, TaskInstance> dependFailedTask = new ConcurrentHashMap<>();
|
||||
private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>();
|
||||
private List<TaskInstance> recoverToleranceFaultTaskList = new ArrayList<>();
|
||||
|
||||
private AlertManager alertManager = new AlertManager();
|
||||
@ -269,6 +270,7 @@ public class MasterExecThread implements Runnable {
|
||||
private void buildFlowDag() throws Exception {
|
||||
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
|
||||
|
||||
forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
|
||||
// generate process to get DAG info
|
||||
List<String> recoveryNameList = getRecoveryNodeNameList();
|
||||
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
|
||||
@ -279,7 +281,8 @@ public class MasterExecThread implements Runnable {
|
||||
return;
|
||||
}
|
||||
// generate process dag
|
||||
dag = buildDagGraph(processDag);
|
||||
dag = DagHelper.buildDagGraph(processDag);
|
||||
|
||||
}
|
||||
|
||||
private void initTaskQueue(){
|
||||
@ -411,6 +414,8 @@ public class MasterExecThread implements Runnable {
|
||||
return taskInstance;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* get post task instance by node
|
||||
*
|
||||
@ -421,14 +426,12 @@ public class MasterExecThread implements Runnable {
|
||||
private List<TaskInstance> getPostTaskInstanceByNode(DAG<String, TaskNode, TaskNodeRelation> dag, String parentNodeName){
|
||||
|
||||
List<TaskInstance> postTaskList = new ArrayList<>();
|
||||
Collection<String> startVertex = null;
|
||||
if(StringUtils.isNotEmpty(parentNodeName)){
|
||||
startVertex = dag.getSubsequentNodes(parentNodeName);
|
||||
}else{
|
||||
startVertex = dag.getBeginNode();
|
||||
Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList);
|
||||
if(startVertex == null){
|
||||
return postTaskList;
|
||||
}
|
||||
for (String nodeName : startVertex){
|
||||
|
||||
for (String nodeName : startVertex){
|
||||
// encapsulation task instance
|
||||
TaskInstance taskInstance = createTaskInstance(processInstance, nodeName ,
|
||||
dag.getNode(nodeName),parentNodeName);
|
||||
@ -517,7 +520,10 @@ public class MasterExecThread implements Runnable {
|
||||
List<String> depsNameList = taskNode.getDepList();
|
||||
for(String depsNode : depsNameList ){
|
||||
|
||||
// dependencies must be all complete
|
||||
if(forbiddenTaskList.containsKey(depsNode)){
|
||||
continue;
|
||||
}
|
||||
// dependencies must be fully completed
|
||||
if(!completeTaskList.containsKey(depsNode)){
|
||||
return DependResult.WAITING;
|
||||
}
|
||||
@ -904,35 +910,6 @@ public class MasterExecThread implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* generate dag graph
|
||||
* @param processDag
|
||||
* @return
|
||||
*/
|
||||
public DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
|
||||
|
||||
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
|
||||
|
||||
/**
|
||||
* add vertex
|
||||
*/
|
||||
if (CollectionUtils.isNotEmpty(processDag.getNodes())){
|
||||
for (TaskNode node : processDag.getNodes()){
|
||||
dag.addNode(node.getName(),node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* add edge
|
||||
*/
|
||||
if (CollectionUtils.isNotEmpty(processDag.getEdges())){
|
||||
for (TaskNodeRelation edge : processDag.getEdges()){
|
||||
dag.addEdge(edge.getStartNode(),edge.getEndNode());
|
||||
}
|
||||
}
|
||||
return dag;
|
||||
}
|
||||
|
||||
/**
|
||||
* whether the retry interval is timed out
|
||||
* @param taskInstance
|
||||
|
@ -294,9 +294,8 @@ public class ProcessUtils {
|
||||
/**
|
||||
* find logs and kill yarn tasks
|
||||
* @param taskInstance
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void killYarnJob(TaskInstance taskInstance) throws Exception {
|
||||
public static void killYarnJob(TaskInstance taskInstance) {
|
||||
try {
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
|
||||
LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT);
|
||||
@ -316,7 +315,7 @@ public class ProcessUtils {
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("kill yarn job failed : " + e.getMessage(),e);
|
||||
throw new RuntimeException("kill yarn job fail");
|
||||
// throw new RuntimeException("kill yarn job fail");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,10 +23,7 @@ import cn.escheduler.common.thread.ThreadUtils;
|
||||
import cn.escheduler.common.utils.FileUtils;
|
||||
import cn.escheduler.common.utils.OSUtils;
|
||||
import cn.escheduler.dao.ProcessDao;
|
||||
import cn.escheduler.dao.model.ProcessDefinition;
|
||||
import cn.escheduler.dao.model.ProcessInstance;
|
||||
import cn.escheduler.dao.model.TaskInstance;
|
||||
import cn.escheduler.dao.model.WorkerGroup;
|
||||
import cn.escheduler.dao.model.*;
|
||||
import cn.escheduler.server.zk.ZKWorkerClient;
|
||||
import com.cronutils.utils.StringUtils;
|
||||
import org.apache.commons.configuration.Configuration;
|
||||
@ -194,9 +191,16 @@ public class FetchTaskThread implements Runnable{
|
||||
// get process instance
|
||||
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
|
||||
|
||||
|
||||
// get process define
|
||||
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
|
||||
|
||||
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
|
||||
processDefine.getUserId());
|
||||
|
||||
if(tenant != null){
|
||||
processInstance.setTenantCode(tenant.getTenantCode());
|
||||
}
|
||||
|
||||
taskInstance.setProcessInstance(processInstance);
|
||||
taskInstance.setProcessDefine(processDefine);
|
||||
|
@ -18,6 +18,7 @@ package cn.escheduler.server.zk;
|
||||
|
||||
import cn.escheduler.common.Constants;
|
||||
import cn.escheduler.common.enums.ExecutionStatus;
|
||||
import cn.escheduler.common.enums.ZKNodeType;
|
||||
import cn.escheduler.common.utils.CollectionUtils;
|
||||
import cn.escheduler.common.utils.DateUtils;
|
||||
import cn.escheduler.common.utils.OSUtils;
|
||||
@ -28,10 +29,11 @@ import cn.escheduler.dao.ProcessDao;
|
||||
import cn.escheduler.dao.ServerDao;
|
||||
import cn.escheduler.dao.model.ProcessInstance;
|
||||
import cn.escheduler.dao.model.TaskInstance;
|
||||
import cn.escheduler.dao.model.WorkerServer;
|
||||
import cn.escheduler.server.ResInfo;
|
||||
import cn.escheduler.server.utils.ProcessUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.imps.CuratorFrameworkState;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||
@ -134,7 +136,9 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
|
||||
// check if fault tolerance is required,failure and tolerance
|
||||
if (getActiveMasterNum() == 1) {
|
||||
processDao.masterStartupFaultTolerant();
|
||||
failoverWorker(null, true);
|
||||
// processDao.masterStartupFaultTolerant();
|
||||
failoverMaster(null);
|
||||
}
|
||||
|
||||
}catch (Exception e){
|
||||
@ -190,31 +194,20 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
Date now = new Date();
|
||||
createTime = now ;
|
||||
try {
|
||||
String osHost = OSUtils.getHost();
|
||||
|
||||
// encapsulation master znnode
|
||||
masterZNode = masterZNodeParentPath + "/" + OSUtils.getHost() + "_";
|
||||
List<String> masterZNodeList = zkClient.getChildren().forPath(masterZNodeParentPath);
|
||||
|
||||
if (CollectionUtils.isNotEmpty(masterZNodeList)){
|
||||
boolean flag = false;
|
||||
for (String masterZNode : masterZNodeList){
|
||||
if (masterZNode.startsWith(OSUtils.getHost())){
|
||||
flag = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (flag){
|
||||
logger.error("register failure , master already started on host : {}" , OSUtils.getHost());
|
||||
// exit system
|
||||
System.exit(-1);
|
||||
}
|
||||
// zookeeper node exists, cannot start a new one.
|
||||
if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){
|
||||
logger.error("register failure , master already started on host : {}" , osHost);
|
||||
// exit system
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
// specify the format of stored data in ZK nodes
|
||||
String heartbeatZKInfo = getOsInfo(now);
|
||||
// create temporary sequence nodes for master znode
|
||||
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(masterZNode, heartbeatZKInfo.getBytes());
|
||||
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
|
||||
masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
|
||||
|
||||
logger.info("register master node {} success" , masterZNode);
|
||||
|
||||
@ -238,6 +231,46 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* check the zookeeper node already exists
|
||||
* @param host
|
||||
* @param zkNodeType
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
|
||||
|
||||
String path = null;
|
||||
switch (zkNodeType){
|
||||
case MASTER:
|
||||
path = masterZNodeParentPath;
|
||||
break;
|
||||
case WORKER:
|
||||
path = workerZNodeParentPath;
|
||||
break;
|
||||
case DEAD_SERVER:
|
||||
path = deadServerZNodeParentPath;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if(StringUtils.isEmpty(path)){
|
||||
logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
|
||||
return false;
|
||||
}
|
||||
|
||||
List<String> masterZNodeList = null;
|
||||
masterZNodeList = zkClient.getChildren().forPath(path);
|
||||
if (CollectionUtils.isNotEmpty(masterZNodeList)){
|
||||
for (String masterZNode : masterZNodeList){
|
||||
if (masterZNode.startsWith(host)){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* monitor master
|
||||
*/
|
||||
@ -279,17 +312,9 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
|
||||
alertDao.sendServerStopedAlert(1, masterHost, "Master-Server");
|
||||
}
|
||||
|
||||
logger.info("start master failover ...");
|
||||
|
||||
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
|
||||
|
||||
//updateProcessInstance host is null and insert into command
|
||||
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
|
||||
processDao.processNeedFailoverProcessInstances(processInstance);
|
||||
if(StringUtils.isNotEmpty(masterHost)){
|
||||
failoverMaster(masterHost);
|
||||
}
|
||||
|
||||
logger.info("master failover end");
|
||||
}catch (Exception e){
|
||||
logger.error("master failover failed : " + e.getMessage(),e);
|
||||
}finally {
|
||||
@ -331,6 +356,8 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* monitor worker
|
||||
*/
|
||||
@ -369,23 +396,9 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server");
|
||||
}
|
||||
|
||||
logger.info("start worker failover ...");
|
||||
|
||||
|
||||
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
|
||||
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
|
||||
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
|
||||
if(instance!=null){
|
||||
taskInstance.setProcessInstance(instance);
|
||||
}
|
||||
// only kill yarn job if exists , the local thread has exited
|
||||
ProcessUtils.killYarnJob(taskInstance);
|
||||
}
|
||||
|
||||
//updateProcessInstance state value is NEED_FAULT_TOLERANCE
|
||||
processDao.updateNeedFailoverTaskInstances(workerHost);
|
||||
|
||||
logger.info("worker failover end");
|
||||
if(StringUtils.isNotEmpty(workerHost)){
|
||||
failoverWorker(workerHost, true);
|
||||
}
|
||||
}catch (Exception e){
|
||||
logger.error("worker failover failed : " + e.getMessage(),e);
|
||||
}
|
||||
@ -476,6 +489,95 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* task needs failover if task start before worker starts
|
||||
*
|
||||
* @param taskInstance
|
||||
* @return
|
||||
*/
|
||||
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
|
||||
|
||||
boolean taskNeedFailover = true;
|
||||
|
||||
// if the worker node exists in zookeeper, we must check the task starts after the worker
|
||||
if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
|
||||
//if task start after worker starts, there is no need to failover the task.
|
||||
if(checkTaskAfterWorkerStart(taskInstance)){
|
||||
taskNeedFailover = false;
|
||||
}
|
||||
}
|
||||
return taskNeedFailover;
|
||||
}
|
||||
|
||||
/**
|
||||
* check task start after the worker server starts.
|
||||
* @param taskInstance
|
||||
* @return
|
||||
*/
|
||||
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
|
||||
Date workerServerStartDate = null;
|
||||
List<WorkerServer> workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost());
|
||||
if(workerServers.size() > 0){
|
||||
workerServerStartDate = workerServers.get(0).getCreateTime();
|
||||
}
|
||||
|
||||
if(workerServerStartDate != null){
|
||||
return taskInstance.getStartTime().after(workerServerStartDate);
|
||||
|
||||
}else{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* failover worker tasks
|
||||
* 1. kill yarn job if there are yarn jobs in tasks.
|
||||
* 2. change task state from running to need failover.
|
||||
* @param workerHost
|
||||
*/
|
||||
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
|
||||
logger.info("start worker[{}] failover ...", workerHost);
|
||||
|
||||
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
|
||||
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
|
||||
if(needCheckWorkerAlive){
|
||||
if(!checkTaskInstanceNeedFailover(taskInstance)){
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
|
||||
if(instance!=null){
|
||||
taskInstance.setProcessInstance(instance);
|
||||
}
|
||||
// only kill yarn job if exists , the local thread has exited
|
||||
ProcessUtils.killYarnJob(taskInstance);
|
||||
|
||||
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
|
||||
processDao.saveTaskInstance(taskInstance);
|
||||
}
|
||||
|
||||
//update task Instance state value is NEED_FAULT_TOLERANCE
|
||||
// processDao.updateNeedFailoverTaskInstances(workerHost);
|
||||
logger.info("end worker[{}] failover ...", workerHost);
|
||||
}
|
||||
|
||||
/**
|
||||
* failover master tasks
|
||||
* @param masterHost
|
||||
*/
|
||||
private void failoverMaster(String masterHost) {
|
||||
logger.info("start master failover ...");
|
||||
|
||||
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
|
||||
|
||||
//updateProcessInstance host is null and insert into command
|
||||
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
|
||||
processDao.processNeedFailoverProcessInstances(processInstance);
|
||||
}
|
||||
|
||||
logger.info("master failover end");
|
||||
}
|
||||
|
||||
/**
|
||||
* get host ip
|
||||
@ -488,6 +590,7 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
|
||||
if(startIndex >= endIndex){
|
||||
logger.error("parse ip error");
|
||||
return "";
|
||||
}
|
||||
return path.substring(startIndex, endIndex);
|
||||
}
|
||||
|
@ -18,15 +18,27 @@ package cn.escheduler.server.master;
|
||||
|
||||
import cn.escheduler.common.enums.CommandType;
|
||||
import cn.escheduler.common.enums.FailureStrategy;
|
||||
import cn.escheduler.common.enums.TaskDependType;
|
||||
import cn.escheduler.common.enums.WarningType;
|
||||
import cn.escheduler.common.graph.DAG;
|
||||
import cn.escheduler.common.model.TaskNode;
|
||||
import cn.escheduler.common.model.TaskNodeRelation;
|
||||
import cn.escheduler.common.process.ProcessDag;
|
||||
import cn.escheduler.dao.datasource.ConnectionFactory;
|
||||
import cn.escheduler.dao.mapper.CommandMapper;
|
||||
import cn.escheduler.dao.mapper.ProcessDefinitionMapper;
|
||||
import cn.escheduler.dao.model.Command;
|
||||
import cn.escheduler.dao.model.ProcessDefinition;
|
||||
import cn.escheduler.dao.utils.DagHelper;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* master test
|
||||
*/
|
||||
@ -36,9 +48,14 @@ public class MasterCommandTest {
|
||||
|
||||
private CommandMapper commandMapper;
|
||||
|
||||
private ProcessDefinitionMapper processDefinitionMapper;
|
||||
|
||||
|
||||
@Before
|
||||
public void before(){
|
||||
|
||||
commandMapper = ConnectionFactory.getSqlSession().getMapper(CommandMapper.class);
|
||||
processDefinitionMapper = ConnectionFactory.getSqlSession().getMapper(ProcessDefinitionMapper.class);
|
||||
}
|
||||
|
||||
|
||||
@ -104,4 +121,29 @@ public class MasterCommandTest {
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDagHelper(){
|
||||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(19);
|
||||
|
||||
try {
|
||||
ProcessDag processDag = DagHelper.generateFlowDag(processDefinition.getProcessDefinitionJson(),
|
||||
new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST);
|
||||
|
||||
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
|
||||
Collection<String> start = DagHelper.getStartVertex("1", dag, null);
|
||||
|
||||
System.out.println(start.toString());
|
||||
|
||||
Map<String, TaskNode> forbidden = DagHelper.getForbiddenTaskNodeMaps(processDefinition.getProcessDefinitionJson());
|
||||
System.out.println(forbidden);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -26,6 +26,10 @@
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<div class="title" style="padding-top: 6px;">
|
||||
<span class="text-b">{{$t('select tenant')}}</span>
|
||||
<form-tenant v-model="tenantId"></form-tenant>
|
||||
</div>
|
||||
<div class="title" style="padding-top: 6px;">
|
||||
<span>超时告警</span>
|
||||
<span style="padding-left: 6px;">
|
||||
@ -73,6 +77,7 @@
|
||||
import mLocalParams from '../formModel/tasks/_source/localParams'
|
||||
import disabledState from '@/module/mixin/disabledState'
|
||||
import Affirm from '../jumpAffirm'
|
||||
import FormTenant from "./_source/selectTenant";
|
||||
|
||||
export default {
|
||||
name: 'udp',
|
||||
@ -90,6 +95,8 @@
|
||||
syncDefine: true,
|
||||
// Timeout alarm
|
||||
timeout: 0,
|
||||
|
||||
tenantId: -1,
|
||||
// checked Timeout alarm
|
||||
checkedTimeout: true
|
||||
}
|
||||
@ -116,6 +123,7 @@
|
||||
this.store.commit('dag/setGlobalParams', _.cloneDeep(this.udpList))
|
||||
this.store.commit('dag/setName', _.cloneDeep(this.name))
|
||||
this.store.commit('dag/setTimeout', _.cloneDeep(this.timeout))
|
||||
this.store.commit('dag/setTenantId', _.cloneDeep(this.tenantId))
|
||||
this.store.commit('dag/setDesc', _.cloneDeep(this.desc))
|
||||
this.store.commit('dag/setSyncDefine', this.syncDefine)
|
||||
},
|
||||
@ -181,9 +189,10 @@
|
||||
this.syncDefine = dag.syncDefine
|
||||
this.timeout = dag.timeout || 0
|
||||
this.checkedTimeout = this.timeout !== 0
|
||||
this.tenantId = dag.tenantId || -1
|
||||
},
|
||||
mounted () {},
|
||||
components: { mLocalParams }
|
||||
components: {FormTenant, mLocalParams }
|
||||
}
|
||||
</script>
|
||||
|
||||
|
@ -26,7 +26,7 @@
|
||||
methods: {
|
||||
...mapMutations('dag', ['resetParams', 'setIsDetails']),
|
||||
...mapActions('dag', ['getProcessList', 'getResourcesList', 'getProcessDetails']),
|
||||
...mapActions('security', ['getWorkerGroupsAll']),
|
||||
...mapActions('security', ['getTenantList','getWorkerGroupsAll']),
|
||||
/**
|
||||
* init
|
||||
*/
|
||||
@ -43,7 +43,8 @@
|
||||
// get resource
|
||||
this.getResourcesList(),
|
||||
// get worker group list
|
||||
this.getWorkerGroupsAll()
|
||||
this.getWorkerGroupsAll(),
|
||||
this.getTenantList()
|
||||
]).then((data) => {
|
||||
let item = data[0]
|
||||
this.setIsDetails(item.releaseState === 'ONLINE')
|
||||
|
@ -25,7 +25,7 @@
|
||||
methods: {
|
||||
...mapMutations('dag', ['resetParams']),
|
||||
...mapActions('dag', ['getProcessList', 'getResourcesList']),
|
||||
...mapActions('security', ['getWorkerGroupsAll']),
|
||||
...mapActions('security', ['getTenantList','getWorkerGroupsAll']),
|
||||
/**
|
||||
* init
|
||||
*/
|
||||
@ -40,7 +40,8 @@
|
||||
// get resource
|
||||
this.getResourcesList(),
|
||||
// get worker group list
|
||||
this.getWorkerGroupsAll()
|
||||
this.getWorkerGroupsAll(),
|
||||
this.getTenantList()
|
||||
]).then((data) => {
|
||||
this.isLoading = false
|
||||
// Whether to pop up the box?
|
||||
@ -65,4 +66,4 @@
|
||||
},
|
||||
components: { mDag, mSpin }
|
||||
}
|
||||
</script>
|
||||
</script>
|
||||
|
@ -26,7 +26,7 @@
|
||||
methods: {
|
||||
...mapMutations('dag', ['setIsDetails', 'resetParams']),
|
||||
...mapActions('dag', ['getProcessList', 'getResourcesList', 'getInstancedetail']),
|
||||
...mapActions('security', ['getWorkerGroupsAll']),
|
||||
...mapActions('security', ['getTenantList','getWorkerGroupsAll']),
|
||||
/**
|
||||
* init
|
||||
*/
|
||||
@ -43,7 +43,8 @@
|
||||
// get resources
|
||||
this.getResourcesList(),
|
||||
// get worker group list
|
||||
this.getWorkerGroupsAll()
|
||||
this.getWorkerGroupsAll(),
|
||||
this.getTenantList()
|
||||
]).then((data) => {
|
||||
let item = data[0]
|
||||
let flag = false
|
||||
@ -92,4 +93,4 @@
|
||||
},
|
||||
components: { mDag, mSpin, mVariable }
|
||||
}
|
||||
</script>
|
||||
</script>
|
||||
|
@ -0,0 +1,115 @@
|
||||
<template>
|
||||
<m-list-construction :title="$t('statistics') + $t('Manage')">
|
||||
<template slot="content">
|
||||
<div class="servers-wrapper mysql-model" v-show="2">
|
||||
<div class="row">
|
||||
<div class="col-md-3">
|
||||
<div class="text-num-model text">
|
||||
<div class="title">
|
||||
<span >{{$t('process number of waiting for running')}}</span>
|
||||
</div>
|
||||
<div class="value-p">
|
||||
<b :style="{color:color[0]}"> {{commandCountData.normalCount}}</b>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-3">
|
||||
<div class="text-num-model text">
|
||||
<div class="title">
|
||||
<span >{{$t('failure command number')}}}</span>
|
||||
</div>
|
||||
<div class="value-p">
|
||||
<b :style="{color:color[1]}"> {{commandCountData.errorCount}}</b>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-3">
|
||||
<div class="text-num-model text">
|
||||
<div class="title">
|
||||
<span >{{$t('tasks number of waiting running')}}</span>
|
||||
</div>
|
||||
<div class="value-p">
|
||||
<b :style="{color:color[0]}"> {{queueCount.taskQueue}}</b>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-3">
|
||||
<div class="text-num-model text">
|
||||
<div class="title">
|
||||
<span >{{$t('task number of ready to kill')}}</span>
|
||||
</div>
|
||||
<div class="value-p">
|
||||
<b :style="{color:color[1]}">{{queueCount.taskKill}}</b>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<m-spin :is-spin="isLoading" ></m-spin>
|
||||
</template>
|
||||
</m-list-construction>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import { mapActions } from 'vuex'
|
||||
import mSpin from '@/module/components/spin/spin'
|
||||
import mNoData from '@/module/components/noData/noData'
|
||||
import themeData from '@/module/echarts/themeData.json'
|
||||
import mListConstruction from '@/module/components/listConstruction/listConstruction'
|
||||
|
||||
export default {
|
||||
name: 'statistics',
|
||||
data () {
|
||||
return {
|
||||
isLoading: false,
|
||||
queueCount: {},
|
||||
commandCountData: {},
|
||||
color: themeData.color
|
||||
}
|
||||
},
|
||||
props:{},
|
||||
methods: {
|
||||
//...mapActions('monitor', ['getDatabaseData'])
|
||||
// ...mapActions('projects', ['getCommandStateCount']),
|
||||
...mapActions('projects', ['getQueueCount']),
|
||||
...mapActions('projects', ['getCommandStateCount']),
|
||||
},
|
||||
watch: {},
|
||||
created () {
|
||||
this.isLoading = true
|
||||
this.getQueueCount().then(res => {
|
||||
this.queueCount = res.data
|
||||
this.isLoading = false
|
||||
}).catch(() => {
|
||||
this.isLoading = false
|
||||
})
|
||||
|
||||
this.getCommandStateCount().then(res => {
|
||||
let normal = 0
|
||||
let error = 0
|
||||
_.forEach(res.data, (v, i) => {
|
||||
let key = _.keys(v)
|
||||
if(key[0] == 'errorCount') {
|
||||
error = error + v.errorCount
|
||||
}
|
||||
if(key[1] == 'normalCount'){
|
||||
normal = normal + v.normalCount
|
||||
}
|
||||
}
|
||||
)
|
||||
this.commandCountData = {
|
||||
'normalCount': normal,
|
||||
'errorCount' : error
|
||||
}
|
||||
}).catch( () => {
|
||||
})
|
||||
},
|
||||
mounted () {
|
||||
},
|
||||
components: { mListConstruction, mSpin, mNoData }
|
||||
}
|
||||
|
||||
</script>
|
||||
<style lang="scss" rel="stylesheet/scss">
|
||||
@import "./servers";
|
||||
</style>
|
@ -33,30 +33,6 @@
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row" style="padding-top: 20px;">
|
||||
<div class="col-md-6">
|
||||
</div>
|
||||
<div class="col-md-6">
|
||||
<div class="chart-title">
|
||||
<span>{{$t('Queue statistics')}}</span>
|
||||
</div>
|
||||
<div class="row">
|
||||
<m-queue-count :search-params="searchParams">
|
||||
</m-queue-count>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-md-12">
|
||||
<div class="chart-title" style="margin-bottom: 20px;margin-top: 30px">
|
||||
<span>{{$t('Command status statistics')}}</span>
|
||||
</div>
|
||||
<div>
|
||||
<m-command-state-count :search-params="searchParams">
|
||||
</m-command-state-count>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-md-12">
|
||||
<div class="chart-title" style="margin-bottom: -20px;margin-top: 30px">
|
||||
|
@ -439,6 +439,14 @@ const router = new Router({
|
||||
meta: {
|
||||
title: `Mysql`
|
||||
}
|
||||
},
|
||||
{
|
||||
path: '/monitor/servers/statistics',
|
||||
name: 'statistics',
|
||||
component: resolve => require(['../pages/monitor/pages/servers/statistics'], resolve),
|
||||
meta: {
|
||||
title: `statistics`
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -115,6 +115,7 @@ export default {
|
||||
// timeout
|
||||
state.timeout = processDefinitionJson.timeout
|
||||
|
||||
state.tenantId = processDefinitionJson.tenantId
|
||||
resolve(res.data)
|
||||
}).catch(res => {
|
||||
reject(res)
|
||||
@ -146,6 +147,8 @@ export default {
|
||||
// timeout
|
||||
state.timeout = processInstanceJson.timeout
|
||||
|
||||
state.tenantId = processInstanceJson.tenantId
|
||||
|
||||
resolve(res.data)
|
||||
}).catch(res => {
|
||||
reject(res)
|
||||
@ -160,6 +163,7 @@ export default {
|
||||
let data = {
|
||||
globalParams: state.globalParams,
|
||||
tasks: state.tasks,
|
||||
tenantId: state.tenantId,
|
||||
timeout: state.timeout
|
||||
}
|
||||
io.post(`projects/${state.projectName}/process/save`, {
|
||||
@ -183,6 +187,7 @@ export default {
|
||||
let data = {
|
||||
globalParams: state.globalParams,
|
||||
tasks: state.tasks,
|
||||
tenantId: state.tenantId,
|
||||
timeout: state.timeout
|
||||
}
|
||||
io.post(`projects/${state.projectName}/process/update`, {
|
||||
@ -207,6 +212,7 @@ export default {
|
||||
let data = {
|
||||
globalParams: state.globalParams,
|
||||
tasks: state.tasks,
|
||||
tenantId: state.tenantId,
|
||||
timeout: state.timeout
|
||||
}
|
||||
io.post(`projects/${state.projectName}/instance/update`, {
|
||||
|
@ -58,6 +58,12 @@ export default {
|
||||
setTimeout (state, payload) {
|
||||
state.timeout = payload
|
||||
},
|
||||
/**
|
||||
* set tenantId
|
||||
*/
|
||||
setTenantId (state, payload) {
|
||||
state.tenantId = payload
|
||||
},
|
||||
/**
|
||||
* set global params
|
||||
*/
|
||||
@ -100,6 +106,7 @@ export default {
|
||||
state.name = payload && payload.name || ''
|
||||
state.desc = payload && payload.desc || ''
|
||||
state.timeout = payload && payload.timeout || 0
|
||||
state.tenantId = payload && payload.tenantId || -1
|
||||
state.processListS = payload && payload.processListS || []
|
||||
state.resourcesListS = payload && payload.resourcesListS || []
|
||||
state.isDetails = payload && payload.isDetails || false
|
||||
|
@ -31,6 +31,8 @@ export default {
|
||||
tasks: [],
|
||||
// Timeout alarm
|
||||
timeout: 0,
|
||||
// tenant id
|
||||
tenantId:-1,
|
||||
// Node location information
|
||||
locations: {},
|
||||
// Node-to-node connection
|
||||
|
@ -240,7 +240,13 @@ export default {
|
||||
getTenantList ({ state }, payload) {
|
||||
return new Promise((resolve, reject) => {
|
||||
io.get(`tenant/list`, payload, res => {
|
||||
resolve(res.data)
|
||||
let list=res.data
|
||||
list.unshift({
|
||||
id: -1,
|
||||
tenantName: 'Default'
|
||||
})
|
||||
state.tenantAllList = list
|
||||
resolve(list)
|
||||
}).catch(e => {
|
||||
reject(e)
|
||||
})
|
||||
|
@ -15,5 +15,6 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
export default {
|
||||
workerGroupsListAll: []
|
||||
workerGroupsListAll: [],
|
||||
tenantAllList : []
|
||||
}
|
||||
|
@ -193,7 +193,7 @@ let menu = {
|
||||
monitor: [
|
||||
{
|
||||
name: `${i18n.$t('Servers manage')}`,
|
||||
id: 0,
|
||||
id: 1,
|
||||
path: '',
|
||||
isOpen: true,
|
||||
disabled: true,
|
||||
@ -242,6 +242,22 @@ let menu = {
|
||||
disabled: true
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
name: `${i18n.$t('Statistics manage')}`,
|
||||
id: 0,
|
||||
path: '',
|
||||
isOpen: true,
|
||||
disabled: true,
|
||||
icon: 'fa-server',
|
||||
children: [
|
||||
{
|
||||
name: "Statistics",
|
||||
path: 'statistics',
|
||||
id: 0,
|
||||
disabled: true
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -459,5 +459,13 @@ export default {
|
||||
'Statement cannot be empty': 'Statement cannot be empty',
|
||||
'Process Define Count': 'Process Define Count',
|
||||
'Process Instance Running Count': 'Process Instance Running Count',
|
||||
'process number of waiting for running': 'process number of waiting for running',
|
||||
'failure command number': 'failure command number',
|
||||
'tasks number of waiting running': 'tasks number of waiting running',
|
||||
'task number of ready to kill': '待杀死任务数',
|
||||
'Statistics manage': 'Statistics manage',
|
||||
'statistics': 'statistics',
|
||||
'select tenant':'select tenant',
|
||||
'Process Instance Running Count': 'Process Instance Running Count',
|
||||
'Please enter Principal':'Please enter Principal'
|
||||
}
|
||||
|
@ -460,5 +460,12 @@ export default {
|
||||
'Process Define Count': '流程定义个数',
|
||||
'Process Instance Running Count': '运行流程实例个数',
|
||||
'Please select a queue': '请选择队列',
|
||||
'process number of waiting for running': '待执行的流程数',
|
||||
'failure command number': '执行失败的命令数',
|
||||
'tasks number of waiting running': '待运行任务数',
|
||||
'task number of ready to kill': '待杀死任务数',
|
||||
'Statistics manage': '统计管理',
|
||||
'statistics': '统计',
|
||||
'select tenant':'选择租户',
|
||||
'Please enter Principal':'请输入Principal'
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user