[FLINK-31345] Trim metric history to fit CM size limit
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index e6daa1d..8850b95 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -38,6 +38,7 @@
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.LoaderOptions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -65,8 +66,10 @@
protected static final String SCALING_HISTORY_KEY = "scalingHistory";
protected static final String JOB_UPDATE_TS_KEY = "jobUpdateTs";
+ protected static final int MAX_CM_BYTES = 1000000;
+
protected static final ObjectMapper YAML_MAPPER =
- new ObjectMapper(new YAMLFactory())
+ new ObjectMapper(yamlFactory())
.registerModule(new JavaTimeModule())
.registerModule(new JobVertexSerDeModule());
@@ -97,6 +100,7 @@
public void updateMetricHistory(
Instant jobUpdateTs,
SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> history) {
+
configMap
.getData()
.put(COLLECTED_METRICS_KEY, compress(YAML_MAPPER.writeValueAsString(history)));
@@ -175,10 +179,35 @@
.put(SCALING_HISTORY_KEY, compress(YAML_MAPPER.writeValueAsString(scalingHistory)));
}
- public void replaceInKubernetes(KubernetesClient client) {
+ public void replaceInKubernetes(KubernetesClient client) throws Exception {
+ trimHistoryToMaxCmSize();
client.resource(configMap).replace();
}
+ @VisibleForTesting
+ protected void trimHistoryToMaxCmSize() throws Exception {
+ var data = configMap.getData();
+
+ int scalingHistorySize = data.getOrDefault(SCALING_HISTORY_KEY, "").length();
+ int metricHistorySize = data.getOrDefault(COLLECTED_METRICS_KEY, "").length();
+
+ SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricHistory = null;
+ while (scalingHistorySize + metricHistorySize > MAX_CM_BYTES) {
+ if (metricHistory == null) {
+ metricHistory = getMetricHistory();
+ }
+ if (metricHistory.isEmpty()) {
+ return;
+ }
+ var firstKey = metricHistory.firstKey();
+ LOG.info("Trimming metric history by removing {}", firstKey);
+ metricHistory.remove(firstKey);
+ String compressed = compress(YAML_MAPPER.writeValueAsString(metricHistory));
+ data.put(COLLECTED_METRICS_KEY, compressed);
+ metricHistorySize = compressed.length();
+ }
+ }
+
public static AutoScalerInfo forResource(
AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) {
@@ -243,4 +272,11 @@
return compressed;
}
}
+
+ private static YAMLFactory yamlFactory() {
+ // Set yaml size limit to 10mb
+ var loaderOptions = new LoaderOptions();
+ loaderOptions.setCodePointLimit(20 * 1024 * 1024);
+ return YAMLFactory.builder().loaderOptions(loaderOptions).build();
+ }
}
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index f682300..cb26202 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -31,11 +31,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for AutoScalerInfo. */
public class AutoScalerInfoTest {
@@ -144,4 +147,54 @@
assertEquals(scalingHistory, info.getScalingHistory());
assertEquals(metricHistory, info.getMetricHistory());
}
+
+ @Test
+ public void testMetricsTrimming() throws Exception {
+ var v1 = new JobVertexID();
+ Random rnd = new Random();
+
+ var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>();
+ for (int i = 0; i < 50; i++) {
+ var m = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
+ for (int j = 0; j < 500; j++) {
+ m.put(
+ new JobVertexID(),
+ Map.of(ScalingMetric.TRUE_PROCESSING_RATE, rnd.nextDouble()));
+ }
+ metricHistory.put(Instant.now(), m);
+ }
+
+ var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant, ScalingSummary>>();
+ scalingHistory.put(v1, new TreeMap<>());
+ scalingHistory
+ .get(v1)
+ .put(
+ Instant.now(),
+ new ScalingSummary(
+ 1, 2, Map.of(ScalingMetric.LAG, EvaluatedScalingMetric.of(2.))));
+
+ var data = new HashMap<String, String>();
+ var info = new AutoScalerInfo(data);
+
+ info.addToScalingHistory(
+ Instant.now(),
+ Map.of(
+ v1,
+ new ScalingSummary(
+ 1, 2, Map.of(ScalingMetric.LAG, EvaluatedScalingMetric.of(2.)))),
+ new Configuration());
+
+ info.updateMetricHistory(Instant.now(), metricHistory);
+
+ assertFalse(
+ data.get(AutoScalerInfo.COLLECTED_METRICS_KEY).length()
+ + data.get(AutoScalerInfo.SCALING_HISTORY_KEY).length()
+ < AutoScalerInfo.MAX_CM_BYTES);
+
+ info.trimHistoryToMaxCmSize();
+ assertTrue(
+ data.get(AutoScalerInfo.COLLECTED_METRICS_KEY).length()
+ + data.get(AutoScalerInfo.SCALING_HISTORY_KEY).length()
+ < AutoScalerInfo.MAX_CM_BYTES);
+ }
}