Spark: Do not redistribute MERGE INTO rows unless the table is sorted (#2139)

diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
index ef55a8d..b6322de 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
@@ -236,12 +236,19 @@
   def buildWritePlan(
      childPlan: LogicalPlan,
      table: Table): LogicalPlan = {
+    val defaultDistributionMode = table match {
+      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
+        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
+      case _ =>
+        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
+    }
+
     table match {
       case iceTable: SparkTable =>
         val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
         val table = iceTable.table()
         val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, defaultDistributionMode)
         val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
         DistributionMode.fromName(distributionMode) match {
           case DistributionMode.NONE =>