[FLINK-36039][autoscaler] Support clean historical event handler records in JDBC event handler (#865)

---------

Co-authored-by: Rui Fan <fanrui@apache.org>
diff --git a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
index 7d724b7..45b5184 100644
--- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
+++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
@@ -39,6 +39,12 @@
             <td>The port of flink cluster when the flink-cluster fetcher is used.</td>
         </tr>
         <tr>
+            <td><h5>autoscaler.standalone.jdbc.event-handler.ttl</h5></td>
+            <td style="word-wrap: break-word;">90 d</td>
+            <td>Duration</td>
+            <td>The time to live based on create time for the JDBC event handler records. When the config is set as '0', the ttl strategy for the records would be disabled.</td>
+        </tr>
+        <tr>
             <td><h5>autoscaler.standalone.jdbc.password-env-variable</h5></td>
             <td style="word-wrap: break-word;">"JDBC_PWD"</td>
             <td>String</td>
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
index c42692a..5b45580 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
@@ -18,18 +18,27 @@
 package org.apache.flink.autoscaler.jdbc.event;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nullable;
 
+import java.sql.Timestamp;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The event handler which persists its event in JDBC related database.
@@ -38,13 +47,34 @@
  * @param <Context> The job autoscaler context.
  */
 @Experimental
+@Slf4j
 public class JdbcAutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>>
         implements AutoScalerEventHandler<KEY, Context> {
 
     private final JdbcEventInteractor jdbcEventInteractor;
+    private final Duration eventHandlerTtl;
+    @Nullable private final ScheduledExecutorService scheduledEventHandlerCleaner;
 
-    public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) {
+    public JdbcAutoScalerEventHandler(
+            JdbcEventInteractor jdbcEventInteractor, Duration eventHandlerTtl) {
         this.jdbcEventInteractor = jdbcEventInteractor;
+        this.eventHandlerTtl = Preconditions.checkNotNull(eventHandlerTtl);
+
+        if (eventHandlerTtl.toMillis() <= 0) {
+            this.scheduledEventHandlerCleaner = null;
+            return;
+        }
+        this.scheduledEventHandlerCleaner =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ThreadFactoryBuilder()
+                                .setNameFormat("jdbc-autoscaler-events-cleaner")
+                                .setDaemon(true)
+                                .build());
+        this.scheduledEventHandlerCleaner.scheduleAtFixedRate(
+                this::cleanExpiredEvents,
+                Duration.ofDays(1).toMillis(),
+                Duration.ofDays(1).toMillis(),
+                TimeUnit.MILLISECONDS);
     }
 
     @SneakyThrows
@@ -104,6 +134,48 @@
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        final var batchSize = 2000;
+        final var sleepMs = 1000;
+
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            var deletedTotalCount = 0L;
+            while (true) {
+                Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+                if (Objects.isNull(minId)) {
+                    log.info(
+                            "Deleted expired {} event handler records successfully",
+                            deletedTotalCount);
+                    break;
+                }
+
+                for (long startId = minId;
+                        jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate(
+                                        startId, startId + batchSize, date)
+                                == batchSize;
+                        startId += batchSize) {
+                    Thread.sleep(sleepMs);
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error in cleaning expired event handler records.", e);
+        }
+    }
+
     /**
      * @return True means the existing event is still in the interval duration we can update it.
      *     Otherwise, it's too early, we should create a new one instead of updating it.
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
index 5e2d616..ce3fc48 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
@@ -22,6 +22,7 @@
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -152,4 +153,29 @@
     void setClock(@Nonnull Clock clock) {
         this.clock = Preconditions.checkNotNull(clock);
     }
+
+    @Nullable
+    Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
+        var sql =
+                "SELECT id from t_flink_autoscaler_event_handler "
+                        + "           where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
+                        + "           and create_time < ?";
+        try (var pstmt = conn.prepareStatement(sql)) {
+            pstmt.setObject(1, timestamp);
+            ResultSet resultSet = pstmt.executeQuery();
+            return resultSet.next() ? resultSet.getLong(1) : null;
+        }
+    }
+
+    int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp)
+            throws Exception {
+        var query =
+                "delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setObject(1, startId);
+            pstmt.setObject(2, endId);
+            pstmt.setObject(3, timestamp);
+            return pstmt.executeUpdate();
+        }
+    }
 }
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
index b08030f..f527817 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
@@ -33,19 +33,30 @@
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.annotation.Nonnull;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Timestamp;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The abstract IT case for {@link JdbcAutoScalerEventHandler}. */
@@ -60,6 +71,7 @@
     private final Instant createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
     private final Map<JobVertexID, ScalingSummary> scalingSummaries =
             generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent);
