This directory contains examples for executing distributed queries with Ballista.
The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler and executor are started in-process.
cargo run --example standalone_sql --features="ballista/standalone"
#[tokio::main] async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "1") .build()?; let ctx = BallistaContext::standalone(&config, 2).await?; ctx.register_csv( "test", "testdata/aggregate_test_100.csv", CsvReadOptions::new(), ) .await?; let df = ctx.sql("select count(1) from test").await?; df.show().await?; Ok(()) }
For background information on the Ballista architecture, refer to the Ballista README.
From the root of the project, build release binaries.
cargo build --release
Start a Ballista scheduler process in a new terminal session.
RUST_LOG=info ./target/release/ballista-scheduler
Start one or more Ballista executor processes in new terminal sessions. When starting more than one executor, a unique port number must be specified for each executor.
RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051 RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052
The examples can be run using the cargo run --bin
syntax.
cargo run --release --bin sql
use ballista::prelude::*; use datafusion::prelude::CsvReadOptions; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results, using SQL #[tokio::main] async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; let ctx = BallistaContext::remote("localhost", 50050, &config).await?; // register csv file with the execution context ctx.register_csv( "test", "testdata/aggregate_test_100.csv", CsvReadOptions::new(), ) .await?; // execute the query let df = ctx .sql( "SELECT c1, MIN(c12), MAX(c12) \ FROM test \ WHERE c11 > 0.1 AND c11 < 0.9 \ GROUP BY c1", ) .await?; // print the results df.show().await?; Ok(()) }
cargo run --release --bin dataframe
#[tokio::main] async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; let ctx = BallistaContext::remote("localhost", 50050, &config).await?; let filename = "testdata/alltypes_plain.parquet"; // define the query using the DataFrame trait let df = ctx .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; // print the results df.show().await?; Ok(()) }