[HUDI-6293] Make HoodieClusteringJob's parallelism of clustering_task more reasonable (#8866)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 633f06b..223f85d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -310,9 +310,12 @@
 
       HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
 
+      int inputGroupSize = clusteringPlan.getInputGroups().size();
+
       // get clusteringParallelism.
       int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
-          ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+          ? inputGroupSize
+          : Math.min(conf.getInteger(FlinkOptions.CLUSTERING_TASKS), inputGroupSize);
 
       // Mark instant as clustering inflight
       table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());