From 852597c9bf3340b679be8e11b5fe33b3e34e03e7 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Tue, 22 Mar 2022 14:02:07 +0800 Subject: [PATCH] [Bug-9070][Server]fix taskExecutionContext json convert error (#9072) * [Bug-9070][Server]fix taskExecutionContext json convert error * test * add license header Co-authored-by: caishunfeng <534328519@qq.com> --- .../consumer/TaskPriorityQueueConsumer.java | 2 +- .../resource/AbstractResourceParameters.java | 9 +++ .../resource/ResourceParametersHelper.java | 12 ++-- .../AbstractResourceParametersTest.java | 56 +++++++++++++++++++ 4 files changed, 74 insertions(+), 5 deletions(-) create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParametersTest.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 7c158add39..d7d549413d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -168,7 +168,7 @@ public class TaskPriorityQueueConsumer extends Thread { } result = dispatcher.dispatch(executionContext); - } catch (ExecuteException e) { + } catch (RuntimeException | ExecuteException e) { logger.error("dispatch error: {}", e.getMessage(), e); } return result; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParameters.java index 7f05b1238d..39193c8f2e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParameters.java @@ -17,6 +17,15 @@ package org.apache.dolphinscheduler.plugin.task.api.parameters.resource; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @Type(value = DataSourceParameters.class, name = "DATASOURCE"), + @Type(value = UdfFuncParameters.class, name = "UDF") +}) public abstract class AbstractResourceParameters { } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/ResourceParametersHelper.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/ResourceParametersHelper.java index 43c4aaf8be..14628c04ec 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/ResourceParametersHelper.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/ResourceParametersHelper.java @@ -25,23 +25,27 @@ import java.util.Objects; public class ResourceParametersHelper { - private Map> map = new HashMap<>(); + private Map> resourceMap = new HashMap<>(); public void put(ResourceType resourceType, Integer id) { put(resourceType, id, null); } public void put(ResourceType resourceType, Integer id, AbstractResourceParameters parameters) { - Map resourceParametersMap = map.get(resourceType); + Map resourceParametersMap = resourceMap.get(resourceType); if (Objects.isNull(resourceParametersMap)) { resourceParametersMap = new HashMap<>(); - map.put(resourceType, resourceParametersMap); + resourceMap.put(resourceType, resourceParametersMap); } resourceParametersMap.put(id, parameters); } + public void setResourceMap(Map> resourceMap) { + this.resourceMap = resourceMap; + } + public Map> getResourceMap() { - return map; + return resourceMap; } public Map getResourceMap(ResourceType resourceType) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParametersTest.java new file mode 100644 index 0000000000..9f96294038 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/resource/AbstractResourceParametersTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.parameters.resource; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; +import org.apache.dolphinscheduler.spi.enums.DbType; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +import org.junit.Assert; +import org.junit.Test; + +public class AbstractResourceParametersTest { + + @Test + public void testDataSource() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + String taskParam = "{\"localParams\":[],\"resourceList\":[],\"type\":\"MYSQL\",\"datasource\":\"1\",\"sql\":\"select now();\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}"; + + ResourceParametersHelper resourceParametersHelper = JSONUtils.parseObject(taskParam, SqlParameters.class).getResources(); + + resourceParametersHelper.getResourceMap().forEach((type, map) -> { + map.forEach((code, parameters) -> { + DataSourceParameters dataSourceParameters = new DataSourceParameters(); + dataSourceParameters.setType(DbType.MYSQL); + dataSourceParameters.setConnectionParams("127.0.0.1:3306"); + map.put(code, dataSourceParameters); + }); + }); + + taskExecutionContext.setResourceParametersHelper(resourceParametersHelper); + + String json = JSONUtils.toJsonString(taskExecutionContext); + + taskExecutionContext = JSONUtils.parseObject(json, TaskExecutionContext.class); + + Assert.assertNotNull(taskExecutionContext); + } +} + +