[Fix-3298][K8s] Fix task log disappear after recreating or upgrading helm release (#4755)

* [Improvement][K8s] Alter column host varchar(15) to varchar(135) for long host

* [Improvement][K8s] Improve getHost and getAddr in NetUtils

* [Improvement][K8s] Replace getHost with getAddr
This commit is contained in:
Shiwen Cheng 2021-02-14 21:33:45 +08:00 committed by GitHub
parent a6ea04d4a3
commit 53598fdd18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 197 additions and 29 deletions

View File

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.regex.Pattern;
@ -1042,4 +1043,11 @@ public final class Constants {
* pstree, get pud and sub pid
*/
public static final String PSTREE = "pstree";
/**
* docker & kubernetes
*/
public static final boolean DOCKER_MODE = StringUtils.isNotEmpty(System.getenv("DOCKER"));
public static final boolean KUBERNETES_MODE = StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
}

View File

@ -49,8 +49,6 @@ public class NetUtils {
private static final String NETWORK_PRIORITY_INNER = "inner";
private static final String NETWORK_PRIORITY_OUTER = "outer";
private static final Logger logger = LoggerFactory.getLogger(NetUtils.class);
private static final String ANY_HOST_VALUE = "0.0.0.0";
private static final String LOCAL_HOST_VALUE = "127.0.0.1";
private static InetAddress LOCAL_ADDRESS = null;
private static volatile String HOST_ADDRESS;
@ -58,6 +56,22 @@ public class NetUtils {
throw new UnsupportedOperationException("Construct NetUtils");
}
/**
* get addr like host:port
* @return addr
*/
public static String getAddr(String host, int port) {
return String.format("%s:%d", host, port);
}
/**
* get addr like host:port
* @return addr
*/
public static String getAddr(int port) {
return getAddr(getHost(), port);
}
public static String getHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
@ -65,10 +79,10 @@ public class NetUtils {
InetAddress address = getLocalAddress();
if (address != null) {
HOST_ADDRESS = address.getHostAddress();
HOST_ADDRESS = Constants.KUBERNETES_MODE ? address.getHostName() : address.getHostAddress();
return HOST_ADDRESS;
}
return LOCAL_HOST_VALUE;
return Constants.KUBERNETES_MODE ? "localhost" : "127.0.0.1";
}
private static InetAddress getLocalAddress() {
@ -153,8 +167,8 @@ public class NetUtils {
String name = address.getHostAddress();
return (name != null
&& IP_PATTERN.matcher(name).matches()
&& !ANY_HOST_VALUE.equals(name)
&& !LOCAL_HOST_VALUE.equals(name));
&& !address.isAnyLocalAddress()
&& !address.isLoopbackAddress());
}
/**

View File

@ -29,6 +29,13 @@ import static org.mockito.Mockito.when;
*/
public class NetUtilsTest {
@Test
public void testGetAddr() {
assertEquals(NetUtils.getHost() + ":5678", NetUtils.getAddr(5678));
assertEquals("127.0.0.1:5678", NetUtils.getAddr("127.0.0.1", 5678));
assertEquals("localhost:1234", NetUtils.getAddr("localhost", 1234));
}
@Test
public void testGetLocalHost() {
assertNotNull(NetUtils.getHost());
@ -45,9 +52,11 @@ public class NetUtilsTest {
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("0.0.0.0");
when(address.isAnyLocalAddress()).thenReturn(true);
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("127.0.0.1");
when(address.isLoopbackAddress()).thenReturn(true);
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("1.2.3.4");

View File

@ -99,12 +99,6 @@ public class OSUtilsTest {
Assert.assertNotEquals(0, processId);
}
@Test
public void getHost(){
String host = NetUtils.getHost();
Assert.assertNotNull(host);
Assert.assertNotEquals("", host);
}
@Test
public void checkResource(){
boolean resource = OSUtils.checkResource(100,0);
Assert.assertTrue(resource);

View File

@ -135,7 +135,7 @@ public class MasterRegistry {
*/
private String getLocalAddress() {
return NetUtils.getHost() + ":" + masterConfig.getListenPort();
return NetUtils.getAddr(masterConfig.getListenPort());
}

View File

@ -125,7 +125,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() {
this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);

View File

@ -185,7 +185,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);

View File

@ -178,7 +178,7 @@ public class MasterSchedulerService extends Thread {
}
}
private String getLocalAddress(){
return NetUtils.getHost() + ":" + masterConfig.getListenPort();
private String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
}

View File

@ -139,7 +139,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort());
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);

View File

@ -169,7 +169,7 @@ public class WorkerRegistry {
* get local address
*/
private String getLocalAddress() {
return NetUtils.getHost() + COLON + workerConfig.getListenPort();
return NetUtils.getAddr(workerConfig.getListenPort());
}
/**

View File

@ -79,7 +79,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(NetUtils.getHost() + ":" + serverConfig.getListenPort()));
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
nettyRemotingServer.close();
@ -98,7 +98,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(NetUtils.getHost() + ":4444"));
executionContext.setHost(Host.of(NetUtils.getAddr(4444)));
nettyExecutorManager.execute(executionContext);
}

View File

@ -75,7 +75,7 @@ public class ZookeeperNodeManagerTest {
Set<String> masterNodes = zookeeperNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(NetUtils.getHost() + ":" + masterConfig.getListenPort(), masterNodes.iterator().next());
Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next());
workerRegistry.unRegistry();
}
@ -105,7 +105,7 @@ public class ZookeeperNodeManagerTest {
Set<String> workerNodes = zookeeperNodeManager.getWorkerGroupNodes("default");
Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes));
Assert.assertEquals(1, workerNodes.size());
Assert.assertEquals(NetUtils.getHost() + ":" + workerConfig.getListenPort(), workerNodes.iterator().next());
Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next());
workerRegistry.unRegistry();
}
}

View File

@ -47,7 +47,7 @@ public class ExecutionContextTestUtils {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(NetUtils.getHost() + ":" + port));
executionContext.setHost(Host.of(NetUtils.getAddr(port)));
return executionContext;
}

View File

@ -113,7 +113,7 @@ public class WorkerRegistryTest {
int i = 0;
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getHost() + ":" + workerConfig.getListenPort());
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getAddr(workerConfig.getListenPort()));
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
if (0 == i) {
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));

View File

@ -839,6 +839,7 @@
<include>**/common/utils/IpUtilsTest.java</include>
<include>**/common/utils/JSONUtilsTest.java</include>
<include>**/common/utils/LoggerUtilsTest.java</include>
<include>**/common/utils/NetUtilsTest.java</include>
<include>**/common/utils/OSUtilsTest.java</include>
<include>**/common/utils/ParameterUtilsTest.java</include>
<include>**/common/utils/TimePlaceholderUtilsTest.java</include>

View File

@ -348,7 +348,7 @@ CREATE TABLE t_ds_process_instance (
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,
run_times int DEFAULT NULL ,
host varchar(45) DEFAULT NULL ,
host varchar(135) DEFAULT NULL ,
command_type int DEFAULT NULL ,
command_param text ,
task_depend_type int DEFAULT NULL ,
@ -562,7 +562,7 @@ CREATE TABLE t_ds_task_instance (
submit_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,
host varchar(45) DEFAULT NULL ,
host varchar(135) DEFAULT NULL ,
execute_path varchar(200) DEFAULT NULL ,
log_path varchar(200) DEFAULT NULL ,
alert_flag int DEFAULT NULL ,

View File

@ -458,7 +458,7 @@ CREATE TABLE `t_ds_process_instance` (
`start_time` datetime DEFAULT NULL COMMENT 'process instance start time',
`end_time` datetime DEFAULT NULL COMMENT 'process instance end time',
`run_times` int(11) DEFAULT NULL COMMENT 'process instance run times',
`host` varchar(45) DEFAULT NULL COMMENT 'process instance host',
`host` varchar(135) DEFAULT NULL COMMENT 'process instance host',
`command_type` tinyint(4) DEFAULT NULL COMMENT 'command type',
`command_param` text COMMENT 'json command parameters',
`task_depend_type` tinyint(4) DEFAULT NULL COMMENT 'task depend type. 0: only current node,1:before the node,2:later nodes',
@ -697,7 +697,7 @@ CREATE TABLE `t_ds_task_instance` (
`submit_time` datetime DEFAULT NULL COMMENT 'task submit time',
`start_time` datetime DEFAULT NULL COMMENT 'task start time',
`end_time` datetime DEFAULT NULL COMMENT 'task end time',
`host` varchar(45) DEFAULT NULL COMMENT 'host of task running on',
`host` varchar(135) DEFAULT NULL COMMENT 'host of task running on',
`execute_path` varchar(200) DEFAULT NULL COMMENT 'task execute path in the host',
`log_path` varchar(200) DEFAULT NULL COMMENT 'task log path',
`alert_flag` tinyint(4) DEFAULT NULL COMMENT 'whether alert',

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
-- uc_dolphin_T_t_ds_process_instance_R_host
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_R_host;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_R_host()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='host')
THEN
ALTER TABLE t_ds_process_instance MODIFY COLUMN `host` varchar(135);
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_R_host;
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_R_host;
-- uc_dolphin_T_t_ds_task_instance_R_host
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_R_host;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_R_host()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='host')
THEN
ALTER TABLE t_ds_task_instance MODIFY COLUMN `host` varchar(135);
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_R_host;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_host;

View File

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- uc_dolphin_T_t_ds_process_instance_A_host
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_instance_A_host() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='host')
THEN
ALTER TABLE t_ds_process_instance ALTER COLUMN host type varchar(135);
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_process_instance_A_host();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_A_host();
-- uc_dolphin_T_t_ds_task_instance_A_host
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_host() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='host')
THEN
ALTER TABLE t_ds_task_instance ALTER COLUMN host type varchar(135);
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_host();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_host();

View File

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/