[Feature-14421][K8S Task] Configurable image pull policy (#14426)

This commit is contained in:
Aaron Wang 2023-07-05 09:17:47 +08:00 committed by GitHub
parent d8cf0f8b85
commit 65b6a4b097
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 82 additions and 29 deletions

View File

@ -16,17 +16,18 @@ K8S task type used to execute a batch task. In this task, the worker submits the
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
| **Parameter** | **Description** |
|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Namespace | The namespace for running k8s task. |
| Min CPU | Minimum CPU requirement for running k8s task. |
| Min Memory | Minimum memory requirement for running k8s task. |
| Image | The registry url for image. |
| Command | The container execution command (yaml-style array), for example: ["printenv"] |
| Args | The args of execution command (yaml-style array), for example: ["HOSTNAME", "KUBERNETES_PORT"] |
| Custom label | The customized labels for k8s Job. |
| Node selector | The label selectors for running k8s pod. Different value in value set should be seperated by comma, for example: `value1,value2`. You can refer to https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/node-selector-requirement/ for configuration of different operators. |
| Custom parameter | It is a local user-defined parameter for K8S task, these params will pass to container as environment variables. |
| **Parameter** | **Description** |
|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Namespace | The namespace for running k8s task. |
| Min CPU | Minimum CPU requirement for running k8s task. |
| Min Memory | Minimum memory requirement for running k8s task. |
| Image | The registry url for image. |
| Image Pull Policy | The image pull policy for image. |
| Command | The container execution command (yaml-style array), for example: ["printenv"] |
| Args | The args of execution command (yaml-style array), for example: ["HOSTNAME", "KUBERNETES_PORT"] |
| Custom label | The customized labels for k8s Job. |
| Node selector | The label selectors for running k8s pod. Different value in value set should be seperated by comma, for example: `value1,value2`. You can refer to https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/node-selector-requirement/ for configuration of different operators. |
| Custom parameter | It is a local user-defined parameter for K8S task, these params will pass to container as environment variables. |
## Task Example

View File

@ -22,6 +22,7 @@ kubernetes任务类型用于在kubernetes上执行一个短时和批处理的
| 最小CPU | 任务在kubernetes上运行所需的最小CPU |
| 最小内存 | 任务在kubernetes上运行所需的最小内存 |
| 镜像 | 镜像地址 |
| 镜像拉取策略 | 镜像的拉取策略 |
| 容器执行命令 | 容器执行命令yaml格式数组例如["printenv"] |
| 执行命令参数 | 执行命令参数yaml格式数组例如["HOSTNAME", "KUBERNETES_PORT"] |
| 自定义标签 | 作业自定义标签 |

View File

@ -34,6 +34,7 @@ public class K8sTaskMainParameters {
private String args;
private String namespaceName;
private String clusterName;
private String imagePullPolicy;
private double minCpuCores;
private double minMemorySpace;
private Map<String, String> paramsMap;

View File

@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.IMAGE_PULL_POLICY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL_VALUE;
@ -56,6 +55,8 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
import io.fabric8.kubernetes.api.model.Quantity;
@ -82,6 +83,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
String image = k8STaskMainParameters.getImage();
String namespaceName = k8STaskMainParameters.getNamespaceName();
String imagePullPolicy = k8STaskMainParameters.getImagePullPolicy();
Map<String, String> otherParams = k8STaskMainParameters.getParamsMap();
Double podMem = k8STaskMainParameters.getMinMemorySpace();
Double podCpu = k8STaskMainParameters.getMinCpuCores();
@ -129,7 +131,16 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
NodeSelectorTerm nodeSelectorTerm = new NodeSelectorTerm();
nodeSelectorTerm.setMatchExpressions(k8STaskMainParameters.getNodeSelectorRequirements());
return new JobBuilder()
Affinity affinity = k8STaskMainParameters.getNodeSelectorRequirements().size() == 0 ? null
: new AffinityBuilder()
.withNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTermLike(nodeSelectorTerm)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity().build();
JobBuilder jobBuilder = new JobBuilder()
.withApiVersion(API_VERSION)
.withNewMetadata()
.withName(k8sJobName)
@ -145,24 +156,18 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
.withImage(image)
.withCommand(commands.size() == 0 ? null : commands)
.withArgs(args.size() == 0 ? null : args)
.withImagePullPolicy(IMAGE_PULL_POLICY)
.withImagePullPolicy(imagePullPolicy)
.withResources(new ResourceRequirements(limitRes, reqRes))
.withEnv(envVars)
.endContainer()
.withRestartPolicy(RESTART_POLICY)
.withNewAffinity()
.withNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTermLike(nodeSelectorTerm)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity()
.endAffinity()
.withAffinity(affinity)
.endSpec()
.endTemplate()
.withBackoffLimit(retryNum)
.endSpec()
.build();
.endSpec();
return jobBuilder.build();
}
public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse,

View File

@ -37,11 +37,12 @@ public class K8sTaskParameters extends AbstractParameters {
private String image;
private String namespace;
private String command;
private List<Label> customizedLabels;
private List<NodeSelectorExpression> nodeSelectors;
private String args;
private String imagePullPolicy;
private double minCpuCores;
private double minMemorySpace;
private List<Label> customizedLabels;
private List<NodeSelectorExpression> nodeSelectors;
@Override
public boolean checkParameters() {

View File

@ -44,6 +44,7 @@ public class K8sTaskExecutorTest {
private K8sTaskExecutor k8sTaskExecutor = null;
private K8sTaskMainParameters k8sTaskMainParameters = null;
private final String image = "ds-dev";
private final String imagePullPolicy = "IfNotPresent";
private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
private final double minCpuCores = 2;
private final double minMemorySpace = 10;
@ -68,6 +69,7 @@ public class K8sTaskExecutorTest {
k8sTaskExecutor = new K8sTaskExecutor(null, taskRequest);
k8sTaskMainParameters = new K8sTaskMainParameters();
k8sTaskMainParameters.setImage(image);
k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy);
k8sTaskMainParameters.setNamespaceName(namespaceName);
k8sTaskMainParameters.setClusterName(clusterName);
k8sTaskMainParameters.setMinCpuCores(minCpuCores);

View File

@ -97,6 +97,7 @@ public class K8sTask extends AbstractK8sTask {
.setNodeSelectorRequirements(convertToNodeSelectorRequirements(k8sTaskParameters.getNodeSelectors()));
k8sTaskMainParameters.setCommand(k8sTaskParameters.getCommand());
k8sTaskMainParameters.setArgs(k8sTaskParameters.getArgs());
k8sTaskMainParameters.setImagePullPolicy(k8sTaskParameters.getImagePullPolicy());
return JSONUtils.toJsonString(k8sTaskMainParameters);
}

View File

@ -32,6 +32,7 @@ public class K8sParametersTest {
private K8sTaskParameters k8sTaskParameters = null;
private final String image = "ds-dev";
private final String imagePullPolicy = "IfNotPresent";
private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
private final double minCpuCores = 2;
private final double minMemorySpace = 10;
@ -45,6 +46,7 @@ public class K8sParametersTest {
public void before() {
k8sTaskParameters = new K8sTaskParameters();
k8sTaskParameters.setImage(image);
k8sTaskParameters.setImagePullPolicy(imagePullPolicy);
k8sTaskParameters.setNamespace(namespace);
k8sTaskParameters.setMinCpuCores(minCpuCores);
k8sTaskParameters.setMinMemorySpace(minMemorySpace);
@ -68,6 +70,7 @@ public class K8sParametersTest {
@Test
public void testK8sParameters() {
Assertions.assertEquals(image, k8sTaskParameters.getImage());
Assertions.assertEquals(imagePullPolicy, k8sTaskParameters.getImagePullPolicy());
Assertions.assertEquals(namespace, k8sTaskParameters.getNamespace());
Assertions.assertEquals(0, Double.compare(minCpuCores, k8sTaskParameters.getMinCpuCores()));
Assertions.assertEquals(0, Double.compare(minMemorySpace, k8sTaskParameters.getMinMemorySpace()));

View File

@ -44,6 +44,7 @@ public class K8sTaskTest {
private K8sTask k8sTask = null;
private final String image = "ds-dev";
private final String imagePullPolicy = "IfNotPresent";
private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
@ -65,6 +66,7 @@ public class K8sTaskTest {
public void before() {
k8sTaskParameters = new K8sTaskParameters();
k8sTaskParameters.setImage(image);
k8sTaskParameters.setImagePullPolicy(imagePullPolicy);
k8sTaskParameters.setNamespace(namespace);
k8sTaskParameters.setMinCpuCores(minCpuCores);
k8sTaskParameters.setMinMemorySpace(minMemorySpace);
@ -97,7 +99,7 @@ public class K8sTaskTest {
@Test
public void testBuildCommandNormal() {
String expectedStr =
"{\"image\":\"ds-dev\",\"command\":\"[\\\"/bin/bash\\\", \\\"-c\\\"]\",\"args\":\"[\\\"echo hello world\\\"]\",\"namespaceName\":\"default\",\"clusterName\":\"lab\",\"minCpuCores\":2.0,\"minMemorySpace\":10.0,\"paramsMap\":{\"day\":\"20220507\"},\"labelMap\":{\"test\":\"1234\"},\"nodeSelectorRequirements\":[{\"key\":\"node-label\",\"operator\":\"In\",\"values\":[\"1234\",\"12345\"]}]}";
"{\"image\":\"ds-dev\",\"command\":\"[\\\"/bin/bash\\\", \\\"-c\\\"]\",\"args\":\"[\\\"echo hello world\\\"]\",\"namespaceName\":\"default\",\"clusterName\":\"lab\",\"imagePullPolicy\":\"IfNotPresent\",\"minCpuCores\":2.0,\"minMemorySpace\":10.0,\"paramsMap\":{\"day\":\"20220507\"},\"labelMap\":{\"test\":\"1234\"},\"nodeSelectorRequirements\":[{\"key\":\"node-label\",\"operator\":\"In\",\"values\":[\"1234\",\"12345\"]}]}";
String commandStr = k8sTask.buildCommand();
Assertions.assertEquals(expectedStr, commandStr);
}
@ -105,7 +107,7 @@ public class K8sTaskTest {
@Test
public void testGetParametersNormal() {
String expectedStr =
"K8sTaskParameters(image=ds-dev, namespace={\"name\":\"default\",\"cluster\":\"lab\"}, command=[\"/bin/bash\", \"-c\"], customizedLabels=[Label(label=test, value=1234)], nodeSelectors=[NodeSelectorExpression(key=node-label, operator=In, values=1234,12345)], args=[\"echo hello world\"], minCpuCores=2.0, minMemorySpace=10.0)";
"K8sTaskParameters(image=ds-dev, namespace={\"name\":\"default\",\"cluster\":\"lab\"}, command=[\"/bin/bash\", \"-c\"], args=[\"echo hello world\"], imagePullPolicy=IfNotPresent, minCpuCores=2.0, minMemorySpace=10.0, customizedLabels=[Label(label=test, value=1234)], nodeSelectors=[NodeSelectorExpression(key=node-label, operator=In, values=1234,12345)])";
String result = k8sTask.getParameters().toString();
Assertions.assertEquals(expectedStr, result);
}

View File

@ -386,6 +386,8 @@ export default {
mb: 'MB',
image: 'Image',
image_tips: 'Please enter image',
image_pull_policy: 'Image pull policy',
image_pull_policy_tips: 'Please select a image pull policy (required)',
command: 'Command',
command_tips:
'Please enter the container execution command, for example: ["printenv"]',

View File

@ -385,6 +385,8 @@ export default {
mb: 'MB',
image: '镜像',
image_tips: '请输入镜像',
image_pull_policy: '镜像拉取策略',
image_pull_policy_tips: '请选择镜像拉取策略(必选)',
command: '容器执行命令',
command_tips: '请输入容器执行命令,例如:["printenv"]',
args: '执行命令参数',

View File

@ -56,12 +56,26 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] {
type: 'input',
field: 'image',
name: t('project.node.image'),
span: 18,
props: {
placeholder: t('project.node.image_tips')
},
validate: {
trigger: ['input', 'blur'],
message: t('project.node.min_memory_tips')
required: true,
message: t('project.node.image_tips')
}
},
{
type: 'select',
field: 'imagePullPolicy',
name: t('project.node.image_pull_policy'),
span: 6,
options: IMAGE_PULL_POLICY_LIST,
validate: {
trigger: ['input', 'blur'],
required: true,
message: t('project.node.image_pull_policy_tips')
}
},
{
@ -93,3 +107,19 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] {
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}
export const IMAGE_PULL_POLICY_LIST = [
{
value: 'IfNotPresent',
label: 'IfNotPresent'
},
{
value: 'Always',
label: 'Always'
},
{
value: 'Never',
label: 'Never'
}
]

View File

@ -347,6 +347,7 @@ export function formatParams(data: INodeData): {
taskParams.minCpuCores = data.minCpuCores
taskParams.minMemorySpace = data.minMemorySpace
taskParams.image = data.image
taskParams.imagePullPolicy = data.imagePullPolicy
taskParams.command = data.command
taskParams.args = data.args
taskParams.customizedLabels = data.customizedLabels

View File

@ -370,6 +370,7 @@ interface ITaskParams {
minCpuCores?: string
minMemorySpace?: string
image?: string
imagePullPolicy?: string
command?: string
args?: string
customizedLabels?: ILabel[]