[FLINK-38381] Enumerate Kafka partitions across Kafka clusters (#1030)
When using the DynamicKafkaSink, topics can be spread across multiple
clusters. This used to work fine, but a regression has been added which
considers partitions across different clusters to be identical. This limits the
scale out of the source operator.
Here is an example:
```
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset"
```
Those would result be treated as one partition, but there are two partitions from separate kafka clusters.
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index ee3ded9..310e063 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -269,7 +269,7 @@
try (var restClient = ctx.getRestClusterClient()) {
Pattern partitionRegex =
Pattern.compile(
- "^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
+ "^.*?(\\.kafkaCluster\\.(?<kafkaCluster>.+))?\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
+ "|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$");
for (var vertexInfo : topology.getVertexInfos().values()) {
if (vertexInfo.getInputs().isEmpty()) {
@@ -281,12 +281,18 @@
Matcher matcher = partitionRegex.matcher(v);
if (matcher.matches()) {
String kafkaTopic = matcher.group("kafkaTopic");
+ String kafkaCluster =
+ matcher.group("kafkaCluster");
String kafkaId = matcher.group("kafkaId");
String pulsarTopic =
matcher.group("pulsarTopic");
String pulsarId = matcher.group("pulsarId");
return kafkaTopic != null
- ? kafkaTopic + "-" + kafkaId
+ ? kafkaCluster
+ + "-"
+ + kafkaTopic
+ + "-"
+ + kafkaId
: pulsarTopic + "-" + pulsarId;
}
return null;
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index 1176075..5bca303 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -260,6 +260,20 @@
metricsCollector.setMetricNames(
Map.of(
+ source1,
+ List.of(
+ "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
+ "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset",
+ "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
+ "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.1.currentOffset",
+ "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.2.currentOffset",
+ "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.3.currentOffset")));
+
+ collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
+ assertEquals(6, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions());
+
+ metricsCollector.setMetricNames(
+ Map.of(
source2,
List.of(
"0.Source__pulsar_source[1].PulsarConsumer"