[Improve]Release JDBC connection resources (#474)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 427eb8b..9f7ed8d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
@@ -103,10 +104,11 @@
}
public void execute(String sql) {
- try (Statement statement =
- jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
+ try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
+ Statement statement = connection.createStatement()) {
statement.execute(sql);
} catch (Exception e) {
+ LOG.error("SQL query could not be executed: {}", sql, e);
throw new DorisSystemException(
String.format("SQL query could not be executed: %s", sql), e);
}
@@ -116,18 +118,19 @@
String sql, int columnIndex, Predicate<String> filterFunc, Object... params) {
List<String> columnValues = Lists.newArrayList();
- try (PreparedStatement ps =
- jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
+ try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
+ PreparedStatement ps = connection.prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
- ResultSet rs = ps.executeQuery();
- while (rs.next()) {
- String columnValue = rs.getString(columnIndex);
- if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
- columnValues.add(columnValue);
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ String columnValue = rs.getString(columnIndex);
+ if (filterFunc == null || filterFunc.test(columnValue)) {
+ columnValues.add(columnValue);
+ }
}
}
return columnValues;
@@ -152,16 +155,17 @@
databaseName, tableName);
Map<String, String> columnValues = new HashMap<>();
- try (PreparedStatement ps =
- jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
- ResultSet rs = ps.executeQuery();
+ try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
+ PreparedStatement ps = connection.prepareStatement(sql);
+ ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
- String filedName = rs.getString(1);
+ String fieldName = rs.getString(1);
String datatype = rs.getString(2);
- columnValues.put(filedName, datatype);
+ columnValues.put(fieldName, datatype);
}
return columnValues;
} catch (Exception e) {
+ LOG.error("SQL query could not be executed: {}", sql, e);
throw new DorisSystemException(
String.format("The following SQL query could not be executed: %s", sql), e);
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
index 68a9231..0315684 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -42,7 +42,7 @@
@Override
public Connection getOrEstablishConnection() throws ClassNotFoundException, SQLException {
- if (connection != null && !connection.isClosed() && connection.isValid(10000)) {
+ if (connection != null && !connection.isClosed() && connection.isValid(10)) {
return connection;
}
try {