[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after updating statistics to ensures semantic correctness and prevent test failure
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e4d455a..8562238 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1385,8 +1385,8 @@
                 lastSubsumed = null;
             }
 
-            pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
             reportCompletedCheckpoint(completedCheckpoint);
+            pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
         } catch (Exception exception) {
             // For robustness reasons, we need catch exception and try marking the checkpoint
             // completed.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a7bba5a..7cd7361 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -119,6 +119,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -154,6 +155,8 @@
 /** Tests for the checkpoint coordinator. */
 class CheckpointCoordinatorTest {
 
+    private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+
     @RegisterExtension
     static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
             TestingUtils.defaultExecutorExtension();
@@ -4409,4 +4412,106 @@
             }
         }
     }
+
+    /**
+     * Tests that Checkpoint CompletableFuture completion happens after reportCompletedCheckpoint
+     * finishes. This ensures that when external components are notified via the CompletableFuture
+     * that a checkpoint is complete, all statistics have already been updated.
+     */
+    @Test
+    void testCompletionFutureCompletesAfterReporting() throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ControllableCheckpointStatsTracker tracker = new ControllableCheckpointStatsTracker();
+
+        CheckpointCoordinator coordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(tracker)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build(graph);
+
+        CompletableFuture<CompletedCheckpoint> checkpointFuture =
+                coordinator.triggerCheckpoint(false);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        CompletableFuture<Void> ackTask =
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                ackCheckpoint(
+                                        1L,
+                                        coordinator,
+                                        jobVertexID,
+                                        graph,
+                                        handle(),
+                                        handle(),
+                                        handle());
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
+                .as("reportCompletedCheckpoint should be started soon when checkpoint is acked.")
+                .isNull();
+
+        for (int i = 0; i < 30; i++) {
+            assertThat(checkpointFuture)
+                    .as(
+                            "Checkpoint future should not complete while reportCompletedCheckpoint is blocked")
+                    .isNotDone();
+            Thread.sleep(100);
+        }
+
+        tracker.getReportBlockingFuture().complete(null);
+
+        CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        assertThat(result)
+                .as("Checkpoint future should complete after reportCompletedCheckpoint finishes")
+                .isNotNull();
+
+        ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    /**
+     * A controllable checkpoint stats tracker for testing purposes. Allows precise control over
+     * when reportCompletedCheckpoint() completes, enabling verification of execution order and
+     * timing in tests.
+     */
+    private static class ControllableCheckpointStatsTracker extends DefaultCheckpointStatsTracker {
+        private final CompletableFuture<Void> reportStartedFuture;
+        private final CompletableFuture<Void> reportBlockingFuture;
+
+        public ControllableCheckpointStatsTracker() {
+            super(
+                    Integer.MAX_VALUE,
+                    UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+            this.reportStartedFuture = new CompletableFuture<>();
+            this.reportBlockingFuture = new CompletableFuture<>();
+        }
+
+        public CompletableFuture<Void> getReportStartedFuture() {
+            return reportStartedFuture;
+        }
+
+        public CompletableFuture<Void> getReportBlockingFuture() {
+            return reportBlockingFuture;
+        }
+
+        @Override
+        public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+            reportStartedFuture.complete(null);
+
+            try {
+                reportBlockingFuture.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            super.reportCompletedCheckpoint(completed);
+        }
+    }
 }