[FLINK-31345] Reduce AutoScalerInfo size by rounding metrics and compressing stored data
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index bff2228..e6daa1d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -35,16 +35,24 @@
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.SneakyThrows;
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.time.Instant;
+import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
/** Class for encapsulating information stored for each resource when using the autoscaler. */
public class AutoScalerInfo {
@@ -53,11 +61,11 @@
private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
- private static final String COLLECTED_METRICS_KEY = "collectedMetrics";
- private static final String SCALING_HISTORY_KEY = "scalingHistory";
- private static final String JOB_UPDATE_TS_KEY = "jobUpdateTs";
+ protected static final String COLLECTED_METRICS_KEY = "collectedMetrics";
+ protected static final String SCALING_HISTORY_KEY = "scalingHistory";
+ protected static final String JOB_UPDATE_TS_KEY = "jobUpdateTs";
- private static final ObjectMapper YAML_MAPPER =
+ protected static final ObjectMapper YAML_MAPPER =
new ObjectMapper(new YAMLFactory())
.registerModule(new JavaTimeModule())
.registerModule(new JobVertexSerDeModule());
@@ -82,14 +90,16 @@
return new TreeMap<>();
}
- return YAML_MAPPER.readValue(historyYaml, new TypeReference<>() {});
+ return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {});
}
@SneakyThrows
public void updateMetricHistory(
Instant jobUpdateTs,
SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> history) {
- configMap.getData().put(COLLECTED_METRICS_KEY, YAML_MAPPER.writeValueAsString(history));
+ configMap
+ .getData()
+ .put(COLLECTED_METRICS_KEY, compress(YAML_MAPPER.writeValueAsString(history)));
configMap.getData().put(JOB_UPDATE_TS_KEY, jobUpdateTs.toString());
}
@@ -99,9 +109,7 @@
getScalingHistory();
if (scalingHistory.keySet().removeIf(v -> !vertexList.contains(v))) {
- configMap
- .getData()
- .put(SCALING_HISTORY_KEY, YAML_MAPPER.writeValueAsString(scalingHistory));
+ storeScalingHistory();
}
}
@@ -119,7 +127,7 @@
if (scalingHistory != null) {
return scalingHistory;
}
- var yaml = configMap.getData().get(SCALING_HISTORY_KEY);
+ var yaml = decompress(configMap.getData().get(SCALING_HISTORY_KEY));
scalingHistory =
yaml == null
? new HashMap<>()
@@ -158,9 +166,13 @@
}
}
+ storeScalingHistory();
+ }
+
+ private void storeScalingHistory() throws Exception {
configMap
.getData()
- .put(SCALING_HISTORY_KEY, YAML_MAPPER.writeValueAsString(scalingHistory));
+ .put(SCALING_HISTORY_KEY, compress(YAML_MAPPER.writeValueAsString(scalingHistory)));
}
public void replaceInKubernetes(KubernetesClient client) {
@@ -205,4 +217,30 @@
.withName(objectMeta.getName())
.get());
}
+
+ private static String compress(String original) throws IOException {
+ ByteArrayOutputStream rstBao = new ByteArrayOutputStream();
+ try (var zos = new GZIPOutputStream(rstBao)) {
+ zos.write(original.getBytes(StandardCharsets.UTF_8));
+ }
+
+ return Base64.getEncoder().encodeToString(rstBao.toByteArray());
+ }
+
+ private static String decompress(String compressed) {
+ if (compressed == null) {
+ return null;
+ }
+
+ try {
+ byte[] bytes = Base64.getDecoder().decode(compressed);
+ try (var zi = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
+ return IOUtils.toString(zi, StandardCharsets.UTF_8);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while decompressing scaling data, treating as uncompressed");
+ // Fall back to non-compressed for migration
+ return compressed;
+ }
+ }
}
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 6686982..c926538 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -263,6 +263,10 @@
lagGrowthRate,
conf);
+ vertexScalingMetrics
+ .entrySet()
+ .forEach(e -> e.setValue(ScalingMetrics.roundMetric(e.getValue())));
+
LOG.debug(
"Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
});
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 6b61416..7110fd5 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -70,7 +70,7 @@
for (var vertex : topology.getVerticesInTopologicalOrder()) {
scalingOutput.put(
vertex,
- computeVertexScalingSummary(
+ evaluateMetrics(
conf,
scalingOutput,
metricsHistory,
@@ -110,7 +110,7 @@
}
@NotNull
- private Map<ScalingMetric, EvaluatedScalingMetric> computeVertexScalingSummary(
+ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
index a230412..2378888 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -17,19 +17,22 @@
package org.apache.flink.kubernetes.operator.autoscaler.metrics;
-import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** Evaluated scaling metric. */
@Data
-@AllArgsConstructor
@NoArgsConstructor
public class EvaluatedScalingMetric {
private double current;
private double average;
+ public EvaluatedScalingMetric(double current, double average) {
+ this.current = ScalingMetrics.roundMetric(current);
+ this.average = ScalingMetrics.roundMetric(average);
+ }
+
public static EvaluatedScalingMetric of(double value) {
return new EvaluatedScalingMetric(value, Double.NaN);
}
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 03fb3ea..05a947d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -23,6 +23,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.commons.math3.util.Precision;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -194,4 +195,10 @@
}
return number;
}
+
+ public static double roundMetric(double value) {
+ double rounded = Precision.round(value, 3);
+ // Never round down to 0, return original value instead
+ return rounded == 0 ? value : rounded;
+ }
}
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index e4ba23e..f682300 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -19,15 +19,21 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -97,4 +103,45 @@
Set.of(now.plus(Duration.ofSeconds(15))),
new AutoScalerInfo(data).getScalingHistory().get(v1).keySet());
}
+
+ @Test
+ public void testCompressionMigration() throws JsonProcessingException {
+ var jobUpdateTs = Instant.now();
+ var v1 = new JobVertexID();
+
+ var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>();
+ metricHistory.put(jobUpdateTs, Map.of(v1, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, 1.)));
+
+ var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant, ScalingSummary>>();
+ scalingHistory.put(v1, new TreeMap<>());
+ scalingHistory
+ .get(v1)
+ .put(
+ jobUpdateTs,
+ new ScalingSummary(
+ 1, 2, Map.of(ScalingMetric.LAG, EvaluatedScalingMetric.of(2.))));
+
+ // Store uncompressed data in map to simulate migration
+ var data = new HashMap<String, String>();
+ data.put(
+ AutoScalerInfo.COLLECTED_METRICS_KEY,
+ AutoScalerInfo.YAML_MAPPER.writeValueAsString(metricHistory));
+ data.put(AutoScalerInfo.JOB_UPDATE_TS_KEY, jobUpdateTs.toString());
+ data.put(
+ AutoScalerInfo.SCALING_HISTORY_KEY,
+ AutoScalerInfo.YAML_MAPPER.writeValueAsString(scalingHistory));
+
+ var info = new AutoScalerInfo(data);
+ assertEquals(scalingHistory, info.getScalingHistory());
+ assertEquals(metricHistory, info.getMetricHistory());
+
+ // Override with compressed data
+ var newTs = Instant.now();
+ info.updateMetricHistory(newTs, metricHistory);
+ info.addToScalingHistory(newTs, Map.of(), new Configuration());
+
+ // Make sure we can still access everything
+ assertEquals(scalingHistory, info.getScalingHistory());
+ assertEquals(metricHistory, info.getMetricHistory());
+ }
}
diff --git a/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties b/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..3b7cec7
--- /dev/null
+++ b/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = DEBUG
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable}