| <!--- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Ballista: Distributed Scheduler for Apache DataFusion |
| |
| Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs. |
| |
|  |
| |
| Ballista is a distributed query execution engine that enhances [Apache DataFusion](https://github.com/apache/datafusion) by enabling the parallelized execution of workloads across multiple nodes in a distributed environment. |
| |
| Existing DataFusion application: |
| |
| ```rust,no_run |
| use datafusion::prelude::*; |
| |
| #[tokio::main] |
| async fn main() -> datafusion::error::Result<()> { |
| // datafusion context |
| let ctx = SessionContext::new(); |
| |
| // register the table |
| ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; |
| |
| // create a plan to run a SQL query |
| let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?; |
| |
| // execute and print results |
| df.show().await?; |
| Ok(()) |
| } |
| ``` |
| |
| can be distributed with few lines changed: |
| |
| ```rust,no_run |
| use ballista::prelude::*; |
| use datafusion::prelude::*; |
| |
| #[tokio::main] |
| async fn main() -> datafusion::error::Result<()> { |
| // create SessionContext with ballista support |
| // standalone context will start all required |
| // ballista infrastructure in the background as well |
| let ctx = SessionContext::standalone().await?; |
| |
| // everything else remains the same |
| |
| // register the table |
| ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()) |
| .await?; |
| |
| // create a plan to run a SQL query |
| let df = ctx |
| .sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100") |
| .await?; |
| |
| // execute and print results |
| df.show().await?; |
| Ok(()) |
| } |
| ``` |
| |
| ## Starting a cluster |
| |
|  |
| |
| A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates. |
| |
| ```bash |
| cargo install --locked ballista-scheduler |
| cargo install --locked ballista-executor |
| ``` |
| |
| With these crates installed, it is now possible to start a scheduler process. |
| |
| ```bash |
| RUST_LOG=info ballista-scheduler |
| ``` |
| |
| The scheduler will bind to port `50050` by default. |
| |
| Next, start an executor processes in a new terminal session with the specified concurrency level. |
| |
| ```bash |
| RUST_LOG=info ballista-executor -c 4 |
| ``` |
| |
| The executor will bind to port `50051` by default. Additional executors can be started by manually specifying a bind port. |
| |
| For full documentation, refer to the deployment section of the |
| [Ballista User Guide](https://datafusion.apache.org/ballista/user-guide/deployment/) |
| |
| ## Executing a Query |
| |
| Ballista provides a custom `SessionContext` as a starting point for creating queries. DataFrames can be created by invoking the `read_csv`, `read_parquet`, and `sql` methods. |
| |
| To build a simple ballista example, run the following command to add the dependencies to your `Cargo.toml` file: |
| |
| ```bash |
| cargo add ballista datafusion tokio |
| ``` |
| |
| ```rust,no_run |
| use ballista::prelude::*; |
| use datafusion::common::Result; |
| use datafusion::prelude::{col, SessionContext, ParquetReadOptions}; |
| use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, average::avg}; |
| |
| #[tokio::main] |
| async fn main() -> Result<()> { |
| |
| // connect to Ballista scheduler |
| let ctx = SessionContext::remote("df://localhost:50050").await?; |
| |
| let filename = "testdata/yellow_tripdata_2022-01.parquet"; |
| |
| // define the query using the DataFrame trait |
| let df = ctx |
| .read_parquet(filename, ParquetReadOptions::default()) |
| .await? |
| .select_columns(&["passenger_count", "fare_amount"])? |
| .aggregate( |
| vec![col("passenger_count")], |
| vec![ |
| min(col("fare_amount")), |
| max(col("fare_amount")), |
| avg(col("fare_amount")), |
| sum(col("fare_amount")), |
| ], |
| )? |
| .sort(vec![col("passenger_count").sort(true, true)])?; |
| |
| df.show().await?; |
| |
| Ok(()) |
| } |
| ``` |
| |
| The output should look similar to the following table. |
| |
| ```text |
| +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ |
| | passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) | |
| +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ |
| | | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 | |
| | 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 | |
| | 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 | |
| | 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 | |
| | 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 | |
| | 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 | |
| +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ |
| ``` |
| |
| More [examples](../../examples/examples/) can be found in the datafusion-ballista repository. |
| |
| ## Performance |
| |
| We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations. |
| |
| These are benchmarks derived from TPC-H and not official TPC-H benchmarks. These results are from running individual queries at scale factor 100 (100 GB) on a single node with a single executor and 8 concurrent tasks. |
| |
| ### Overall Speedup |
| |
| The overall speedup is 2.9x |
| |
|  |
| |
| ### Per Query Comparison |
| |
|  |
| |
| ### Relative Speedup |
| |
|  |
| |
| ### Absolute Speedup |
| |
|  |