mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-02 04:08:31 +08:00
Use single thread to refresh kerberos (#13456)
This commit is contained in:
parent
af9091a98a
commit
b11431db95
@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.thread;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import lombok.experimental.UtilityClass;
|
||||
@ -45,6 +46,14 @@ public class ThreadUtils {
|
||||
return Executors.newFixedThreadPool(threadsNum, threadFactory);
|
||||
}
|
||||
|
||||
public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) {
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat(threadName)
|
||||
.setDaemon(true)
|
||||
.build();
|
||||
return Executors.newSingleThreadScheduledExecutor(threadFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep in given mills, this is not accuracy.
|
||||
*/
|
||||
|
@ -21,44 +21,29 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_S
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
|
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
|
||||
import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
|
||||
import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
|
||||
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
|
||||
import org.apache.dolphinscheduler.spi.enums.DbType;
|
||||
|
||||
import sun.security.krb5.Config;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
public class HiveDataSourceClient extends CommonDataSourceClient {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(HiveDataSourceClient.class);
|
||||
|
||||
private ScheduledExecutorService kerberosRenewalService;
|
||||
|
||||
private Configuration hadoopConf;
|
||||
private UserGroupInformation ugi;
|
||||
private boolean retryGetConnection = true;
|
||||
|
||||
public HiveDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
|
||||
super(baseConnectionParam, dbType);
|
||||
}
|
||||
@ -66,18 +51,12 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
|
||||
@Override
|
||||
protected void preInit() {
|
||||
logger.info("PreInit in {}", getClass().getName());
|
||||
this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor(
|
||||
new ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
|
||||
logger.info("Create Configuration for hive configuration.");
|
||||
this.hadoopConf = createHadoopConf();
|
||||
logger.info("Create Configuration success.");
|
||||
|
||||
logger.info("Create UserGroupInformation.");
|
||||
this.ugi = createUserGroupInformation(baseConnectionParam.getUser());
|
||||
UserGroupInformationFactory.login(baseConnectionParam.getUser());
|
||||
logger.info("Create ugi success.");
|
||||
|
||||
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
|
||||
@ -108,61 +87,18 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
|
||||
}
|
||||
}
|
||||
|
||||
private UserGroupInformation createUserGroupInformation(String username) {
|
||||
String krb5File = PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH);
|
||||
String keytab = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH);
|
||||
String principal = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);
|
||||
|
||||
try {
|
||||
UserGroupInformation ugi = CommonUtil.createUGI(getHadoopConf(), principal, keytab, krb5File, username);
|
||||
try {
|
||||
Field isKeytabField = ugi.getClass().getDeclaredField("isKeytab");
|
||||
isKeytabField.setAccessible(true);
|
||||
isKeytabField.set(ugi, true);
|
||||
} catch (NoSuchFieldException | IllegalAccessException e) {
|
||||
logger.warn(e.getMessage());
|
||||
}
|
||||
|
||||
kerberosRenewalService.scheduleWithFixedDelay(() -> {
|
||||
try {
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
} catch (IOException e) {
|
||||
logger.error("Check TGT and Renewal from Keytab error", e);
|
||||
}
|
||||
}, 5, 5, TimeUnit.MINUTES);
|
||||
return ugi;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("createUserGroupInformation fail. ", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected Configuration createHadoopConf() {
|
||||
Configuration hadoopConf = new Configuration();
|
||||
hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed", true);
|
||||
return hadoopConf;
|
||||
}
|
||||
|
||||
protected Configuration getHadoopConf() {
|
||||
return this.hadoopConf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() {
|
||||
try {
|
||||
return dataSource.getConnection();
|
||||
} catch (SQLException e) {
|
||||
boolean kerberosStartupState =
|
||||
PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
|
||||
if (retryGetConnection && kerberosStartupState) {
|
||||
retryGetConnection = false;
|
||||
createUserGroupInformation(baseConnectionParam.getUser());
|
||||
Connection connection = getConnection();
|
||||
retryGetConnection = true;
|
||||
return connection;
|
||||
Connection connection = null;
|
||||
while (connection == null) {
|
||||
try {
|
||||
connection = dataSource.getConnection();
|
||||
} catch (SQLException e) {
|
||||
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
|
||||
UserGroupInformationFactory.login(baseConnectionParam.getUser());
|
||||
}
|
||||
logger.error("get oneSessionDataSource Connection fail SQLException: {}", e.getMessage(), e);
|
||||
return null;
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -170,8 +106,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
|
||||
try {
|
||||
super.close();
|
||||
} finally {
|
||||
kerberosRenewalService.shutdown();
|
||||
this.ugi = null;
|
||||
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
|
||||
}
|
||||
logger.info("Closed Hive datasource client.");
|
||||
|
||||
|
@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.hive.security;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF;
|
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.ResUploadType;
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class UserGroupInformationFactory {
|
||||
|
||||
private static final Map<String, Integer> currentLoginTimesMap = new HashMap<>();
|
||||
|
||||
private static final Map<String, UserGroupInformation> userGroupInformationMap = new HashMap<>();
|
||||
|
||||
private static final ScheduledExecutorService kerberosRenewalService =
|
||||
ThreadUtils.newSingleDaemonScheduledExecutorService("Hive-Kerberos-Renewal-Thread-");
|
||||
|
||||
static {
|
||||
kerberosRenewalService.scheduleWithFixedDelay(() -> {
|
||||
if (userGroupInformationMap.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
userGroupInformationMap.forEach((key, ugi) -> {
|
||||
try {
|
||||
if (ugi.isFromKeytab()) {
|
||||
ugi.checkTGTAndReloginFromKeytab();
|
||||
}
|
||||
log.info("Relogin from keytab success, user: {}", key);
|
||||
} catch (Exception e) {
|
||||
log.error("Relogin from keytab failed, user: {}", key, e);
|
||||
}
|
||||
});
|
||||
}, 0, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
public synchronized static UserGroupInformation login(String userName) {
|
||||
UserGroupInformation userGroupInformation = userGroupInformationMap.get(userName);
|
||||
if (userGroupInformation == null) {
|
||||
if (!openKerberos()) {
|
||||
userGroupInformation = createRemoteUser(userName);
|
||||
} else {
|
||||
userGroupInformation = createKerberosUser();
|
||||
}
|
||||
userGroupInformationMap.put(userName, userGroupInformation);
|
||||
}
|
||||
currentLoginTimesMap.compute(userName, (k, v) -> v == null ? 1 : v + 1);
|
||||
return userGroupInformation;
|
||||
}
|
||||
|
||||
public synchronized static void logout(String userName) {
|
||||
Integer currentLoginTimes = currentLoginTimesMap.get(userName);
|
||||
if (currentLoginTimes == null) {
|
||||
return;
|
||||
}
|
||||
if (currentLoginTimes <= 1) {
|
||||
currentLoginTimesMap.remove(userName);
|
||||
userGroupInformationMap.remove(userName);
|
||||
} else {
|
||||
currentLoginTimesMap.put(userName, currentLoginTimes - 1);
|
||||
}
|
||||
}
|
||||
|
||||
private static UserGroupInformation createRemoteUser(String userName) {
|
||||
return UserGroupInformation.createRemoteUser(userName);
|
||||
}
|
||||
|
||||
private static UserGroupInformation createKerberosUser() {
|
||||
String krb5File = PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH);
|
||||
String keytab = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH);
|
||||
String principal = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);
|
||||
if (StringUtils.isNotBlank(krb5File)) {
|
||||
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File);
|
||||
}
|
||||
|
||||
Configuration hadoopConf = new Configuration();
|
||||
hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed", true);
|
||||
hadoopConf.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS);
|
||||
|
||||
try {
|
||||
UserGroupInformation.setConfiguration(hadoopConf);
|
||||
UserGroupInformation userGroupInformation =
|
||||
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keytab.trim());
|
||||
UserGroupInformation.setLoginUser(userGroupInformation);
|
||||
return userGroupInformation;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("createUserGroupInformation fail. ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean openKerberos() {
|
||||
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
|
||||
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
|
||||
Boolean kerberosStartupState =
|
||||
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
|
||||
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
|
||||
}
|
||||
|
||||
}
|
@ -1,67 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.hive.utils;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF;
|
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.ResUploadType;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import lombok.experimental.UtilityClass;
|
||||
|
||||
@UtilityClass
|
||||
public class CommonUtil {
|
||||
|
||||
public static boolean getKerberosStartupState() {
|
||||
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
|
||||
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
|
||||
Boolean kerberosStartupState =
|
||||
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
|
||||
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
|
||||
}
|
||||
|
||||
public static synchronized UserGroupInformation createUGI(Configuration configuration, String principal,
|
||||
String keyTab, String krb5File,
|
||||
String username) throws IOException {
|
||||
if (getKerberosStartupState()) {
|
||||
Objects.requireNonNull(keyTab);
|
||||
if (StringUtils.isNotBlank(krb5File)) {
|
||||
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File);
|
||||
}
|
||||
return loginKerberos(configuration, principal, keyTab);
|
||||
}
|
||||
return UserGroupInformation.createRemoteUser(username);
|
||||
}
|
||||
|
||||
public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal,
|
||||
final String keyTab) throws IOException {
|
||||
config.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS);
|
||||
UserGroupInformation.setConfiguration(config);
|
||||
UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim());
|
||||
return UserGroupInformation.getCurrentUser();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user