[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after updating statistics to ensures semantic correctness and prevent test failure
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e4d455a..8562238 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1385,8 +1385,8 @@
lastSubsumed = null;
}
- pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
reportCompletedCheckpoint(completedCheckpoint);
+ pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
} catch (Exception exception) {
// For robustness reasons, we need catch exception and try marking the checkpoint
// completed.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a7bba5a..7cd7361 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -119,6 +119,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -154,6 +155,8 @@
/** Tests for the checkpoint coordinator. */
class CheckpointCoordinatorTest {
+ private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();
@@ -4409,4 +4412,106 @@
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
+ .as("reportCompletedCheckpoint should be started soon when checkpoint is acked.")
+ .isNull();
+
+ for (int i = 0; i < 30; i++) {
+ assertThat(checkpointFuture)
+ .as(
+ "Checkpoint future should not complete while reportCompletedCheckpoint is blocked")
+ .isNotDone();
+ Thread.sleep(100);
+ }
+
+ tracker.getReportBlockingFuture().complete(null);
+
+ CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ assertThat(result)
+ .as("Checkpoint future should complete after reportCompletedCheckpoint finishes")
+ .isNotNull();
+
+ ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * A controllable checkpoint stats tracker for testing purposes. Allows precise control over
+ * when reportCompletedCheckpoint() completes, enabling verification of execution order and
+ * timing in tests.
+ */
+ private static class ControllableCheckpointStatsTracker extends DefaultCheckpointStatsTracker {
+ private final CompletableFuture<Void> reportStartedFuture;
+ private final CompletableFuture<Void> reportBlockingFuture;
+
+ public ControllableCheckpointStatsTracker() {
+ super(
+ Integer.MAX_VALUE,
+ UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+ this.reportStartedFuture = new CompletableFuture<>();
+ this.reportBlockingFuture = new CompletableFuture<>();
+ }
+
+ public CompletableFuture<Void> getReportStartedFuture() {
+ return reportStartedFuture;
+ }
+
+ public CompletableFuture<Void> getReportBlockingFuture() {
+ return reportBlockingFuture;
+ }
+
+ @Override
+ public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+ reportStartedFuture.complete(null);
+
+ try {
+ reportBlockingFuture.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ super.reportCompletedCheckpoint(completed);
+ }
+ }
}