diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index ffb6febd66..a45fbfff6f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -457,6 +457,11 @@ public class ProcessDao { if(tenantId >= 0){ tenant = tenantMapper.queryById(tenantId); } + + if (userId == 0){ + return null; + } + if(null == tenant){ User user = userMapper.selectById(userId); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index 4899f4b2b5..221ad069bb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.queue.ITaskQueue; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -139,6 +140,7 @@ public class FetchTaskThread implements Runnable{ logger.info("worker start fetch tasks..."); while (Stopper.isRunning()){ InterProcessMutex mutex = null; + String currentTaskQueueStr = null; try { ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; //check memory and cpu usage and threads @@ -165,6 +167,9 @@ public class FetchTaskThread implements Runnable{ List taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum); for(String taskQueueStr : taskQueueStrArr){ + + currentTaskQueueStr = taskQueueStr; + if (StringUtils.isEmpty(taskQueueStr)) { continue; } @@ -184,7 +189,7 @@ public class FetchTaskThread implements Runnable{ // verify task instance is null if (verifyTaskInstanceIsNull(taskInstance)) { logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr); - removeNodeFromTaskQueue(taskQueueStr); + processErrorTask(taskQueueStr); continue; } @@ -192,13 +197,17 @@ public class FetchTaskThread implements Runnable{ continue; } - Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), - taskInstance.getProcessDefine().getUserId()); + // if process definition is null ,process definition already deleted + int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); + + Tenant tenant = processDao.getTenantForProcess( + taskInstance.getProcessInstance().getTenantId(), + userId); // verify tenant is null if (verifyTenantIsNull(tenant)) { logger.warn("remove task queue : {} due to tenant is null", taskQueueStr); - removeNodeFromTaskQueue(taskQueueStr); + processErrorTask(taskQueueStr); continue; } @@ -232,6 +241,7 @@ public class FetchTaskThread implements Runnable{ } }catch (Exception e){ + processErrorTask(currentTaskQueueStr); logger.error("fetch task thread failure" ,e); }finally { AbstractZKClient.releaseMutex(mutex); @@ -239,6 +249,26 @@ public class FetchTaskThread implements Runnable{ } } + /** + * process error task + * + * @param taskQueueStr task queue str + */ + private void processErrorTask(String taskQueueStr){ + // remove from zk + removeNodeFromTaskQueue(taskQueueStr); + + if (taskInstance != null){ + processDao.changeTaskState(ExecutionStatus.FAILURE, + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + null, + taskInstId); + } + + } + /** * remove node from task queue * @@ -269,8 +299,7 @@ public class FetchTaskThread implements Runnable{ */ private boolean verifyTenantIsNull(Tenant tenant) { if(tenant == null){ - logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}", - taskInstance.getProcessDefine().getId(), + logger.error("tenant not exists,process instance id : {},task instance id : {}", taskInstance.getProcessInstance().getId(), taskInstance.getId()); return true;