[Fix-7146][server]Fix the task processor thread is not safe. (#7488)

* Fix the task processor thread is not safe.

* fix code style
This commit is contained in:
Kerwin 2021-12-20 17:58:57 +08:00 committed by GitHub
parent 8c2dd4447b
commit b23f756c22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 238 additions and 72 deletions

View File

@ -64,12 +64,6 @@ public class MasterSchedulerService extends Thread {
@Autowired
private ProcessService processService;
/**
* task processor factory
*/
@Autowired
private TaskProcessorFactory taskProcessorFactory;
/**
* master config
*/
@ -176,8 +170,7 @@ public class MasterSchedulerService extends Thread {
, nettyExecutorManager
, processAlertManager
, masterConfig
, stateWheelExecuteThread
, taskProcessorFactory);
, stateWheelExecuteThread);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {

View File

@ -133,11 +133,6 @@ public class WorkflowExecuteThread {
*/
private ProcessDefinition processDefinition;
/**
* task processor
*/
private TaskProcessorFactory taskProcessorFactory;
/**
* the object of DAG
*/
@ -227,22 +222,19 @@ public class WorkflowExecuteThread {
* @param processAlertManager processAlertManager
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
* @param taskProcessorFactory taskProcessorFactory
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, StateWheelExecuteThread stateWheelExecuteThread
, TaskProcessorFactory taskProcessorFactory) {
, StateWheelExecuteThread stateWheelExecuteThread) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.taskProcessorFactory = taskProcessorFactory;
}
/**
@ -805,7 +797,7 @@ public class WorkflowExecuteThread {
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
try {
ITaskProcessor taskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);

View File

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
@ -61,7 +62,6 @@ import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.base.Enums;
import com.google.common.base.Strings;
@ -80,8 +80,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessInstance processInstance;
@Autowired
protected ProcessService processService;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);;
/**
* pause task, common tasks donot need this.

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class CommonTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return Constants.COMMON_TASK_TYPE;
}
@Override
public ITaskProcessor create() {
return new CommonTaskProcessor();
}
}

View File

@ -38,20 +38,14 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* common task processor
*/
@Service
public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
private TaskPriorityQueue taskUpdateQueue;
@Autowired
NettyExecutorManager nettyExecutorManager;
private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class ConditionTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.CONDITIONS.getDesc();
}
@Override
public ITaskProcessor create() {
return new ConditionTaskProcessor();
}
}

View File

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@ -39,13 +40,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* condition task processor
*/
@Service
public class ConditionTaskProcessor extends BaseTaskProcessor {
/**
@ -65,8 +62,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
@Autowired
private MasterConfig masterConfig;
private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
private TaskDefinition taskDefinition;

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class DependentTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.DEPENDENT.getDesc();
}
@Override
public ITaskProcessor create() {
return new DependentTaskProcessor();
}
}

View File

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@ -40,15 +41,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* dependent task processor
*/
@Service
public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
@ -75,8 +72,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
ProcessInstance processInstance;
TaskDefinition taskDefinition;
@Autowired
private MasterConfig masterConfig;
private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
boolean allDependentItemFinished;

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
public interface ITaskProcessFactory {
String type();
ITaskProcessor create();
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class SubTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.SUB_PROCESS.getDesc();
}
@Override
public ITaskProcessor create() {
return new SubTaskProcessor();
}
}

View File

@ -25,18 +25,15 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
* subtask processor
*/
@Service
public class SubTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
@ -49,8 +46,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
*/
private final Lock runLock = new ReentrantLock();
@Autowired
private StateEventCallbackService stateEventCallbackService;
private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class SwitchTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.SWITCH.getDesc();
}
@Override
public ITaskProcessor create() {
return new SwitchTaskProcessor();
}
}

View File

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@ -46,7 +47,9 @@ import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
/**
* switch task processor
*/
public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
@ -56,8 +59,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
TaskDefinition taskDefinition;
@Autowired
private MasterConfig masterConfig;
private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
/**
* switch result

View File

@ -21,39 +21,35 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
/**
* the factory to create task processor
*/
@Service
public class TaskProcessorFactory {
public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
private Map<String, ITaskProcessor> taskProcessorMap;
@Autowired
public TaskProcessorFactory(List<ITaskProcessor> taskProcessors) {
taskProcessorMap = taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType, Function.identity(), (v1, v2) -> v2));
static {
for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) {
PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
}
}
public ITaskProcessor getTaskProcessor(String key) {
if (StringUtils.isEmpty(key)) {
key = DEFAULT_PROCESSOR;
public static ITaskProcessor getTaskProcessor(String type) {
if (StringUtils.isEmpty(type)) {
type = DEFAULT_PROCESSOR;
}
ITaskProcessor taskProcessor = taskProcessorMap.get(key);
if (Objects.isNull(taskProcessor)) {
taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR);
ITaskProcessFactory taskProcessFactory = PROCESS_FACTORY_MAP.get(type);
if (Objects.isNull(taskProcessFactory)) {
taskProcessFactory = PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR);
}
return taskProcessor;
return taskProcessFactory.create();
}
}

View File

@ -111,7 +111,7 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread, taskProcessorFactory));
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);

View File

@ -26,14 +26,13 @@ import org.junit.Test;
@Ignore
public class TaskProcessorFactoryTest {
private TaskProcessorFactory taskProcessorFactory;
@Test
public void testFactory() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");
ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
ITaskProcessor iTaskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
Assert.assertNotNull(iTaskProcessor);
}