Spark: Do not redistribute MERGE INTO rows unless the table is sorted (#2139)
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
index ef55a8d..b6322de 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
@@ -236,12 +236,19 @@
def buildWritePlan(
childPlan: LogicalPlan,
table: Table): LogicalPlan = {
+ val defaultDistributionMode = table match {
+ case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
+ TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
+ case _ =>
+ TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
+ }
+
table match {
case iceTable: SparkTable =>
val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
val table = iceTable.table()
val distributionMode: String = table.properties
- .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+ .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, defaultDistributionMode)
val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
DistributionMode.fromName(distributionMode) match {
case DistributionMode.NONE =>