Fix task log file might not be clear (#13102)

This commit is contained in:
Wenjun Ruan 2022-12-06 11:23:20 +08:00 committed by GitHub
parent 602971c724
commit 8a152aebc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 34 additions and 22 deletions

View File

@ -814,18 +814,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
}
try {
processService.removeTaskLogFile(processInstanceId);
} catch (Exception ex) {
// ignore
logger.warn("Remove task log file exception, processInstanceId:{}.", processInstanceId, ex);
}
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);
// We need to remove the task log file before deleting the task instance
// because the task log file is query from task instance.
// When delete task instance error, the task log file will also be deleted, this may cause data inconsistency.
processService.removeTaskLogFile(processInstanceId);
taskInstanceDao.deleteByWorkflowInstanceId(processInstanceId);
if (delete > 0) {

View File

@ -155,4 +155,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("status") int status);
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId);
List<TaskInstance> findByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
}

View File

@ -89,4 +89,5 @@ public interface TaskInstanceDao {
void deleteByWorkflowInstanceId(int workflowInstanceId);
List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer processInstanceId);
}

View File

@ -172,4 +172,9 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
taskInstanceMapper.deleteByWorkflowInstanceId(workflowInstanceId);
}
@Override
public List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) {
return taskInstanceMapper.findByWorkflowInstanceId(workflowInstanceId);
}
}

View File

@ -57,6 +57,12 @@
and test_flag=#{testFlag}
order by start_time desc
</select>
<select id="findByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
WHERE process_instance_id = #{workflowInstanceId}
</select>
<select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>

View File

@ -176,17 +176,15 @@ public class LogClient implements AutoCloseable {
* remove task log
*
* @param host host
* @param port port
* @param path path
* @return remove task status
*/
public Boolean removeTaskLog(String host, int port, String path) {
logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path);
public Boolean removeTaskLog(@NonNull Host host, String path) {
logger.info("Remove task log from host: {} logPath {}", host, path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
Command response = this.client.sendSync(host, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RemoveTaskLogResponseCommand taskLogResponse =
JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
@ -196,11 +194,11 @@ public class LogClient implements AutoCloseable {
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
"Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted",
host, port, path, ex);
"Remove task log from host: {}, logPath: {} error, the current thread has been interrupted",
host, path, ex);
return false;
} catch (Exception e) {
logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e);
logger.error("Remove task log from host: {}, logPath: {} error", host, path, e);
return false;
}
}

View File

@ -510,9 +510,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public void removeTaskLogFile(Integer processInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
List<TaskInstance> taskInstanceList =
taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
List<TaskInstance> taskInstanceList = taskInstanceDao.findTaskInstanceByWorkflowInstanceId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@ -521,9 +519,14 @@ public class ProcessServiceImpl implements ProcessService {
if (Strings.isNullOrEmpty(taskInstance.getHost())) {
continue;
}
Host host = Host.of(taskInstance.getHost());
// remove task log from loggerserver
logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
try {
Host host = Host.of(taskInstance.getHost());
logClient.removeTaskLog(host, taskLogPath);
} catch (Exception e) {
logger.error(
"Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}",
taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e);
}
}
}

View File

@ -151,7 +151,7 @@ public class LogClientTest {
.thenReturn(command);
LogClient logClient = new LogClient();
Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path");
Boolean status = logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path");
Assertions.assertTrue(status);
}
}