[FLINK-31345] Reduce AutoScalerInfo size by rounding metrics and compressing stored data
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index bff2228..e6daa1d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -35,16 +35,24 @@
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import lombok.SneakyThrows;
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.time.Instant;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 /** Class for encapsulating information stored for each resource when using the autoscaler. */
 public class AutoScalerInfo {
@@ -53,11 +61,11 @@
 
     private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
 
-    private static final String COLLECTED_METRICS_KEY = "collectedMetrics";
-    private static final String SCALING_HISTORY_KEY = "scalingHistory";
-    private static final String JOB_UPDATE_TS_KEY = "jobUpdateTs";
+    protected static final String COLLECTED_METRICS_KEY = "collectedMetrics";
+    protected static final String SCALING_HISTORY_KEY = "scalingHistory";
+    protected static final String JOB_UPDATE_TS_KEY = "jobUpdateTs";
 
-    private static final ObjectMapper YAML_MAPPER =
+    protected static final ObjectMapper YAML_MAPPER =
             new ObjectMapper(new YAMLFactory())
                     .registerModule(new JavaTimeModule())
                     .registerModule(new JobVertexSerDeModule());
@@ -82,14 +90,16 @@
             return new TreeMap<>();
         }
 
-        return YAML_MAPPER.readValue(historyYaml, new TypeReference<>() {});
+        return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
     }
 
     @SneakyThrows
     public void updateMetricHistory(
             Instant jobUpdateTs,
             SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> history) {
-        configMap.getData().put(COLLECTED_METRICS_KEY, YAML_MAPPER.writeValueAsString(history));
+        configMap
+                .getData()
+                .put(COLLECTED_METRICS_KEY, compress(YAML_MAPPER.writeValueAsString(history)));
         configMap.getData().put(JOB_UPDATE_TS_KEY, jobUpdateTs.toString());
     }
 
@@ -99,9 +109,7 @@
         getScalingHistory();
 
         if (scalingHistory.keySet().removeIf(v -> !vertexList.contains(v))) {
-            configMap
-                    .getData()
-                    .put(SCALING_HISTORY_KEY, YAML_MAPPER.writeValueAsString(scalingHistory));
+            storeScalingHistory();
         }
     }
 
@@ -119,7 +127,7 @@
         if (scalingHistory != null) {
             return scalingHistory;
         }
-        var yaml = configMap.getData().get(SCALING_HISTORY_KEY);
+        var yaml = decompress(configMap.getData().get(SCALING_HISTORY_KEY));
         scalingHistory =
                 yaml == null
                         ? new HashMap<>()
@@ -158,9 +166,13 @@
             }
         }
 
+        storeScalingHistory();
+    }
+
+    private void storeScalingHistory() throws Exception {
         configMap
                 .getData()
-                .put(SCALING_HISTORY_KEY, YAML_MAPPER.writeValueAsString(scalingHistory));
+                .put(SCALING_HISTORY_KEY, compress(YAML_MAPPER.writeValueAsString(scalingHistory)));
     }
 
     public void replaceInKubernetes(KubernetesClient client) {
@@ -205,4 +217,30 @@
                         .withName(objectMeta.getName())
                         .get());
     }
+
+    private static String compress(String original) throws IOException {
+        ByteArrayOutputStream rstBao = new ByteArrayOutputStream();
+        try (var zos = new GZIPOutputStream(rstBao)) {
+            zos.write(original.getBytes(StandardCharsets.UTF_8));
+        }
+
+        return Base64.getEncoder().encodeToString(rstBao.toByteArray());
+    }
+
+    private static String decompress(String compressed) {
+        if (compressed == null) {
+            return null;
+        }
+
+        try {
+            byte[] bytes = Base64.getDecoder().decode(compressed);
+            try (var zi = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
+                return IOUtils.toString(zi, StandardCharsets.UTF_8);
+            }
+        } catch (Exception e) {
+            LOG.warn("Error while decompressing scaling data, treating as uncompressed");
+            // Fall back to non-compressed for migration
+            return compressed;
+        }
+    }
 }
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 6686982..c926538 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -263,6 +263,10 @@
                             lagGrowthRate,
                             conf);
 
