| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Comet Plugin Architecture |
| |
| ## Overview |
| |
| The Comet plugin enhances Spark SQL by introducing optimized query execution and shuffle mechanisms leveraging |
| native code. It integrates with Spark's plugin framework and extension API to replace or extend Spark's |
| default behavior. |
| |
| --- |
| |
| # Plugin Components |
| |
| ## Comet SQL Plugin |
| |
| The entry point to Comet is the org.apache.spark.CometPlugin class, which is registered in Spark using the following |
| configuration: |
| |
| ``` |
| --conf spark.plugins=org.apache.spark.CometPlugin |
| ``` |
| |
| The plugin is loaded on the Spark driver and does not provide executor-side plugins. |
| |
| The plugin will update the current `SparkConf` with the extra configuration provided by Comet, such as executor memory |
| configuration. |
| |
| The plugin also registers `CometSparkSessionExtensions` with Spark's extension API. |
| |
| ## CometSparkSessionExtensions |
| |
| On initialization, this class registers two physical plan optimization rules with Spark: `CometScanRule` |
| and `CometExecRule`. These rules run whenever a query stage is being planned during Adaptive Query Execution, and |
| run once for the entire plan when Adaptive Query Execution is disabled. |
| |
| ### CometScanRule |
| |
| `CometScanRule` replaces any Parquet scans with Comet operators. There are different paths for Spark v1 and v2 data sources. |
| |
| When reading from Parquet v1 data sources, Comet replaces `FileSourceScanExec` with a `CometScanExec`, and for v2 |
| data sources, `BatchScanExec` is replaced with `CometBatchScanExec`. In both cases, Comet replaces Spark's Parquet |
| reader with a custom vectorized Parquet reader. This is similar to Spark's vectorized Parquet reader used by the v2 |
| Parquet data source but leverages native code for decoding Parquet row groups directly into Arrow format. |
| |
| Comet only supports a subset of data types and will fall back to Spark's scan if unsupported types |
| exist. Comet can still accelerate the rest of the query execution in this case because `CometSparkToColumnarExec` will |
| convert the output from Spark's scan to Arrow arrays. Note that both `spark.comet.exec.enabled=true` and |
| `spark.comet.convert.parquet.enabled=true` must be set to enable this conversion. |
| |
| Refer to the [Supported Spark Data Types](https://datafusion.apache.org/comet/user-guide/datatypes.html) section |
| in the contributor guide to see a list of currently supported data types. |
| |
| ### CometExecRule |
| |
| This rule traverses bottom-up from the original Spark plan and attempts to replace each operator with a Comet equivalent. |
| For example, a `ProjectExec` will be replaced by `CometProjectExec`. |
| |
| When replacing a node, various checks are performed to determine if Comet can support the operator and its expressions. |
| If an operator, expression, or data type is not supported by Comet then the reason will be stored in a tag on the |
| underlying Spark node and the plan will not be converted. |
| |
| Comet does not support partially replacing subsets of the plan within a query stage because this would involve adding |
| transitions to convert between row-based and columnar data between Spark operators and Comet operators and the overhead |
| of this could outweigh the benefits of running parts of the query stage natively in Comet. |
| |
| ## Query Execution |
| |
| Once the plan has been transformed, any consecutive native Comet operators are combined into a `CometNativeExec` which contains |
| a protocol buffer serialized version of the plan (the serialization code can be found in `QueryPlanSerde`). |
| |
| Spark serializes the physical plan and sends it to the executors when executing tasks. The executors deserialize the |
| plan and invoke it. |
| |
| When `CometNativeExec` is invoked, it will pass the serialized protobuf plan into |
| `Native.createPlan`, which invokes the native code via JNI, where the plan is then deserialized. |
| |
| In the native code there is a `PhysicalPlanner` struct (in `planner.rs`) which converts the deserialized plan into an |
| Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized physical operators and expressions to |
| override the DataFusion versions to ensure compatibility with Apache Spark. |
| |
| The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to |
| `CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan, |
| a Spark exchange, or another native plan. |
| |
| `CometNativeExec` creates a `CometExecIterator` and applies this iterator to the input RDD |
| partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes |
| executing, the resulting Arrow batches are imported into the JVM using Arrow FFI. |
| |
| ## Shuffle |
| |
| Comet integrates with Spark's shuffle mechanism, optimizing both shuffle writes and reads. Comet's shuffle manager |
| must be registered with Spark using the following configuration: |
| |
| ``` |
| --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager |
| ``` |
| |
| ### Shuffle Writes |
| |
| For shuffle writes, a `ShuffleMapTask` runs in the executors. This task contains a `ShuffleDependency` that is |
| broadcast to all of the executors. It then passes the input RDD to `ShuffleWriteProcessor.write()` which |
| requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a Comet shuffle writer. |
| |
| `ShuffleWriteProcessor` then invokes the dependency RDD and fetches rows/batches and passes them to Comet's |
| shuffle writer, which writes batches to disk in Arrow IPC format. |
| |
| As a result, we cannot avoid having one native plan to produce the shuffle input and another native plan for |
| writing the batches to the shuffle file. |
| |
| ### Shuffle Reads |
| |
| For shuffle reads a `ShuffledRDD` requests a `ShuffleReader` from the shuffle manager. Comet provides a |
| `CometBlockStoreShuffleReader` which is implemented in JVM and fetches blocks from Spark and then creates an |
| `ArrowReaderIterator` to process the blocks using Arrow's `StreamReader` for decoding IPC batches. |