mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-02 04:08:31 +08:00
Add task executor threads full policy config in worker (#12510)
This commit is contained in:
parent
af031ef9f8
commit
75f6c416fb
@ -295,6 +295,7 @@ Location: `worker-server/conf/application.yaml`
|
|||||||
|worker.alert-listen-port|50052|the alert listen port of worker|
|
|worker.alert-listen-port|50052|the alert listen port of worker|
|
||||||
|worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting|
|
|worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting|
|
||||||
|worker.registry-disconnect-strategy.max-waiting-time|100s|Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will waitting infinitely |
|
|worker.registry-disconnect-strategy.max-waiting-time|100s|Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will waitting infinitely |
|
||||||
|
|worker.task-execute-threads-full-policy|REJECT|If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution|
|
||||||
|
|
||||||
### Alert Server related configuration
|
### Alert Server related configuration
|
||||||
|
|
||||||
|
@ -289,6 +289,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
|
|||||||
|worker.alert-listen-port|50052|alert监听端口|
|
|worker.alert-listen-port|50052|alert监听端口|
|
||||||
|worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
|
|worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
|
||||||
|worker.registry-disconnect-strategy.max-waiting-time|100s|当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
|
|worker.registry-disconnect-strategy.max-waiting-time|100s|当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
|
||||||
|
|worker.task-execute-threads-full-policy|REJECT|如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务,Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务,放入等待队列中等待空闲线程去执行该任务|
|
||||||
|
|
||||||
## Alert Server相关配置
|
## Alert Server相关配置
|
||||||
|
|
||||||
|
@ -175,6 +175,7 @@ worker:
|
|||||||
# alert server listen host
|
# alert server listen host
|
||||||
alert-listen-host: localhost
|
alert-listen-host: localhost
|
||||||
alert-listen-port: 50052
|
alert-listen-port: 50052
|
||||||
|
task-execute-threads-full-policy: REJECT
|
||||||
|
|
||||||
alert:
|
alert:
|
||||||
port: 50052
|
port: 50052
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.worker.config;
|
||||||
|
|
||||||
|
public enum TaskExecuteThreadsFullPolicy {
|
||||||
|
CONTINUE,
|
||||||
|
REJECT,
|
||||||
|
;
|
||||||
|
}
|
@ -60,6 +60,8 @@ public class WorkerConfig implements Validator {
|
|||||||
private String workerAddress;
|
private String workerAddress;
|
||||||
private String workerRegistryPath;
|
private String workerRegistryPath;
|
||||||
|
|
||||||
|
private TaskExecuteThreadsFullPolicy taskExecuteThreadsFullPolicy = TaskExecuteThreadsFullPolicy.REJECT;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(Class<?> clazz) {
|
public boolean supports(Class<?> clazz) {
|
||||||
return WorkerConfig.class.isAssignableFrom(clazz);
|
return WorkerConfig.class.isAssignableFrom(clazz);
|
||||||
@ -97,5 +99,6 @@ public class WorkerConfig implements Validator {
|
|||||||
logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
|
logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
|
||||||
logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
|
logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
|
||||||
logger.info("Worker config: workerRegistryPath: {}", workerRegistryPath);
|
logger.info("Worker config: workerRegistryPath: {}", workerRegistryPath);
|
||||||
|
logger.info("Worker config: taskExecuteThreadsFullPolicy: {}", taskExecuteThreadsFullPolicy);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
|
|||||||
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
|
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
|
||||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
|
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
|
||||||
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
||||||
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
|
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -40,8 +41,8 @@ public class WorkerManagerThread implements Runnable {
|
|||||||
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
|
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
|
||||||
|
|
||||||
private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
|
private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
|
||||||
|
|
||||||
private final WorkerExecService workerExecService;
|
private final WorkerExecService workerExecService;
|
||||||
|
private final WorkerConfig workerConfig;
|
||||||
|
|
||||||
private final int workerExecThreads;
|
private final int workerExecThreads;
|
||||||
|
|
||||||
@ -51,6 +52,7 @@ public class WorkerManagerThread implements Runnable {
|
|||||||
private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public WorkerManagerThread(WorkerConfig workerConfig) {
|
public WorkerManagerThread(WorkerConfig workerConfig) {
|
||||||
|
this.workerConfig = workerConfig;
|
||||||
workerExecThreads = workerConfig.getExecThreads();
|
workerExecThreads = workerConfig.getExecThreads();
|
||||||
this.waitSubmitQueue = new DelayQueue<>();
|
this.waitSubmitQueue = new DelayQueue<>();
|
||||||
workerExecService = new WorkerExecService(
|
workerExecService = new WorkerExecService(
|
||||||
@ -92,6 +94,10 @@ public class WorkerManagerThread implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
|
public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
|
||||||
|
if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
|
||||||
|
return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
|
||||||
|
}
|
||||||
|
|
||||||
if (waitSubmitQueue.size() > workerExecThreads) {
|
if (waitSubmitQueue.size() > workerExecThreads) {
|
||||||
logger.warn("Wait submit queue is full, will retry submit task later");
|
logger.warn("Wait submit queue is full, will retry submit task later");
|
||||||
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
|
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
|
||||||
|
@ -76,6 +76,7 @@ worker:
|
|||||||
strategy: waiting
|
strategy: waiting
|
||||||
# The max waiting time to reconnect to registry if you set the strategy to waiting
|
# The max waiting time to reconnect to registry if you set the strategy to waiting
|
||||||
max-waiting-time: 100s
|
max-waiting-time: 100s
|
||||||
|
task-execute-threads-full-policy: REJECT
|
||||||
|
|
||||||
server:
|
server:
|
||||||
port: 1235
|
port: 1235
|
||||||
|
Loading…
Reference in New Issue
Block a user