[FLINK-32100] Use the total number of Kafka partitions as the max source parallelism (#597)

So far, we've taken the max number partitions we can find. However, the correct way to calculate the
max source parallelism would be to sum the number of partitions of all topis.
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index b20c53e..cb094e7 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -58,6 +58,7 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /** Metric collector using flink rest api. */
@@ -192,28 +193,22 @@
     private void updateKafkaSourceMaxParallelisms(
             RestClusterClient<String> restClient, JobID jobId, JobTopology topology)
             throws Exception {
+        var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
         for (Map.Entry<JobVertexID, Set<JobVertexID>> entry : topology.getInputs().entrySet()) {
             if (entry.getValue().isEmpty()) {
                 var sourceVertex = entry.getKey();
-                queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream()
-                        .map(AggregatedMetric::getId)
-                        .filter(s -> s.endsWith(".currentOffset"))
-                        .mapToInt(
-                                s -> {
-                                    // We extract the partition from the pattern:
-                                    // ...topic.[topic].partition.3.currentOffset
-                                    var split = s.split("\\.");
-                                    return Integer.parseInt(split[split.length - 2]);
-                                })
-                        .max()
-                        .ifPresent(
-                                p -> {
-                                    LOG.debug(
-                                            "Updating source {} max parallelism based on available partitions to {}",
-                                            sourceVertex,
-                                            p + 1);
-                                    topology.updateMaxParallelism(sourceVertex, p + 1);
-                                });
+                var numPartitions =
+                        queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream()
+                                .map(AggregatedMetric::getId)
+                                .filter(partitionRegex.asMatchPredicate())
+                                .count();
+                if (numPartitions > 0) {
+                    LOG.debug(
+                            "Updating source {} max parallelism based on available partitions to {}",
+                            sourceVertex,
+                            numPartitions);
+                    topology.updateMaxParallelism(sourceVertex, (int) numPartitions);
+                }
             }
         }
     }
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 2734151..7bef560 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -271,6 +271,10 @@
                         source1,
                         List.of(
                                 new AggregatedMetric(
+                                        "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.anotherMetric"),
+                                new AggregatedMetric(
+                                        "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset"),
+                                new AggregatedMetric(
                                         "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.currentOffset"),
                                 new AggregatedMetric(
                                         "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.1.currentOffset"),
@@ -280,7 +284,7 @@
                                         "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset"))));
 
         collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, service, conf);
-        assertEquals(4, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
+        assertEquals(5, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
         assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source2));
     }