[Improvement][API] simplify with stream (#3764)

* simplify with stream

* add distinct

* compatible tasks is null

* add unit test
This commit is contained in:
孙继峰 2020-09-26 15:45:24 +08:00 committed by GitHub
parent 3b581455fc
commit eb597e67e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 20 deletions

View File

@ -48,7 +48,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -77,6 +77,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -84,6 +85,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@ -232,25 +234,16 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
* @return resource ids
*/
private String getResourceIds(ProcessData processData) {
List<TaskNode> tasks = processData.getTasks();
Set<Integer> resourceIds = new HashSet<>();
for (TaskNode taskNode : tasks) {
String taskParameter = taskNode.getParams();
AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter);
if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
Set<Integer> tempSet = params.getResourceFilesList().stream().map(t -> t.getId()).collect(Collectors.toSet());
resourceIds.addAll(tempSet);
}
}
StringBuilder sb = new StringBuilder();
for (int i : resourceIds) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(i);
}
return sb.toString();
return Optional.ofNullable(processData.getTasks())
.orElse(Collections.emptyList())
.stream()
.map(taskNode -> TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()))
.filter(Objects::nonNull)
.flatMap(parameters -> parameters.getResourceFilesList().stream())
.map(ResourceInfo::getId)
.distinct()
.map(Objects::toString)
.collect(Collectors.joining(","));
}
/**

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
@ -28,6 +30,9 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -51,8 +56,11 @@ import org.apache.http.entity.ContentType;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -66,6 +74,7 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
@ -984,6 +993,81 @@ public class ProcessDefinitionServiceTest {
loginUser, projectName, "1", null);
}
@Test
public void testGetResourceIds() throws Exception {
// set up
Method testMethod = ReflectionUtils.findMethod(ProcessDefinitionServiceImpl.class, "getResourceIds", ProcessData.class);
assertThat(testMethod).isNotNull();
testMethod.setAccessible(true);
// when processData has empty task, then return empty string
ProcessData input1 = new ProcessData();
input1.setTasks(Collections.emptyList());
String output1 = (String) testMethod.invoke(processDefinitionService, input1);
assertThat(output1).isEmpty();
// when task is null, then return empty string
ProcessData input2 = new ProcessData();
input2.setTasks(null);
String output2 = (String) testMethod.invoke(processDefinitionService, input2);
assertThat(output2).isEmpty();
// when task type is incorrect mapping, then return empty string
ProcessData input3 = new ProcessData();
TaskNode taskNode3 = new TaskNode();
taskNode3.setType("notExistType");
input3.setTasks(Collections.singletonList(taskNode3));
String output3 = (String) testMethod.invoke(processDefinitionService, input3);
assertThat(output3).isEmpty();
// when task parameter list is null, then return empty string
ProcessData input4 = new ProcessData();
TaskNode taskNode4 = new TaskNode();
taskNode4.setType("SHELL");
taskNode4.setParams(null);
input4.setTasks(Collections.singletonList(taskNode4));
String output4 = (String) testMethod.invoke(processDefinitionService, input4);
assertThat(output4).isEmpty();
// when resource id list is 0 1, then return 0,1
ProcessData input5 = new ProcessData();
TaskNode taskNode5 = new TaskNode();
taskNode5.setType("SHELL");
ShellParameters shellParameters5 = new ShellParameters();
ResourceInfo resourceInfo5A = new ResourceInfo();
resourceInfo5A.setId(0);
ResourceInfo resourceInfo5B = new ResourceInfo();
resourceInfo5B.setId(1);
shellParameters5.setResourceList(Arrays.asList(resourceInfo5A, resourceInfo5B));
taskNode5.setParams(JSONUtils.toJsonString(shellParameters5));
input5.setTasks(Collections.singletonList(taskNode5));
String output5 = (String) testMethod.invoke(processDefinitionService, input5);
assertThat(output5.split(",")).hasSize(2)
.containsExactlyInAnyOrder("0", "1");
// when resource id list is 0 1 1 2, then return 0,1,2
ProcessData input6 = new ProcessData();
TaskNode taskNode6 = new TaskNode();
taskNode6.setType("SHELL");
ShellParameters shellParameters6 = new ShellParameters();
ResourceInfo resourceInfo6A = new ResourceInfo();
resourceInfo6A.setId(0);
ResourceInfo resourceInfo6B = new ResourceInfo();
resourceInfo6B.setId(1);
ResourceInfo resourceInfo6C = new ResourceInfo();
resourceInfo6C.setId(1);
ResourceInfo resourceInfo6D = new ResourceInfo();
resourceInfo6D.setId(2);
shellParameters6.setResourceList(Arrays.asList(resourceInfo6A, resourceInfo6B, resourceInfo6C, resourceInfo6D));
taskNode6.setParams(JSONUtils.toJsonString(shellParameters6));
input6.setTasks(Collections.singletonList(taskNode6));
String output6 = (String) testMethod.invoke(processDefinitionService, input6);
assertThat(output6.split(",")).hasSize(3)
.containsExactlyInAnyOrder("0", "1", "2");
}
/**
* get mock datasource
*