[FLINK-31345] Trim metric history to fit CM size limit
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index e6daa1d..8850b95 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -38,6 +38,7 @@
 import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.LoaderOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -65,8 +66,10 @@
     protected static final String SCALING_HISTORY_KEY = "scalingHistory";
     protected static final String JOB_UPDATE_TS_KEY = "jobUpdateTs";
 
+    protected static final int MAX_CM_BYTES = 1000000;
+
     protected static final ObjectMapper YAML_MAPPER =
-            new ObjectMapper(new YAMLFactory())
+            new ObjectMapper(yamlFactory())
                     .registerModule(new JavaTimeModule())
                     .registerModule(new JobVertexSerDeModule());
 
@@ -97,6 +100,7 @@
     public void updateMetricHistory(
             Instant jobUpdateTs,
             SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> history) {
+
         configMap
                 .getData()
                 .put(COLLECTED_METRICS_KEY, compress(YAML_MAPPER.writeValueAsString(history)));
@@ -175,10 +179,35 @@
                 .put(SCALING_HISTORY_KEY, compress(YAML_MAPPER.writeValueAsString(scalingHistory)));
     }
 
-    public void replaceInKubernetes(KubernetesClient client) {
+    public void replaceInKubernetes(KubernetesClient client) throws Exception {
+        trimHistoryToMaxCmSize();
         client.resource(configMap).replace();
     }
 
+    @VisibleForTesting
+    protected void trimHistoryToMaxCmSize() throws Exception {
+        var data = configMap.getData();
+
+        int scalingHistorySize = data.getOrDefault(SCALING_HISTORY_KEY, "").length();
+        int metricHistorySize = data.getOrDefault(COLLECTED_METRICS_KEY, "").length();
+
+        SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricHistory = null;
+        while (scalingHistorySize + metricHistorySize > MAX_CM_BYTES) {
+            if (metricHistory == null) {
+                metricHistory = getMetricHistory();
+            }
+            if (metricHistory.isEmpty()) {
+                return;
+            }
+            var firstKey = metricHistory.firstKey();
+            LOG.info("Trimming metric history by removing {}", firstKey);
+            metricHistory.remove(firstKey);
+            String compressed = compress(YAML_MAPPER.writeValueAsString(metricHistory));
+            data.put(COLLECTED_METRICS_KEY, compressed);
+            metricHistorySize = compressed.length();
+        }
+    }
+
     public static AutoScalerInfo forResource(
             AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) {
 
@@ -243,4 +272,11 @@
             return compressed;
         }
     }
+
+    private static YAMLFactory yamlFactory() {
+        // Set yaml size limit to 10mb
+        var loaderOptions = new LoaderOptions();
+        loaderOptions.setCodePointLimit(20 * 1024 * 1024);
+        return YAMLFactory.builder().loaderOptions(loaderOptions).build();
+    }
 }
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index f682300..cb26202 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -31,11 +31,14 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for AutoScalerInfo. */
 public class AutoScalerInfoTest {
@@ -144,4 +147,54 @@
         assertEquals(scalingHistory, info.getScalingHistory());
         assertEquals(metricHistory, info.getMetricHistory());
     }
+
+    @Test
+    public void testMetricsTrimming() throws Exception {
+        var v1 = new JobVertexID();
+        Random rnd = new Random();
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>();
+        for (int i = 0; i < 50; i++) {
+            var m = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
+            for (int j = 0; j < 500; j++) {
+                m.put(
+                        new JobVertexID(),
+                        Map.of(ScalingMetric.TRUE_PROCESSING_RATE, rnd.nextDouble()));
+            }
+            metricHistory.put(Instant.now(), m);
+        }
+
+        var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant, ScalingSummary>>();
+        scalingHistory.put(v1, new TreeMap<>());
+        scalingHistory
+                .get(v1)
+                .put(
+                        Instant.now(),
+                        new ScalingSummary(
+                                1, 2, Map.of(ScalingMetric.LAG, EvaluatedScalingMetric.of(2.))));
+
+        var data = new HashMap<String, String>();
+        var info = new AutoScalerInfo(data);
+
+        info.addToScalingHistory(
+                Instant.now(),
+                Map.of(
+                        v1,
+                        new ScalingSummary(
+                                1, 2, Map.of(ScalingMetric.LAG, EvaluatedScalingMetric.of(2.)))),
+                new Configuration());
+
+        info.updateMetricHistory(Instant.now(), metricHistory);
+
+        assertFalse(
+                data.get(AutoScalerInfo.COLLECTED_METRICS_KEY).length()
+                                + data.get(AutoScalerInfo.SCALING_HISTORY_KEY).length()
+                        < AutoScalerInfo.MAX_CM_BYTES);
+
+        info.trimHistoryToMaxCmSize();
+        assertTrue(
+                data.get(AutoScalerInfo.COLLECTED_METRICS_KEY).length()
+                                + data.get(AutoScalerInfo.SCALING_HISTORY_KEY).length()
+                        < AutoScalerInfo.MAX_CM_BYTES);
+    }
 }