+                    vertexScalingMetrics
+                            .entrySet()
+                            .forEach(e -> e.setValue(ScalingMetrics.roundMetric(e.getValue())));
+
                     LOG.debug(
                             "Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
                 });
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 6b61416..7110fd5 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -70,7 +70,7 @@
         for (var vertex : topology.getVerticesInTopologicalOrder()) {
             scalingOutput.put(
                     vertex,
-                    computeVertexScalingSummary(
+                    evaluateMetrics(
                             conf,
                             scalingOutput,
                             metricsHistory,
@@ -110,7 +110,7 @@
     }
 
     @NotNull
-    private Map<ScalingMetric, EvaluatedScalingMetric> computeVertexScalingSummary(
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
             Configuration conf,
             HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
             SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
index a230412..2378888 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -17,19 +17,22 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
-import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /** Evaluated scaling metric. */
 @Data
-@AllArgsConstructor
 @NoArgsConstructor
 public class EvaluatedScalingMetric {
     private double current;
 
     private double average;
 
+    public EvaluatedScalingMetric(double current, double average) {
+        this.current = ScalingMetrics.roundMetric(current);
+        this.average = ScalingMetrics.roundMetric(average);
+    }
+
     public static EvaluatedScalingMetric of(double value) {
         return new EvaluatedScalingMetric(value, Double.NaN);
     }
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 03fb3ea..05a947d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
+import org.apache.commons.math3.util.Precision;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -194,4 +195,10 @@
         }
         return number;
     }
+
+    public static double roundMetric(double value) {
+        double rounded = Precision.round(value, 3);
+        // Never round down to 0, return original value instead
+        return rounded == 0 ? value : rounded;
+    }
 }
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index e4ba23e..f682300 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -19,15 +19,21 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -97,4 +103,45 @@
                 Set.of(now.plus(Duration.ofSeconds(15))),
                 new AutoScalerInfo(data).getScalingHistory().get(v1).keySet());
     }
+
+    @Test
+    public void testCompressionMigration() throws JsonProcessingException {
+        var jobUpdateTs = Instant.now();
+        var v1 = new JobVertexID();
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>();
+        metricHistory.put(jobUpdateTs, Map.of(v1, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, 1.)));
+
+        var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant, ScalingSummary>>();
+        scalingHistory.put(v1, new TreeMap<>());
+        scalingHistory
+                .get(v1)
+                .put(
+                        jobUpdateTs,
+                        new ScalingSummary(
+                                1, 2, Map.of(ScalingMetric.LAG, EvaluatedScalingMetric.of(2.))));
+
+        // Store uncompressed data in map to simulate migration
+        var data = new HashMap<String, String>();
+        data.put(
+                AutoScalerInfo.COLLECTED_METRICS_KEY,
+                AutoScalerInfo.YAML_MAPPER.writeValueAsString(metricHistory));
+        data.put(AutoScalerInfo.JOB_UPDATE_TS_KEY, jobUpdateTs.toString());
+        data.put(
+                AutoScalerInfo.SCALING_HISTORY_KEY,
+                AutoScalerInfo.YAML_MAPPER.writeValueAsString(scalingHistory));
+
+        var info = new AutoScalerInfo(data);
+        assertEquals(scalingHistory, info.getScalingHistory());
+        assertEquals(metricHistory, info.getMetricHistory());
+
+        // Override with compressed data
+        var newTs = Instant.now();
+        info.updateMetricHistory(newTs, metricHistory);
+        info.addToScalingHistory(newTs, Map.of(), new Configuration());
+
+        // Make sure we can still access everything
+        assertEquals(scalingHistory, info.getScalingHistory());
+        assertEquals(metricHistory, info.getMetricHistory());
+    }
 }
diff --git a/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties b/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..3b7cec7
--- /dev/null
+++ b/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = DEBUG
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable}