feat(接口测试): 增加 k8s 执行调度逻辑

This commit is contained in:
fit2-zhao 2024-10-08 15:07:06 +08:00 committed by Craftsman
parent 336e845a61
commit f899fc8bf9
9 changed files with 212 additions and 87 deletions

View File

@ -94,6 +94,13 @@
</exclusion>
</exclusions>
</dependency>
<!-- k8s client -->
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -1,9 +0,0 @@
package io.metersphere.api.engine;
public interface ApiEngine {
void start();
void stop();
}

View File

@ -1,32 +0,0 @@
package io.metersphere.api.engine;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.util.LogUtils;
import org.apache.commons.beanutils.ConstructorUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.reflections.Reflections;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.lang.reflect.InvocationTargetException;
import java.util.Set;
@Service
@Transactional(rollbackFor = Exception.class)
public class EngineFactory {
private static Class<? extends ApiEngine> apiEngine = null;
static {
Set<Class<? extends ApiEngine>> subTypes = new Reflections("io.metersphere.xpack.engine.api").getSubTypesOf(ApiEngine.class);
if (CollectionUtils.isNotEmpty(subTypes)) {
apiEngine = subTypes.stream().findFirst().get();
}
}
public static ApiEngine createApiEngine(TaskRequestDTO request)
throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException {
LogUtils.info("创建K8s client");
return ConstructorUtils.invokeConstructor(apiEngine, request);
}
}

View File

@ -0,0 +1,55 @@
package io.metersphere.api.engine;
import io.metersphere.engine.ApiEngine;
import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.system.dto.pool.TestResourceDTO;
import java.util.List;
public class KubernetesExecEngine implements ApiEngine {
/**
* 任务请求参数 @LINK TaskRequestDTO or TaskBatchRequestDTO or List<String>
*/
private final Object request;
private final TestResourceDTO resource;
public KubernetesExecEngine(TaskRequestDTO request, TestResourceDTO resource) {
this.request = request;
this.resource = resource;
}
public KubernetesExecEngine(TaskBatchRequestDTO batchRequestDTO, TestResourceDTO resource) {
this.resource = resource;
this.request = batchRequestDTO;
}
public KubernetesExecEngine(List<String> reportIds, TestResourceDTO resource) {
this.resource = resource;
this.request = reportIds;
}
@Override
public void execute(String command) {
// 初始化任务
LogUtils.info("K8s 开始执行: {}", command);
this.runApi(command);
}
private void runApi(String command) {
try {
KubernetesProvider.exec(resource, request, command);
} catch (Exception e) {
LogUtils.error("K8S 执行异常:", e);
rollbackOnFailure(); // 错误处理逻辑
}
}
// 错误回滚处理
private void rollbackOnFailure() {
// TODO: 实现回滚处理逻辑
LogUtils.info("执行失败,回滚操作启动。");
}
}

View File

@ -0,0 +1,91 @@
package io.metersphere.api.engine;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.metersphere.sdk.exception.MSException;
import io.metersphere.sdk.util.JSON;
import io.metersphere.sdk.util.LogUtils;
import io.metersphere.system.dto.pool.TestResourceDTO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class KubernetesProvider {
private static final String RUNNING_PHASE = "Running";
private static final String SHELL_COMMAND = "sh";
public static KubernetesClient getKubernetesClient(TestResourceDTO credential) {
ConfigBuilder configBuilder = new ConfigBuilder()
.withMasterUrl(credential.getIp())
.withOauthToken(credential.getToken())
.withTrustCerts(true)
.withNamespace(credential.getNamespace());
return new KubernetesClientBuilder()
.withConfig(configBuilder.build())
.build();
}
public static Pod getExecPod(KubernetesClient client, TestResourceDTO credential) {
List<Pod> pods = client.pods().inNamespace(credential.getNamespace()).list().getItems();
if (CollectionUtils.isEmpty(pods)) {
throw new MSException("Execution node not found");
}
List<Pod> nodePods = pods.stream()
.filter(s -> RUNNING_PHASE.equals(s.getStatus().getPhase())
&& StringUtils.startsWith(s.getMetadata().getGenerateName(), credential.getDeployName()))
.toList();
if (CollectionUtils.isEmpty(nodePods)) {
throw new MSException("Execution node not found");
}
return nodePods.get(ThreadLocalRandom.current().nextInt(nodePods.size()));
}
public static void exec(TestResourceDTO resource, Object runRequest, String command) {
try (KubernetesClient client = getKubernetesClient(resource)) {
Pod pod = getExecPod(client, resource);
LogUtils.info("CURL 命令:【 " + command + "");
client.pods().inNamespace(client.getNamespace()).withName(pod.getMetadata().getName())
.redirectingInput()
.writingOutput(System.out)
.writingError(System.err)
.withTTY()
.usingListener(new SimpleListener(runRequest))
.exec(SHELL_COMMAND, "-c", command);
} catch (Exception e) {
throw new MSException("Error during Kubernetes execution: " + e.getMessage(), e);
}
}
private record SimpleListener(Object runRequest) implements ExecListener {
@Override
public void onOpen() {
LogUtils.info("K8s 开启监听");
}
@Override
public void onFailure(Throwable t, Response response) {
LogUtils.error("K8s 监听失败", t);
if (runRequest != null) {
LogUtils.info("请求参数:{}", JSON.toJSONString(runRequest));
// TODO: Add proper error handling based on response or task request details
} else {
throw new MSException("K8S 节点执行错误:" + t.getMessage(), t);
}
}
@Override
public void onClose(int code, String reason) {
LogUtils.info("K8s 监听关闭code=" + code + ", reason=" + reason);
// No additional actions needed for now
}
}
}

