| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::collections::HashMap; |
| use std::sync::Arc; |
| |
| use arrow::array::{BooleanArray, Int32Array}; |
| use arrow::record_batch::RecordBatch; |
| |
| use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; |
| use datafusion::common::DFSchema; |
| use datafusion::error::Result; |
| use datafusion::functions_aggregate::first_last::first_value_udaf; |
| use datafusion::optimizer::simplify_expressions::ExprSimplifier; |
| use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; |
| use datafusion::prelude::*; |
| use datafusion_common::{ScalarValue, ToDFSchema}; |
| use datafusion_expr::execution_props::ExecutionProps; |
| use datafusion_expr::expr::BinaryExpr; |
| use datafusion_expr::interval_arithmetic::Interval; |
| use datafusion_expr::simplify::SimplifyContext; |
| use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator}; |
| |
| /// This example demonstrates the DataFusion [`Expr`] API. |
| /// |
| /// DataFusion comes with a powerful and extensive system for |
| /// representing and manipulating expressions such as `A + 5` and `X |
| /// IN ('foo', 'bar', 'baz')`. |
| /// |
| /// In addition to building and manipulating [`Expr`]s, DataFusion |
| /// also comes with APIs for evaluation, simplification, and analysis. |
| /// |
| /// The code in this example shows how to: |
| /// 1. Create [`Expr`]s using different APIs: [`main`]` |
| /// 2. Use the fluent API to easily create complex [`Expr`]s: [`expr_fn_demo`] |
| /// 3. Evaluate [`Expr`]s against data: [`evaluate_demo`] |
| /// 4. Simplify expressions: [`simplify_demo`] |
| /// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`] |
| /// 6. Get the types of the expressions: [`expression_type_demo`] |
| #[tokio::main] |
| async fn main() -> Result<()> { |
| // The easiest way to do create expressions is to use the |
| // "fluent"-style API: |
| let expr = col("a") + lit(5); |
| |
| // The same same expression can be created directly, with much more code: |
| let expr2 = Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(col("a")), |
| Operator::Plus, |
| Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))), |
| )); |
| assert_eq!(expr, expr2); |
| |
| // See how to build aggregate functions with the expr_fn API |
| expr_fn_demo()?; |
| |
| // See how to evaluate expressions |
| evaluate_demo()?; |
| |
| // See how to simplify expressions |
| simplify_demo()?; |
| |
| // See how to analyze ranges in expressions |
| range_analysis_demo()?; |
| |
| // See how to determine the data types of expressions |
| expression_type_demo()?; |
| |
| Ok(()) |
| } |
| |
| /// DataFusion's `expr_fn` API makes it easy to create [`Expr`]s for the |
| /// full range of expression types such as aggregates and window functions. |
| fn expr_fn_demo() -> Result<()> { |
| // Let's say you want to call the "first_value" aggregate function |
| let first_value = first_value_udaf(); |
| |
| // For example, to create the expression `FIRST_VALUE(price)` |
| // These expressions can be passed to `DataFrame::aggregate` and other |
| // APIs that take aggregate expressions. |
| let agg = first_value.call(vec![col("price")]); |
| assert_eq!(agg.to_string(), "first_value(price)"); |
| |
| // You can use the ExprFunctionExt trait to create more complex aggregates |
| // such as `FIRST_VALUE(price FILTER quantity > 100 ORDER BY ts ) |
| let agg = first_value |
| .call(vec![col("price")]) |
| .order_by(vec![col("ts").sort(false, false)]) |
| .filter(col("quantity").gt(lit(100))) |
| .build()?; // build the aggregate |
| assert_eq!( |
| agg.to_string(), |
| "first_value(price) FILTER (WHERE quantity > Int32(100)) ORDER BY [ts DESC NULLS LAST]" |
| ); |
| |
| Ok(()) |
| } |
| |
| /// DataFusion can also evaluate arbitrary expressions on Arrow arrays. |
| fn evaluate_demo() -> Result<()> { |
| // For example, let's say you have some integers in an array |
| let batch = RecordBatch::try_from_iter([( |
| "a", |
| Arc::new(Int32Array::from(vec![4, 5, 6, 7, 8, 7, 4])) as _, |
| )])?; |
| |
| // If you want to find all rows where the expression `a < 5 OR a = 8` is true |
| let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8))); |
| |
| // First, you make a "physical expression" from the logical `Expr` |
| let df_schema = DFSchema::try_from(batch.schema())?; |
| let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?; |
| |
| // Now, you can evaluate the expression against the RecordBatch |
| let result = physical_expr.evaluate(&batch)?; |
| |
| // The result contain an array that is true only for where `a < 5 OR a = 8` |
| let expected_result = Arc::new(BooleanArray::from(vec![ |
| true, false, false, false, true, false, true, |
| ])) as _; |
| assert!( |
| matches!(&result, ColumnarValue::Array(r) if r == &expected_result), |
| "result: {:?}", |
| result |
| ); |
| |
| Ok(()) |
| } |
| |
| /// In addition to easy construction, DataFusion exposes APIs for simplifying |
| /// such expression so they are more efficient to evaluate. This code is also |
| /// used by the query engine to optimize queries. |
| fn simplify_demo() -> Result<()> { |
| // For example, lets say you have has created an expression such |
| // ts = to_timestamp("2020-09-08T12:00:00+00:00") |
| let expr = col("ts").eq(to_timestamp(vec![lit("2020-09-08T12:00:00+00:00")])); |
| |
| // Naively evaluating such an expression against a large number of |
| // rows would involve re-converting "2020-09-08T12:00:00+00:00" to a |
| // timestamp for each row which gets expensive |
| // |
| // However, DataFusion's simplification logic can do this for you |
| |
| // you need to tell DataFusion the type of column "ts": |
| let schema = Schema::new(vec![make_ts_field("ts")]).to_dfschema_ref()?; |
| |
| // And then build a simplifier |
| // the ExecutionProps carries information needed to simplify |
| // expressions, such as the current time (to evaluate `now()` |
| // correctly) |
| let props = ExecutionProps::new(); |
| let context = SimplifyContext::new(&props).with_schema(schema); |
| let simplifier = ExprSimplifier::new(context); |
| |
| // And then call the simplify_expr function: |
| let expr = simplifier.simplify(expr)?; |
| |
| // DataFusion has simplified the expression to a comparison with a constant |
| // ts = 1599566400000000000; Tada! |
| assert_eq!( |
| expr, |
| col("ts").eq(lit_timestamp_nano(1599566400000000000i64)) |
| ); |
| |
| // here are some other examples of what DataFusion is capable of |
| let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?; |
| let context = SimplifyContext::new(&props).with_schema(schema.clone()); |
| let simplifier = ExprSimplifier::new(context); |
| |
| // basic arithmetic simplification |
| // i + 1 + 2 => i + 3 |
| // (note this is not done if the expr is (col("i") + (lit(1) + lit(2)))) |
| assert_eq!( |
| simplifier.simplify(col("i") + (lit(1) + lit(2)))?, |
| col("i") + lit(3) |
| ); |
| |
| // (i * 0) > 5 --> false (only if null) |
| assert_eq!( |
| simplifier.simplify((col("i") * lit(0)).gt(lit(5)))?, |
| lit(false) |
| ); |
| |
| // Logical simplification |
| |
| // ((i > 5) AND FALSE) OR (i < 10) --> i < 10 |
| assert_eq!( |
| simplifier |
| .simplify(col("i").gt(lit(5)).and(lit(false)).or(col("i").lt(lit(10))))?, |
| col("i").lt(lit(10)) |
| ); |
| |
| // String --> Date simplification |
| // `cast('2020-09-01' as date)` --> 18506 # number of days since epoch 1970-01-01 |
| assert_eq!( |
| simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?, |
| lit(ScalarValue::Date32(Some(18506))) |
| ); |
| |
| Ok(()) |
| } |
| |
| /// DataFusion also has APIs for analyzing predicates (boolean expressions) to |
| /// determine any ranges restrictions on the inputs required for the predicate |
| /// evaluate to true. |
| fn range_analysis_demo() -> Result<()> { |
| // For example, let's say you are interested in finding data for all days |
| // in the month of September, 2020 |
| let september_1 = ScalarValue::Date32(Some(18506)); // 2020-09-01 |
| let october_1 = ScalarValue::Date32(Some(18536)); // 2020-10-01 |
| |
| // The predicate to find all such days could be |
| // `date > '2020-09-01' AND date < '2020-10-01'` |
| let expr = col("date") |
| .gt(lit(september_1.clone())) |
| .and(col("date").lt(lit(october_1.clone()))); |
| |
| // Using the analysis API, DataFusion can determine that the value of `date` |
| // must be in the range `['2020-09-01', '2020-10-01']`. If your data is |
| // organized in files according to day, this information permits skipping |
| // entire files without reading them. |
| // |
| // While this simple example could be handled with a special case, the |
| // DataFusion API handles arbitrary expressions (so for example, you don't |
| // have to handle the case where the predicate clauses are reversed such as |
| // `date < '2020-10-01' AND date > '2020-09-01'` |
| |
| // As always, we need to tell DataFusion the type of column "date" |
| let schema = Arc::new(Schema::new(vec![make_field("date", DataType::Date32)])); |
| |
| // You can provide DataFusion any known boundaries on the values of `date` |
| // (for example, maybe you know you only have data up to `2020-09-15`), but |
| // in this case, let's say we don't know any boundaries beforehand so we use |
| // `try_new_unknown` |
| let boundaries = ExprBoundaries::try_new_unbounded(&schema)?; |
| |
| // Now, we invoke the analysis code to perform the range analysis |
| let df_schema = DFSchema::try_from(schema)?; |
| let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?; |
| let analysis_result = analyze( |
| &physical_expr, |
| AnalysisContext::new(boundaries), |
| df_schema.as_ref(), |
| )?; |
| |
| // The results of the analysis is an range, encoded as an `Interval`, for |
| // each column in the schema, that must be true in order for the predicate |
| // to be true. |
| // |
| // In this case, we can see that, as expected, `analyze` has figured out |
| // that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']` |
| let expected_range = Interval::try_new(september_1, october_1)?; |
| assert_eq!(analysis_result.boundaries[0].interval, expected_range); |
| |
| Ok(()) |
| } |
| |
| fn make_field(name: &str, data_type: DataType) -> Field { |
| let nullable = false; |
| Field::new(name, data_type, nullable) |
| } |
| |
| fn make_ts_field(name: &str) -> Field { |
| let tz = None; |
| make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz)) |
| } |
| |
| /// This function shows how to use `Expr::get_type` to retrieve the DataType |
| /// of an expression |
| fn expression_type_demo() -> Result<()> { |
| let expr = col("c"); |
| |
| // To determine the DataType of an expression, DataFusion must know the |
| // types of the input expressions. You can provide this information using |
| // a schema. In this case we create a schema where the column `c` is of |
| // type Utf8 (a String / VARCHAR) |
| let schema = DFSchema::from_unqualified_fields( |
| vec![Field::new("c", DataType::Utf8, true)].into(), |
| HashMap::new(), |
| )?; |
| assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap())); |
| |
| // Using a schema where the column `foo` is of type Int32 |
| let schema = DFSchema::from_unqualified_fields( |
| vec![Field::new("c", DataType::Int32, true)].into(), |
| HashMap::new(), |
| )?; |
| assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap())); |
| |
| // Get the type of an expression that adds 2 columns. Adding an Int32 |
| // and Float32 results in Float32 type |
| let expr = col("c1") + col("c2"); |
| let schema = DFSchema::from_unqualified_fields( |
| vec![ |
| Field::new("c1", DataType::Int32, true), |
| Field::new("c2", DataType::Float32, true), |
| ] |
| .into(), |
| HashMap::new(), |
| )?; |
| assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap())); |
| |
| Ok(()) |
| } |