[FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is always great than `stabilization.interval`
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 04ab36e..b3839ff 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -160,14 +160,23 @@
var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
var jobTopology = collectedMetrics.getJobTopology();
+ var now = clock.instant();
+ var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+ var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
+ // A scaling tracking without an end time gets created whenever a scaling decision is
+ // applied. Here, we record the end time for it (runScalingLogic is only called when the job
+ // transitions back into the RUNNING state).
+ if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
+ collectedMetrics.getJobRunningTs(), jobTopology, scalingHistory)) {
+ stateStore.storeScalingTracking(ctx, scalingTracking);
+ }
+
if (collectedMetrics.getMetricHistory().isEmpty()) {
return;
}
LOG.debug("Collected metrics: {}", collectedMetrics);
- var now = clock.instant();
// Scaling tracking data contains previous restart times that are taken into account
- var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration());
var evaluatedMetrics =
evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime);
@@ -179,15 +188,6 @@
jobTopology.getVerticesInTopologicalOrder(),
() -> lastEvaluatedMetrics.get(ctx.getJobKey()));
- var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
- // A scaling tracking without an end time gets created whenever a scaling decision is
- // applied. Here, we record the end time for it (runScalingLogic is only called when the job
- // transitions back into the RUNNING state).
- if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
- now, jobTopology, scalingHistory)) {
- stateStore.storeScalingTracking(ctx, scalingTracking);
- }
-
if (!collectedMetrics.isFullyCollected()) {
// We have done an upfront evaluation, but we are not ready for scaling.
resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index fce91ea..ed5a2e4 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -149,10 +149,10 @@
if (isStabilizing) {
LOG.info("Stabilizing until {}", readable(stableTime));
stateStore.storeCollectedMetrics(ctx, metricHistory);
- return new CollectedMetricHistory(topology, Collections.emptySortedMap());
+ return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs);
}
- var collectedMetrics = new CollectedMetricHistory(topology, metricHistory);
+ var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
if (now.isBefore(windowFullTime)) {
LOG.info("Metric window not full until {}", readable(windowFullTime));
} else {
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
index 0f11445..af8a2f4 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java
@@ -69,7 +69,8 @@
* Sets restart duration for the latest scaling record if its parallelism matches the current
* job parallelism.
*
- * @param now The instant to be used as the end time when calculating the restart duration.
+ * @param jobRunningTs The instant when the JobStatus is switched to RUNNING, it will be used as
+ * the end time when calculating the restart duration.
* @param jobTopology The current job topology containing details of the job's parallelism.
* @param scalingHistory The scaling history.
* @return true if the restart duration is successfully recorded, false if the restart duration
@@ -77,7 +78,7 @@
* not match the actual parallelism.
*/
public boolean recordRestartDurationIfTrackedAndParallelismMatches(
- Instant now,
+ Instant jobRunningTs,
JobTopology jobTopology,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
return getLatestScalingRecordEntry()
@@ -94,12 +95,13 @@
if (targetParallelismMatchesActual(
targetParallelism, actualParallelism)) {
value.setRestartDuration(
- Duration.between(scalingTimestamp, now));
+ Duration.between(scalingTimestamp, jobRunningTs));
LOG.debug(
"Recorded restart duration of {} seconds (from {} till {})",
- Duration.between(scalingTimestamp, now).getSeconds(),
+ Duration.between(scalingTimestamp, jobRunningTs)
+ .getSeconds(),
scalingTimestamp,
- now);
+ jobRunningTs);
return true;
}
} else {
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
index 31a1987..442c666 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
@@ -30,5 +30,6 @@
public class CollectedMetricHistory {
final JobTopology jobTopology;
final SortedMap<Instant, CollectedMetrics> metricHistory;
+ final Instant jobRunningTs;
@Setter private boolean fullyCollected;
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index d48f059..bd71dba 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -398,9 +398,12 @@
new JobTopology(
new VertexInfo(source1, Set.of(), 4, 720),
new VertexInfo(sink, Set.of(source1), 4, 720)));
+
+ var expectedEndTime = Instant.ofEpochMilli(10);
+ metricsCollector.setJobUpdateTs(expectedEndTime);
autoscaler.scale(context);
- assertLastTrackingEndTimeIs(now);
+ assertLastTrackingEndTimeIs(expectedEndTime);
}
private void assertLastTrackingEndTimeIs(Instant expectedEndTime) throws Exception {
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 260687e..0bfbd98 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -118,7 +118,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
Duration.ZERO)
.getVertexMetrics();
@@ -152,7 +152,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
Duration.ZERO)
.getVertexMetrics();
assertEquals(
@@ -175,7 +175,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
Duration.ZERO)
.getVertexMetrics();
assertEquals(
@@ -197,7 +197,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
Duration.ZERO)
.getVertexMetrics();
assertEquals(
@@ -243,7 +243,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
Duration.ZERO)
.getVertexMetrics();
assertEquals(
@@ -461,7 +461,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
restartTime)
.getVertexMetrics()
.get(source)
@@ -476,7 +476,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
restartTime)
.getVertexMetrics()
.get(source)
@@ -489,7 +489,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
restartTime)
.getVertexMetrics()
.get(source)
@@ -531,7 +531,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
restartTime)
.getVertexMetrics()
.get(source)
@@ -562,7 +562,7 @@
evaluator
.evaluate(
conf,
- new CollectedMetricHistory(topology, metricHistory),
+ new CollectedMetricHistory(topology, metricHistory, Instant.now()),
restartTime)
.getVertexMetrics()
.get(source)