+    private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault());
 
     private CountableJdbcEventInteractor jdbcEventInteractor;
     private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler;
@@ -68,11 +80,16 @@
     @BeforeEach
     void beforeEach() throws Exception {
         jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection());
-        jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));
-        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor);
+        jdbcEventInteractor.setClock(defaultClock);
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO);
         ctx = createDefaultJobAutoScalerContext();
     }
 
+    @AfterEach
+    void tearDown() {
+        eventHandler.close();
+    }
+
     /** All events shouldn't be deduplicated when interval is null. */
     @Test
     void testEventWithoutInterval() throws Exception {
@@ -254,8 +271,7 @@
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), SCALING_REPORT_REASON))
                 .singleElement()
                 .satisfies(
                         event -> {
@@ -283,8 +299,7 @@
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), SCALING_REPORT_REASON))
                 .as("All scaling events shouldn't be deduplicated when scaling happens.")
                 .hasSize(2)
                 .satisfiesExactlyInAnyOrder(
@@ -322,8 +337,7 @@
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), SCALING_REPORT_REASON))
                 .as(
                         "The event should be deduplicated when parallelism is not changed and within the interval.")
                 .singleElement()
@@ -360,8 +374,7 @@
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), SCALING_REPORT_REASON))
                 .as("We should create a new event when the old event is too early.")
                 .hasSize(2)
                 .satisfiesExactlyInAnyOrder(
@@ -401,8 +414,7 @@
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), SCALING_REPORT_REASON))
                 .as("We should create a new event when the old event is too early.")
                 .hasSize(2)
                 .satisfiesExactlyInAnyOrder(
@@ -430,6 +442,131 @@
                         });
     }
 
