diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java b/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java index 3825127b4a..58bedf5a18 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/service/ApiExecuteService.java @@ -176,9 +176,13 @@ public class ApiExecuteService { throw new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage()); } catch (MSException e) { handleDoExecuteException(scriptRedisKey, e); + // 集合报告对应的资源池集合移除 + removeCollectionReport(taskRequest); throw e; } catch (Exception e) { handleDoExecuteException(scriptRedisKey, e); + // 集合报告对应的资源池集合移除 + removeCollectionReport(taskRequest); throw new MSException(RESOURCE_POOL_EXECUTE_ERROR, e.getMessage()); } } @@ -189,6 +193,14 @@ public class ApiExecuteService { stringRedisTemplate.delete(scriptRedisKey); } + private void removeCollectionReport(TaskRequestDTO taskRequest) { + // 集合报告对应的资源池集合移除 + if (taskRequest.getRunModeConfig().getIntegratedReport()) { + String SET_PREFIX = "set:" + taskRequest.getRunModeConfig().getCollectionReport().getReportId(); + stringRedisTemplate.opsForSet().remove(SET_PREFIX, taskRequest.getResourceId()); + } + } + private GlobalParams getGlobalParam(String projectId) { GlobalParamsDTO globalParamsDTO = globalParamsService.get(projectId); if (globalParamsDTO != null) { @@ -205,7 +217,11 @@ public class ApiExecuteService { private TaskRequestDTO doExecute(TaskRequestDTO taskRequest) throws Exception { // 获取资源池 TestResourcePoolReturnDTO testResourcePoolDTO = getGetResourcePoolNodeDTO(taskRequest.getRunModeConfig(), taskRequest.getProjectId()); + if (testResourcePoolDTO == null || CollectionUtils.isEmpty(testResourcePoolDTO.getTestResourceReturnDTO().getNodesList())) { + throw new MSException(ApiResultCode.EXECUTE_RESOURCE_POOL_NOT_CONFIG); + } TestResourceNodeDTO testResourceNodeDTO = getProjectExecuteNode(testResourcePoolDTO); + if (StringUtils.isNotBlank(testResourcePoolDTO.getServerUrl())) { // 如果资源池配置了当前站点,则使用资源池的 taskRequest.setMsUrl(testResourcePoolDTO.getServerUrl()); @@ -231,14 +247,18 @@ public class ApiExecuteService { private TestResourceNodeDTO getProjectExecuteNode(TestResourcePoolReturnDTO resourcePoolDTO) { roundRobinService.initializeNodes(resourcePoolDTO.getId(), resourcePoolDTO.getTestResourceReturnDTO().getNodesList()); try { - return roundRobinService.getNextNode(resourcePoolDTO.getId()); + TestResourceNodeDTO node = roundRobinService.getNextNode(resourcePoolDTO.getId()); + if (node == null) { + node = resourcePoolDTO.getTestResourceReturnDTO().getNodesList().getFirst(); + } + return node; } catch (Exception e) { LogUtils.error(e); throw new MSException("get execute node error", e); } } - private TestResourcePoolReturnDTO getGetResourcePoolNodeDTO(ApiRunModeConfigDTO runModeConfig, String projectId) { + public TestResourcePoolReturnDTO getGetResourcePoolNodeDTO(ApiRunModeConfigDTO runModeConfig, String projectId) { String poolId = runModeConfig.getPoolId(); if (StringUtils.isBlank(poolId)) { poolId = getProjectApiResourcePoolId(projectId); diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioBatchRunService.java b/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioBatchRunService.java index 4f432d080e..3abcc34cef 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioBatchRunService.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioBatchRunService.java @@ -1,6 +1,9 @@ package io.metersphere.api.service.scenario; -import io.metersphere.api.domain.*; +import io.metersphere.api.domain.ApiScenario; +import io.metersphere.api.domain.ApiScenarioRecord; +import io.metersphere.api.domain.ApiScenarioReport; +import io.metersphere.api.domain.ApiScenarioReportStep; import io.metersphere.api.dto.ApiScenarioParamConfig; import io.metersphere.api.dto.ApiScenarioParseTmpParam; import io.metersphere.api.dto.debug.ApiResourceRunRequest; @@ -9,11 +12,14 @@ import io.metersphere.api.dto.scenario.ApiScenarioBatchRunRequest; import io.metersphere.api.dto.scenario.ApiScenarioDetail; import io.metersphere.api.dto.scenario.ApiScenarioParseParam; import io.metersphere.api.dto.scenario.ApiScenarioStepDTO; -import io.metersphere.api.mapper.*; +import io.metersphere.api.mapper.ApiScenarioReportMapper; +import io.metersphere.api.mapper.ExtApiScenarioMapper; import io.metersphere.api.service.ApiBatchRunBaseService; import io.metersphere.api.service.ApiExecuteService; import io.metersphere.api.service.queue.ApiExecutionQueueService; import io.metersphere.api.service.queue.ApiExecutionSetService; +import io.metersphere.api.utils.ExecTask; +import io.metersphere.api.utils.TaskRunnerUtils; import io.metersphere.sdk.constants.*; import io.metersphere.sdk.dto.api.task.ApiRunModeConfigDTO; import io.metersphere.sdk.dto.api.task.CollectionReportDTO; @@ -24,11 +30,13 @@ import io.metersphere.sdk.util.BeanUtils; import io.metersphere.sdk.util.DateUtils; import io.metersphere.sdk.util.LogUtils; import io.metersphere.sdk.util.SubListUtils; +import io.metersphere.system.dto.pool.TestResourcePoolReturnDTO; import io.metersphere.system.uid.IDGenerator; import jakarta.annotation.Resource; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -61,6 +69,9 @@ public class ApiScenarioBatchRunService { @Resource private ExtApiScenarioMapper extApiScenarioMapper; + @Value("${spring.datasource.hikari.maximum-pool-size}") + private int maximumPoolSize; + /** * 异步批量执行 * @@ -137,39 +148,29 @@ public class ApiScenarioBatchRunService { // 集成报告,执行前先设置成 RUNNING setRunningIntegrateReport(runModeConfig); - AtomicInteger errorCount = new AtomicInteger(); - // 这里ID顺序和队列的ID顺序保持一致 - for (String id : ids) { + TestResourcePoolReturnDTO testResourcePoolDTO = apiExecuteService.getGetResourcePoolNodeDTO(runModeConfig, request.getProjectId()); + List apiScenarioDetails = apiScenarioService.getForRuns(ids); - String reportId = null; - try { - ApiScenarioDetail apiScenarioDetail = apiScenarioService.getForRun(id); - if (apiScenarioDetail == null) { - if (runModeConfig.isIntegratedReport()) { - // 用例不存在,则在执行集合中删除 - apiExecutionSetService.removeItem(runModeConfig.getCollectionReport().getReportId(), id); - } - LogUtils.info("当前执行任务的用例已删除 {}", id); - continue; - } + if (StringUtils.isNotBlank(testResourcePoolDTO.getServerUrl())) { + // 独立部署执行专属服务,线程池执行 + TaskRunnerUtils.setThreadPoolSize(maximumPoolSize / 2 - 10); + apiScenarioDetails.forEach(apiScenarioDetail -> { + ExecTask execTask = new ExecTask(this, apiScenarioDetail, scenarioReportMap, runModeConfig); + TaskRunnerUtils.executeThreadPool(execTask); + }); + } else { + // 未独立部署执行专属引用则使用默认循环分发任务 + apiScenarioDetails.forEach(apiScenarioDetail -> execute(apiScenarioDetail, scenarioReportMap, runModeConfig)); + } + } - if (runModeConfig.isIntegratedReport()) { - // 集成报告生成虚拟的报告ID - reportId = IDGenerator.nextStr(); - } else { - reportId = scenarioReportMap.get(id); - } - - TaskRequestDTO taskRequest = getTaskRequestDTO(reportId, apiScenarioDetail, runModeConfig); - execute(taskRequest, apiScenarioDetail); - } catch (Exception e) { - LogUtils.error("执行用例失败 {}-{}", reportId, id); - LogUtils.error(e); - if (errorCount.getAndIncrement() > 10) { - LogUtils.error("批量执行用例失败,错误次数超过10次,停止执行"); - return; - } - } + public void execute(ApiScenarioDetail apiScenarioDetail, Map scenarioReportMap, ApiRunModeConfigDTO runModeConfig) { + try { + String reportId = runModeConfig.isIntegratedReport() ? IDGenerator.nextStr() : scenarioReportMap.get(apiScenarioDetail.getId()); + TaskRequestDTO taskRequest = getTaskRequestDTO(reportId, apiScenarioDetail, runModeConfig); + execute(taskRequest, apiScenarioDetail); + } catch (Exception e) { + LogUtils.error("执行用例失败 {}", apiScenarioDetail.getId(), e); } } @@ -422,7 +423,7 @@ public class ApiScenarioBatchRunService { if (queueDetail == null) { return; } - Long requestCount = 0L; + long requestCount = 0L; while (queueDetail != null) { ApiScenarioDetail apiScenarioDetail = apiScenarioService.getForRun(queueDetail.getResourceId()); if (apiScenarioDetail == null) { diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioService.java b/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioService.java index 652942d242..a2a25f2d37 100644 --- a/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioService.java +++ b/backend/services/api-test/src/main/java/io/metersphere/api/service/scenario/ApiScenarioService.java @@ -561,7 +561,7 @@ public class ApiScenarioService extends MoveNodeService { } private List getCsvVariables(ScenarioConfig scenarioConfig) { - if (scenarioConfig == null ||scenarioConfig.getVariable() == null || scenarioConfig.getVariable().getCsvVariables() == null) { + if (scenarioConfig == null || scenarioConfig.getVariable() == null || scenarioConfig.getVariable().getCsvVariables() == null) { return List.of(); } return scenarioConfig.getVariable().getCsvVariables(); @@ -2187,6 +2187,12 @@ public class ApiScenarioService extends MoveNodeService { return apiScenarioDetail; } + public List getForRuns(List scenarioIds) { + List apiScenarioDetails = list(scenarioIds); + apiScenarioDetails.forEach(apiScenarioDetail -> apiScenarioDetail.setSteps(filerDisableSteps(apiScenarioDetail.getSteps()))); + return apiScenarioDetails; + } + /** * 过滤掉禁用的步骤 */ @@ -2258,9 +2264,80 @@ public class ApiScenarioService extends MoveNodeService { return apiScenarioDetail; } + + public List list(List scenarioIds) { + List list = new LinkedList<>(); + + ApiScenarioExample example = new ApiScenarioExample(); + example.createCriteria().andIdIn(scenarioIds).andDeletedEqualTo(false); + List apiScenarios = apiScenarioMapper.selectByExample(example); + + + ApiScenarioBlobExample blobExample = new ApiScenarioBlobExample(); + blobExample.createCriteria().andIdIn(scenarioIds); + List apiScenarioBlobs = apiScenarioBlobMapper.selectByExampleWithBLOBs(blobExample); + Map scenarioMap = apiScenarioBlobs.stream() + .collect(Collectors.toMap(ApiScenarioBlob::getId, item -> item)); + + apiScenarios.forEach(apiScenario -> { + ApiScenarioDetail apiScenarioDetail = BeanUtils.copyBean(new ApiScenarioDetail(), apiScenario); + apiScenarioDetail.setSteps(List.of()); + ApiScenarioBlob apiScenarioBlob = scenarioMap.get(apiScenario.getId()); + + if (apiScenarioBlob != null) { + apiScenarioDetail.setScenarioConfig(JSON.parseObject(new String(apiScenarioBlob.getConfig()), ScenarioConfig.class)); + } + + //存放csv变量 + apiScenarioDetail.getScenarioConfig().getVariable().setCsvVariables(getCsvVariables(apiScenario.getId())); + + // 获取所有步骤 + List allSteps = getAllStepsByScenarioIds(List.of(apiScenario.getId())) + .stream() + .distinct() // 这里可能存在多次引用相同场景,步骤可能会重复,去重 + .collect(Collectors.toList()); + + // 设置步骤的 csvIds + setStepCsvIds(apiScenario.getId(), allSteps); + + // 构造 map,key 为场景ID,value 为步骤列表 + Map> scenarioStepMap = allSteps.stream() + .collect(Collectors.groupingBy(step -> Optional.ofNullable(step.getScenarioId()).orElse(StringUtils.EMPTY))); + + // key 为父步骤ID,value 为子步骤列表 + if (MapUtils.isEmpty(scenarioStepMap)) { + list.add(apiScenarioDetail); + return; + } + + Map> currentScenarioParentStepMap = scenarioStepMap.get(apiScenario.getId()) + .stream() + .collect(Collectors.groupingBy(step -> { + if (StringUtils.equals(step.getParentId(), "NONE")) { + step.setParentId(StringUtils.EMPTY); + } + return Optional.ofNullable(step.getParentId()).orElse(StringUtils.EMPTY); + })); + + List steps = buildStepTree(currentScenarioParentStepMap.get(StringUtils.EMPTY), currentScenarioParentStepMap, scenarioStepMap, new HashSet<>()); + + // 查询步骤详情 + Map stepDetailMap = getPartialRefStepDetailMap(allSteps); + + // 设置部分引用的步骤的启用状态 + setPartialRefStepsEnable(steps, stepDetailMap); + + apiScenarioDetail.setSteps(steps); + + list.add(apiScenarioDetail); + }); + + return list; + } + private void setStepCsvIds(String scenarioId, List allSteps) { List refScenarioIds = allSteps.stream() - .filter(step -> isRefOrPartialScenario(step)) + .filter(this::isRefOrPartialScenario) .map(ApiScenarioStepCommonDTO::getResourceId) .collect(Collectors.toList()); refScenarioIds.add(scenarioId); diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/utils/ExecTask.java b/backend/services/api-test/src/main/java/io/metersphere/api/utils/ExecTask.java new file mode 100644 index 0000000000..74bbe888d0 --- /dev/null +++ b/backend/services/api-test/src/main/java/io/metersphere/api/utils/ExecTask.java @@ -0,0 +1,23 @@ +package io.metersphere.api.utils; + +import io.metersphere.api.dto.scenario.ApiScenarioDetail; +import io.metersphere.api.service.scenario.ApiScenarioBatchRunService; +import io.metersphere.sdk.dto.api.task.ApiRunModeConfigDTO; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.Map; + +@Data +@AllArgsConstructor +public class ExecTask implements Runnable { + private ApiScenarioBatchRunService apiScenarioBatchRunService; + private ApiScenarioDetail detail; + private Map scenarioReportMap; + private ApiRunModeConfigDTO runModeConfig; + + @Override + public void run() { + apiScenarioBatchRunService.execute(detail, scenarioReportMap, runModeConfig); + } +} diff --git a/backend/services/api-test/src/main/java/io/metersphere/api/utils/TaskRunnerUtils.java b/backend/services/api-test/src/main/java/io/metersphere/api/utils/TaskRunnerUtils.java new file mode 100644 index 0000000000..b13fc1c1e0 --- /dev/null +++ b/backend/services/api-test/src/main/java/io/metersphere/api/utils/TaskRunnerUtils.java @@ -0,0 +1,52 @@ +package io.metersphere.api.utils; + +import io.metersphere.sdk.util.LogUtils; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class TaskRunnerUtils { + + // 线程池维护线程的最大数量 + private final static int MAX_POOL_SIZE = 10; + // 线程池维护线程所允许的空闲时间 + private final static int KEEP_ALIVE_TIME = 1; + // 线程池所使用的缓冲队列大小 + private final static int WORK_QUEUE_SIZE = 50000; + + private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( + MAX_POOL_SIZE, + MAX_POOL_SIZE, + KEEP_ALIVE_TIME, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(WORK_QUEUE_SIZE)); + + public static void executeThreadPool(ExecTask task) { + try { + // 开始执行任务 + threadPool.execute(task); + + LogUtils.info("当前线程池活跃线程数量:{},当前线程池线程数量:{},当前线程池队列数量:{}", + threadPool.getActiveCount(), + threadPool.getPoolSize(), + threadPool.getQueue().size()); + } catch (Exception e) { + LogUtils.error("KAFKA消费失败:", e); + } + } + + public static void setThreadPoolSize(int poolSize) { + try { + if (poolSize > 10 && poolSize < 500 && poolSize != threadPool.getMaximumPoolSize()) { + threadPool.setMaximumPoolSize(poolSize); + threadPool.setCorePoolSize(poolSize); + threadPool.allowCoreThreadTimeOut(true); + LogUtils.info("Set successfully: " + threadPool.prestartAllCoreThreads()); + } + LogUtils.info("Invalid thread pool size: " + poolSize); + } catch (Exception e) { + LogUtils.error("设置线程参数异常", e); + } + } +}