blob: ceb5ca65f5eddc669da3c55b700c0fd8c951c034 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! DataFrame API for building and executing query plans.
use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
};
use std::sync::Arc;
use async_trait::async_trait;
/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
/// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html)
///
/// DataFrames are typically created by the `read_csv` and `read_parquet` methods on the
/// [ExecutionContext](../execution/context/struct.ExecutionContext.html) and can then be modified
/// by calling the transformation methods, such as `filter`, `select`, `aggregate`, and `limit`
/// to build up a query definition.
///
/// The query can be executed by calling the `collect` method.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(&[col("a")], &[min(col("b"))])?
/// .limit(100)?;
/// let results = df.collect();
/// # Ok(())
/// # }
/// ```
#[async_trait]
pub trait DataFrame: Send + Sync {
/// Filter the DataFrame by column. Returns a new DataFrame only containing the
/// specified columns.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.select_columns(&["a", "b"])?;
/// # Ok(())
/// # }
/// ```
fn select_columns(&self, columns: &[&str]) -> Result<Arc<dyn DataFrame>>;
/// Create a projection based on arbitrary expressions.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.select(&[col("a") * col("b"), col("c")])?;
/// # Ok(())
/// # }
/// ```
fn select(&self, expr: &[Expr]) -> Result<Arc<dyn DataFrame>>;
/// Filter a DataFrame to only include rows that match the specified filter expression.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.filter(col("a").lt_eq(col("b")))?;
/// # Ok(())
/// # }
/// ```
fn filter(&self, expr: Expr) -> Result<Arc<dyn DataFrame>>;
/// Perform an aggregate query with optional grouping expressions.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
///
/// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
/// let _ = df.aggregate(&[col("a")], &[min(col("b"))])?;
///
/// // The following use is the equivalent of "SELECT MIN(b)"
/// let _ = df.aggregate(&[], &[min(col("b"))])?;
/// # Ok(())
/// # }
/// ```
fn aggregate(
&self,
group_expr: &[Expr],
aggr_expr: &[Expr],
) -> Result<Arc<dyn DataFrame>>;
/// Limit the number of rows returned from this DataFrame.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.limit(100)?;
/// # Ok(())
/// # }
/// ```
fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>>;
/// Sort the DataFrame by the specified sorting expressions. Any expression can be turned into
/// a sort expression by calling its [sort](../logical_plan/enum.Expr.html#method.sort) method.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.sort(&[col("a").sort(true, true), col("b").sort(false, false)])?;
/// # Ok(())
/// # }
/// ```
fn sort(&self, expr: &[Expr]) -> Result<Arc<dyn DataFrame>>;
/// Join this DataFrame with another DataFrame using the specified columns as join keys
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let left = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let right = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?
/// .select(&[
/// col("a").alias("a2"),
/// col("b").alias("b2"),
/// col("c").alias("c2")])?;
/// let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"])?;
/// let batches = join.collect().await?;
/// # Ok(())
/// # }
/// ```
fn join(
&self,
right: Arc<dyn DataFrame>,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
) -> Result<Arc<dyn DataFrame>>;
/// Repartition a DataFrame based on a logical partitioning scheme.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
/// # Ok(())
/// # }
/// ```
fn repartition(
&self,
partitioning_scheme: Partitioning,
) -> Result<Arc<dyn DataFrame>>;
/// Executes this DataFrame and collects all results into a vector of RecordBatch.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let batches = df.collect().await?;
/// # Ok(())
/// # }
/// ```
async fn collect(&self) -> Result<Vec<RecordBatch>>;
/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
/// maintaining the input partitioning.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let batches = df.collect_partitioned().await?;
/// # Ok(())
/// # }
/// ```
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>;
/// Returns the schema describing the output of this DataFrame in terms of columns returned,
/// where each column has a name, data type, and nullability attribute.
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let schema = df.schema();
/// # Ok(())
/// # }
/// ```
fn schema(&self) -> &DFSchema;
/// Return the logical plan represented by this DataFrame.
fn to_logical_plan(&self) -> LogicalPlan;
/// Return a DataFrame with the explanation of its plan so far.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let batches = df.limit(100)?.explain(false)?.collect().await?;
/// # Ok(())
/// # }
/// ```
fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>>;
/// Return a `FunctionRegistry` used to plan udf's calls
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let f = df.registry();
/// // use f.udf("name", vec![...]) to use the udf
/// # Ok(())
/// # }
/// ```
fn registry(&self) -> Arc<dyn FunctionRegistry>;
}