[FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is always great than `stabilization.interval`
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 04ab36e..b3839ff 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -160,14 +160,23 @@
         var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
         var jobTopology = collectedMetrics.getJobTopology();
 
+        var now = clock.instant();
+        var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+        var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
+        // A scaling tracking without an end time gets created whenever a scaling decision is
+        // applied. Here, we record the end time for it (runScalingLogic is only called when the job
+        // transitions back into the RUNNING state).
+        if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
+                collectedMetrics.getJobRunningTs(), jobTopology, scalingHistory)) {
+            stateStore.storeScalingTracking(ctx, scalingTracking);
+        }
+
         if (collectedMetrics.getMetricHistory().isEmpty()) {
             return;
         }
         LOG.debug("Collected metrics: {}", collectedMetrics);
 
-        var now = clock.instant();
         // Scaling tracking data contains previous restart times that are taken into account
-        var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
         var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration());
         var evaluatedMetrics =
                 evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime);
@@ -179,15 +188,6 @@
                 jobTopology.getVerticesInTopologicalOrder(),
                 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-        var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
-        // A scaling tracking without an end time gets created whenever a scaling decision is
-        // applied. Here, we record the end time for it (runScalingLogic is only called when the job
-        // transitions back into the RUNNING state).
-        if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
-                now, jobTopology, scalingHistory)) {
-            stateStore.storeScalingTracking(ctx, scalingTracking);
-        }
-
         if (!collectedMetrics.isFullyCollected()) {
             // We have done an upfront evaluation, but we are not ready for scaling.
             resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index fce91ea..ed5a2e4 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -149,10 +149,10 @@
         if (isStabilizing) {
             LOG.info("Stabilizing until {}", readable(stableTime));
             stateStore.storeCollectedMetrics(ctx, metricHistory);
-            return new CollectedMetricHistory(topology, Collections.emptySortedMap());
+            return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs);
         }
 
-        var collectedMetrics = new CollectedMetricHistory(topology, metricHistory);
+        var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
         if (now.isBefore(windowFullTime)) {
             LOG.info("Metric window not full until {}", readable(windowFullTime));
         } else {
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
index 0f11445..af8a2f4 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
@@ -69,7 +69,8 @@
      * Sets restart duration for the latest scaling record if its parallelism matches the current
      * job parallelism.
      *
-     * @param now The instant to be used as the end time when calculating the restart duration.
+     * @param jobRunningTs The instant when the JobStatus is switched to RUNNING, it will be used as
+     *     the end time when calculating the restart duration.
      * @param jobTopology The current job topology containing details of the job's parallelism.
      * @param scalingHistory The scaling history.
      * @return true if the restart duration is successfully recorded, false if the restart duration
@@ -77,7 +78,7 @@
      *     not match the actual parallelism.
      */
     public boolean recordRestartDurationIfTrackedAndParallelismMatches(
-            Instant now,
+            Instant jobRunningTs,
             JobTopology jobTopology,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
         return getLatestScalingRecordEntry()
@@ -94,12 +95,13 @@
                                 if (targetParallelismMatchesActual(
                                         targetParallelism, actualParallelism)) {
                                     value.setRestartDuration(
-                                            Duration.between(scalingTimestamp, now));
+                                            Duration.between(scalingTimestamp, jobRunningTs));
                                     LOG.debug(
                                             "Recorded restart duration of {} seconds (from {} till {})",
-                                            Duration.between(scalingTimestamp, now).getSeconds(),
+                                            Duration.between(scalingTimestamp, jobRunningTs)
+                                                    .getSeconds(),
                                             scalingTimestamp,
-                                            now);
+                                            jobRunningTs);
                                     return true;
                                 }
                             } else {
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
index 31a1987..442c666 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
@@ -30,5 +30,6 @@
 public class CollectedMetricHistory {
     final JobTopology jobTopology;
     final SortedMap<Instant, CollectedMetrics> metricHistory;
+    final Instant jobRunningTs;
     @Setter private boolean fullyCollected;
 }
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index d48f059..bd71dba 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -398,9 +398,12 @@
                 new JobTopology(
                         new VertexInfo(source1, Set.of(), 4, 720),
                         new VertexInfo(sink, Set.of(source1), 4, 720)));
+
+        var expectedEndTime = Instant.ofEpochMilli(10);
+        metricsCollector.setJobUpdateTs(expectedEndTime);
         autoscaler.scale(context);
 
-        assertLastTrackingEndTimeIs(now);
+        assertLastTrackingEndTimeIs(expectedEndTime);
     }
 
     private void assertLastTrackingEndTimeIs(Instant expectedEndTime) throws Exception {
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 260687e..0bfbd98 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -118,7 +118,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
 
@@ -152,7 +152,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -175,7 +175,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -197,7 +197,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -243,7 +243,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 Duration.ZERO)
                         .getVertexMetrics();
         assertEquals(
@@ -461,7 +461,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -476,7 +476,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -489,7 +489,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -531,7 +531,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)
@@ -562,7 +562,7 @@
                 evaluator
                         .evaluate(
                                 conf,
-                                new CollectedMetricHistory(topology, metricHistory),
+                                new CollectedMetricHistory(topology, metricHistory, Instant.now()),
                                 restartTime)
                         .getVertexMetrics()
                         .get(source)