blob: 1b330edeabe0249c804b78d4511d51cbca92c960 [file] [view]
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# DataFusion for Ray Design Documentation
DataFusion for Ray is a distributed SQL query engine that is powered by DataFusion and Ray.
DataFusion provides a high-performance query engine that is already partition-aware, with partitions being executed
in parallel in separate threads. DataFusion for Ray provides a distributed query planner that translates a DataFusion physical
plan into a distributed plan.
Note that this document is dated from an early implementation of DataFusion for Ray. The details around shuffle differ in the current ArrowFlight Streaming based implementation.
However the general discussion around how to break a physical plan into discrete stages remains useful, and we retain this document here.
Let's walk through an example to see how that works. We'll use [SQLBench-H](https://github.com/sql-benchmarks/sqlbench-h)
query 3 for the example. This is an aggregate query with a three-way join.
_SQLBench-H Query 3_
```sql
-- SQLBench-H query 3 derived from TPC-H query 3 under the terms of the TPC Fair Use Policy.
-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council.
select
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_mktsegment = 'HOUSEHOLD'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < date '1995-03-21'
and l_shipdate > date '1995-03-21'
group by
l_orderkey,
o_orderdate,
o_shippriority
order by
revenue desc,
o_orderdate limit 10;
```
## DataFusion's Logical Plan
DataFusion produces the following optimized _logical_ query plan. Note that this plan does not have the
concept of partitions yet.
```text
Limit: skip=0, fetch=10
Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST, fetch=10
Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, orders.o_orderdate, orders.o_shippriority
Aggregate: groupBy=[[lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(35, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(35, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
Filter: customer.c_mktsegment = Utf8("BUILDING")
TableScan: customer projection=[c_custkey, c_mktsegment], partial_filters=[customer.c_mktsegment = Utf8("BUILDING")]
Filter: orders.o_orderdate < Date32("9204")
TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority], partial_filters=[orders.o_orderdate < Date32("9204")]
Filter: lineitem.l_shipdate > Date32("9204")
TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate], partial_filters=[lineitem.l_shipdate > Date32("9204")]
```
## DataFusion's Physical Plan
DataFusion's physical plan lists all the files to be queried, and they are organized into partitions to allow for
parallel execution within a single process. In this example, the level of concurrency was configured to be four, so
we see `partitions={4 groups: [[ ... ]]` in the leaf `ParquetExec` nodes, with the filenames listed in four groups.
Here is the full physical plan for query 3.
```text
GlobalLimitExec: skip=0, fetch=10
SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
SortExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 4), input_partitions=4
AggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
FilterExec: c_mktsegment@1 = BUILDING
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-3.parquet]]}, predicate=c_mktsegment = Utf8("BUILDING"), pruning_predicate=c_mktsegment_min@0 <= BUILDING AND BUILDING <= c_mktsegment_max@1, projection=[c_custkey, c_mktsegment]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
FilterExec: o_orderdate@2 < 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-3.parquet]]}, predicate=o_orderdate < Date32("9204"), pruning_predicate=o_orderdate_min@0 < 9204, projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
FilterExec: l_shipdate@3 > 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate > Date32("9204"), pruning_predicate=l_shipdate_max@0 > 9204, projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate]
```
## Partitioning & Distribution
The partitioning scheme changes throughout the plan and this is the most important concept to
understand in order to understand DataFusion for Ray's design. Changes in partitioning are implemented by the `RepartitionExec`
operator in DataFusion and are happen in the following scenarios.
### Joins
The first join to happen is between `customer` and `orders`. The join condition is
`customers.c_custkey = orders.o_custkey`. To perform this join in parallel we first
need to repartition the data on the join keys so that the data for customers with `c_custkey = 1`
and the data for orders with `o_custkey = 1` can be sent to the same thread or node for
processing. This allows the join to happen in parallel. This is known as a hash-partitioned
join and is implemented by the `HashJoinExec` operator in DataFusion.
We can see that `DataFusion` has inserted `RepartitionExec` operators around both inputs to the join.
### Aggregates
There are multiple approaches to distributed aggregate queries. Here are two popular approaches:
- Perform aggregates in parallel on each partition, where the resulting data for each partition could contain
duplicate grouping keys and then perform a final aggregate on the intermediate aggregates to remove the duplicates.
- Partition the input data by the grouping keys so that the aggregates from each partition can simply be merged to
produce the final result.
For this example query, the data is already partitioned on the aggregate's grouping key so the latter approach is used.
### Sort
Sort also has multiple approaches.
- The input partitions can be collapsed down to a single partition and then sorted
- Partitions can be sorted in parallel and then merged using a sort-preserving merge
DataFusion and DataFusion for Ray currently choose the first approach, but there is a DataFusion PR open for implementing the second.
### Limit
- The input partitions can be collapsed down to a single partition and then have the limit applied
- The limit can be pushed down to each partition as well
## Query Stages
The first two query stages to be executed will read the `customer` and `order` parquet files and reparition them by the join keys `c_custkey` and `o_custkey`.
```text
Query Stage #0:
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "c_custkey", index: 0 }], 4))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: c_mktsegment@1 = BUILDING
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-3.parquet]]}, predicate=c_mktsegment = Utf8("BUILDING"), pruning_predicate=c_mktsegment_min@0 <= BUILDING AND BUILDING <= c_mktsegment_max@1, projection=[c_custkey, c_mktsegment]
Query Stage #1:
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "o_custkey", index: 1 }], 4))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: o_orderdate@2 < 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-3.parquet]]}, predicate=o_orderdate < Date32("9204"), pruning_predicate=o_orderdate_min@0 < 9204, projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority]
```
Now that these inputs are partitioned by join key, we can execute the join itself. The two inputs to the join are instances of `ShuffleReaderExec` that
read the shuffle files produced by the previous stages. The result of the join is repartitioned by `o_order_key` in preparation for the next join.
```text
Query Stage #2:
ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 4))
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitions=4)
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=1, input_partitions=4)
```
We continue to prepare for the next join with `lineitem` by repartitioning the parquet files by the join key.
```text
Query Stage #3:
ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 4))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: l_shipdate@3 > 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate > Date32("9204"), pruning_predicate=l_shipdate_max@0 > 9204, projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate]
```
With the inputs repartitioned by the join keys, we can execute a query stage to perform the join. However, in this case, we also perform an aggregate in the
same stage without any additional repartitioning. This is possible because the aggregate grouping includes `l_orderkey` which we already partitioned on. This
means that we can perform this aggregate in parallel and guarantee that there will be no duplicates in the output of each aggregate. Pretty neat!
The output of this shuffle is partitioned by `l_orderkey`, `o_orderdate`, and `o_shippriority` in preparation for the `ORDER BY` part of the query.
```text
Query Stage #4:
ShuffleWriterExec(stage_id=4, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 4))
AggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitions=4)
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=3, input_partitions=4)
```
Now we perform a final aggregate (which is maybe redundant?) and then sort the results in parallel across the
partitions.
```text
Query Stage #5:
ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column { name: "o_shippriority", index: 3 }], 4))
SortExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=4, input_partitions=4)
```
Finally, we have a query stage that reads the sorted results and merges them into a single partition (preserving the
sort order) and applies the `LIMIT` clause.
```text
Query Stage #6:
GlobalLimitExec: skip=0, fetch=10
SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
ShuffleReaderExec(stage_id=5, input_partitions=4)
```
## Distributed Scheduling
In the previous section we walked through the plan as if we were executing one query stage at a time. However, some
of these query stages could execute in parallel (assuming enough resource is available).
For example, we could execute query stage 3 to repartition the `lineitem` input without waiting for query stages 0, 1,
and 2 to finish.
More generally, we can typically execute all remaining leaf nodes of the plan concurrently.
The `execute_query_stage` method in `context.py` is a remote method that recursively walks the query plan and executes
child plans, building up a DAG of futures.
## Distributed Shuffle
The output of each query stage needs to be persisted somewhere so that the next query stage can read it.
DataFusion for Ray uses the Ray object store as a shared file system, which was proposed [here](https://github.com/datafusion-contrib/ray-sql/issues/22) and implemented [here](https://github.com/datafusion-contrib/ray-sql/pull/33).
DataFusion's `RepartitionExec` uses threads and channels within a single process and is not suitable for a
distributed query engine, so DataFusion for Ray rewrites the physical plan and replaces the `RepartionExec` with a pair of
operators to perform a "shuffle". These are the `ShuffleWriterExec` and `ShuffleReaderExec`.
### Shuffle Writes
`ShuffleWriterExec` reads input partitions and repartitions them, using the same `BatchPartitioner` that DataFusion
uses, then writes the output to disk in Arrow IPC format.
### Shuffle Reads
`ShuffleReaderExec` reads the shuffle files written by the `ShuffleWriterExec`.