[SPARK-40703][SQL] Introduce shuffle on SinglePartition to improve parallelism

### What changes were proposed in this pull request?

This PR fixes a performance regression issue when one side of a join uses `HashPartitioning` with `ShuffleExchange` while the other side uses `SinglePartition`. In this case, Spark will re-shuffle the side with `HashPartitioning` and both sides will end up with only a single partition. This could hurt query performance a lot if the side with `HashPartitioning` contains a lot of input data.

### Why are the changes needed?

After SPARK-35703, when Spark sees that one side of the join has `ShuffleExchange` (meaning it needs to be shuffled anyways), and the other side doesn't, it'll try to avoid shuffling the side without `ShuffleExchange`. For instance:

```
ShuffleExchange(HashPartition(200)) <-> HashPartition(150)
```

will be converted into
```
ShuffleExchange(HashPartition(150)) <-> HashPartition(150)
```

However, when the side without `ShuffleExchange` is `SinglePartition`, like the following:
```
ShuffleExchange(HashPartition(150)) <-> SinglePartition
```

Spark will also do the same which causes the left-hand side to only use one partition. This can hurt job parallelism dramatically, especially when using DataSource V2, since `SinglePartition` is used by the V2 scan. On the other hand, it seems DataSource V1 won't be impacted much as it always report `UnknownPartitioning` in this situation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new unit tests in `EnsureRequirementsSuite`.

Closes #38196 from sunchao/SPARK-40703.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit bde6423c947dea0c7529bd3a1f8a0be36b970ff5)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 69eeab42..209f369 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -529,7 +529,6 @@
    * clustering expressions.
    *
    * This will only be called when:
-   *  - [[canCreatePartitioning]] returns true.
    *  - [[isCompatibleWith]] returns false on the side where the `clustering` is from.
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
@@ -542,7 +541,7 @@
     other.numPartitions == 1
   }
 
-  override def canCreatePartitioning: Boolean = true
+  override def canCreatePartitioning: Boolean = false
 
   override def createPartitioning(clustering: Seq[Expression]): Partitioning =
     SinglePartition
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
index 7e11d4f..51e7688 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
@@ -367,7 +367,7 @@
       assert(HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), distribution)
         .canCreatePartitioning)
     }
-    assert(SinglePartitionShuffleSpec.canCreatePartitioning)
+    assert(!SinglePartitionShuffleSpec.canCreatePartitioning)
     withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") {
       assert(ShuffleSpecCollection(Seq(
         HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 67a58da..581fa14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -73,9 +73,13 @@
       case _ => false
     }.map(_._2)
 
+    // Special case: if all sides of the join are single partition
+    val allSinglePartition =
+      childrenIndexes.forall(children(_).outputPartitioning == SinglePartition)
+
     // If there are more than one children, we'll need to check partitioning & distribution of them
     // and see if extra shuffles are necessary.
-    if (childrenIndexes.length > 1) {
+    if (childrenIndexes.length > 1 && !allSinglePartition) {
       val specs = childrenIndexes.map(i => {
         val requiredDist = requiredChildDistributions(i)
         assert(requiredDist.isInstanceOf[ClusteredDistribution],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index c3c8959..000bd8c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -262,7 +262,7 @@
           val numExchanges = collect(plan) {
             case exchange: ShuffleExchangeExec => exchange
           }.length
-          assert(numExchanges === 3)
+          assert(numExchanges === 5)
         }
 
         {
@@ -278,7 +278,7 @@
           val numExchanges = collect(plan) {
             case exchange: ShuffleExchangeExec => exchange
           }.length
-          assert(numExchanges === 3)
+          assert(numExchanges === 5)
         }
 
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index 7237cc5..d692ba5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -433,8 +433,10 @@
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, DummySparkPlan(_, _, SinglePartition, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(SinglePartition, _, _), _), _) =>
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
+          assert(left.numPartitions == 5)
+          assert(right.numPartitions == 5)
         case other => fail(other.toString)
       }
 
@@ -690,6 +692,45 @@
     }
   }
 
+  test("SPARK-40703: shuffle for SinglePartitionShuffleSpec") {
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> 20.toString) {
+      // We should re-shuffle the side with single partition when the other side is
+      // `HashPartitioning` with shuffle node, and respect the minimum parallelism.
+      var plan1: SparkPlan = ShuffleExchangeExec(
+        outputPartitioning = HashPartitioning(exprA :: Nil, 10),
+        DummySparkPlan())
+      var plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
+      var smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, plan1, plan2)
+      EnsureRequirements.apply(smjExec) match {
+        case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
+          assert(leftKeys === Seq(exprA))
+          assert(rightKeys === Seq(exprC))
+          assert(left.numPartitions == 20)
+          assert(right.numPartitions == 20)
+        case other => fail(other.toString)
+      }
+
+      // We should also re-shuffle the side with only a single partition even the other side does
+      // not have `ShuffleExchange`, but just `HashPartitioning`. However in this case the minimum
+      // shuffle parallelism will be ignored since we don't want to introduce extra shuffle.
+      plan1 = DummySparkPlan(
+        outputPartitioning = HashPartitioning(exprA :: Nil, 10))
+      plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
+      smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, plan1, plan2)
+      EnsureRequirements.apply(smjExec) match {
+        case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+        SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
+          assert(leftKeys === Seq(exprA))
+          assert(rightKeys === Seq(exprC))
+          assert(right.numPartitions == 10)
+        case other => fail(other.toString)
+      }
+    }
+  }
+
   test("Check with KeyGroupedPartitioning") {
     // simplest case: identity transforms
     var plan1 = DummySparkPlan(