mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-29 18:58:05 +08:00
[Fix] [Worker] Fix worker will hang if fails to start (#10342)
* Fix worker will hang if fails to start * Add .run to ignore
This commit is contained in:
parent
da3c25dc67
commit
516757cc74
1
.gitignore
vendored
1
.gitignore
vendored
@ -6,6 +6,7 @@
|
||||
.DS_Store
|
||||
.target
|
||||
.idea/
|
||||
.run/
|
||||
target/
|
||||
dist/
|
||||
all-dependencies.txt
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.hive;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
|
||||
@ -60,7 +61,8 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
|
||||
@Override
|
||||
protected void preInit() {
|
||||
logger.info("PreInit in {}", getClass().getName());
|
||||
this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor();
|
||||
this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor(
|
||||
new ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
|
||||
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants;
|
||||
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@ -54,6 +55,8 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* logger request process logic
|
||||
*/
|
||||
@ -65,7 +68,8 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
|
||||
private final ExecutorService executor;
|
||||
|
||||
public LoggerRequestProcessor() {
|
||||
this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1);
|
||||
this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1,
|
||||
new NamedThreadFactory("Log-Request-Process-Thread"));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -80,7 +84,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
|
||||
command.getBody(), GetLogBytesRequestCommand.class);
|
||||
String path = getLogRequest.getPath();
|
||||
if (!checkPathSecurity(path)) {
|
||||
throw new IllegalArgumentException("Illegal path");
|
||||
throw new IllegalArgumentException("Illegal path: " + path);
|
||||
}
|
||||
byte[] bytes = getFileContentBytes(path);
|
||||
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
|
||||
@ -91,7 +95,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
|
||||
command.getBody(), ViewLogRequestCommand.class);
|
||||
String viewLogPath = viewLogRequest.getPath();
|
||||
if (!checkPathSecurity(viewLogPath)) {
|
||||
throw new IllegalArgumentException("Illegal path");
|
||||
throw new IllegalArgumentException("Illegal path: " + viewLogPath);
|
||||
}
|
||||
String msg = LoggerUtils.readWholeFileContent(viewLogPath);
|
||||
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
|
||||
@ -103,7 +107,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
|
||||
|
||||
String rollViewLogPath = rollViewLogRequest.getPath();
|
||||
if (!checkPathSecurity(rollViewLogPath)) {
|
||||
throw new IllegalArgumentException("Illegal path");
|
||||
throw new IllegalArgumentException("Illegal path: " + rollViewLogPath);
|
||||
}
|
||||
|
||||
List<String> lines = readPartFileContent(rollViewLogPath,
|
||||
@ -121,7 +125,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
|
||||
|
||||
String taskLogPath = removeTaskLogRequest.getPath();
|
||||
if (!checkPathSecurity(taskLogPath)) {
|
||||
throw new IllegalArgumentException("Illegal path");
|
||||
throw new IllegalArgumentException("Illegal path: " + taskLogPath);
|
||||
}
|
||||
File taskLogFile = new File(taskLogPath);
|
||||
boolean status = true;
|
||||
@ -137,7 +141,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
|
||||
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("unknown commandType");
|
||||
throw new IllegalArgumentException("unknown commandType: " + commandType);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,8 +103,8 @@ public class NettyRemotingServer {
|
||||
*/
|
||||
public NettyRemotingServer(final NettyServerConfig serverConfig) {
|
||||
this.serverConfig = serverConfig;
|
||||
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build();
|
||||
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build();
|
||||
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build();
|
||||
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build();
|
||||
if (Epoll.isAvailable()) {
|
||||
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
|
||||
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
|
||||
|
Loading…
Reference in New Issue
Block a user