[HUDI-6293] Make HoodieClusteringJob's parallelism of clustering_task more reasonable (#8866)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 633f06b..223f85d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -310,9 +310,12 @@
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
+ int inputGroupSize = clusteringPlan.getInputGroups().size();
+
// get clusteringParallelism.
int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
- ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+ ? inputGroupSize
+ : Math.min(conf.getInteger(FlinkOptions.CLUSTERING_TASKS), inputGroupSize);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());