[FLINK-32100] Use the total number of Kafka partitions as the max source parallelism (#597)
So far, we've taken the max number partitions we can find. However, the correct way to calculate the
max source parallelism would be to sum the number of partitions of all topis.
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index b20c53e..cb094e7 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -58,6 +58,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
/** Metric collector using flink rest api. */
@@ -192,28 +193,22 @@
private void updateKafkaSourceMaxParallelisms(
RestClusterClient<String> restClient, JobID jobId, JobTopology topology)
throws Exception {
+ var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
for (Map.Entry<JobVertexID, Set<JobVertexID>> entry : topology.getInputs().entrySet()) {
if (entry.getValue().isEmpty()) {
var sourceVertex = entry.getKey();
- queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream()
- .map(AggregatedMetric::getId)
- .filter(s -> s.endsWith(".currentOffset"))
- .mapToInt(
- s -> {
- // We extract the partition from the pattern:
- // ...topic.[topic].partition.3.currentOffset
- var split = s.split("\\.");
- return Integer.parseInt(split[split.length - 2]);
- })
- .max()
- .ifPresent(
- p -> {
- LOG.debug(
- "Updating source {} max parallelism based on available partitions to {}",
- sourceVertex,
- p + 1);
- topology.updateMaxParallelism(sourceVertex, p + 1);
- });
+ var numPartitions =
+ queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream()
+ .map(AggregatedMetric::getId)
+ .filter(partitionRegex.asMatchPredicate())
+ .count();
+ if (numPartitions > 0) {
+ LOG.debug(
+ "Updating source {} max parallelism based on available partitions to {}",
+ sourceVertex,
+ numPartitions);
+ topology.updateMaxParallelism(sourceVertex, (int) numPartitions);
+ }
}
}
}
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 2734151..7bef560 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -271,6 +271,10 @@
source1,
List.of(
new AggregatedMetric(
+ "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.anotherMetric"),
+ new AggregatedMetric(
+ "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset"),
+ new AggregatedMetric(
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.currentOffset"),
new AggregatedMetric(
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.1.currentOffset"),
@@ -280,7 +284,7 @@
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset"))));
collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, service, conf);
- assertEquals(4, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
+ assertEquals(5, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source2));
}