+    @Test
+    void testDeleteCounterWhenIdNotConsecutive() throws Exception {
+        // Create 2 events.
+        final Duration ttl = Duration.ofDays(1L);
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, ttl);
+        initTestingEventHandlerRecords(2);
+
+        // Simulate ids are not consecutive.
+        var events =
+                jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), SCALING_REPORT_REASON);
+        assertThat(events).hasSize(2);
+        var maxId =
+                events.stream()
+                        .map(AutoScalerEvent::getId)
+                        .max(Comparable::compareTo)
+                        .orElseThrow();
+
+        try (Connection connection = getConnection();
+                PreparedStatement ps =
+                        connection.prepareStatement(
+                                "update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
+            ps.setLong(1, maxId + 1_000_000);
+            ps.setLong(2, maxId);
+            ps.execute();
+        }
+
+        // Reset the clock to clean all expired data.
+        jdbcEventInteractor.setClock(
+                Clock.fixed(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .plus(ttl)
+                                .plus(Duration.ofMillis(1)),
+                        ZoneId.systemDefault()));
+
+        eventHandler.cleanExpiredEvents();
+        jdbcEventInteractor.assertDeleteExpiredCounter(2L);
+    }
+
+    private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() {
+        return Stream.of(
+                Arguments.of(false, 128, Duration.ofMinutes(2), 10),
+                Arguments.of(true, 256, Duration.ofMinutes(2), 0),
+                Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 12),
+                Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 0),
+                Arguments.of(true, 512, Duration.ofMinutes(100), 3),
+                Arguments.of(false, 64, Duration.ofMinutes(100), 0),
+                Arguments.of(true, 1024 * 9, Duration.ofMinutes(100), 64),
+                Arguments.of(false, 1024 * 9, Duration.ofMinutes(100), 0),
+                Arguments.of(false, 0, Duration.ofMinutes(100), 128),
+                Arguments.of(false, 0, Duration.ofMinutes(100), 0));
+    }
+
+    @MethodSource("getExpiredEventHandlersCaseMatrix")
+    @ParameterizedTest(
+            name =
+                    "tryIdNotSequential:{0}, expiredRecordsNum: {1}, eventHandlerTtl: {2}, unexpiredRecordsNum: {3}")
+    void testCleanExpiredEvents(
+            boolean tryIdNotSequential,
+            int expiredRecordsNum,
+            Duration eventHandlerTtl,
+            int unexpiredRecordsNum)
+            throws Exception {
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl);
+
+        // Init the expired records.
+        initTestingEventHandlerRecords(expiredRecordsNum);
+        if (tryIdNotSequential) {
+            tryDeleteOneRecord(expiredRecordsNum);
+        }
+        var expiredInstant = jdbcEventInteractor.getCurrentInstant();
+
+        // Init the unexpired records.
+        initTestingEventHandlerRecords(unexpiredRecordsNum);
+
+        // Reset the clock to clean all expired data.
+        jdbcEventInteractor.setClock(
+                Clock.fixed(
+                        expiredInstant.plus(eventHandlerTtl).plus(Duration.ofMillis(1)),
+                        ZoneId.systemDefault()));
+
+        eventHandler.cleanExpiredEvents();
+
+        try (Connection connection = getConnection();
+                PreparedStatement ps =
+                        connection.prepareStatement(
+                                "select count(1) from t_flink_autoscaler_event_handler");
+                ResultSet countResultSet = ps.executeQuery()) {
+            countResultSet.next();
+            assertThat(countResultSet.getInt(1)).isEqualTo(unexpiredRecordsNum);
+        }
+    }
+
+    private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception {
+        // To simulate non-sequential IDs in expired records.
+        Timestamp date = Timestamp.from(createTime);
+        Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+        if (minId == null) {
+            return;
+        }
+        try (Connection connection = getConnection();
+                PreparedStatement ps =
+                        connection.prepareStatement(
+                                "delete from t_flink_autoscaler_event_handler where id = ?")) {
+            ps.setObject(1, (minId + expiredRecordsNum) / 2);
+            ps.execute();
+        }
+    }
+
+    private void initTestingEventHandlerRecords(int recordsNum) {
+        for (int i = 0; i < recordsNum; i++) {
+            jdbcEventInteractor.setClock(
+                    Clock.fixed(
+                            jdbcEventInteractor.getCurrentInstant().plusSeconds(1),
+                            ZoneId.systemDefault()));
+            eventHandler.handleEvent(
+                    ctx,
+                    AutoScalerEventHandler.Type.Normal,
+                    SCALING_REPORT_REASON,
+                    "message-" + i,
+                    "messageKey-" + i,
+                    null);
+        }
+    }
+
     private void createFirstScalingEvent() throws Exception {
         jdbcEventInteractor.assertCounters(0, 0, 0);
         eventHandler.handleScalingEvent(
@@ -441,8 +578,7 @@
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), SCALING_REPORT_REASON))
                 .singleElement()
                 .satisfies(
                         event -> {
@@ -523,7 +659,7 @@
         assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime);
         assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime);
         assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString());
-        assertThat(event.getReason()).isEqualTo(AutoScalerEventHandler.SCALING_REPORT_REASON);
+        assertThat(event.getReason()).isEqualTo(SCALING_REPORT_REASON);
         assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString());
         assertThat(event.getCount()).isEqualTo(expectedCount);
     }
@@ -531,7 +667,20 @@
 
 /** Test {@link JdbcAutoScalerEventHandler} via Derby. */
 class DerbyJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
-        implements DerbyTestBase {}
+        implements DerbyTestBase {
+
+    @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
+    @Override
+    void testCleanExpiredEvents(
+            boolean tryIdNotSequential,
+            int expiredRecordsNum,
+            Duration eventHandlerTtl,
+            int unexpiredRecordsNum) {}
+
+    @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
+    @Override
+    void testDeleteCounterWhenIdNotConsecutive() {}
+}
 
 /** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */
 class MySQL56JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
@@ -547,4 +696,9 @@
 
 /** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */
 class PostgreSQLJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
-        implements PostgreSQLTestBase {}
+        implements PostgreSQLTestBase {
+
+    @Disabled("Disabled due to the column 'id' can only be updated to DEFAULT.")
+    @Override
+    void testDeleteCounterWhenIdNotConsecutive() {}
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
index 664a28b..39ccd76 100644
--- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
@@ -20,6 +20,7 @@
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 
 import java.sql.Connection;
+import java.sql.Timestamp;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -31,12 +32,14 @@
     private final AtomicLong queryCounter;
     private final AtomicLong createCounter;
     private final AtomicLong updateCounter;
+    private final AtomicLong deleteExpiredCounter;
 
     public CountableJdbcEventInteractor(Connection conn) {
         super(conn);
         queryCounter = new AtomicLong();
         createCounter = new AtomicLong();
         updateCounter = new AtomicLong();
+        deleteExpiredCounter = new AtomicLong();
     }
 
     @Override
@@ -64,10 +67,21 @@
         super.updateEvent(id, message, eventCount);
     }
 
+    @Override
+    int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp)
+            throws Exception {
+        deleteExpiredCounter.incrementAndGet();
+        return super.deleteExpiredEventsByIdRangeAndDate(startId, endId, timestamp);
+    }
+
     public void assertCounters(
             long expectedQueryCounter, long expectedUpdateCounter, long expectedCreateCounter) {
         assertThat(queryCounter).hasValue(expectedQueryCounter);
         assertThat(updateCounter).hasValue(expectedUpdateCounter);
         assertThat(createCounter).hasValue(expectedCreateCounter);
     }
+
+    public void assertDeleteExpiredCounter(long expectedCounter) {
+        assertThat(deleteExpiredCounter).hasValue(expectedCounter);
+    }
 }
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
index 5d063ca..2f58ca6 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
@@ -30,6 +30,7 @@
 import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
 import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
 import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
+import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_EVENT_HANDLER_TTL;
 import static org.apache.flink.configuration.description.TextElement.text;
 
 /** The factory of {@link AutoScalerEventHandler}. */
@@ -73,6 +74,7 @@
             AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf)
                     throws Exception {
         var conn = HikariJDBCUtil.getConnection(conf);
-        return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn));
+        return new JdbcAutoScalerEventHandler<>(
+                new JdbcEventInteractor(conn), conf.get(JDBC_EVENT_HANDLER_TTL));
     }
 }
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index a075b71..87875ec 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -113,6 +113,7 @@
     public void close() {
         scheduledExecutorService.shutdownNow();
         scalingThreadPool.shutdownNow();
+        eventHandler.close();
     }
 
     /**
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
index edc3054..15b9b6a 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
@@ -121,4 +121,12 @@
                                             code(EVENT_HANDLER_TYPE.key()),
                                             code("JDBC"))
                                     .build());
+
+    public static final ConfigOption<Duration> JDBC_EVENT_HANDLER_TTL =
+            autoscalerStandaloneConfig("jdbc.event-handler.ttl")
+                    .durationType()
+                    .defaultValue(Duration.ofDays(90))
+                    .withDescription(
+                            "The time to live based on create time for the JDBC event handler records. "
+                                    + "When the config is set as '0', the ttl strategy for the records would be disabled.");
 }
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index afca8b8..0fcf50b 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -27,6 +27,7 @@
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.time.Duration;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -42,7 +43,8 @@
  * @param <Context> Instance of JobAutoScalerContext.
  */
 @Experimental
-public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
+public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>>
+        extends Closeable {
     String SCALING_SUMMARY_ENTRY =
             "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}";
     String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism change:";
@@ -99,6 +101,9 @@
                 interval);
     }
 
+    /** Close the related resource. */
+    default void close() {}
+
     static String scalingReport(Map<JobVertexID, ScalingSummary> scalingSummaries, String message) {
         StringBuilder sb = new StringBuilder(message);
         scalingSummaries.forEach(