| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Functions for creating logical expressions |
| |
| use crate::expr::{ |
| AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery, |
| NullTreatment, Placeholder, TryCast, Unnest, WildcardOptions, WindowFunction, |
| }; |
| use crate::function::{ |
| AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory, |
| StateFieldsArgs, |
| }; |
| use crate::ptr_eq::PtrEq; |
| use crate::select_expr::SelectExpr; |
| use crate::{ |
| conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery, |
| AggregateUDF, Expr, LimitEffect, LogicalPlan, Operator, PartitionEvaluator, |
| ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDF, Signature, Volatility, |
| }; |
| use crate::{ |
| AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowFrame, WindowUDF, WindowUDFImpl, |
| }; |
| use arrow::compute::kernels::cast_utils::{ |
| parse_interval_day_time, parse_interval_month_day_nano, parse_interval_year_month, |
| }; |
| use arrow::datatypes::{DataType, Field, FieldRef}; |
| use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference}; |
| use datafusion_functions_window_common::field::WindowUDFFieldArgs; |
| use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; |
| use datafusion_physical_expr_common::physical_expr::PhysicalExpr; |
| use std::any::Any; |
| use std::collections::HashMap; |
| use std::fmt::Debug; |
| use std::hash::Hash; |
| use std::ops::Not; |
| use std::sync::Arc; |
| |
| /// Create a column expression based on a qualified or unqualified column name. Will |
| /// normalize unquoted identifiers according to SQL rules (identifiers will become lowercase). |
| /// |
| /// For example: |
| /// |
| /// ```rust |
| /// # use datafusion_expr::col; |
| /// let c1 = col("a"); |
| /// let c2 = col("A"); |
| /// assert_eq!(c1, c2); |
| /// |
| /// // note how quoting with double quotes preserves the case |
| /// let c3 = col(r#""A""#); |
| /// assert_ne!(c1, c3); |
| /// ``` |
| pub fn col(ident: impl Into<Column>) -> Expr { |
| Expr::Column(ident.into()) |
| } |
| |
| /// Create an out reference column which hold a reference that has been resolved to a field |
| /// outside of the current plan. |
| /// The expression created by this function does not preserve the metadata of the outer column. |
| /// Please use `out_ref_col_with_metadata` if you want to preserve the metadata. |
| pub fn out_ref_col(dt: DataType, ident: impl Into<Column>) -> Expr { |
| out_ref_col_with_metadata(dt, HashMap::new(), ident) |
| } |
| |
| /// Create an out reference column from an existing field (preserving metadata) |
| pub fn out_ref_col_with_metadata( |
| dt: DataType, |
| metadata: HashMap<String, String>, |
| ident: impl Into<Column>, |
| ) -> Expr { |
| let column = ident.into(); |
| let field: FieldRef = |
| Arc::new(Field::new(column.name(), dt, true).with_metadata(metadata)); |
| Expr::OuterReferenceColumn(field, column) |
| } |
| |
| /// Create an unqualified column expression from the provided name, without normalizing |
| /// the column. |
| /// |
| /// For example: |
| /// |
| /// ```rust |
| /// # use datafusion_expr::{col, ident}; |
| /// let c1 = ident("A"); // not normalized staying as column 'A' |
| /// let c2 = col("A"); // normalized via SQL rules becoming column 'a' |
| /// assert_ne!(c1, c2); |
| /// |
| /// let c3 = col(r#""A""#); |
| /// assert_eq!(c1, c3); |
| /// |
| /// let c4 = col("t1.a"); // parses as relation 't1' column 'a' |
| /// let c5 = ident("t1.a"); // parses as column 't1.a' |
| /// assert_ne!(c4, c5); |
| /// ``` |
| pub fn ident(name: impl Into<String>) -> Expr { |
| Expr::Column(Column::from_name(name)) |
| } |
| |
| /// Create placeholder value that will be filled in (such as `$1`) |
| /// |
| /// Note the parameter type can be inferred using [`Expr::infer_placeholder_types`] |
| /// |
| /// # Example |
| /// |
| /// ```rust |
| /// # use datafusion_expr::{placeholder}; |
| /// let p = placeholder("$1"); // $1, refers to parameter 1 |
| /// assert_eq!(p.to_string(), "$1") |
| /// ``` |
| pub fn placeholder(id: impl Into<String>) -> Expr { |
| Expr::Placeholder(Placeholder { |
| id: id.into(), |
| field: None, |
| }) |
| } |
| |
| /// Create an '*' [`Expr::Wildcard`] expression that matches all columns |
| /// |
| /// # Example |
| /// |
| /// ```rust |
| /// # use datafusion_expr::{wildcard}; |
| /// let p = wildcard(); |
| /// assert_eq!(p.to_string(), "*") |
| /// ``` |
| pub fn wildcard() -> SelectExpr { |
| SelectExpr::Wildcard(WildcardOptions::default()) |
| } |
| |
| /// Create an '*' [`Expr::Wildcard`] expression with the wildcard options |
| pub fn wildcard_with_options(options: WildcardOptions) -> SelectExpr { |
| SelectExpr::Wildcard(options) |
| } |
| |
| /// Create an 't.*' [`Expr::Wildcard`] expression that matches all columns from a specific table |
| /// |
| /// # Example |
| /// |
| /// ```rust |
| /// # use datafusion_common::TableReference; |
| /// # use datafusion_expr::{qualified_wildcard}; |
| /// let p = qualified_wildcard(TableReference::bare("t")); |
| /// assert_eq!(p.to_string(), "t.*") |
| /// ``` |
| pub fn qualified_wildcard(qualifier: impl Into<TableReference>) -> SelectExpr { |
| SelectExpr::QualifiedWildcard(qualifier.into(), WildcardOptions::default()) |
| } |
| |
| /// Create an 't.*' [`Expr::Wildcard`] expression with the wildcard options |
| pub fn qualified_wildcard_with_options( |
| qualifier: impl Into<TableReference>, |
| options: WildcardOptions, |
| ) -> SelectExpr { |
| SelectExpr::QualifiedWildcard(qualifier.into(), options) |
| } |
| |
| /// Return a new expression `left <op> right` |
| pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right))) |
| } |
| |
| /// Return a new expression with a logical AND |
| pub fn and(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(left), |
| Operator::And, |
| Box::new(right), |
| )) |
| } |
| |
| /// Return a new expression with a logical OR |
| pub fn or(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(left), |
| Operator::Or, |
| Box::new(right), |
| )) |
| } |
| |
| /// Return a new expression with a logical NOT |
| pub fn not(expr: Expr) -> Expr { |
| expr.not() |
| } |
| |
| /// Return a new expression with bitwise AND |
| pub fn bitwise_and(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(left), |
| Operator::BitwiseAnd, |
| Box::new(right), |
| )) |
| } |
| |
| /// Return a new expression with bitwise OR |
| pub fn bitwise_or(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(left), |
| Operator::BitwiseOr, |
| Box::new(right), |
| )) |
| } |
| |
| /// Return a new expression with bitwise XOR |
| pub fn bitwise_xor(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(left), |
| Operator::BitwiseXor, |
| Box::new(right), |
| )) |
| } |
| |
| /// Return a new expression with bitwise SHIFT RIGHT |
| pub fn bitwise_shift_right(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(left), |
| Operator::BitwiseShiftRight, |
| Box::new(right), |
| )) |
| } |
| |
| /// Return a new expression with bitwise SHIFT LEFT |
| pub fn bitwise_shift_left(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr(BinaryExpr::new( |
| Box::new(left), |
| Operator::BitwiseShiftLeft, |
| Box::new(right), |
| )) |
| } |
| |
| /// Create an in_list expression |
| pub fn in_list(expr: Expr, list: Vec<Expr>, negated: bool) -> Expr { |
| Expr::InList(InList::new(Box::new(expr), list, negated)) |
| } |
| |
| /// Create an EXISTS subquery expression |
| pub fn exists(subquery: Arc<LogicalPlan>) -> Expr { |
| let outer_ref_columns = subquery.all_out_ref_exprs(); |
| Expr::Exists(Exists { |
| subquery: Subquery { |
| subquery, |
| outer_ref_columns, |
| spans: Spans::new(), |
| }, |
| negated: false, |
| }) |
| } |
| |
| /// Create a NOT EXISTS subquery expression |
| pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr { |
| let outer_ref_columns = subquery.all_out_ref_exprs(); |
| Expr::Exists(Exists { |
| subquery: Subquery { |
| subquery, |
| outer_ref_columns, |
| spans: Spans::new(), |
| }, |
| negated: true, |
| }) |
| } |
| |
| /// Create an IN subquery expression |
| pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr { |
| let outer_ref_columns = subquery.all_out_ref_exprs(); |
| Expr::InSubquery(InSubquery::new( |
| Box::new(expr), |
| Subquery { |
| subquery, |
| outer_ref_columns, |
| spans: Spans::new(), |
| }, |
| false, |
| )) |
| } |
| |
| /// Create a NOT IN subquery expression |
| pub fn not_in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr { |
| let outer_ref_columns = subquery.all_out_ref_exprs(); |
| Expr::InSubquery(InSubquery::new( |
| Box::new(expr), |
| Subquery { |
| subquery, |
| outer_ref_columns, |
| spans: Spans::new(), |
| }, |
| true, |
| )) |
| } |
| |
| /// Create a scalar subquery expression |
| pub fn scalar_subquery(subquery: Arc<LogicalPlan>) -> Expr { |
| let outer_ref_columns = subquery.all_out_ref_exprs(); |
| Expr::ScalarSubquery(Subquery { |
| subquery, |
| outer_ref_columns, |
| spans: Spans::new(), |
| }) |
| } |
| |
| /// Create a grouping set |
| pub fn grouping_set(exprs: Vec<Vec<Expr>>) -> Expr { |
| Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) |
| } |
| |
| /// Create a grouping set for all combination of `exprs` |
| pub fn cube(exprs: Vec<Expr>) -> Expr { |
| Expr::GroupingSet(GroupingSet::Cube(exprs)) |
| } |
| |
| /// Create a grouping set for rollup |
| pub fn rollup(exprs: Vec<Expr>) -> Expr { |
| Expr::GroupingSet(GroupingSet::Rollup(exprs)) |
| } |
| |
| /// Create a cast expression |
| pub fn cast(expr: Expr, data_type: DataType) -> Expr { |
| Expr::Cast(Cast::new(Box::new(expr), data_type)) |
| } |
| |
| /// Create a try cast expression |
| pub fn try_cast(expr: Expr, data_type: DataType) -> Expr { |
| Expr::TryCast(TryCast::new(Box::new(expr), data_type)) |
| } |
| |
| /// Create is null expression |
| pub fn is_null(expr: Expr) -> Expr { |
| Expr::IsNull(Box::new(expr)) |
| } |
| |
| /// Create is true expression |
| pub fn is_true(expr: Expr) -> Expr { |
| Expr::IsTrue(Box::new(expr)) |
| } |
| |
| /// Create is not true expression |
| pub fn is_not_true(expr: Expr) -> Expr { |
| Expr::IsNotTrue(Box::new(expr)) |
| } |
| |
| /// Create is false expression |
| pub fn is_false(expr: Expr) -> Expr { |
| Expr::IsFalse(Box::new(expr)) |
| } |
| |
| /// Create is not false expression |
| pub fn is_not_false(expr: Expr) -> Expr { |
| Expr::IsNotFalse(Box::new(expr)) |
| } |
| |
| /// Create is unknown expression |
| pub fn is_unknown(expr: Expr) -> Expr { |
| Expr::IsUnknown(Box::new(expr)) |
| } |
| |
| /// Create is not unknown expression |
| pub fn is_not_unknown(expr: Expr) -> Expr { |
| Expr::IsNotUnknown(Box::new(expr)) |
| } |
| |
| /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression. |
| pub fn case(expr: Expr) -> CaseBuilder { |
| CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None) |
| } |
| |
| /// Create a CASE WHEN statement with boolean WHEN expressions and no base expression. |
| pub fn when(when: Expr, then: Expr) -> CaseBuilder { |
| CaseBuilder::new(None, vec![when], vec![then], None) |
| } |
| |
| /// Create a Unnest expression |
| pub fn unnest(expr: Expr) -> Expr { |
| Expr::Unnest(Unnest { |
| expr: Box::new(expr), |
| }) |
| } |
| |
| /// Convenience method to create a new user defined scalar function (UDF) with a |
| /// specific signature and specific return type. |
| /// |
| /// Note this function does not expose all available features of [`ScalarUDF`], |
| /// such as |
| /// |
| /// * computing return types based on input types |
| /// * multiple [`Signature`]s |
| /// * aliases |
| /// |
| /// See [`ScalarUDF`] for details and examples on how to use the full |
| /// functionality. |
| pub fn create_udf( |
| name: &str, |
| input_types: Vec<DataType>, |
| return_type: DataType, |
| volatility: Volatility, |
| fun: ScalarFunctionImplementation, |
| ) -> ScalarUDF { |
| ScalarUDF::from(SimpleScalarUDF::new( |
| name, |
| input_types, |
| return_type, |
| volatility, |
| fun, |
| )) |
| } |
| |
| /// Implements [`ScalarUDFImpl`] for functions that have a single signature and |
| /// return type. |
| #[derive(PartialEq, Eq, Hash)] |
| pub struct SimpleScalarUDF { |
| name: String, |
| signature: Signature, |
| return_type: DataType, |
| fun: PtrEq<ScalarFunctionImplementation>, |
| } |
| |
| impl Debug for SimpleScalarUDF { |
| fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { |
| f.debug_struct("SimpleScalarUDF") |
| .field("name", &self.name) |
| .field("signature", &self.signature) |
| .field("return_type", &self.return_type) |
| .field("fun", &"<FUNC>") |
| .finish() |
| } |
| } |
| |
| impl SimpleScalarUDF { |
| /// Create a new `SimpleScalarUDF` from a name, input types, return type and |
| /// implementation. Implementing [`ScalarUDFImpl`] allows more flexibility |
| pub fn new( |
| name: impl Into<String>, |
| input_types: Vec<DataType>, |
| return_type: DataType, |
| volatility: Volatility, |
| fun: ScalarFunctionImplementation, |
| ) -> Self { |
| Self::new_with_signature( |
| name, |
| Signature::exact(input_types, volatility), |
| return_type, |
| fun, |
| ) |
| } |
| |
| /// Create a new `SimpleScalarUDF` from a name, signature, return type and |
| /// implementation. Implementing [`ScalarUDFImpl`] allows more flexibility |
| pub fn new_with_signature( |
| name: impl Into<String>, |
| signature: Signature, |
| return_type: DataType, |
| fun: ScalarFunctionImplementation, |
| ) -> Self { |
| Self { |
| name: name.into(), |
| signature, |
| return_type, |
| fun: fun.into(), |
| } |
| } |
| } |
| |
| impl ScalarUDFImpl for SimpleScalarUDF { |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| fn name(&self) -> &str { |
| &self.name |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { |
| Ok(self.return_type.clone()) |
| } |
| |
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { |
| (self.fun)(&args.args) |
| } |
| } |
| |
| /// Creates a new UDAF with a specific signature, state type and return type. |
| /// The signature and state type must match the `Accumulator's implementation`. |
| pub fn create_udaf( |
| name: &str, |
| input_type: Vec<DataType>, |
| return_type: Arc<DataType>, |
| volatility: Volatility, |
| accumulator: AccumulatorFactoryFunction, |
| state_type: Arc<Vec<DataType>>, |
| ) -> AggregateUDF { |
| let return_type = Arc::unwrap_or_clone(return_type); |
| let state_type = Arc::unwrap_or_clone(state_type); |
| let state_fields = state_type |
| .into_iter() |
| .enumerate() |
| .map(|(i, t)| Field::new(format!("{i}"), t, true)) |
| .map(Arc::new) |
| .collect::<Vec<_>>(); |
| AggregateUDF::from(SimpleAggregateUDF::new( |
| name, |
| input_type, |
| return_type, |
| volatility, |
| accumulator, |
| state_fields, |
| )) |
| } |
| |
| /// Implements [`AggregateUDFImpl`] for functions that have a single signature and |
| /// return type. |
| #[derive(PartialEq, Eq, Hash)] |
| pub struct SimpleAggregateUDF { |
| name: String, |
| signature: Signature, |
| return_type: DataType, |
| accumulator: PtrEq<AccumulatorFactoryFunction>, |
| state_fields: Vec<FieldRef>, |
| } |
| |
| impl Debug for SimpleAggregateUDF { |
| fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { |
| f.debug_struct("SimpleAggregateUDF") |
| .field("name", &self.name) |
| .field("signature", &self.signature) |
| .field("return_type", &self.return_type) |
| .field("fun", &"<FUNC>") |
| .finish() |
| } |
| } |
| |
| impl SimpleAggregateUDF { |
| /// Create a new `SimpleAggregateUDF` from a name, input types, return type, state type and |
| /// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility |
| pub fn new( |
| name: impl Into<String>, |
| input_type: Vec<DataType>, |
| return_type: DataType, |
| volatility: Volatility, |
| accumulator: AccumulatorFactoryFunction, |
| state_fields: Vec<FieldRef>, |
| ) -> Self { |
| let name = name.into(); |
| let signature = Signature::exact(input_type, volatility); |
| Self { |
| name, |
| signature, |
| return_type, |
| accumulator: accumulator.into(), |
| state_fields, |
| } |
| } |
| |
| /// Create a new `SimpleAggregateUDF` from a name, signature, return type, state type and |
| /// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility |
| pub fn new_with_signature( |
| name: impl Into<String>, |
| signature: Signature, |
| return_type: DataType, |
| accumulator: AccumulatorFactoryFunction, |
| state_fields: Vec<FieldRef>, |
| ) -> Self { |
| let name = name.into(); |
| Self { |
| name, |
| signature, |
| return_type, |
| accumulator: accumulator.into(), |
| state_fields, |
| } |
| } |
| } |
| |
| impl AggregateUDFImpl for SimpleAggregateUDF { |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| fn name(&self) -> &str { |
| &self.name |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { |
| Ok(self.return_type.clone()) |
| } |
| |
| fn accumulator( |
| &self, |
| acc_args: AccumulatorArgs, |
| ) -> Result<Box<dyn crate::Accumulator>> { |
| (self.accumulator)(acc_args) |
| } |
| |
| fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<FieldRef>> { |
| Ok(self.state_fields.clone()) |
| } |
| } |
| |
| /// Creates a new UDWF with a specific signature, state type and return type. |
| /// |
| /// The signature and state type must match the [`PartitionEvaluator`]'s implementation`. |
| /// |
| /// [`PartitionEvaluator`]: crate::PartitionEvaluator |
| pub fn create_udwf( |
| name: &str, |
| input_type: DataType, |
| return_type: Arc<DataType>, |
| volatility: Volatility, |
| partition_evaluator_factory: PartitionEvaluatorFactory, |
| ) -> WindowUDF { |
| let return_type = Arc::unwrap_or_clone(return_type); |
| WindowUDF::from(SimpleWindowUDF::new( |
| name, |
| input_type, |
| return_type, |
| volatility, |
| partition_evaluator_factory, |
| )) |
| } |
| |
| /// Implements [`WindowUDFImpl`] for functions that have a single signature and |
| /// return type. |
| #[derive(PartialEq, Eq, Hash)] |
| pub struct SimpleWindowUDF { |
| name: String, |
| signature: Signature, |
| return_type: DataType, |
| partition_evaluator_factory: PtrEq<PartitionEvaluatorFactory>, |
| } |
| |
| impl Debug for SimpleWindowUDF { |
| fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { |
| f.debug_struct("WindowUDF") |
| .field("name", &self.name) |
| .field("signature", &self.signature) |
| .field("return_type", &"<func>") |
| .field("partition_evaluator_factory", &"<FUNC>") |
| .finish() |
| } |
| } |
| |
| impl SimpleWindowUDF { |
| /// Create a new `SimpleWindowUDF` from a name, input types, return type and |
| /// implementation. Implementing [`WindowUDFImpl`] allows more flexibility |
| pub fn new( |
| name: impl Into<String>, |
| input_type: DataType, |
| return_type: DataType, |
| volatility: Volatility, |
| partition_evaluator_factory: PartitionEvaluatorFactory, |
| ) -> Self { |
| let name = name.into(); |
| let signature = Signature::exact([input_type].to_vec(), volatility); |
| Self { |
| name, |
| signature, |
| return_type, |
| partition_evaluator_factory: partition_evaluator_factory.into(), |
| } |
| } |
| } |
| |
| impl WindowUDFImpl for SimpleWindowUDF { |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| fn name(&self) -> &str { |
| &self.name |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn partition_evaluator( |
| &self, |
| _partition_evaluator_args: PartitionEvaluatorArgs, |
| ) -> Result<Box<dyn PartitionEvaluator>> { |
| (self.partition_evaluator_factory)() |
| } |
| |
| fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> { |
| Ok(Arc::new(Field::new( |
| field_args.name(), |
| self.return_type.clone(), |
| true, |
| ))) |
| } |
| |
| fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect { |
| LimitEffect::Unknown |
| } |
| } |
| |
| pub fn interval_year_month_lit(value: &str) -> Expr { |
| let interval = parse_interval_year_month(value).ok(); |
| Expr::Literal(ScalarValue::IntervalYearMonth(interval), None) |
| } |
| |
| pub fn interval_datetime_lit(value: &str) -> Expr { |
| let interval = parse_interval_day_time(value).ok(); |
| Expr::Literal(ScalarValue::IntervalDayTime(interval), None) |
| } |
| |
| pub fn interval_month_day_nano_lit(value: &str) -> Expr { |
| let interval = parse_interval_month_day_nano(value).ok(); |
| Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), None) |
| } |
| |
| /// Extensions for configuring [`Expr::AggregateFunction`] or [`Expr::WindowFunction`] |
| /// |
| /// Adds methods to [`Expr`] that make it easy to set optional options |
| /// such as `ORDER BY`, `FILTER` and `DISTINCT` |
| /// |
| /// # Example |
| /// ```no_run |
| /// # use datafusion_common::Result; |
| /// # use datafusion_expr::expr::NullTreatment; |
| /// # use datafusion_expr::test::function_stub::count; |
| /// # use datafusion_expr::{ExprFunctionExt, lit, Expr, col}; |
| /// # // first_value is an aggregate function in another crate |
| /// # fn first_value(_arg: Expr) -> Expr { |
| /// unimplemented!() } |
| /// # fn main() -> Result<()> { |
| /// // Create an aggregate count, filtering on column y > 5 |
| /// let agg = count(col("x")).filter(col("y").gt(lit(5))).build()?; |
| /// |
| /// // Find the first value in an aggregate sorted by column y |
| /// // equivalent to: |
| /// // `FIRST_VALUE(x ORDER BY y ASC IGNORE NULLS)` |
| /// let sort_expr = col("y").sort(true, true); |
| /// let agg = first_value(col("x")) |
| /// .order_by(vec![sort_expr]) |
| /// .null_treatment(NullTreatment::IgnoreNulls) |
| /// .build()?; |
| /// |
| /// // Create a window expression for percent rank partitioned on column a |
| /// // equivalent to: |
| /// // `PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS LAST IGNORE NULLS)` |
| /// // percent_rank is an udwf function in another crate |
| /// # fn percent_rank() -> Expr { |
| /// unimplemented!() } |
| /// let window = percent_rank() |
| /// .partition_by(vec![col("a")]) |
| /// .order_by(vec![col("b").sort(true, true)]) |
| /// .null_treatment(NullTreatment::IgnoreNulls) |
| /// .build()?; |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub trait ExprFunctionExt { |
| /// Add `ORDER BY <order_by>` |
| fn order_by(self, order_by: Vec<Sort>) -> ExprFuncBuilder; |
| /// Add `FILTER <filter>` |
| fn filter(self, filter: Expr) -> ExprFuncBuilder; |
| /// Add `DISTINCT` |
| fn distinct(self) -> ExprFuncBuilder; |
| /// Add `RESPECT NULLS` or `IGNORE NULLS` |
| fn null_treatment( |
| self, |
| null_treatment: impl Into<Option<NullTreatment>>, |
| ) -> ExprFuncBuilder; |
| /// Add `PARTITION BY` |
| fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder; |
| /// Add appropriate window frame conditions |
| fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder; |
| } |
| |
| #[derive(Debug, Clone)] |
| pub enum ExprFuncKind { |
| Aggregate(AggregateFunction), |
| Window(Box<WindowFunction>), |
| } |
| |
| /// Implementation of [`ExprFunctionExt`]. |
| /// |
| /// See [`ExprFunctionExt`] for usage and examples |
| #[derive(Debug, Clone)] |
| pub struct ExprFuncBuilder { |
| fun: Option<ExprFuncKind>, |
| order_by: Option<Vec<Sort>>, |
| filter: Option<Expr>, |
| distinct: bool, |
| null_treatment: Option<NullTreatment>, |
| partition_by: Option<Vec<Expr>>, |
| window_frame: Option<WindowFrame>, |
| } |
| |
| impl ExprFuncBuilder { |
| /// Create a new `ExprFuncBuilder`, see [`ExprFunctionExt`] |
| fn new(fun: Option<ExprFuncKind>) -> Self { |
| Self { |
| fun, |
| order_by: None, |
| filter: None, |
| distinct: false, |
| null_treatment: None, |
| partition_by: None, |
| window_frame: None, |
| } |
| } |
| |
| /// Updates and returns the in progress [`Expr::AggregateFunction`] or [`Expr::WindowFunction`] |
| /// |
| /// # Errors: |
| /// |
| /// Returns an error if this builder [`ExprFunctionExt`] was used with an |
| /// `Expr` variant other than [`Expr::AggregateFunction`] or [`Expr::WindowFunction`] |
| pub fn build(self) -> Result<Expr> { |
| let Self { |
| fun, |
| order_by, |
| filter, |
| distinct, |
| null_treatment, |
| partition_by, |
| window_frame, |
| } = self; |
| |
| let Some(fun) = fun else { |
| return plan_err!( |
| "ExprFunctionExt can only be used with Expr::AggregateFunction or Expr::WindowFunction" |
| ); |
| }; |
| |
| let fun_expr = match fun { |
| ExprFuncKind::Aggregate(mut udaf) => { |
| udaf.params.order_by = order_by.unwrap_or_default(); |
| udaf.params.filter = filter.map(Box::new); |
| udaf.params.distinct = distinct; |
| udaf.params.null_treatment = null_treatment; |
| Expr::AggregateFunction(udaf) |
| } |
| ExprFuncKind::Window(mut udwf) => { |
| let has_order_by = order_by.as_ref().map(|o| !o.is_empty()); |
| udwf.params.partition_by = partition_by.unwrap_or_default(); |
| udwf.params.order_by = order_by.unwrap_or_default(); |
| udwf.params.window_frame = |
| window_frame.unwrap_or_else(|| WindowFrame::new(has_order_by)); |
| udwf.params.filter = filter.map(Box::new); |
| udwf.params.null_treatment = null_treatment; |
| udwf.params.distinct = distinct; |
| Expr::WindowFunction(udwf) |
| } |
| }; |
| |
| Ok(fun_expr) |
| } |
| } |
| |
| impl ExprFunctionExt for ExprFuncBuilder { |
| /// Add `ORDER BY <order_by>` |
| fn order_by(mut self, order_by: Vec<Sort>) -> ExprFuncBuilder { |
| self.order_by = Some(order_by); |
| self |
| } |
| |
| /// Add `FILTER <filter>` |
| fn filter(mut self, filter: Expr) -> ExprFuncBuilder { |
| self.filter = Some(filter); |
| self |
| } |
| |
| /// Add `DISTINCT` |
| fn distinct(mut self) -> ExprFuncBuilder { |
| self.distinct = true; |
| self |
| } |
| |
| /// Add `RESPECT NULLS` or `IGNORE NULLS` |
| fn null_treatment( |
| mut self, |
| null_treatment: impl Into<Option<NullTreatment>>, |
| ) -> ExprFuncBuilder { |
| self.null_treatment = null_treatment.into(); |
| self |
| } |
| |
| fn partition_by(mut self, partition_by: Vec<Expr>) -> ExprFuncBuilder { |
| self.partition_by = Some(partition_by); |
| self |
| } |
| |
| fn window_frame(mut self, window_frame: WindowFrame) -> ExprFuncBuilder { |
| self.window_frame = Some(window_frame); |
| self |
| } |
| } |
| |
| impl ExprFunctionExt for Expr { |
| fn order_by(self, order_by: Vec<Sort>) -> ExprFuncBuilder { |
| let mut builder = match self { |
| Expr::AggregateFunction(udaf) => { |
| ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf))) |
| } |
| Expr::WindowFunction(udwf) => { |
| ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf))) |
| } |
| _ => ExprFuncBuilder::new(None), |
| }; |
| if builder.fun.is_some() { |
| builder.order_by = Some(order_by); |
| } |
| builder |
| } |
| fn filter(self, filter: Expr) -> ExprFuncBuilder { |
| match self { |
| Expr::AggregateFunction(udaf) => { |
| let mut builder = |
| ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf))); |
| builder.filter = Some(filter); |
| builder |
| } |
| _ => ExprFuncBuilder::new(None), |
| } |
| } |
| fn distinct(self) -> ExprFuncBuilder { |
| match self { |
| Expr::AggregateFunction(udaf) => { |
| let mut builder = |
| ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf))); |
| builder.distinct = true; |
| builder |
| } |
| _ => ExprFuncBuilder::new(None), |
| } |
| } |
| fn null_treatment( |
| self, |
| null_treatment: impl Into<Option<NullTreatment>>, |
| ) -> ExprFuncBuilder { |
| let mut builder = match self { |
| Expr::AggregateFunction(udaf) => { |
| ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf))) |
| } |
| Expr::WindowFunction(udwf) => { |
| ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf))) |
| } |
| _ => ExprFuncBuilder::new(None), |
| }; |
| if builder.fun.is_some() { |
| builder.null_treatment = null_treatment.into(); |
| } |
| builder |
| } |
| |
| fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder { |
| match self { |
| Expr::WindowFunction(udwf) => { |
| let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf))); |
| builder.partition_by = Some(partition_by); |
| builder |
| } |
| _ => ExprFuncBuilder::new(None), |
| } |
| } |
| |
| fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder { |
| match self { |
| Expr::WindowFunction(udwf) => { |
| let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf))); |
| builder.window_frame = Some(window_frame); |
| builder |
| } |
| _ => ExprFuncBuilder::new(None), |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| |
| #[test] |
| fn filter_is_null_and_is_not_null() { |
| let col_null = col("col1"); |
| let col_not_null = ident("col2"); |
| assert_eq!(format!("{}", col_null.is_null()), "col1 IS NULL"); |
| assert_eq!( |
| format!("{}", col_not_null.is_not_null()), |
| "col2 IS NOT NULL" |
| ); |
| } |
| } |