[FLINK-36696] [flink-autoscaler-plugin-jdbc] Switch sql connection usages to datasource (#929)

* [FLINK-36696] [flink-autoscaler-plugin-jdbc] Switch sql connection usages to datasource

* Use the dataSource instead of `getDataSource()` to avoid connection leak

---------

Co-authored-by: Rui Fan <1996fanrui@gmail.com>
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
index 7613717..8faef51 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
@@ -135,11 +135,12 @@
     }
 
     @Override
-    public void close() {
+    public void close() throws Exception {
         if (Objects.nonNull(scheduledEventHandlerCleaner)
                 && !scheduledEventHandlerCleaner.isShutdown()) {
             scheduledEventHandlerCleaner.shutdownNow();
         }
+        jdbcEventInteractor.close();
     }
 
     @VisibleForTesting
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
index ce3fc48..f40740a 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
@@ -23,8 +23,8 @@
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.sql.DataSource;
 
-import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
@@ -37,13 +37,13 @@
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Responsible for interacting with the database. */
-public class JdbcEventInteractor {
+public class JdbcEventInteractor implements AutoCloseable {
 
-    private final Connection conn;
+    private final DataSource dataSource;
     private Clock clock = Clock.systemDefaultZone();
 
-    public JdbcEventInteractor(Connection conn) {
-        this.conn = conn;
+    public JdbcEventInteractor(DataSource dataSource) {
+        this.dataSource = dataSource;
     }
 
     public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey)
@@ -52,7 +52,8 @@
                 "select * from t_flink_autoscaler_event_handler "
                         + "where job_key = ? and reason = ? and event_key = ? ";
 
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             pstmt.setString(1, jobKey);
             pstmt.setString(2, reason);
             pstmt.setString(3, eventKey);
@@ -99,7 +100,8 @@
                         + " values (?, ?, ?, ?, ?, ?, ?, ?)";
 
         var createTime = Timestamp.from(clock.instant());
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             pstmt.setTimestamp(1, createTime);
             pstmt.setTimestamp(2, createTime);
             pstmt.setString(3, jobKey);
@@ -117,7 +119,8 @@
                 "UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?";
 
         var updateTime = Timestamp.from(clock.instant());
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             pstmt.setTimestamp(1, updateTime);
             pstmt.setString(2, message);
             pstmt.setInt(3, eventCount);
@@ -136,7 +139,8 @@
                 "select * from t_flink_autoscaler_event_handler "
                         + "where job_key = ? and reason = ? ";
 
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             pstmt.setString(1, jobKey);
             pstmt.setString(2, reason);
 
@@ -160,7 +164,8 @@
                 "SELECT id from t_flink_autoscaler_event_handler "
                         + "           where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
                         + "           and create_time < ?";
-        try (var pstmt = conn.prepareStatement(sql)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(sql)) {
             pstmt.setObject(1, timestamp);
             ResultSet resultSet = pstmt.executeQuery();
             return resultSet.next() ? resultSet.getLong(1) : null;
@@ -171,11 +176,19 @@
             throws Exception {
         var query =
                 "delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             pstmt.setObject(1, startId);
             pstmt.setObject(2, endId);
             pstmt.setObject(3, timestamp);
             return pstmt.executeUpdate();
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        if (dataSource instanceof AutoCloseable) {
+            ((AutoCloseable) dataSource).close();
+        }
+    }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 1ed3ad9..6bc4887 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -333,4 +333,9 @@
             throws JacksonException {
         return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
     }
+
+    @Override
+    public void close() throws Exception {
+        jdbcStateStore.close();
+    }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
index 7c12bf8..9ddc5c5 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
@@ -20,7 +20,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
+
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.Collections;
@@ -31,21 +32,22 @@
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Responsible for interacting with the database. */
-public class JdbcStateInteractor {
+public class JdbcStateInteractor implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class);
 
-    private final Connection conn;
+    private final DataSource dataSource;
 
-    public JdbcStateInteractor(Connection conn) {
-        this.conn = conn;
+    public JdbcStateInteractor(DataSource dataSource) {
+        this.dataSource = dataSource;
     }
 
     public Map<StateType, String> queryData(String jobKey) throws Exception {
         var query =
                 "select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?";
         var data = new HashMap<StateType, String>();
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             pstmt.setString(1, jobKey);
             var rs = pstmt.executeQuery();
             while (rs.next()) {
@@ -63,7 +65,8 @@
                 String.format(
                         "DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)",
                         String.join(",", Collections.nCopies(deletedStateTypes.size(), "?")));
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             pstmt.setString(1, jobKey);
             int i = 2;
             for (var stateType : deletedStateTypes) {
@@ -80,7 +83,8 @@
         var query =
                 "INSERT INTO t_flink_autoscaler_state_store (update_time, job_key, state_type, state_value) values (?, ?, ?, ?)";
         var updateTime = Timestamp.from(Instant.now());
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             for (var stateType : createdStateTypes) {
                 pstmt.setTimestamp(1, updateTime);
                 pstmt.setString(2, jobKey);
@@ -106,7 +110,8 @@
                 "UPDATE t_flink_autoscaler_state_store set update_time = ?, state_value = ? where job_key = ? and state_type = ?";
 
         var updateTime = Timestamp.from(Instant.now());
-        try (var pstmt = conn.prepareStatement(query)) {
+        try (var conn = dataSource.getConnection();
+                var pstmt = conn.prepareStatement(query)) {
             for (var stateType : updatedStateTypes) {
                 pstmt.setTimestamp(1, updateTime);
 
@@ -123,4 +128,11 @@
             pstmt.executeBatch();
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        if (dataSource instanceof AutoCloseable) {
+            ((AutoCloseable) dataSource).close();
+        }
+    }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
index 1e237a4..e891516 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 /** The jdbc state store. */
-public class JdbcStateStore {
+public class JdbcStateStore implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(JdbcStateStore.class);
 
@@ -89,4 +89,9 @@
     private JobStateView createJobStateView(String jobKey) throws Exception {
         return new JobStateView(jdbcStateInteractor, jobKey);
     }
+
+    @Override
+    public void close() throws Exception {
+        jdbcStateInteractor.close();
+    }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
index f527817..10c7486 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
@@ -42,6 +42,7 @@
 import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.annotation.Nonnull;
+import javax.sql.DataSource;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -73,20 +74,22 @@
             generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent);
     private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault());
 
+    private DataSource dataSource;
     private CountableJdbcEventInteractor jdbcEventInteractor;
     private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler;
     private JobAutoScalerContext<JobID> ctx;
 
     @BeforeEach
     void beforeEach() throws Exception {
-        jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection());
+        dataSource = getDataSource();
+        jdbcEventInteractor = new CountableJdbcEventInteractor(dataSource);
         jdbcEventInteractor.setClock(defaultClock);
         eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO);
         ctx = createDefaultJobAutoScalerContext();
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws Exception {
         eventHandler.close();
     }
 
@@ -459,7 +462,7 @@
                         .max(Comparable::compareTo)
                         .orElseThrow();
 
-        try (Connection connection = getConnection();
+        try (Connection connection = dataSource.getConnection();
                 PreparedStatement ps =
                         connection.prepareStatement(
                                 "update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
@@ -505,6 +508,7 @@
             Duration eventHandlerTtl,
             int unexpiredRecordsNum)
             throws Exception {
+
         eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl);
 
         // Init the expired records.
@@ -525,7 +529,7 @@
 
         eventHandler.cleanExpiredEvents();
 
-        try (Connection connection = getConnection();
+        try (Connection connection = dataSource.getConnection();
                 PreparedStatement ps =
                         connection.prepareStatement(
                                 "select count(1) from t_flink_autoscaler_event_handler");
@@ -542,7 +546,7 @@
         if (minId == null) {
             return;
         }
-        try (Connection connection = getConnection();
+        try (Connection connection = dataSource.getConnection();
                 PreparedStatement ps =
                         connection.prepareStatement(
                                 "delete from t_flink_autoscaler_event_handler where id = ?")) {
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
index 2fe6920..f5f7ca6 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
@@ -46,8 +46,7 @@
 
         // The datetime precision is seconds in MySQL by default.
         var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
-        try (var conn = getConnection()) {
-            var jdbcEventInteractor = new JdbcEventInteractor(conn);
+        try (var jdbcEventInteractor = new JdbcEventInteractor(getDataSource())) {
             jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));
 
             jdbcEventInteractor.createEvent(
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
index 39ccd76..2ea04a7 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
@@ -19,7 +19,8 @@
 
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
+
 import java.sql.Timestamp;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,8 +35,8 @@
     private final AtomicLong updateCounter;
     private final AtomicLong deleteExpiredCounter;
 
-    public CountableJdbcEventInteractor(Connection conn) {
-        super(conn);
+    public CountableJdbcEventInteractor(DataSource dataSource) {
+        super(dataSource);
         queryCounter = new AtomicLong();
         createCounter = new AtomicLong();
         updateCounter = new AtomicLong();
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
index da737a5..159fc68 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
@@ -42,8 +42,7 @@
         var value1 = "value1";
         var value2 = "value2";
         var value3 = "value3";
-        try (var conn = getConnection()) {
-            var jdbcStateInteractor = new JdbcStateInteractor(conn);
+        try (var jdbcStateInteractor = new JdbcStateInteractor(getDataSource())) {
             assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
 
             // Test for creating data.
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
index 5ab4713..bb50ce6 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
@@ -28,7 +28,8 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
+
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
@@ -45,21 +46,21 @@
 abstract class AbstractJdbcStateStoreITCase implements DatabaseTest {
 
     private static final String DEFAULT_JOB_KEY = "jobKey";
-    private Connection conn;
+    private DataSource dataSource;
     private CountableJdbcStateInteractor jdbcStateInteractor;
     private JdbcStateStore jdbcStateStore;
 
     @BeforeEach
     void beforeEach() throws Exception {
-        this.conn = getConnection();
-        this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
+        this.dataSource = getDataSource();
+        this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource);
         this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor);
     }
 
     @AfterEach
-    void afterEach() throws SQLException {
-        if (conn != null) {
-            conn.close();
+    void afterEach() throws Exception {
+        if (dataSource instanceof AutoCloseable) {
+            ((AutoCloseable) dataSource).close();
         }
     }
 
@@ -178,7 +179,7 @@
         assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
 
         // Modify the database directly.
-        var tmpJdbcInteractor = new JdbcStateInteractor(conn);
+        var tmpJdbcInteractor = new JdbcStateInteractor(dataSource);
         tmpJdbcInteractor.createData(
                 DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1));
         assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
@@ -205,7 +206,7 @@
         final var expectedException = new RuntimeException("Database isn't stable.");
 
         var exceptionableJdbcStateInteractor =
-                new CountableJdbcStateInteractor(conn) {
+                new CountableJdbcStateInteractor(dataSource) {
                     private final AtomicBoolean isFirst = new AtomicBoolean(true);
 
                     @Override
@@ -282,7 +283,7 @@
 
     private Optional<String> getValueFromDatabase(String jobKey, StateType stateType)
             throws Exception {
-        var jdbcInteractor = new JdbcStateInteractor(conn);
+        var jdbcInteractor = new JdbcStateInteractor(dataSource);
         return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
     }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
index 8fafc95..c1cb86e 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.autoscaler.jdbc.state;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
+
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -32,8 +33,8 @@
     private final AtomicLong createCounter;
     private final AtomicLong updateCounter;
 
-    public CountableJdbcStateInteractor(Connection conn) {
-        super(conn);
+    public CountableJdbcStateInteractor(DataSource dataSource) {
+        super(dataSource);
         queryCounter = new AtomicLong();
         deleteCounter = new AtomicLong();
         createCounter = new AtomicLong();
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
index a79c07b..31dbd07 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
@@ -48,7 +48,7 @@
 
     @Override
     protected void preSetup() throws Exception {
-        jdbcStateStore = new JdbcStateStore(new JdbcStateInteractor(getConnection()));
+        jdbcStateStore = new JdbcStateStore(new JdbcStateInteractor(getDataSource()));
         cachedStateStore = new JdbcAutoScalerStateStore<>(jdbcStateStore);
     }
 
@@ -56,7 +56,7 @@
     protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
             createPhysicalAutoScalerStateStore() throws Exception {
         return new JdbcAutoScalerStateStore<>(
-                new JdbcStateStore(new JdbcStateInteractor(getConnection())));
+                new JdbcStateStore(new JdbcStateInteractor(getDataSource())));
     }
 
     @Override
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
index 3989dbf..9f6d7ba 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
@@ -23,8 +23,8 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.sql.Connection;
-import java.sql.SQLException;
+import javax.sql.DataSource;
+
 import java.util.Optional;
 
 import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
@@ -36,22 +36,22 @@
 class JobStateViewTest implements DerbyTestBase {
 
     private static final String DEFAULT_JOB_KEY = "jobKey";
-    private Connection conn;
+    private DataSource dataSource;
     private CountableJdbcStateInteractor jdbcStateInteractor;
     private JobStateView jobStateView;
 
     @BeforeEach
     void beforeEach() throws Exception {
-        this.conn = getConnection();
-        this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
+        this.dataSource = getDataSource();
+        this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource);
         this.jobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY);
         jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
     }
 
     @AfterEach
-    void afterEach() throws SQLException {
-        if (conn != null) {
-            conn.close();
+    void afterEach() throws Exception {
+        if (jdbcStateInteractor != null) {
+            jdbcStateInteractor.close();
         }
     }
 
@@ -195,7 +195,7 @@
     }
 
     private Optional<String> getValueFromDatabase(StateType stateType) throws Exception {
-        var jdbcInteractor = new JdbcStateInteractor(conn);
+        var jdbcInteractor = new JdbcStateInteractor(dataSource);
         return Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType));
     }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
index 261578b..27fce55 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.autoscaler.jdbc.testutils.databases;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
 
 /** Database testing. */
 public interface DatabaseTest {
 
-    Connection getConnection() throws Exception;
+    DataSource getDataSource() throws Exception;
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
index 42566d9..d02fee5 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
@@ -17,12 +17,13 @@
 
 package org.apache.flink.autoscaler.jdbc.testutils.databases.derby;
 
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 
-import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.List;
@@ -34,8 +35,11 @@
             List.of("t_flink_autoscaler_state_store", "t_flink_autoscaler_event_handler");
     private static final String JDBC_URL = "jdbc:derby:memory:test";
 
-    public Connection getConnection() throws Exception {
-        return DriverManager.getConnection(JDBC_URL);
+    public HikariDataSource getDataSource() {
+        HikariConfig config = new HikariConfig();
+        config.setJdbcUrl(JDBC_URL);
+        config.setValidationTimeout(1000);
+        return new HikariDataSource(config);
     }
 
     @Override
@@ -76,7 +80,8 @@
         var jobKeyReasonCreateTimeIndex =
                 "CREATE INDEX job_key_reason_create_time_idx ON t_flink_autoscaler_event_handler (job_key, reason, create_time)";
 
-        try (var conn = getConnection();
+        try (var dataSource = getDataSource();
+                var conn = dataSource.getConnection();
                 var st = conn.createStatement()) {
             st.execute(stateStoreDDL);
             st.execute(createStateStoreIndex);
@@ -88,7 +93,8 @@
 
     @Override
     public void afterAll(ExtensionContext extensionContext) throws Exception {
-        try (var conn = getConnection();
+        try (var dataSource = getDataSource();
+                var conn = dataSource.getConnection();
                 var st = conn.createStatement()) {
             for (var tableName : TABLES) {
                 st.executeUpdate(String.format("DROP TABLE %s", tableName));
@@ -103,7 +109,8 @@
     @Override
     public void afterEach(ExtensionContext extensionContext) throws Exception {
         // Clean up all data
-        try (var conn = getConnection();
+        try (var dataSource = getDataSource();
+                var conn = dataSource.getConnection();
                 var st = conn.createStatement()) {
             for (var tableName : TABLES) {
                 st.executeUpdate(String.format("DELETE from %s", tableName));
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
index e0ac3f7..feebb91 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
@@ -21,7 +21,7 @@
 
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
 
 /** Derby database for testing. */
 public interface DerbyTestBase extends DatabaseTest {
@@ -29,7 +29,7 @@
     @RegisterExtension DerbyExtension DERBY_EXTENSION = new DerbyExtension();
 
     @Override
-    default Connection getConnection() throws Exception {
-        return DERBY_EXTENSION.getConnection();
+    default DataSource getDataSource() throws Exception {
+        return DERBY_EXTENSION.getDataSource();
     }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
index f2b1858..f76e92f 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
@@ -21,14 +21,14 @@
 
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
 
 /** MySQL 5.6.x database for testing. */
 public interface MySQL56TestBase extends DatabaseTest {
 
     @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.6.51");
 
-    default Connection getConnection() throws Exception {
-        return MYSQL_EXTENSION.getConnection();
+    default DataSource getDataSource() throws Exception {
+        return MYSQL_EXTENSION.getDataSource();
     }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
index 0b8a696..b1cc46d 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
@@ -21,14 +21,14 @@
 
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
 
 /** MySQL 5.7.x database for testing. */
 public interface MySQL57TestBase extends DatabaseTest {
 
     @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.7.41");
 
-    default Connection getConnection() throws Exception {
-        return MYSQL_EXTENSION.getConnection();
+    default DataSource getDataSource() throws Exception {
+        return MYSQL_EXTENSION.getDataSource();
     }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
index daf7788..d133011 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
@@ -21,14 +21,14 @@
 
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
 
 /** MySQL 8.x database for testing. */
 public interface MySQL8TestBase extends DatabaseTest {
 
     @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("8.0.32");
 
-    default Connection getConnection() throws Exception {
-        return MYSQL_EXTENSION.getConnection();
+    default DataSource getDataSource() throws Exception {
+        return MYSQL_EXTENSION.getDataSource();
     }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
index f0873a6..2dfb1ac 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
 
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.testcontainers.containers.MySQLContainer;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.util.List;
 
 /** The extension of MySQL. */
@@ -50,9 +50,13 @@
                         .withEnv("MYSQL_ROOT_HOST", "%");
     }
 
-    public Connection getConnection() throws Exception {
-        return DriverManager.getConnection(
-                container.getJdbcUrl(), container.getUsername(), container.getPassword());
+    public HikariDataSource getDataSource() throws Exception {
+        HikariConfig config = new HikariConfig();
+        config.setJdbcUrl(container.getJdbcUrl());
+        config.setUsername(container.getUsername());
+        config.setPassword(container.getPassword());
+        config.setValidationTimeout(1000);
+        return new HikariDataSource(config);
     }
 
     @Override
@@ -67,7 +71,8 @@
 
     @Override
     public void afterEach(ExtensionContext extensionContext) throws Exception {
-        try (var conn = getConnection();
+        try (var dataSource = getDataSource();
+                var conn = dataSource.getConnection();
                 var st = conn.createStatement()) {
             for (var tableName : TABLES) {
                 st.executeUpdate(String.format("DELETE from %s", tableName));
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
index ba4fd2f..a54657f 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres;
 
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.testcontainers.containers.PostgreSQLContainer;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.util.List;
 
 /** The extension of PostgreSQL. */
@@ -49,9 +49,13 @@
                         .withEnv("POSTGRES_MAX_CONNECTIONS", "10");
     }
 
-    public Connection getConnection() throws Exception {
-        return DriverManager.getConnection(
-                container.getJdbcUrl(), container.getUsername(), container.getPassword());
+    public HikariDataSource getDataSource() throws Exception {
+        HikariConfig config = new HikariConfig();
+        config.setJdbcUrl(container.getJdbcUrl());
+        config.setUsername(container.getUsername());
+        config.setPassword(container.getPassword());
+        config.setValidationTimeout(1000);
+        return new HikariDataSource(config);
     }
 
     @Override
@@ -66,7 +70,8 @@
 
     @Override
     public void afterEach(ExtensionContext extensionContext) throws Exception {
-        try (var conn = getConnection();
+        try (var dataSource = getDataSource();
+                var conn = dataSource.getConnection();
                 var st = conn.createStatement()) {
             for (var tableName : TABLES) {
                 st.executeUpdate(String.format("DELETE from %s", tableName));
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
index 1e27a6e..4970cb9 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
@@ -21,14 +21,14 @@
 
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.sql.Connection;
+import javax.sql.DataSource;
 
 /** PostgreSQL database for testing. */
 public interface PostgreSQLTestBase extends DatabaseTest {
 
     @RegisterExtension PostgreSQLExtension POSTGRE_SQL_EXTENSION = new PostgreSQLExtension("15.1");
 
-    default Connection getConnection() throws Exception {
-        return POSTGRE_SQL_EXTENSION.getConnection();
+    default DataSource getDataSource() throws Exception {
+        return POSTGRE_SQL_EXTENSION.getDataSource();
     }
 }
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
index 2f58ca6..f6eacb6 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
@@ -73,8 +73,8 @@
     private static <KEY, Context extends JobAutoScalerContext<KEY>>
             AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf)
                     throws Exception {
-        var conn = HikariJDBCUtil.getConnection(conf);
+        var dataSource = HikariJDBCUtil.getDataSource(conf);
         return new JdbcAutoScalerEventHandler<>(
-                new JdbcEventInteractor(conn), conf.get(JDBC_EVENT_HANDLER_TTL));
+                new JdbcEventInteractor(dataSource), conf.get(JDBC_EVENT_HANDLER_TTL));
     }
 }
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
index 0528362..a21cea9 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
@@ -74,7 +74,8 @@
     private static <KEY, Context extends JobAutoScalerContext<KEY>>
             AutoScalerStateStore<KEY, Context> createJdbcStateStore(Configuration conf)
                     throws Exception {
-        var conn = HikariJDBCUtil.getConnection(conf);
-        return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new JdbcStateInteractor(conn)));
+        var dataSource = HikariJDBCUtil.getDataSource(conf);
+        return new JdbcAutoScalerStateStore<>(
+                new JdbcStateStore(new JdbcStateInteractor(dataSource)));
     }
 }
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index a87de96..4a83743 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -33,7 +33,6 @@
 
 import javax.annotation.Nonnull;
 
-import java.io.Closeable;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -56,7 +55,7 @@
 
 /** The executor of the standalone autoscaler. */
 public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerContext<KEY>>
-        implements Closeable {
+        implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerExecutor.class);
 
@@ -113,7 +112,7 @@
     }
 
     @Override
-    public void close() {
+    public void close() throws Exception {
         scheduledExecutorService.shutdownNow();
         scalingThreadPool.shutdownNow();
         eventHandler.close();
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java
index 65fb964..141b12b 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java
@@ -23,7 +23,6 @@
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
 
-import java.sql.Connection;
 import java.sql.SQLException;
 
 import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
@@ -39,7 +38,7 @@
                     "%s is required when jdbc state store or jdbc event handler is used.",
                     JDBC_URL.key());
 
-    public static Connection getConnection(Configuration conf) throws SQLException {
+    public static HikariDataSource getDataSource(Configuration conf) throws SQLException {
         final var jdbcUrl = conf.get(JDBC_URL);
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(jdbcUrl), JDBC_URL_REQUIRED_HINT);
         var user = conf.get(JDBC_USERNAME);
@@ -48,6 +47,6 @@
         hikariConfig.setJdbcUrl(jdbcUrl);
         hikariConfig.setUsername(user);
         hikariConfig.setPassword(password);
-        return new HikariDataSource(hikariConfig).getConnection();
+        return new HikariDataSource(hikariConfig);
     }
 }
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
index 2ab21aa..f8d0e33 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
@@ -68,14 +68,16 @@
         final var conf = new Configuration();
         conf.set(EVENT_HANDLER_TYPE, JDBC);
         conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
-        HikariJDBCUtil.getConnection(conf).close();
+        HikariJDBCUtil.getDataSource(conf).close();
 
         var eventHandler = AutoscalerEventHandlerFactory.create(conf);
         assertThat(eventHandler).isInstanceOf(JdbcAutoScalerEventHandler.class);
+        conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
 
         try {
-            conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
-            HikariJDBCUtil.getConnection(conf).close();
+            var datasource = HikariJDBCUtil.getDataSource(conf);
+            datasource.getConnection().close();
+            datasource.close();
         } catch (RuntimeException ignored) {
             // database shutdown ignored exception
         }
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
index 7ef3c4f..eae17f6 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
@@ -67,14 +67,16 @@
         final var conf = new Configuration();
         conf.set(STATE_STORE_TYPE, JDBC);
         conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
-        HikariJDBCUtil.getConnection(conf).close();
+        HikariJDBCUtil.getDataSource(conf).close();
 
         var stateStore = AutoscalerStateStoreFactory.create(conf);
         assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);
+        conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
 
         try {
-            conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
-            HikariJDBCUtil.getConnection(conf).close();
+            var datasource = HikariJDBCUtil.getDataSource(conf);
+            datasource.getConnection().close();
+            datasource.close();
         } catch (RuntimeException ignored) {
             // database shutdown ignored exception
         }
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index b17d4ba..082f84b 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -110,7 +110,7 @@
     }
 
     @Test
-    void testFetchException() {
+    void testFetchException() throws Exception {
         var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
         try (var autoscalerExecutor =
                 new StandaloneAutoscalerExecutor<>(
@@ -137,7 +137,7 @@
     }
 
     @Test
-    void testScalingParallelism() {
+    void testScalingParallelism() throws Exception {
         var parallelism = 10;
 
         var jobList = new ArrayList<JobAutoScalerContext<JobID>>();
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index 0fcf50b..badc469 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -27,7 +27,6 @@
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.time.Duration;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -44,7 +43,7 @@
  */
 @Experimental
 public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>>
-        extends Closeable {
+        extends AutoCloseable {
     String SCALING_SUMMARY_ENTRY =
             "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}";
     String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism change:";
@@ -101,9 +100,6 @@
                 interval);
     }
 
-    /** Close the related resource. */
-    default void close() {}
-
     static String scalingReport(Map<JobVertexID, ScalingSummary> scalingSummaries, String message) {
         StringBuilder sb = new StringBuilder(message);
         scalingSummaries.forEach(
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java
index 2724034..05fc48d 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java
@@ -49,4 +49,7 @@
                 messageKey,
                 interval);
     }
+
+    @Override
+    public void close() throws Exception {}
 }
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 64e4dd5..08d3051 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -39,7 +39,8 @@
  * @param <Context> Instance of JobAutoScalerContext.
  */
 @Experimental
-public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
+public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>>
+        extends AutoCloseable {
 
     void storeScalingHistory(
             Context jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory)
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index c660f73..e3cdd61 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -191,4 +191,7 @@
                         delayedScaleDownStore)
                 .anyMatch(m -> m.containsKey(k));
     }
+
+    @Override
+    public void close() throws Exception {}
 }
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
index 7151c9c..865dc69 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
@@ -77,6 +77,9 @@
         return context.getJobID() + type.name() + reason + message;
     }
 
+    @Override
+    public void close() throws Exception {}
+
     /** The collected event. */
     public static class Event<KEY, Context extends JobAutoScalerContext<KEY>> {
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
index 1a836a5..55bd3ea 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
@@ -103,4 +103,7 @@
                     labels);
         }
     }
+
+    @Override
+    public void close() throws Exception {}
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index da44597..ed6549a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -403,4 +403,7 @@
         loaderOptions.setCodePointLimit(20 * 1024 * 1024);
         return YAMLFactory.builder().loaderOptions(loaderOptions).build();
     }
+
+    @Override
+    public void close() throws Exception {}
 }