[FLINK-36039][autoscaler] Support clean historical event handler records in JDBC event handler (#865)
---------
Co-authored-by: Rui Fan <fanrui@apache.org>
diff --git a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
index 7d724b7..45b5184 100644
--- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
+++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
@@ -39,6 +39,12 @@
<td>The port of flink cluster when the flink-cluster fetcher is used.</td>
</tr>
<tr>
+ <td><h5>autoscaler.standalone.jdbc.event-handler.ttl</h5></td>
+ <td style="word-wrap: break-word;">90 d</td>
+ <td>Duration</td>
+ <td>The time to live based on create time for the JDBC event handler records. When the config is set as '0', the ttl strategy for the records would be disabled.</td>
+ </tr>
+ <tr>
<td><h5>autoscaler.standalone.jdbc.password-env-variable</h5></td>
<td style="word-wrap: break-word;">"JDBC_PWD"</td>
<td>String</td>
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
index c42692a..5b45580 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
@@ -18,18 +18,27 @@
package org.apache.flink.autoscaler.jdbc.event;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
+import java.sql.Timestamp;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* The event handler which persists its event in JDBC related database.
@@ -38,13 +47,34 @@
* @param <Context> The job autoscaler context.
*/
@Experimental
+@Slf4j
public class JdbcAutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>>
implements AutoScalerEventHandler<KEY, Context> {
private final JdbcEventInteractor jdbcEventInteractor;
+ private final Duration eventHandlerTtl;
+ @Nullable private final ScheduledExecutorService scheduledEventHandlerCleaner;
- public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) {
+ public JdbcAutoScalerEventHandler(
+ JdbcEventInteractor jdbcEventInteractor, Duration eventHandlerTtl) {
this.jdbcEventInteractor = jdbcEventInteractor;
+ this.eventHandlerTtl = Preconditions.checkNotNull(eventHandlerTtl);
+
+ if (eventHandlerTtl.toMillis() <= 0) {
+ this.scheduledEventHandlerCleaner = null;
+ return;
+ }
+ this.scheduledEventHandlerCleaner =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("jdbc-autoscaler-events-cleaner")
+ .setDaemon(true)
+ .build());
+ this.scheduledEventHandlerCleaner.scheduleAtFixedRate(
+ this::cleanExpiredEvents,
+ Duration.ofDays(1).toMillis(),
+ Duration.ofDays(1).toMillis(),
+ TimeUnit.MILLISECONDS);
}
@SneakyThrows
@@ -104,6 +134,48 @@
}
}
+ @Override
+ public void close() {
+ if (Objects.nonNull(scheduledEventHandlerCleaner)
+ && !scheduledEventHandlerCleaner.isShutdown()) {
+ scheduledEventHandlerCleaner.shutdownNow();
+ }
+ }
+
+ @VisibleForTesting
+ void cleanExpiredEvents() {
+ final var batchSize = 2000;
+ final var sleepMs = 1000;
+
+ var date =
+ Timestamp.from(
+ jdbcEventInteractor
+ .getCurrentInstant()
+ .minusMillis(eventHandlerTtl.toMillis()));
+ try {
+ var deletedTotalCount = 0L;
+ while (true) {
+ Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+ if (Objects.isNull(minId)) {
+ log.info(
+ "Deleted expired {} event handler records successfully",
+ deletedTotalCount);
+ break;
+ }
+
+ for (long startId = minId;
+ jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate(
+ startId, startId + batchSize, date)
+ == batchSize;
+ startId += batchSize) {
+ Thread.sleep(sleepMs);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error in cleaning expired event handler records.", e);
+ }
+ }
+
/**
* @return True means the existing event is still in the interval duration we can update it.
* Otherwise, it's too early, we should create a new one instead of updating it.
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
index 5e2d616..ce3fc48 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
@@ -22,6 +22,7 @@
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -152,4 +153,29 @@
void setClock(@Nonnull Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}
+
+ @Nullable
+ Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
+ var sql =
+ "SELECT id from t_flink_autoscaler_event_handler "
+ + " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
+ + " and create_time < ?";
+ try (var pstmt = conn.prepareStatement(sql)) {
+ pstmt.setObject(1, timestamp);
+ ResultSet resultSet = pstmt.executeQuery();
+ return resultSet.next() ? resultSet.getLong(1) : null;
+ }
+ }
+
+ int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp)
+ throws Exception {
+ var query =
+ "delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
+ try (var pstmt = conn.prepareStatement(query)) {
+ pstmt.setObject(1, startId);
+ pstmt.setObject(2, endId);
+ pstmt.setObject(3, timestamp);
+ return pstmt.executeUpdate();
+ }
+ }
}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
index b08030f..f527817 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
@@ -33,19 +33,30 @@
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import javax.annotation.Nonnull;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Timestamp;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Map;
+import java.util.stream.Stream;
import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON;
import static org.assertj.core.api.Assertions.assertThat;
/** The abstract IT case for {@link JdbcAutoScalerEventHandler}. */
@@ -60,6 +71,7 @@
private final Instant createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
private final Map<JobVertexID, ScalingSummary> scalingSummaries =
generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent);
+ private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault());
private CountableJdbcEventInteractor jdbcEventInteractor;
private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler;
@@ -68,11 +80,16 @@
@BeforeEach
void beforeEach() throws Exception {
jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection());
- jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));
- eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor);
+ jdbcEventInteractor.setClock(defaultClock);
+ eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO);
ctx = createDefaultJobAutoScalerContext();
}
+ @AfterEach
+ void tearDown() {
+ eventHandler.close();
+ }
+
/** All events shouldn't be deduplicated when interval is null. */
@Test
void testEventWithoutInterval() throws Exception {
@@ -254,8 +271,7 @@
assertThat(
jdbcEventInteractor.queryEvents(
- ctx.getJobKey().toString(),
- AutoScalerEventHandler.SCALING_REPORT_REASON))
+ ctx.getJobKey().toString(), SCALING_REPORT_REASON))
.singleElement()
.satisfies(
event -> {
@@ -283,8 +299,7 @@
assertThat(
jdbcEventInteractor.queryEvents(
- ctx.getJobKey().toString(),
- AutoScalerEventHandler.SCALING_REPORT_REASON))
+ ctx.getJobKey().toString(), SCALING_REPORT_REASON))
.as("All scaling events shouldn't be deduplicated when scaling happens.")
.hasSize(2)
.satisfiesExactlyInAnyOrder(
@@ -322,8 +337,7 @@
assertThat(
jdbcEventInteractor.queryEvents(
- ctx.getJobKey().toString(),
- AutoScalerEventHandler.SCALING_REPORT_REASON))
+ ctx.getJobKey().toString(), SCALING_REPORT_REASON))
.as(
"The event should be deduplicated when parallelism is not changed and within the interval.")
.singleElement()
@@ -360,8 +374,7 @@
assertThat(
jdbcEventInteractor.queryEvents(
- ctx.getJobKey().toString(),
- AutoScalerEventHandler.SCALING_REPORT_REASON))
+ ctx.getJobKey().toString(), SCALING_REPORT_REASON))
.as("We should create a new event when the old event is too early.")
.hasSize(2)
.satisfiesExactlyInAnyOrder(
@@ -401,8 +414,7 @@
assertThat(
jdbcEventInteractor.queryEvents(
- ctx.getJobKey().toString(),
- AutoScalerEventHandler.SCALING_REPORT_REASON))
+ ctx.getJobKey().toString(), SCALING_REPORT_REASON))
.as("We should create a new event when the old event is too early.")
.hasSize(2)
.satisfiesExactlyInAnyOrder(
@@ -430,6 +442,131 @@
});
}
+ @Test
+ void testDeleteCounterWhenIdNotConsecutive() throws Exception {
+ // Create 2 events.
+ final Duration ttl = Duration.ofDays(1L);
+ eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, ttl);
+ initTestingEventHandlerRecords(2);
+
+ // Simulate ids are not consecutive.
+ var events =
+ jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), SCALING_REPORT_REASON);
+ assertThat(events).hasSize(2);
+ var maxId =
+ events.stream()
+ .map(AutoScalerEvent::getId)
+ .max(Comparable::compareTo)
+ .orElseThrow();
+
+ try (Connection connection = getConnection();
+ PreparedStatement ps =
+ connection.prepareStatement(
+ "update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
+ ps.setLong(1, maxId + 1_000_000);
+ ps.setLong(2, maxId);
+ ps.execute();
+ }
+
+ // Reset the clock to clean all expired data.
+ jdbcEventInteractor.setClock(
+ Clock.fixed(
+ jdbcEventInteractor
+ .getCurrentInstant()
+ .plus(ttl)
+ .plus(Duration.ofMillis(1)),
+ ZoneId.systemDefault()));
+
+ eventHandler.cleanExpiredEvents();
+ jdbcEventInteractor.assertDeleteExpiredCounter(2L);
+ }
+
+ private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() {
+ return Stream.of(
+ Arguments.of(false, 128, Duration.ofMinutes(2), 10),
+ Arguments.of(true, 256, Duration.ofMinutes(2), 0),
+ Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 12),
+ Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 0),
+ Arguments.of(true, 512, Duration.ofMinutes(100), 3),
+ Arguments.of(false, 64, Duration.ofMinutes(100), 0),
+ Arguments.of(true, 1024 * 9, Duration.ofMinutes(100), 64),
+ Arguments.of(false, 1024 * 9, Duration.ofMinutes(100), 0),
+ Arguments.of(false, 0, Duration.ofMinutes(100), 128),
+ Arguments.of(false, 0, Duration.ofMinutes(100), 0));
+ }
+
+ @MethodSource("getExpiredEventHandlersCaseMatrix")
+ @ParameterizedTest(
+ name =
+ "tryIdNotSequential:{0}, expiredRecordsNum: {1}, eventHandlerTtl: {2}, unexpiredRecordsNum: {3}")
+ void testCleanExpiredEvents(
+ boolean tryIdNotSequential,
+ int expiredRecordsNum,
+ Duration eventHandlerTtl,
+ int unexpiredRecordsNum)
+ throws Exception {
+ eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl);
+
+ // Init the expired records.
+ initTestingEventHandlerRecords(expiredRecordsNum);
+ if (tryIdNotSequential) {
+ tryDeleteOneRecord(expiredRecordsNum);
+ }
+ var expiredInstant = jdbcEventInteractor.getCurrentInstant();
+
+ // Init the unexpired records.
+ initTestingEventHandlerRecords(unexpiredRecordsNum);
+
+ // Reset the clock to clean all expired data.
+ jdbcEventInteractor.setClock(
+ Clock.fixed(
+ expiredInstant.plus(eventHandlerTtl).plus(Duration.ofMillis(1)),
+ ZoneId.systemDefault()));
+
+ eventHandler.cleanExpiredEvents();
+
+ try (Connection connection = getConnection();
+ PreparedStatement ps =
+ connection.prepareStatement(
+ "select count(1) from t_flink_autoscaler_event_handler");
+ ResultSet countResultSet = ps.executeQuery()) {
+ countResultSet.next();
+ assertThat(countResultSet.getInt(1)).isEqualTo(unexpiredRecordsNum);
+ }
+ }
+
+ private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception {
+ // To simulate non-sequential IDs in expired records.
+ Timestamp date = Timestamp.from(createTime);
+ Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+ if (minId == null) {
+ return;
+ }
+ try (Connection connection = getConnection();
+ PreparedStatement ps =
+ connection.prepareStatement(
+ "delete from t_flink_autoscaler_event_handler where id = ?")) {
+ ps.setObject(1, (minId + expiredRecordsNum) / 2);
+ ps.execute();
+ }
+ }
+
+ private void initTestingEventHandlerRecords(int recordsNum) {
+ for (int i = 0; i < recordsNum; i++) {
+ jdbcEventInteractor.setClock(
+ Clock.fixed(
+ jdbcEventInteractor.getCurrentInstant().plusSeconds(1),
+ ZoneId.systemDefault()));
+ eventHandler.handleEvent(
+ ctx,
+ AutoScalerEventHandler.Type.Normal,
+ SCALING_REPORT_REASON,
+ "message-" + i,
+ "messageKey-" + i,
+ null);
+ }
+ }
+
private void createFirstScalingEvent() throws Exception {
jdbcEventInteractor.assertCounters(0, 0, 0);
eventHandler.handleScalingEvent(
@@ -441,8 +578,7 @@
assertThat(
jdbcEventInteractor.queryEvents(
- ctx.getJobKey().toString(),
- AutoScalerEventHandler.SCALING_REPORT_REASON))
+ ctx.getJobKey().toString(), SCALING_REPORT_REASON))
.singleElement()
.satisfies(
event -> {
@@ -523,7 +659,7 @@
assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime);
assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime);
assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString());
- assertThat(event.getReason()).isEqualTo(AutoScalerEventHandler.SCALING_REPORT_REASON);
+ assertThat(event.getReason()).isEqualTo(SCALING_REPORT_REASON);
assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString());
assertThat(event.getCount()).isEqualTo(expectedCount);
}
@@ -531,7 +667,20 @@
/** Test {@link JdbcAutoScalerEventHandler} via Derby. */
class DerbyJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
- implements DerbyTestBase {}
+ implements DerbyTestBase {
+
+ @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
+ @Override
+ void testCleanExpiredEvents(
+ boolean tryIdNotSequential,
+ int expiredRecordsNum,
+ Duration eventHandlerTtl,
+ int unexpiredRecordsNum) {}
+
+ @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
+ @Override
+ void testDeleteCounterWhenIdNotConsecutive() {}
+}
/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */
class MySQL56JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
@@ -547,4 +696,9 @@
/** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */
class PostgreSQLJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
- implements PostgreSQLTestBase {}
+ implements PostgreSQLTestBase {
+
+ @Disabled("Disabled due to the column 'id' can only be updated to DEFAULT.")
+ @Override
+ void testDeleteCounterWhenIdNotConsecutive() {}
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
index 664a28b..39ccd76 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
@@ -20,6 +20,7 @@
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import java.sql.Connection;
+import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
@@ -31,12 +32,14 @@
private final AtomicLong queryCounter;
private final AtomicLong createCounter;
private final AtomicLong updateCounter;
+ private final AtomicLong deleteExpiredCounter;
public CountableJdbcEventInteractor(Connection conn) {
super(conn);
queryCounter = new AtomicLong();
createCounter = new AtomicLong();
updateCounter = new AtomicLong();
+ deleteExpiredCounter = new AtomicLong();
}
@Override
@@ -64,10 +67,21 @@
super.updateEvent(id, message, eventCount);
}
+ @Override
+ int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp)
+ throws Exception {
+ deleteExpiredCounter.incrementAndGet();
+ return super.deleteExpiredEventsByIdRangeAndDate(startId, endId, timestamp);
+ }
+
public void assertCounters(
long expectedQueryCounter, long expectedUpdateCounter, long expectedCreateCounter) {
assertThat(queryCounter).hasValue(expectedQueryCounter);
assertThat(updateCounter).hasValue(expectedUpdateCounter);
assertThat(createCounter).hasValue(expectedCreateCounter);
}
+
+ public void assertDeleteExpiredCounter(long expectedCounter) {
+ assertThat(deleteExpiredCounter).hasValue(expectedCounter);
+ }
}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
index 5d063ca..2f58ca6 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
@@ -30,6 +30,7 @@
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
+import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_EVENT_HANDLER_TTL;
import static org.apache.flink.configuration.description.TextElement.text;
/** The factory of {@link AutoScalerEventHandler}. */
@@ -73,6 +74,7 @@
AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf)
throws Exception {
var conn = HikariJDBCUtil.getConnection(conf);
- return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn));
+ return new JdbcAutoScalerEventHandler<>(
+ new JdbcEventInteractor(conn), conf.get(JDBC_EVENT_HANDLER_TTL));
}
}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index a075b71..87875ec 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -113,6 +113,7 @@
public void close() {
scheduledExecutorService.shutdownNow();
scalingThreadPool.shutdownNow();
+ eventHandler.close();
}
/**
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
index edc3054..15b9b6a 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
@@ -121,4 +121,12 @@
code(EVENT_HANDLER_TYPE.key()),
code("JDBC"))
.build());
+
+ public static final ConfigOption<Duration> JDBC_EVENT_HANDLER_TTL =
+ autoscalerStandaloneConfig("jdbc.event-handler.ttl")
+ .durationType()
+ .defaultValue(Duration.ofDays(90))
+ .withDescription(
+ "The time to live based on create time for the JDBC event handler records. "
+ + "When the config is set as '0', the ttl strategy for the records would be disabled.");
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index afca8b8..0fcf50b 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -27,6 +27,7 @@
import javax.annotation.Nullable;
+import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.stream.Collectors;
@@ -42,7 +43,8 @@
* @param <Context> Instance of JobAutoScalerContext.
*/
@Experimental
-public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
+public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>>
+ extends Closeable {
String SCALING_SUMMARY_ENTRY =
"{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}";
String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism change:";
@@ -99,6 +101,9 @@
interval);
}
+ /** Close the related resource. */
+ default void close() {}
+
static String scalingReport(Map<JobVertexID, ScalingSummary> scalingSummaries, String message) {
StringBuilder sb = new StringBuilder(message);
scalingSummaries.forEach(