[hotfix] Preserve scaling history on empty collected metrics
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index ee4e75a..0ed547c 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -99,6 +99,7 @@
                             resource, autoScalerInfo, ctx.getFlinkService(), conf);
 
             if (collectedMetrics.getMetricHistory().isEmpty()) {
+                autoScalerInfo.replaceInKubernetes(kubernetesClient);
                 return false;
             }
 
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 8dba563..e681281 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -324,6 +324,30 @@
         assertEquals(2, scaledParallelism.get(sink));
     }
 
+    @Test
+    public void testMetricsPersistedAfterRedeploy() {
+        var ctx = createAutoscalerTestContext();
+        var now = Instant.ofEpochMilli(0);
+        setClocksTo(now);
+        app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+
+        autoscaler.scale(getResourceContext(app, ctx));
+        assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+    }
+
     private void redeployJob(Instant now) {
         // Offset the update time by one metrics window to simulate collecting one entire window
         app.getStatus()