[Improve]Release JDBC connection resources (#474)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 427eb8b..9f7ed8d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -30,6 +30,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Statement;
@@ -103,10 +104,11 @@
     }
 
     public void execute(String sql) {
-        try (Statement statement =
-                jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
+        try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
+                Statement statement = connection.createStatement()) {
             statement.execute(sql);
         } catch (Exception e) {
+            LOG.error("SQL query could not be executed: {}", sql, e);
             throw new DorisSystemException(
                     String.format("SQL query could not be executed: %s", sql), e);
         }
@@ -116,18 +118,19 @@
             String sql, int columnIndex, Predicate<String> filterFunc, Object... params) {
 
         List<String> columnValues = Lists.newArrayList();
-        try (PreparedStatement ps =
-                jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
+        try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
+                PreparedStatement ps = connection.prepareStatement(sql)) {
             if (Objects.nonNull(params) && params.length > 0) {
                 for (int i = 0; i < params.length; i++) {
                     ps.setObject(i + 1, params[i]);
                 }
             }
-            ResultSet rs = ps.executeQuery();
-            while (rs.next()) {
-                String columnValue = rs.getString(columnIndex);
-                if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
-                    columnValues.add(columnValue);
+            try (ResultSet rs = ps.executeQuery()) {
+                while (rs.next()) {
+                    String columnValue = rs.getString(columnIndex);
+                    if (filterFunc == null || filterFunc.test(columnValue)) {
+                        columnValues.add(columnValue);
+                    }
                 }
             }
             return columnValues;
@@ -152,16 +155,17 @@
                         databaseName, tableName);
 
         Map<String, String> columnValues = new HashMap<>();
-        try (PreparedStatement ps =
-                jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
-            ResultSet rs = ps.executeQuery();
+        try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
+                PreparedStatement ps = connection.prepareStatement(sql);
+                ResultSet rs = ps.executeQuery()) {
             while (rs.next()) {
-                String filedName = rs.getString(1);
+                String fieldName = rs.getString(1);
                 String datatype = rs.getString(2);
-                columnValues.put(filedName, datatype);
+                columnValues.put(fieldName, datatype);
             }
             return columnValues;
         } catch (Exception e) {
+            LOG.error("SQL query could not be executed: {}", sql, e);
             throw new DorisSystemException(
                     String.format("The following SQL query could not be executed: %s", sql), e);
         }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
index 68a9231..0315684 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -42,7 +42,7 @@
 
     @Override
     public Connection getOrEstablishConnection() throws ClassNotFoundException, SQLException {
-        if (connection != null && !connection.isClosed() && connection.isValid(10000)) {
+        if (connection != null && !connection.isClosed() && connection.isValid(10)) {
             return connection;
         }
         try {