blob: dde19cb476f149b9672d3524b9e2a4da2e390624 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! See `main.rs` for how to run it.
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, StringViewArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::MemTable;
use datafusion::common::ScalarValue;
use datafusion::common::config::CsvOptions;
use datafusion::common::parsers::CompressionTypeVariant;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::min_max::max;
use datafusion::prelude::*;
use datafusion_examples::utils::{datasets::ExampleDataset, write_csv_to_parquet};
use tempfile::{TempDir, tempdir};
use tokio::fs::create_dir_all;
/// This example demonstrates using DataFusion's DataFrame API
///
/// # Reading from different formats
///
/// * [read_parquet]: execute queries against parquet files
/// * [read_csv]: execute queries against csv files
/// * [read_memory]: execute queries against in-memory arrow data
/// * [read_memory_macro]: execute queries against in-memory arrow data using macro
///
/// # Writing out to local storage
///
/// The following examples demonstrate how to write a DataFrame to local
/// storage. See `external_dependency/dataframe-to-s3.rs` for an example writing
/// to a remote object store.
///
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
///
/// # Executing subqueries
///
/// * [where_scalar_subquery]: execute a scalar subquery
/// * [where_in_subquery]: execute a subquery with an IN clause
/// * [where_exist_subquery]: execute a subquery with an EXISTS clause
pub async fn dataframe_example() -> Result<()> {
env_logger::init();
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
read_memory_macro().await?;
write_out(&ctx).await?;
register_cars_test_data("t1", &ctx).await?;
register_cars_test_data("t2", &ctx).await?;
where_scalar_subquery(&ctx).await?;
where_in_subquery(&ctx).await?;
where_exist_subquery(&ctx).await?;
Ok(())
}
/// Use DataFrame API to
/// 1. Read parquet files,
/// 2. Show the schema
/// 3. Select columns and rows
async fn read_parquet(ctx: &SessionContext) -> Result<()> {
// Convert the CSV input into a temporary Parquet directory for querying
let dataset = ExampleDataset::Cars;
let parquet_temp = write_csv_to_parquet(ctx, &dataset.path()).await?;
// Read the parquet files and show its schema using 'describe'
let parquet_df = ctx
.read_parquet(parquet_temp.path_str()?, ParquetReadOptions::default())
.await?;
// show its schema using 'describe'
parquet_df.clone().describe().await?.show().await?;
// Select three columns and filter the results
// so that only rows where speed > 1 are returned
// select car, speed, time from t where speed > 1
parquet_df
.select_columns(&["car", "speed", "time"])?
.filter(col("speed").gt(lit(1)))?
.show()
.await?;
Ok(())
}
/// Use the DataFrame API to
/// 1. Read CSV files
/// 2. Optionally specify schema
async fn read_csv(ctx: &SessionContext) -> Result<()> {
// create example.csv file in a temporary directory
let dir = tempdir()?;
let file_path = dir.path().join("example.csv");
{
let mut file = File::create(&file_path)?;
// write CSV data
file.write_all(
r#"id,time,vote,unixtime,rating
a1,"10 6, 2013",3,1381017600,5.0
a2,"08 9, 2013",2,1376006400,4.5"#
.as_bytes(),
)?;
} // scope closes the file
let file_path = file_path.to_str().unwrap();
// You can read a CSV file and DataFusion will infer the schema automatically
let csv_df = ctx.read_csv(file_path, CsvReadOptions::default()).await?;
csv_df.show().await?;
// If you know the types of your data you can specify them explicitly
let schema = Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("time", DataType::Utf8, false),
Field::new("vote", DataType::Int32, true),
Field::new("unixtime", DataType::Int64, false),
Field::new("rating", DataType::Float32, true),
]);
// Create a csv option provider with the desired schema
let csv_read_option = CsvReadOptions {
// Update the option provider with the defined schema
schema: Some(&schema),
..Default::default()
};
let csv_df = ctx.read_csv(file_path, csv_read_option).await?;
csv_df.show().await?;
// You can also create DataFrames from the result of sql queries
// and using the `enable_url_table` refer to local files directly
let dyn_ctx = ctx.clone().enable_url_table();
let csv_df = dyn_ctx
.sql(&format!("SELECT rating, unixtime FROM '{file_path}'"))
.await?;
csv_df.show().await?;
Ok(())
}
/// Use the DataFrame API to:
/// 1. Read in-memory data.
async fn read_memory(ctx: &SessionContext) -> Result<()> {
// define data in memory
let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;
// declare a table in memory. In Apache Spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;
// construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL
let filter = col("b").eq(lit(10));
let df = df.select_columns(&["a", "b"])?.filter(filter)?;
// print the results
df.show().await?;
Ok(())
}
/// Use the DataFrame API to:
/// 1. Read in-memory data.
async fn read_memory_macro() -> Result<()> {
// create a DataFrame using macro
let df = dataframe!(
"a" => ["a", "b", "c", "d"],
"b" => [1, 10, 10, 100]
)?;
// print the results
df.show().await?;
// create empty DataFrame using macro
let df_empty = dataframe!()?;
df_empty.show().await?;
Ok(())
}
/// Use the DataFrame API to:
/// 1. Write out a DataFrame to a table
/// 2. Write out a DataFrame to a parquet file
/// 3. Write out a DataFrame to a csv file
/// 4. Write out a DataFrame to a json file
async fn write_out(ctx: &SessionContext) -> Result<()> {
let array = StringViewArray::from(vec!["a", "b", "c"]);
let schema = Arc::new(Schema::new(vec![Field::new(
"tablecol1",
DataType::Utf8View,
false,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)])?;
let mem_table = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
ctx.register_table("initial_data", Arc::new(mem_table))?;
let df = ctx.table("initial_data").await?;
// Create a single temp root with subdirectories
let tmp_root = TempDir::new()?;
let examples_root = tmp_root.path().join("datafusion-examples");
create_dir_all(&examples_root).await?;
let table_dir = examples_root.join("test_table");
let parquet_dir = examples_root.join("test_parquet");
let csv_dir = examples_root.join("test_csv");
let json_dir = examples_root.join("test_json");
create_dir_all(&table_dir).await?;
create_dir_all(&parquet_dir).await?;
create_dir_all(&csv_dir).await?;
create_dir_all(&json_dir).await?;
let create_sql = format!(
"CREATE EXTERNAL TABLE test(tablecol1 varchar)
STORED AS parquet
LOCATION '{}'",
table_dir.display()
);
ctx.sql(&create_sql).await?.collect().await?;
// This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c').
// The behavior of write_table depends on the TableProvider's implementation
// of the insert_into method.
df.clone()
.write_table("test", DataFrameWriteOptions::new())
.await?;
df.clone()
.write_parquet(
parquet_dir.to_str().unwrap(),
DataFrameWriteOptions::new(),
None,
)
.await?;
df.clone()
.write_csv(
csv_dir.to_str().unwrap(),
// DataFrameWriteOptions contains options which control how data is written
// such as compression codec
DataFrameWriteOptions::new(),
Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)),
)
.await?;
df.clone()
.write_json(
json_dir.to_str().unwrap(),
DataFrameWriteOptions::new(),
None,
)
.await?;
Ok(())
}
/// Use the DataFrame API to execute the following subquery:
/// select car, speed from t1 where (select avg(t2.speed) from t2 where t1.car = t2.car) > 0 limit 3;
async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(
scalar_subquery(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::Utf8, "t1.car").eq(col("t2.car")))?
.aggregate(vec![], vec![avg(col("t2.speed"))])?
.select(vec![avg(col("t2.speed"))])?
.into_unoptimized_plan(),
))
.gt(lit(0.0)),
)?
.select(vec![col("t1.car"), col("t1.speed")])?
.limit(0, Some(3))?
.show()
.await?;
Ok(())
}
/// Use the DataFrame API to execute the following subquery:
/// select t1.car, t1.speed from t1 where t1.speed in (select max(t2.speed) from t2 where t2.car = 'red') limit 3;
async fn where_in_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(in_subquery(
col("t1.speed"),
Arc::new(
ctx.table("t2")
.await?
.filter(
col("t2.car").eq(lit(ScalarValue::Utf8(Some("red".to_string())))),
)?
.aggregate(vec![], vec![max(col("t2.speed"))])?
.select(vec![max(col("t2.speed"))])?
.into_unoptimized_plan(),
),
))?
.select(vec![col("t1.car"), col("t1.speed")])?
.limit(0, Some(3))?
.show()
.await?;
Ok(())
}
/// Use the DataFrame API to execute the following subquery:
/// select t1.car, t1.speed from t1 where exists (select t2.speed from t2 where t1.car = t2.car) limit 3;
async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(exists(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::Utf8, "t1.car").eq(col("t2.car")))?
.select(vec![col("t2.speed")])?
.into_unoptimized_plan(),
)))?
.select(vec![col("t1.car"), col("t1.speed")])?
.limit(0, Some(3))?
.show()
.await?;
Ok(())
}
async fn register_cars_test_data(name: &str, ctx: &SessionContext) -> Result<()> {
let dataset = ExampleDataset::Cars;
ctx.register_csv(name, dataset.path_str()?, CsvReadOptions::default())
.await?;
Ok(())
}