[SPARK-40703][SQL] Introduce shuffle on SinglePartition to improve parallelism
### What changes were proposed in this pull request?
This PR fixes a performance regression issue when one side of a join uses `HashPartitioning` with `ShuffleExchange` while the other side uses `SinglePartition`. In this case, Spark will re-shuffle the side with `HashPartitioning` and both sides will end up with only a single partition. This could hurt query performance a lot if the side with `HashPartitioning` contains a lot of input data.
### Why are the changes needed?
After SPARK-35703, when Spark sees that one side of the join has `ShuffleExchange` (meaning it needs to be shuffled anyways), and the other side doesn't, it'll try to avoid shuffling the side without `ShuffleExchange`. For instance:
```
ShuffleExchange(HashPartition(200)) <-> HashPartition(150)
```
will be converted into
```
ShuffleExchange(HashPartition(150)) <-> HashPartition(150)
```
However, when the side without `ShuffleExchange` is `SinglePartition`, like the following:
```
ShuffleExchange(HashPartition(150)) <-> SinglePartition
```
Spark will also do the same which causes the left-hand side to only use one partition. This can hurt job parallelism dramatically, especially when using DataSource V2, since `SinglePartition` is used by the V2 scan. On the other hand, it seems DataSource V1 won't be impacted much as it always report `UnknownPartitioning` in this situation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new unit tests in `EnsureRequirementsSuite`.
Closes #38196 from sunchao/SPARK-40703.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit bde6423c947dea0c7529bd3a1f8a0be36b970ff5)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 69eeab42..209f369 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -529,7 +529,6 @@
* clustering expressions.
*
* This will only be called when:
- * - [[canCreatePartitioning]] returns true.
* - [[isCompatibleWith]] returns false on the side where the `clustering` is from.
*/
def createPartitioning(clustering: Seq[Expression]): Partitioning =
@@ -542,7 +541,7 @@
other.numPartitions == 1
}
- override def canCreatePartitioning: Boolean = true
+ override def canCreatePartitioning: Boolean = false
override def createPartitioning(clustering: Seq[Expression]): Partitioning =
SinglePartition
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
index 7e11d4f..51e7688 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
@@ -367,7 +367,7 @@
assert(HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), distribution)
.canCreatePartitioning)
}
- assert(SinglePartitionShuffleSpec.canCreatePartitioning)
+ assert(!SinglePartitionShuffleSpec.canCreatePartitioning)
withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") {
assert(ShuffleSpecCollection(Seq(
HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 67a58da..581fa14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -73,9 +73,13 @@
case _ => false
}.map(_._2)
+ // Special case: if all sides of the join are single partition
+ val allSinglePartition =
+ childrenIndexes.forall(children(_).outputPartitioning == SinglePartition)
+
// If there are more than one children, we'll need to check partitioning & distribution of them
// and see if extra shuffles are necessary.
- if (childrenIndexes.length > 1) {
+ if (childrenIndexes.length > 1 && !allSinglePartition) {
val specs = childrenIndexes.map(i => {
val requiredDist = requiredChildDistributions(i)
assert(requiredDist.isInstanceOf[ClusteredDistribution],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index c3c8959..000bd8c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -262,7 +262,7 @@
val numExchanges = collect(plan) {
case exchange: ShuffleExchangeExec => exchange
}.length
- assert(numExchanges === 3)
+ assert(numExchanges === 5)
}
{
@@ -278,7 +278,7 @@
val numExchanges = collect(plan) {
case exchange: ShuffleExchangeExec => exchange
}.length
- assert(numExchanges === 3)
+ assert(numExchanges === 5)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index 7237cc5..d692ba5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -433,8 +433,10 @@
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, DummySparkPlan(_, _, SinglePartition, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(SinglePartition, _, _), _), _) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
+ assert(left.numPartitions == 5)
+ assert(right.numPartitions == 5)
case other => fail(other.toString)
}
@@ -690,6 +692,45 @@
}
}
+ test("SPARK-40703: shuffle for SinglePartitionShuffleSpec") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> 20.toString) {
+ // We should re-shuffle the side with single partition when the other side is
+ // `HashPartitioning` with shuffle node, and respect the minimum parallelism.
+ var plan1: SparkPlan = ShuffleExchangeExec(
+ outputPartitioning = HashPartitioning(exprA :: Nil, 10),
+ DummySparkPlan())
+ var plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
+ var smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, plan1, plan2)
+ EnsureRequirements.apply(smjExec) match {
+ case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
+ assert(leftKeys === Seq(exprA))
+ assert(rightKeys === Seq(exprC))
+ assert(left.numPartitions == 20)
+ assert(right.numPartitions == 20)
+ case other => fail(other.toString)
+ }
+
+ // We should also re-shuffle the side with only a single partition even the other side does
+ // not have `ShuffleExchange`, but just `HashPartitioning`. However in this case the minimum
+ // shuffle parallelism will be ignored since we don't want to introduce extra shuffle.
+ plan1 = DummySparkPlan(
+ outputPartitioning = HashPartitioning(exprA :: Nil, 10))
+ plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
+ smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, plan1, plan2)
+ EnsureRequirements.apply(smjExec) match {
+ case SortMergeJoinExec(leftKeys, rightKeys, _, _,
+ SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), _) =>
+ assert(leftKeys === Seq(exprA))
+ assert(rightKeys === Seq(exprC))
+ assert(right.numPartitions == 10)
+ case other => fail(other.toString)
+ }
+ }
+ }
+
test("Check with KeyGroupedPartitioning") {
// simplest case: identity transforms
var plan1 = DummySparkPlan(