diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index 2aadab8039..b5b85427f7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; @@ -154,26 +155,21 @@ public class SeatunnelTask extends AbstractRemoteTask { protected List buildOptions() throws Exception { List args = new ArrayList<>(); + args.add(CONFIG_OPTIONS); + String scriptContent; if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) { - args.add(CONFIG_OPTIONS); - args.add(buildCustomConfigCommand()); + scriptContent = buildCustomConfigContent(); } else { - seatunnelParameters.getResourceList().forEach(resourceInfo -> { - args.add(CONFIG_OPTIONS); - // TODO: Need further check for refactored resource center - // TODO Currently resourceName is `/xxx.sh`, it has more `/` and needs to be optimized - args.add(resourceInfo.getResourceName().replaceFirst(".*:", "")); - }); + String resourceFileName = seatunnelParameters.getResourceList().get(0).getResourceName(); + ResourceContext resourceContext = taskExecutionContext.getResourceContext(); + scriptContent = FileUtils.readFileToString( + new File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()), + StandardCharsets.UTF_8); } - return args; - } - - protected String buildCustomConfigCommand() throws Exception { - String config = buildCustomConfigContent(); String filePath = buildConfigFilePath(); - createConfigFileIfNotExists(config, filePath); - - return filePath; + createConfigFileIfNotExists(scriptContent, filePath); + args.add(filePath); + return args; } private String buildCustomConfigContent() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java similarity index 54% rename from dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java rename to dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java index 70abee3281..11fffedd80 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java @@ -14,35 +14,91 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.plugin.task.seatunnel; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.junit.Test; +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; + +import org.apache.commons.io.FileUtils; + +import java.util.Collections; + +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; public class SeatunnelTaskTest { - private static final String EXECUTE_PATH = "/home"; - private static final String TASK_APPID = "9527"; + + private static final String EXECUTE_PATH = "/tmp"; + private static final String RESOURCE_SCRIPT_PATH = "/tmp/demo.conf"; + + private MockedStatic mockedStaticFileUtils; + + @BeforeEach + public void setUp() { + mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class); + } + + @AfterEach + public void after() { + mockedStaticFileUtils.close(); + } @Test - public void formatDetector() throws Exception{ + public void formatDetector() throws Exception { + String taskId = "1234"; SeatunnelParameters seatunnelParameters = new SeatunnelParameters(); + seatunnelParameters.setUseCustom(true); seatunnelParameters.setRawScript(RAW_SCRIPT); TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setExecutePath(EXECUTE_PATH); - taskExecutionContext.setTaskAppId(TASK_APPID); + taskExecutionContext.setTaskAppId(taskId); taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters)); SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext); seatunnelTask.setSeatunnelParameters(seatunnelParameters); - Assertions.assertEquals("/home/seatunnel_9527.conf", seatunnelTask.buildCustomConfigCommand()); + String command1 = String.join(" ", seatunnelTask.buildOptions()); + String expectedCommand1 = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, taskId); + Assertions.assertEquals(expectedCommand1, command1); seatunnelParameters.setRawScript(RAW_SCRIPT_2); seatunnelTask.setSeatunnelParameters(seatunnelParameters); - Assertions.assertEquals("/home/seatunnel_9527.json", seatunnelTask.buildCustomConfigCommand()); + String command2 = String.join(" ", seatunnelTask.buildOptions()); + String expectedCommand2 = String.format("--config %s/seatunnel_%s.json", EXECUTE_PATH, taskId); + Assertions.assertEquals(expectedCommand2, command2); } + + @Test + public void testReadConfigFromResourceCenter() throws Exception { + String taskId = "2345"; + SeatunnelParameters seatunnelParameters = new SeatunnelParameters(); + seatunnelParameters.setUseCustom(false); + ResourceInfo resourceInfo = new ResourceInfo(); + resourceInfo.setResourceName(RESOURCE_SCRIPT_PATH); + seatunnelParameters.setResourceList(Collections.singletonList(resourceInfo)); + + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setExecutePath(EXECUTE_PATH); + taskExecutionContext.setTaskAppId(taskId); + taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters)); + ResourceContext resourceContext = new ResourceContext(); + resourceContext.addResourceItem(new ResourceContext.ResourceItem(RESOURCE_SCRIPT_PATH, RESOURCE_SCRIPT_PATH)); + taskExecutionContext.setResourceContext(resourceContext); + + SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext); + seatunnelTask.setSeatunnelParameters(seatunnelParameters); + String command = String.join(" ", seatunnelTask.buildOptions()); + String expectedCommand = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, taskId); + Assertions.assertEquals(expectedCommand, command); + } + private static final String RAW_SCRIPT = "env {\n" + " execution.parallelism = 2\n" + " job.mode = \"BATCH\"\n" + diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index fc5a1d9466..006756f10d 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -205,7 +205,9 @@ export function formatParams(data: INodeData): { if (data.taskType === 'SEATUNNEL') { taskParams.startupScript = data.startupScript taskParams.useCustom = data.useCustom - taskParams.rawScript = data.rawScript + if (!data.useCustom) { + taskParams.rawScript = '' + } if (data.startupScript?.includes('flink')) { taskParams.runMode = data.runMode taskParams.others = data.others