[FLINK-36696] [flink-autoscaler-plugin-jdbc] Switch sql connection usages to datasource (#929)
* [FLINK-36696] [flink-autoscaler-plugin-jdbc] Switch sql connection usages to datasource
* Use the dataSource instead of `getDataSource()` to avoid connection leak
---------
Co-authored-by: Rui Fan <1996fanrui@gmail.com>
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
index 7613717..8faef51 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
@@ -135,11 +135,12 @@
}
@Override
- public void close() {
+ public void close() throws Exception {
if (Objects.nonNull(scheduledEventHandlerCleaner)
&& !scheduledEventHandlerCleaner.isShutdown()) {
scheduledEventHandlerCleaner.shutdownNow();
}
+ jdbcEventInteractor.close();
}
@VisibleForTesting
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
index ce3fc48..f40740a 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
@@ -23,8 +23,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.sql.DataSource;
-import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
@@ -37,13 +37,13 @@
import static org.apache.flink.util.Preconditions.checkState;
/** Responsible for interacting with the database. */
-public class JdbcEventInteractor {
+public class JdbcEventInteractor implements AutoCloseable {
- private final Connection conn;
+ private final DataSource dataSource;
private Clock clock = Clock.systemDefaultZone();
- public JdbcEventInteractor(Connection conn) {
- this.conn = conn;
+ public JdbcEventInteractor(DataSource dataSource) {
+ this.dataSource = dataSource;
}
public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey)
@@ -52,7 +52,8 @@
"select * from t_flink_autoscaler_event_handler "
+ "where job_key = ? and reason = ? and event_key = ? ";
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
pstmt.setString(2, reason);
pstmt.setString(3, eventKey);
@@ -99,7 +100,8 @@
+ " values (?, ?, ?, ?, ?, ?, ?, ?)";
var createTime = Timestamp.from(clock.instant());
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
pstmt.setTimestamp(1, createTime);
pstmt.setTimestamp(2, createTime);
pstmt.setString(3, jobKey);
@@ -117,7 +119,8 @@
"UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?";
var updateTime = Timestamp.from(clock.instant());
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
pstmt.setTimestamp(1, updateTime);
pstmt.setString(2, message);
pstmt.setInt(3, eventCount);
@@ -136,7 +139,8 @@
"select * from t_flink_autoscaler_event_handler "
+ "where job_key = ? and reason = ? ";
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
pstmt.setString(2, reason);
@@ -160,7 +164,8 @@
"SELECT id from t_flink_autoscaler_event_handler "
+ " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
+ " and create_time < ?";
- try (var pstmt = conn.prepareStatement(sql)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(sql)) {
pstmt.setObject(1, timestamp);
ResultSet resultSet = pstmt.executeQuery();
return resultSet.next() ? resultSet.getLong(1) : null;
@@ -171,11 +176,19 @@
throws Exception {
var query =
"delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
pstmt.setObject(1, startId);
pstmt.setObject(2, endId);
pstmt.setObject(3, timestamp);
return pstmt.executeUpdate();
}
}
+
+ @Override
+ public void close() throws Exception {
+ if (dataSource instanceof AutoCloseable) {
+ ((AutoCloseable) dataSource).close();
+ }
+ }
}
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 1ed3ad9..6bc4887 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -333,4 +333,9 @@
throws JacksonException {
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
}
+
+ @Override
+ public void close() throws Exception {
+ jdbcStateStore.close();
+ }
}
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
index 7c12bf8..9ddc5c5 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
@@ -20,7 +20,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
+import javax.sql.DataSource;
+
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collections;
@@ -31,21 +32,22 @@
import static org.apache.flink.util.Preconditions.checkState;
/** Responsible for interacting with the database. */
-public class JdbcStateInteractor {
+public class JdbcStateInteractor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class);
- private final Connection conn;
+ private final DataSource dataSource;
- public JdbcStateInteractor(Connection conn) {
- this.conn = conn;
+ public JdbcStateInteractor(DataSource dataSource) {
+ this.dataSource = dataSource;
}
public Map<StateType, String> queryData(String jobKey) throws Exception {
var query =
"select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?";
var data = new HashMap<StateType, String>();
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
var rs = pstmt.executeQuery();
while (rs.next()) {
@@ -63,7 +65,8 @@
String.format(
"DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)",
String.join(",", Collections.nCopies(deletedStateTypes.size(), "?")));
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
int i = 2;
for (var stateType : deletedStateTypes) {
@@ -80,7 +83,8 @@
var query =
"INSERT INTO t_flink_autoscaler_state_store (update_time, job_key, state_type, state_value) values (?, ?, ?, ?)";
var updateTime = Timestamp.from(Instant.now());
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
for (var stateType : createdStateTypes) {
pstmt.setTimestamp(1, updateTime);
pstmt.setString(2, jobKey);
@@ -106,7 +110,8 @@
"UPDATE t_flink_autoscaler_state_store set update_time = ?, state_value = ? where job_key = ? and state_type = ?";
var updateTime = Timestamp.from(Instant.now());
- try (var pstmt = conn.prepareStatement(query)) {
+ try (var conn = dataSource.getConnection();
+ var pstmt = conn.prepareStatement(query)) {
for (var stateType : updatedStateTypes) {
pstmt.setTimestamp(1, updateTime);
@@ -123,4 +128,11 @@
pstmt.executeBatch();
}
}
+
+ @Override
+ public void close() throws Exception {
+ if (dataSource instanceof AutoCloseable) {
+ ((AutoCloseable) dataSource).close();
+ }
+ }
}
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
index 1e237a4..e891516 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
@@ -24,7 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
/** The jdbc state store. */
-public class JdbcStateStore {
+public class JdbcStateStore implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(JdbcStateStore.class);
@@ -89,4 +89,9 @@
private JobStateView createJobStateView(String jobKey) throws Exception {
return new JobStateView(jdbcStateInteractor, jobKey);
}
+
+ @Override
+ public void close() throws Exception {
+ jdbcStateInteractor.close();
+ }
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
index f527817..10c7486 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
@@ -42,6 +42,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import javax.annotation.Nonnull;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -73,20 +74,22 @@
generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent);
private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault());
+ private DataSource dataSource;
private CountableJdbcEventInteractor jdbcEventInteractor;
private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler;
private JobAutoScalerContext<JobID> ctx;
@BeforeEach
void beforeEach() throws Exception {
- jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection());
+ dataSource = getDataSource();
+ jdbcEventInteractor = new CountableJdbcEventInteractor(dataSource);
jdbcEventInteractor.setClock(defaultClock);
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO);
ctx = createDefaultJobAutoScalerContext();
}
@AfterEach
- void tearDown() {
+ void tearDown() throws Exception {
eventHandler.close();
}
@@ -459,7 +462,7 @@
.max(Comparable::compareTo)
.orElseThrow();
- try (Connection connection = getConnection();
+ try (Connection connection = dataSource.getConnection();
PreparedStatement ps =
connection.prepareStatement(
"update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
@@ -505,6 +508,7 @@
Duration eventHandlerTtl,
int unexpiredRecordsNum)
throws Exception {
+
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl);
// Init the expired records.
@@ -525,7 +529,7 @@
eventHandler.cleanExpiredEvents();
- try (Connection connection = getConnection();
+ try (Connection connection = dataSource.getConnection();
PreparedStatement ps =
connection.prepareStatement(
"select count(1) from t_flink_autoscaler_event_handler");
@@ -542,7 +546,7 @@
if (minId == null) {
return;
}
- try (Connection connection = getConnection();
+ try (Connection connection = dataSource.getConnection();
PreparedStatement ps =
connection.prepareStatement(
"delete from t_flink_autoscaler_event_handler where id = ?")) {
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
index 2fe6920..f5f7ca6 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
@@ -46,8 +46,7 @@
// The datetime precision is seconds in MySQL by default.
var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
- try (var conn = getConnection()) {
- var jdbcEventInteractor = new JdbcEventInteractor(conn);
+ try (var jdbcEventInteractor = new JdbcEventInteractor(getDataSource())) {
jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));
jdbcEventInteractor.createEvent(
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
index 39ccd76..2ea04a7 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
@@ -19,7 +19,8 @@
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
-import java.sql.Connection;
+import javax.sql.DataSource;
+
import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,8 +35,8 @@
private final AtomicLong updateCounter;
private final AtomicLong deleteExpiredCounter;
- public CountableJdbcEventInteractor(Connection conn) {
- super(conn);
+ public CountableJdbcEventInteractor(DataSource dataSource) {
+ super(dataSource);
queryCounter = new AtomicLong();
createCounter = new AtomicLong();
updateCounter = new AtomicLong();
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
index da737a5..159fc68 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
@@ -42,8 +42,7 @@
var value1 = "value1";
var value2 = "value2";
var value3 = "value3";
- try (var conn = getConnection()) {
- var jdbcStateInteractor = new JdbcStateInteractor(conn);
+ try (var jdbcStateInteractor = new JdbcStateInteractor(getDataSource())) {
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
// Test for creating data.
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
index 5ab4713..bb50ce6 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
@@ -28,7 +28,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.sql.Connection;
+import javax.sql.DataSource;
+
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@@ -45,21 +46,21 @@
abstract class AbstractJdbcStateStoreITCase implements DatabaseTest {
private static final String DEFAULT_JOB_KEY = "jobKey";
- private Connection conn;
+ private DataSource dataSource;
private CountableJdbcStateInteractor jdbcStateInteractor;
private JdbcStateStore jdbcStateStore;
@BeforeEach
void beforeEach() throws Exception {
- this.conn = getConnection();
- this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
+ this.dataSource = getDataSource();
+ this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource);
this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor);
}
@AfterEach
- void afterEach() throws SQLException {
- if (conn != null) {
- conn.close();
+ void afterEach() throws Exception {
+ if (dataSource instanceof AutoCloseable) {
+ ((AutoCloseable) dataSource).close();
}
}
@@ -178,7 +179,7 @@
assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
// Modify the database directly.
- var tmpJdbcInteractor = new JdbcStateInteractor(conn);
+ var tmpJdbcInteractor = new JdbcStateInteractor(dataSource);
tmpJdbcInteractor.createData(
DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1));
assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
@@ -205,7 +206,7 @@
final var expectedException = new RuntimeException("Database isn't stable.");
var exceptionableJdbcStateInteractor =
- new CountableJdbcStateInteractor(conn) {
+ new CountableJdbcStateInteractor(dataSource) {
private final AtomicBoolean isFirst = new AtomicBoolean(true);
@Override
@@ -282,7 +283,7 @@
private Optional<String> getValueFromDatabase(String jobKey, StateType stateType)
throws Exception {
- var jdbcInteractor = new JdbcStateInteractor(conn);
+ var jdbcInteractor = new JdbcStateInteractor(dataSource);
return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
}
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
index 8fafc95..c1cb86e 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
@@ -17,7 +17,8 @@
package org.apache.flink.autoscaler.jdbc.state;
-import java.sql.Connection;
+import javax.sql.DataSource;
+
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,8 +33,8 @@
private final AtomicLong createCounter;
private final AtomicLong updateCounter;
- public CountableJdbcStateInteractor(Connection conn) {
- super(conn);
+ public CountableJdbcStateInteractor(DataSource dataSource) {
+ super(dataSource);
queryCounter = new AtomicLong();
deleteCounter = new AtomicLong();
createCounter = new AtomicLong();
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
index a79c07b..31dbd07 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
@@ -48,7 +48,7 @@
@Override
protected void preSetup() throws Exception {
- jdbcStateStore = new JdbcStateStore(new JdbcStateInteractor(getConnection()));
+ jdbcStateStore = new JdbcStateStore(new JdbcStateInteractor(getDataSource()));
cachedStateStore = new JdbcAutoScalerStateStore<>(jdbcStateStore);
}
@@ -56,7 +56,7 @@
protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
createPhysicalAutoScalerStateStore() throws Exception {
return new JdbcAutoScalerStateStore<>(
- new JdbcStateStore(new JdbcStateInteractor(getConnection())));
+ new JdbcStateStore(new JdbcStateInteractor(getDataSource())));
}
@Override
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
index 3989dbf..9f6d7ba 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
@@ -23,8 +23,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.sql.Connection;
-import java.sql.SQLException;
+import javax.sql.DataSource;
+
import java.util.Optional;
import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
@@ -36,22 +36,22 @@
class JobStateViewTest implements DerbyTestBase {
private static final String DEFAULT_JOB_KEY = "jobKey";
- private Connection conn;
+ private DataSource dataSource;
private CountableJdbcStateInteractor jdbcStateInteractor;
private JobStateView jobStateView;
@BeforeEach
void beforeEach() throws Exception {
- this.conn = getConnection();
- this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
+ this.dataSource = getDataSource();
+ this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource);
this.jobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY);
jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
}
@AfterEach
- void afterEach() throws SQLException {
- if (conn != null) {
- conn.close();
+ void afterEach() throws Exception {
+ if (jdbcStateInteractor != null) {
+ jdbcStateInteractor.close();
}
}
@@ -195,7 +195,7 @@
}
private Optional<String> getValueFromDatabase(StateType stateType) throws Exception {
- var jdbcInteractor = new JdbcStateInteractor(conn);
+ var jdbcInteractor = new JdbcStateInteractor(dataSource);
return Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType));
}
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
index 261578b..27fce55 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
@@ -17,10 +17,10 @@
package org.apache.flink.autoscaler.jdbc.testutils.databases;
-import java.sql.Connection;
+import javax.sql.DataSource;
/** Database testing. */
public interface DatabaseTest {
- Connection getConnection() throws Exception;
+ DataSource getDataSource() throws Exception;
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
index 42566d9..d02fee5 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
@@ -17,12 +17,13 @@
package org.apache.flink.autoscaler.jdbc.testutils.databases.derby;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
-import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
@@ -34,8 +35,11 @@
List.of("t_flink_autoscaler_state_store", "t_flink_autoscaler_event_handler");
private static final String JDBC_URL = "jdbc:derby:memory:test";
- public Connection getConnection() throws Exception {
- return DriverManager.getConnection(JDBC_URL);
+ public HikariDataSource getDataSource() {
+ HikariConfig config = new HikariConfig();
+ config.setJdbcUrl(JDBC_URL);
+ config.setValidationTimeout(1000);
+ return new HikariDataSource(config);
}
@Override
@@ -76,7 +80,8 @@
var jobKeyReasonCreateTimeIndex =
"CREATE INDEX job_key_reason_create_time_idx ON t_flink_autoscaler_event_handler (job_key, reason, create_time)";
- try (var conn = getConnection();
+ try (var dataSource = getDataSource();
+ var conn = dataSource.getConnection();
var st = conn.createStatement()) {
st.execute(stateStoreDDL);
st.execute(createStateStoreIndex);
@@ -88,7 +93,8 @@
@Override
public void afterAll(ExtensionContext extensionContext) throws Exception {
- try (var conn = getConnection();
+ try (var dataSource = getDataSource();
+ var conn = dataSource.getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DROP TABLE %s", tableName));
@@ -103,7 +109,8 @@
@Override
public void afterEach(ExtensionContext extensionContext) throws Exception {
// Clean up all data
- try (var conn = getConnection();
+ try (var dataSource = getDataSource();
+ var conn = dataSource.getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DELETE from %s", tableName));
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
index e0ac3f7..feebb91 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
@@ -21,7 +21,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.sql.Connection;
+import javax.sql.DataSource;
/** Derby database for testing. */
public interface DerbyTestBase extends DatabaseTest {
@@ -29,7 +29,7 @@
@RegisterExtension DerbyExtension DERBY_EXTENSION = new DerbyExtension();
@Override
- default Connection getConnection() throws Exception {
- return DERBY_EXTENSION.getConnection();
+ default DataSource getDataSource() throws Exception {
+ return DERBY_EXTENSION.getDataSource();
}
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
index f2b1858..f76e92f 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
@@ -21,14 +21,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.sql.Connection;
+import javax.sql.DataSource;
/** MySQL 5.6.x database for testing. */
public interface MySQL56TestBase extends DatabaseTest {
@RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.6.51");
- default Connection getConnection() throws Exception {
- return MYSQL_EXTENSION.getConnection();
+ default DataSource getDataSource() throws Exception {
+ return MYSQL_EXTENSION.getDataSource();
}
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
index 0b8a696..b1cc46d 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
@@ -21,14 +21,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.sql.Connection;
+import javax.sql.DataSource;
/** MySQL 5.7.x database for testing. */
public interface MySQL57TestBase extends DatabaseTest {
@RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.7.41");
- default Connection getConnection() throws Exception {
- return MYSQL_EXTENSION.getConnection();
+ default DataSource getDataSource() throws Exception {
+ return MYSQL_EXTENSION.getDataSource();
}
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
index daf7788..d133011 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
@@ -21,14 +21,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.sql.Connection;
+import javax.sql.DataSource;
/** MySQL 8.x database for testing. */
public interface MySQL8TestBase extends DatabaseTest {
@RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("8.0.32");
- default Connection getConnection() throws Exception {
- return MYSQL_EXTENSION.getConnection();
+ default DataSource getDataSource() throws Exception {
+ return MYSQL_EXTENSION.getDataSource();
}
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
index f0873a6..2dfb1ac 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
@@ -17,14 +17,14 @@
package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.MySQLContainer;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.util.List;
/** The extension of MySQL. */
@@ -50,9 +50,13 @@
.withEnv("MYSQL_ROOT_HOST", "%");
}
- public Connection getConnection() throws Exception {
- return DriverManager.getConnection(
- container.getJdbcUrl(), container.getUsername(), container.getPassword());
+ public HikariDataSource getDataSource() throws Exception {
+ HikariConfig config = new HikariConfig();
+ config.setJdbcUrl(container.getJdbcUrl());
+ config.setUsername(container.getUsername());
+ config.setPassword(container.getPassword());
+ config.setValidationTimeout(1000);
+ return new HikariDataSource(config);
}
@Override
@@ -67,7 +71,8 @@
@Override
public void afterEach(ExtensionContext extensionContext) throws Exception {
- try (var conn = getConnection();
+ try (var dataSource = getDataSource();
+ var conn = dataSource.getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DELETE from %s", tableName));
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
index ba4fd2f..a54657f 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
@@ -17,14 +17,14 @@
package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.PostgreSQLContainer;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.util.List;
/** The extension of PostgreSQL. */
@@ -49,9 +49,13 @@
.withEnv("POSTGRES_MAX_CONNECTIONS", "10");
}
- public Connection getConnection() throws Exception {
- return DriverManager.getConnection(
- container.getJdbcUrl(), container.getUsername(), container.getPassword());
+ public HikariDataSource getDataSource() throws Exception {
+ HikariConfig config = new HikariConfig();
+ config.setJdbcUrl(container.getJdbcUrl());
+ config.setUsername(container.getUsername());
+ config.setPassword(container.getPassword());
+ config.setValidationTimeout(1000);
+ return new HikariDataSource(config);
}
@Override
@@ -66,7 +70,8 @@
@Override
public void afterEach(ExtensionContext extensionContext) throws Exception {
- try (var conn = getConnection();
+ try (var dataSource = getDataSource();
+ var conn = dataSource.getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DELETE from %s", tableName));
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
index 1e27a6e..4970cb9 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
@@ -21,14 +21,14 @@
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.sql.Connection;
+import javax.sql.DataSource;
/** PostgreSQL database for testing. */
public interface PostgreSQLTestBase extends DatabaseTest {
@RegisterExtension PostgreSQLExtension POSTGRE_SQL_EXTENSION = new PostgreSQLExtension("15.1");
- default Connection getConnection() throws Exception {
- return POSTGRE_SQL_EXTENSION.getConnection();
+ default DataSource getDataSource() throws Exception {
+ return POSTGRE_SQL_EXTENSION.getDataSource();
}
}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
index 2f58ca6..f6eacb6 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
@@ -73,8 +73,8 @@
private static <KEY, Context extends JobAutoScalerContext<KEY>>
AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf)
throws Exception {
- var conn = HikariJDBCUtil.getConnection(conf);
+ var dataSource = HikariJDBCUtil.getDataSource(conf);
return new JdbcAutoScalerEventHandler<>(
- new JdbcEventInteractor(conn), conf.get(JDBC_EVENT_HANDLER_TTL));
+ new JdbcEventInteractor(dataSource), conf.get(JDBC_EVENT_HANDLER_TTL));
}
}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
index 0528362..a21cea9 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
@@ -74,7 +74,8 @@
private static <KEY, Context extends JobAutoScalerContext<KEY>>
AutoScalerStateStore<KEY, Context> createJdbcStateStore(Configuration conf)
throws Exception {
- var conn = HikariJDBCUtil.getConnection(conf);
- return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new JdbcStateInteractor(conn)));
+ var dataSource = HikariJDBCUtil.getDataSource(conf);
+ return new JdbcAutoScalerStateStore<>(
+ new JdbcStateStore(new JdbcStateInteractor(dataSource)));
}
}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index a87de96..4a83743 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -33,7 +33,6 @@
import javax.annotation.Nonnull;
-import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -56,7 +55,7 @@
/** The executor of the standalone autoscaler. */
public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerContext<KEY>>
- implements Closeable {
+ implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerExecutor.class);
@@ -113,7 +112,7 @@
}
@Override
- public void close() {
+ public void close() throws Exception {
scheduledExecutorService.shutdownNow();
scalingThreadPool.shutdownNow();
eventHandler.close();
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java
index 65fb964..141b12b 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java
@@ -23,7 +23,6 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
-import java.sql.Connection;
import java.sql.SQLException;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
@@ -39,7 +38,7 @@
"%s is required when jdbc state store or jdbc event handler is used.",
JDBC_URL.key());
- public static Connection getConnection(Configuration conf) throws SQLException {
+ public static HikariDataSource getDataSource(Configuration conf) throws SQLException {
final var jdbcUrl = conf.get(JDBC_URL);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(jdbcUrl), JDBC_URL_REQUIRED_HINT);
var user = conf.get(JDBC_USERNAME);
@@ -48,6 +47,6 @@
hikariConfig.setJdbcUrl(jdbcUrl);
hikariConfig.setUsername(user);
hikariConfig.setPassword(password);
- return new HikariDataSource(hikariConfig).getConnection();
+ return new HikariDataSource(hikariConfig);
}
}
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
index 2ab21aa..f8d0e33 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
@@ -68,14 +68,16 @@
final var conf = new Configuration();
conf.set(EVENT_HANDLER_TYPE, JDBC);
conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
- HikariJDBCUtil.getConnection(conf).close();
+ HikariJDBCUtil.getDataSource(conf).close();
var eventHandler = AutoscalerEventHandlerFactory.create(conf);
assertThat(eventHandler).isInstanceOf(JdbcAutoScalerEventHandler.class);
+ conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
try {
- conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
- HikariJDBCUtil.getConnection(conf).close();
+ var datasource = HikariJDBCUtil.getDataSource(conf);
+ datasource.getConnection().close();
+ datasource.close();
} catch (RuntimeException ignored) {
// database shutdown ignored exception
}
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
index 7ef3c4f..eae17f6 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
@@ -67,14 +67,16 @@
final var conf = new Configuration();
conf.set(STATE_STORE_TYPE, JDBC);
conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
- HikariJDBCUtil.getConnection(conf).close();
+ HikariJDBCUtil.getDataSource(conf).close();
var stateStore = AutoscalerStateStoreFactory.create(conf);
assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);
+ conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
try {
- conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
- HikariJDBCUtil.getConnection(conf).close();
+ var datasource = HikariJDBCUtil.getDataSource(conf);
+ datasource.getConnection().close();
+ datasource.close();
} catch (RuntimeException ignored) {
// database shutdown ignored exception
}
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index b17d4ba..082f84b 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -110,7 +110,7 @@
}
@Test
- void testFetchException() {
+ void testFetchException() throws Exception {
var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
@@ -137,7 +137,7 @@
}
@Test
- void testScalingParallelism() {
+ void testScalingParallelism() throws Exception {
var parallelism = 10;
var jobList = new ArrayList<JobAutoScalerContext<JobID>>();
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index 0fcf50b..badc469 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -27,7 +27,6 @@
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.stream.Collectors;
@@ -44,7 +43,7 @@
*/
@Experimental
public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>>
- extends Closeable {
+ extends AutoCloseable {
String SCALING_SUMMARY_ENTRY =
"{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}";
String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism change:";
@@ -101,9 +100,6 @@
interval);
}
- /** Close the related resource. */
- default void close() {}
-
static String scalingReport(Map<JobVertexID, ScalingSummary> scalingSummaries, String message) {
StringBuilder sb = new StringBuilder(message);
scalingSummaries.forEach(
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java
index 2724034..05fc48d 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java
@@ -49,4 +49,7 @@
messageKey,
interval);
}
+
+ @Override
+ public void close() throws Exception {}
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 64e4dd5..08d3051 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -39,7 +39,8 @@
* @param <Context> Instance of JobAutoScalerContext.
*/
@Experimental
-public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
+public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>>
+ extends AutoCloseable {
void storeScalingHistory(
Context jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory)
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index c660f73..e3cdd61 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -191,4 +191,7 @@
delayedScaleDownStore)
.anyMatch(m -> m.containsKey(k));
}
+
+ @Override
+ public void close() throws Exception {}
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
index 7151c9c..865dc69 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
@@ -77,6 +77,9 @@
return context.getJobID() + type.name() + reason + message;
}
+ @Override
+ public void close() throws Exception {}
+
/** The collected event. */
public static class Event<KEY, Context extends JobAutoScalerContext<KEY>> {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
index 1a836a5..55bd3ea 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
@@ -103,4 +103,7 @@
labels);
}
}
+
+ @Override
+ public void close() throws Exception {}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index da44597..ed6549a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -403,4 +403,7 @@
loaderOptions.setCodePointLimit(20 * 1024 * 1024);
return YAMLFactory.builder().loaderOptions(loaderOptions).build();
}
+
+ @Override
+ public void close() throws Exception {}
}