[hotfix] Preserve scaling history on empty collected metrics
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index ee4e75a..0ed547c 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -99,6 +99,7 @@
resource, autoScalerInfo, ctx.getFlinkService(), conf);
if (collectedMetrics.getMetricHistory().isEmpty()) {
+ autoScalerInfo.replaceInKubernetes(kubernetesClient);
return false;
}
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 8dba563..e681281 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -324,6 +324,30 @@
assertEquals(2, scaledParallelism.get(sink));
}
+ @Test
+ public void testMetricsPersistedAfterRedeploy() {
+ var ctx = createAutoscalerTestContext();
+ var now = Instant.ofEpochMilli(0);
+ setClocksTo(now);
+ app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.)),
+ sink,
+ Map.of(
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+
+ autoscaler.scale(getResourceContext(app, ctx));
+ assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+ }
+
private void redeployJob(Instant now) {
// Offset the update time by one metrics window to simulate collecting one entire window
app.getStatus()