[FLINK-38381] Enumerate Kafka partitions across Kafka clusters (#1030)

When using the DynamicKafkaSink, topics can be spread across multiple
clusters. This used to work fine, but a regression has been added which
considers partitions across different clusters to be identical. This limits the
scale out of the source operator.

Here is an example:

```
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset"
```

Those would result be treated as one partition, but there are two partitions from separate kafka clusters.
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index ee3ded9..310e063 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -269,7 +269,7 @@
         try (var restClient = ctx.getRestClusterClient()) {
             Pattern partitionRegex =
                     Pattern.compile(
-                            "^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
+                            "^.*?(\\.kafkaCluster\\.(?<kafkaCluster>.+))?\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
                                     + "|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$");
             for (var vertexInfo : topology.getVertexInfos().values()) {
                 if (vertexInfo.getInputs().isEmpty()) {
@@ -281,12 +281,18 @@
                                                 Matcher matcher = partitionRegex.matcher(v);
                                                 if (matcher.matches()) {
                                                     String kafkaTopic = matcher.group("kafkaTopic");
+                                                    String kafkaCluster =
+                                                            matcher.group("kafkaCluster");
                                                     String kafkaId = matcher.group("kafkaId");
                                                     String pulsarTopic =
                                                             matcher.group("pulsarTopic");
                                                     String pulsarId = matcher.group("pulsarId");
                                                     return kafkaTopic != null
-                                                            ? kafkaTopic + "-" + kafkaId
+                                                            ? kafkaCluster
+                                                                    + "-"
+                                                                    + kafkaTopic
+                                                                    + "-"
+                                                                    + kafkaId
                                                             : pulsarTopic + "-" + pulsarId;
                                                 }
                                                 return null;
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index 1176075..5bca303 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -260,6 +260,20 @@
 
         metricsCollector.setMetricNames(
                 Map.of(
+                        source1,
+                        List.of(
+                                "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
+                                "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset",
+                                "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
+                                "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.1.currentOffset",
+                                "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.2.currentOffset",
+                                "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.3.currentOffset")));
+
+        collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
+        assertEquals(6, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions());
+
+        metricsCollector.setMetricNames(
+                Map.of(
                         source2,
                         List.of(
                                 "0.Source__pulsar_source[1].PulsarConsumer"