This directory contains examples for executing distributed queries with Ballista.
The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler and executor are started in-process.
cargo run --example standalone_sql --features="ballista/standalone"
use ballista::{ extension::SessionConfigExt, prelude::* }; use datafusion::{ execution::{options::ParquetReadOptions, SessionStateBuilder}, prelude::{SessionConfig, SessionContext}, }; #[tokio::main] async fn main() -> Result<()> { let config = SessionConfig::new_with_ballista() .with_target_partitions(1) .with_ballista_standalone_parallelism(2); let state = SessionStateBuilder::new() .with_config(config) .with_default_features() .build(); let ctx = SessionContext::standalone_with_state(state).await?; let test_data = test_util::examples_test_data(); // register parquet file with the execution context ctx.register_parquet( "test", &format!("{test_data}/alltypes_plain.parquet"), ParquetReadOptions::default(), ) .await?; let df = ctx.sql("select count(1) from test").await?; df.show().await?; Ok(()) }
For background information on the Ballista architecture, refer to the Ballista README.
From the root of the project, build release binaries.
cargo build --release
Start a Ballista scheduler process in a new terminal session.
RUST_LOG=info ./target/release/ballista-scheduler
Start one or more Ballista executor processes in new terminal sessions. When starting more than one executor, a unique port number must be specified for each executor.
RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051 RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052
The examples can be run using the cargo run --bin
syntax.
cargo run --release --example remote-sql
use ballista::{extension::SessionConfigExt, prelude::*}; use datafusion::{ execution::SessionStateBuilder, prelude::{CsvReadOptions, SessionConfig, SessionContext}, }; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results, using SQL #[tokio::main] async fn main() -> Result<()> { let config = SessionConfig::new_with_ballista() .with_target_partitions(4) .with_ballista_job_name("Remote SQL Example"); let state = SessionStateBuilder::new() .with_config(config) .with_default_features() .build(); let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; let test_data = test_util::examples_test_data(); ctx.register_csv( "test", &format!("{test_data}/aggregate_test_100.csv"), CsvReadOptions::new(), ) .await?; let df = ctx .sql( "SELECT c1, MIN(c12), MAX(c12) \ FROM test \ WHERE c11 > 0.1 AND c11 < 0.9 \ GROUP BY c1", ) .await?; df.show().await?; Ok(()) }
cargo run --release --example remote-dataframe
use ballista::{extension::SessionConfigExt, prelude::*}; use datafusion::{ execution::SessionStateBuilder, prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext}, }; /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and /// fetching results, using the DataFrame trait #[tokio::main] async fn main() -> Result<()> { let config = SessionConfig::new_with_ballista().with_target_partitions(4); let state = SessionStateBuilder::new() .with_config(config) .with_default_features() .build(); let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; let test_data = test_util::examples_test_data(); let filename = format!("{test_data}/alltypes_plain.parquet"); let df = ctx .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; df.show().await?; Ok(()) }