From 2e4a9e6d5395730282a3f105aa815a4095a96433 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sun, 9 Oct 2022 09:36:59 +0800 Subject: [PATCH] Fix mysql registry doesn't support array (#12255) --- .../plugin/registry/mysql/MysqlOperator.java | 129 +++++++++++------- .../mysql/task/EphemeralDateManager.java | 1 + 2 files changed, 80 insertions(+), 50 deletions(-) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java index d5dd507f92..8d7c88f358 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock import org.apache.commons.lang3.StringUtils; -import java.sql.Array; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -34,6 +33,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,8 @@ public class MysqlOperator implements AutoCloseable { private final long expireTimeWindow; public MysqlOperator(MysqlRegistryProperties registryProperties) { - this.expireTimeWindow = registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis(); + this.expireTimeWindow = + registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis(); HikariConfig hikariConfig = registryProperties.getHikariConfig(); hikariConfig.setPoolName("MysqlRegistryDataSourcePool"); @@ -62,18 +63,20 @@ public class MysqlOperator implements AutoCloseable { public void healthCheck() throws SQLException { String sql = "select 1 from t_ds_mysql_registry_data"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql); - ResultSet resultSet = preparedStatement.executeQuery();) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + ResultSet resultSet = preparedStatement.executeQuery();) { // if no exception, the healthCheck success } } public List queryAllMysqlRegistryData() throws SQLException { String sql = "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql); - ResultSet resultSet = preparedStatement.executeQuery()) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + ResultSet resultSet = preparedStatement.executeQuery()) { List result = new ArrayList<>(resultSet.getFetchSize()); while (resultSet.next()) { MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder() @@ -91,11 +94,15 @@ public class MysqlOperator implements AutoCloseable { } public long insertOrUpdateEphemeralData(String key, String value) throws SQLException { - String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" + - "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; + String sql = + "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" + + + "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; // put a ephemeralData - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setString(1, key); preparedStatement.setString(2, value); preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue()); @@ -110,11 +117,15 @@ public class MysqlOperator implements AutoCloseable { } public long insertOrUpdatePersistentData(String key, String value) throws SQLException { - String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" + - "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; + String sql = + "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" + + + "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; // put a persistent Data - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setString(1, key); preparedStatement.setString(2, value); preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue()); @@ -130,8 +141,9 @@ public class MysqlOperator implements AutoCloseable { public void deleteEphemeralData(String key) throws SQLException { String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setString(1, key); preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue()); preparedStatement.execute(); @@ -140,8 +152,9 @@ public class MysqlOperator implements AutoCloseable { public void deleteEphemeralData(long ephemeralNodeId) throws SQLException { String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setLong(1, ephemeralNodeId); preparedStatement.execute(); } @@ -149,8 +162,9 @@ public class MysqlOperator implements AutoCloseable { public void deletePersistentData(String key) throws SQLException { String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setString(1, key); preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue()); preparedStatement.execute(); @@ -159,8 +173,9 @@ public class MysqlOperator implements AutoCloseable { public void clearExpireLock() { String sql = "delete from t_ds_mysql_registry_lock where last_term < ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow)); int i = preparedStatement.executeUpdate(); @@ -174,8 +189,9 @@ public class MysqlOperator implements AutoCloseable { public void clearExpireEphemeralDate() { String sql = "delete from t_ds_mysql_registry_data where last_update_time < ? and type = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow)); preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue()); int i = preparedStatement.executeUpdate(); @@ -188,9 +204,11 @@ public class MysqlOperator implements AutoCloseable { } public MysqlRegistryData getData(String key) throws SQLException { - String sql = "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + String sql = + "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?"; + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setString(1, key); try (ResultSet resultSet = preparedStatement.executeQuery()) { if (!resultSet.next()) { @@ -210,8 +228,9 @@ public class MysqlOperator implements AutoCloseable { public List getChildren(String key) throws SQLException { String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setString(1, key + "%"); try (ResultSet resultSet = preparedStatement.executeQuery()) { List result = new ArrayList<>(resultSet.getFetchSize()); @@ -228,8 +247,9 @@ public class MysqlOperator implements AutoCloseable { public boolean existKey(String key) throws SQLException { String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setString(1, key); try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { @@ -244,9 +264,12 @@ public class MysqlOperator implements AutoCloseable { * Try to acquire the target Lock, if cannot acquire, return null. */ public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException { - String sql = "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + String sql = + "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)"; + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setString(1, key); preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER); preparedStatement.executeUpdate(); @@ -264,9 +287,11 @@ public class MysqlOperator implements AutoCloseable { } public MysqlRegistryLock getLockById(long lockId) throws SQLException { - String sql = "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + String sql = + "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?"; + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setLong(1, lockId); try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { @@ -287,8 +312,9 @@ public class MysqlOperator implements AutoCloseable { // release the lock public boolean releaseLock(long lockId) throws SQLException { String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.setLong(1, lockId); int i = preparedStatement.executeUpdate(); return i > 0; @@ -297,20 +323,23 @@ public class MysqlOperator implements AutoCloseable { public boolean updateEphemeralDataTerm(Collection ephemeralDateIds) throws SQLException { String sql = "update t_ds_mysql_registry_data set `last_update_time` = current_timestamp() where `id` IN (?)"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - Array idArray = connection.createArrayOf("bigint", ephemeralDateIds.toArray()); - preparedStatement.setArray(1, idArray); + String ids = ephemeralDateIds.stream().map(String::valueOf).collect(Collectors.joining(",")); + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + preparedStatement.setString(1, ids); return preparedStatement.executeUpdate() > 0; } } public boolean updateLockTerm(List lockIds) throws SQLException { - String sql = "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)"; - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - Array idArray = connection.createArrayOf("bigint", lockIds.toArray()); - preparedStatement.setArray(1, idArray); + String sql = + "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)"; + String ids = lockIds.stream().map(String::valueOf).collect(Collectors.joining(",")); + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + preparedStatement.setString(1, ids); return preparedStatement.executeUpdate() > 0; } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java index 45bd992134..d17806c6bd 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java @@ -148,6 +148,7 @@ public class EphemeralDateManager implements AutoCloseable { mysqlOperator.clearExpireEphemeralDate(); return ConnectionState.CONNECTED; } catch (Exception ex) { + LOGGER.error("Get connection state error, meet an unknown exception", ex); return ConnectionState.DISCONNECTED; } }