| <!--- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Ballista Examples |
| |
| This directory contains examples for executing distributed queries with Ballista. |
| |
| ## Standalone Examples |
| |
| The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler |
| and executor are started in-process. |
| |
| ```bash |
| cargo run --example standalone_sql --features="ballista/standalone" |
| ``` |
| |
| ### Source code for standalone SQL example |
| |
| ```rust |
| use ballista::{ |
| extension::SessionConfigExt, |
| prelude::* |
| }; |
| use datafusion::{ |
| execution::{options::ParquetReadOptions, SessionStateBuilder}, |
| prelude::{SessionConfig, SessionContext}, |
| }; |
| |
| #[tokio::main] |
| async fn main() -> Result<()> { |
| let config = SessionConfig::new_with_ballista() |
| .with_target_partitions(1) |
| .with_ballista_standalone_parallelism(2); |
| |
| let state = SessionStateBuilder::new() |
| .with_config(config) |
| .with_default_features() |
| .build(); |
| |
| let ctx = SessionContext::standalone_with_state(state).await?; |
| |
| let test_data = test_util::examples_test_data(); |
| |
| // register parquet file with the execution context |
| ctx.register_parquet( |
| "test", |
| &format!("{test_data}/alltypes_plain.parquet"), |
| ParquetReadOptions::default(), |
| ) |
| .await?; |
| |
| let df = ctx.sql("select count(1) from test").await?; |
| |
| df.show().await?; |
| Ok(()) |
| } |
| |
| ``` |
| |
| ## Distributed Examples |
| |
| For background information on the Ballista architecture, refer to |
| the [Ballista README](../ballista/client/README.md). |
| |
| ### Start a standalone cluster |
| |
| From the root of the project, build release binaries. |
| |
| ```bash |
| cargo build --release |
| ``` |
| |
| Start a Ballista scheduler process in a new terminal session. |
| |
| ```bash |
| RUST_LOG=info ./target/release/ballista-scheduler |
| ``` |
| |
| Start one or more Ballista executor processes in new terminal sessions. When starting more than one |
| executor, a unique port number must be specified for each executor. |
| |
| ```bash |
| RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051 |
| RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052 |
| ``` |
| |
| ### Running the examples |
| |
| The examples can be run using the `cargo run --bin` syntax. |
| |
| ### Distributed SQL Example |
| |
| ```bash |
| cargo run --release --example remote-sql |
| ``` |
| |
| #### Source code for distributed SQL example |
| |
| ```rust |
| use ballista::{extension::SessionConfigExt, prelude::*}; |
| use datafusion::{ |
| execution::SessionStateBuilder, |
| prelude::{CsvReadOptions, SessionConfig, SessionContext}, |
| }; |
| |
| /// This example demonstrates executing a simple query against an Arrow data source (CSV) and |
| /// fetching results, using SQL |
| #[tokio::main] |
| async fn main() -> Result<()> { |
| let config = SessionConfig::new_with_ballista() |
| .with_target_partitions(4) |
| .with_ballista_job_name("Remote SQL Example"); |
| |
| let state = SessionStateBuilder::new() |
| .with_config(config) |
| .with_default_features() |
| .build(); |
| |
| let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; |
| |
| let test_data = test_util::examples_test_data(); |
| |
| ctx.register_csv( |
| "test", |
| &format!("{test_data}/aggregate_test_100.csv"), |
| CsvReadOptions::new(), |
| ) |
| .await?; |
| |
| let df = ctx |
| .sql( |
| "SELECT c1, MIN(c12), MAX(c12) \ |
| FROM test \ |
| WHERE c11 > 0.1 AND c11 < 0.9 \ |
| GROUP BY c1", |
| ) |
| .await?; |
| |
| df.show().await?; |
| |
| Ok(()) |
| } |
| ``` |
| |
| ### Distributed DataFrame Example |
| |
| ```bash |
| cargo run --release --example remote-dataframe |
| ``` |
| |
| #### Source code for distributed DataFrame example |
| |
| ```rust |
| use ballista::{extension::SessionConfigExt, prelude::*}; |
| use datafusion::{ |
| execution::SessionStateBuilder, |
| prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext}, |
| }; |
| |
| /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and |
| /// fetching results, using the DataFrame trait |
| #[tokio::main] |
| async fn main() -> Result<()> { |
| let config = SessionConfig::new_with_ballista().with_target_partitions(4); |
| |
| let state = SessionStateBuilder::new() |
| .with_config(config) |
| .with_default_features() |
| .build(); |
| |
| let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; |
| |
| let test_data = test_util::examples_test_data(); |
| let filename = format!("{test_data}/alltypes_plain.parquet"); |
| |
| let df = ctx |
| .read_parquet(filename, ParquetReadOptions::default()) |
| .await? |
| .select_columns(&["id", "bool_col", "timestamp_col"])? |
| .filter(col("id").gt(lit(1)))?; |
| |
| df.show().await?; |
| |
| Ok(()) |
| } |
| ``` |