[MINOR] Added safety-net check to catch any potential issue to deduce parallelism from the incoming `Dataset` appropriately (#7873)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index a6488b0..e239db1 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -203,6 +203,17 @@
.values
}
+ override protected def deduceShuffleParallelism(input: DataFrame, configuredParallelism: Int): Int = {
+ val deduceParallelism = super.deduceShuffleParallelism(input, configuredParallelism)
+ // NOTE: In case parallelism deduction failed to accurately deduce parallelism level of the
+ // incoming dataset we fallback to default parallelism level set for this Spark session
+ if (deduceParallelism > 0) {
+ deduceParallelism
+ } else {
+ input.sparkSession.sparkContext.defaultParallelism
+ }
+ }
+
private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = {
val partitionPathFields = getPartitionPathFields(config).toSet
val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))