Ballista usage is very similar to DataFusion. Tha main difference is that the starting point is a BallistaContext instead of the DataFusion SessionContext. Ballista uses the same DataFrame API as DataFusion.
The following code sample demonstrates how to create a BallistaContext to connect to a Ballista scheduler process.
let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; // connect to Ballista scheduler let ctx = BallistaContext::remote("localhost", 50050, &config);
Here is a full example using the DataFrame API.
#[tokio::main] async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; // connect to Ballista scheduler let ctx = BallistaContext::remote("localhost", 50050, &config); let testdata = datafusion::test_util::parquet_test_data(); let filename = &format!("{}/alltypes_plain.parquet", testdata); // define the query using the DataFrame trait let df = ctx .read_parquet(filename)? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; // print the results df.show().await?; Ok(()) }
Here is a full example demonstrating SQL usage.
#[tokio::main] async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; // connect to Ballista scheduler let ctx = BallistaContext::remote("localhost", 50050, &config); let testdata = datafusion::test_util::arrow_test_data(); // register csv file with the execution context ctx.register_csv( "aggregate_test_100", &format!("{}/csv/aggregate_test_100.csv", testdata), CsvReadOptions::new(), )?; // execute the query let df = ctx.sql( "SELECT c1, MIN(c12), MAX(c12) \ FROM aggregate_test_100 \ WHERE c11 > 0.1 AND c11 < 0.9 \ GROUP BY c1", )?; // print the results df.show().await?; Ok(()) }