View File

@ -1,10 +1,5 @@
package io.metersphere.api.controller;
import io.metersphere.api.engine.ApiEngine;
import io.metersphere.api.engine.EngineFactory;
import io.metersphere.sdk.constants.ResourcePoolTypeEnum;
import io.metersphere.sdk.dto.api.task.ApiRunModeConfigDTO;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.util.BeanUtils;
import io.metersphere.sdk.util.CommonBeanFactory;
import io.metersphere.sdk.util.JSON;
@ -22,8 +17,6 @@ import io.metersphere.system.utils.SessionUtils;
import jakarta.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
@ -158,17 +151,4 @@ public class KubernetesEngineTests extends BaseTest {
}
testResourcePool.setDeleted(false);
}
@Test
@Order(0)
public void pluginSubTypeTest() throws Exception {
String id = this.addPool(ResourcePoolTypeEnum.K8S.name());
TaskRequestDTO request = new TaskRequestDTO();
ApiRunModeConfigDTO runModeConfig = new ApiRunModeConfigDTO();
runModeConfig.setPoolId(id);
request.getTaskInfo().setRunModeConfig(runModeConfig);
final ApiEngine engine = EngineFactory.createApiEngine(request);
engine.start();
}
}

View File

@ -0,0 +1,57 @@
package io.metersphere.api.k8s;
import io.metersphere.engine.EngineFactory;
import io.metersphere.sdk.dto.api.task.TaskBatchRequestDTO;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.system.base.BaseTest;
import io.metersphere.system.dto.pool.TestResourceDTO;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.ArrayList;
import java.util.List;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@AutoConfigureMockMvc
public class KubernetesExecTests extends BaseTest {
@Test
@Order(0)
public void debugApi() throws Exception {
TaskRequestDTO request = new TaskRequestDTO();
TestResourceDTO resource = new TestResourceDTO();
EngineFactory.debugApi(request, resource);
}
@Test
@Order(1)
public void runApi() throws Exception {
TaskRequestDTO request = new TaskRequestDTO();
TestResourceDTO resource = new TestResourceDTO();
EngineFactory.runApi(request, resource);
}
@Test
@Order(2)
public void batchRunApi() throws Exception {
TaskBatchRequestDTO request = new TaskBatchRequestDTO();
TestResourceDTO resource = new TestResourceDTO();
EngineFactory.batchRunApi(request, resource);
}
@Test
@Order(3)
public void stop() throws Exception {
List<String> request = new ArrayList<>();
TestResourceDTO resource = new TestResourceDTO();
EngineFactory.stopApi(request, resource);
}
}

View File

@ -1,24 +0,0 @@
package io.metersphere.xpack.engine.api;
import io.metersphere.api.engine.ApiEngine;
import io.metersphere.sdk.dto.api.task.TaskRequestDTO;
import io.metersphere.sdk.util.LogUtils;
public class KubernetesApiEngin implements ApiEngine {
// 初始化API调用
public KubernetesApiEngin(TaskRequestDTO request) {
LogUtils.info("init k8s client");
}
@Override
public void start() {
LogUtils.info("k8s执行START");
}
@Override
public void stop() {
LogUtils.info("K8S执行STOP");
}
}

View File

@ -62,7 +62,7 @@
<dingtalk-client.version>2.0.77</dingtalk-client.version>
<minio.version>8.5.9</minio.version>
<commons-collections4.version>4.4</commons-collections4.version>
<kubernetes-client.version>6.8.0</kubernetes-client.version>
<kubernetes-client.version>6.13.4</kubernetes-client.version>
<jgit.version>6.8.0.202311291450-r</jgit.version>
<embedded.version>3.1.6</embedded.version>
<otp-java.version>2.0.1</otp-java.version>
@ -84,7 +84,7 @@
<commons-jexl.version>2.1.1</commons-jexl.version>
<commons-jexl3.version>3.3</commons-jexl3.version>
<revision>3.x</revision>
<monitoring-engine.revision>3.1</monitoring-engine.revision>
<monitoring-engine.revision>3.2</monitoring-engine.revision>
<swagger-core-jakarta.revision>2.2.20</swagger-core-jakarta.revision>
</properties>