Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
Ballista is a distributed query execution engine that enhances Apache DataFusion by enabling the parallelized execution of workloads across multiple nodes in a distributed environment.
Existing DataFusion application:
use datafusion::prelude::*; #[tokio::main] async fn main() -> datafusion::error::Result<()> { // datafusion context let ctx = SessionContext::new(); // register the table ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; // create a plan to run a SQL query let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?; // execute and print results df.show().await?; Ok(()) }
can be distributed with few lines changed:
use ballista::prelude::*; use datafusion::prelude::*; #[tokio::main] async fn main() -> datafusion::error::Result<()> { // create SessionContext with ballista support // standalone context will start all required // ballista infrastructure in the background as well let ctx = SessionContext::standalone().await?; // everything else remains the same // register the table ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()) .await?; // create a plan to run a SQL query let df = ctx .sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100") .await?; // execute and print results df.show().await?; Ok(()) }
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
cargo install --locked ballista-scheduler cargo install --locked ballista-executor
With these crates installed, it is now possible to start a scheduler process.
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050
by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051
by default. Additional executors can be started by manually specifying a bind port.
For full documentation, refer to the deployment section of the Ballista User Guide
Ballista provides a custom SessionContext
as a starting point for creating queries. DataFrames can be created by invoking the read_csv
, read_parquet
, and sql
methods.
To build a simple ballista example, run the following command to add the dependencies to your Cargo.toml
file:
cargo add ballista datafusion tokio
use ballista::prelude::*; use datafusion::common::Result; use datafusion::prelude::{col, SessionContext, ParquetReadOptions}; use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, average::avg}; #[tokio::main] async fn main() -> Result<()> { // connect to Ballista scheduler let ctx = SessionContext::remote("df://localhost:50050").await?; let filename = "testdata/yellow_tripdata_2022-01.parquet"; // define the query using the DataFrame trait let df = ctx .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["passenger_count", "fare_amount"])? .aggregate( vec![col("passenger_count")], vec![ min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount")), ], )? .sort(vec![col("passenger_count").sort(true, true)])?; df.show().await?; Ok(()) }
The output should look similar to the following table.
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ | passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) | +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ | | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 | | 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 | | 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 | | 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 | | 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 | | 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 | +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
More examples can be found in the datafusion-ballista repository.
We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations.
These are benchmarks derived from TPC-H and not official TPC-H benchmarks. These results are from running individual queries at scale factor 100 (100 GB) on a single node with a single executor and 8 concurrent tasks.
The overall speedup is 2.9x