| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan |
| |
| use arrow::datatypes::Schema; |
| |
| use crate::error::Result; |
| use crate::logical_plan::Expr; |
| use crate::logical_plan::{and, LogicalPlan}; |
| use crate::optimizer::optimizer::OptimizerRule; |
| use crate::optimizer::utils; |
| use std::{ |
| collections::{HashMap, HashSet}, |
| sync::Arc, |
| }; |
| |
| /// Filter Push Down optimizer rule pushes filter clauses down the plan |
| /// # Introduction |
| /// A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)). |
| /// An example of a filter-commutative operation is a projection; a counter-example is `limit`. |
| /// |
| /// The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B) |
| /// can commute with a filter that depends on A only, but does not commute with a filter that depends |
| /// on SUM(B). |
| /// |
| /// This optimizer commutes filters with filter-commutative operations to push the filters |
| /// the closest possible to the scans, re-writing the filter expressions by every |
| /// projection that changes the filter's expression. |
| /// |
| /// Filter: #b Gt Int64(10) |
| /// Projection: #a AS b |
| /// |
| /// is optimized to |
| /// |
| /// Projection: #a AS b |
| /// Filter: #a Gt Int64(10) <--- changed from #b to #a |
| /// |
| /// This performs a single pass trought the plan. When it passes trought a filter, it stores that filter, |
| /// and when it reaches a node that does not commute with it, it adds the filter to that place. |
| /// When it passes through a projection, it re-writes the filter's expression taking into accoun that projection. |
| /// When multiple filters would have been written, it `AND` their expressions into a single expression. |
| pub struct FilterPushDown {} |
| |
| #[derive(Debug, Clone, Default)] |
| struct State { |
| // (predicate, columns on the predicate) |
| filters: Vec<(Expr, HashSet<String>)>, |
| } |
| |
| type Predicates<'a> = (Vec<&'a Expr>, Vec<&'a HashSet<String>>); |
| |
| /// returns all predicates in `state` that depend on any of `used_columns` |
| fn get_predicates<'a>( |
| state: &'a State, |
| used_columns: &HashSet<String>, |
| ) -> Predicates<'a> { |
| state |
| .filters |
| .iter() |
| .filter(|(_, columns)| { |
| columns |
| .intersection(used_columns) |
| .collect::<HashSet<_>>() |
| .len() |
| > 0 |
| }) |
| .map(|&(ref a, ref b)| (a, b)) |
| .unzip() |
| } |
| |
| // returns 3 (potentially overlaping) sets of predicates: |
| // * pushable to left: its columns are all on the left |
| // * pushable to right: its columns is all on the right |
| // * keep: the set of columns is not in only either left or right |
| // Note that a predicate can be both pushed to the left and to the right. |
| fn get_join_predicates<'a>( |
| state: &'a State, |
| left: &Schema, |
| right: &Schema, |
| ) -> ( |
| Vec<&'a HashSet<String>>, |
| Vec<&'a HashSet<String>>, |
| Predicates<'a>, |
| ) { |
| let left_columns = &left |
| .fields() |
| .iter() |
| .map(|f| f.name().clone()) |
| .collect::<HashSet<_>>(); |
| let right_columns = &right |
| .fields() |
| .iter() |
| .map(|f| f.name().clone()) |
| .collect::<HashSet<_>>(); |
| |
| let filters = state |
| .filters |
| .iter() |
| .map(|(predicate, columns)| { |
| ( |
| (predicate, columns), |
| ( |
| columns, |
| left_columns.intersection(columns).collect::<HashSet<_>>(), |
| right_columns.intersection(columns).collect::<HashSet<_>>(), |
| ), |
| ) |
| }) |
| .collect::<Vec<_>>(); |
| |
| let pushable_to_left = filters |
| .iter() |
| .filter(|(_, (columns, left, _))| left.len() == columns.len()) |
| .map(|((_, b), _)| *b) |
| .collect(); |
| let pushable_to_right = filters |
| .iter() |
| .filter(|(_, (columns, _, right))| right.len() == columns.len()) |
| .map(|((_, b), _)| *b) |
| .collect(); |
| let keep = filters |
| .iter() |
| .filter(|(_, (columns, left, right))| { |
| // predicates whose columns are not in only one side of the join need to remain |
| let all_in_left = left.len() == columns.len(); |
| let all_in_right = right.len() == columns.len(); |
| !all_in_left && !all_in_right |
| }) |
| .map(|((ref a, ref b), _)| (a, b)) |
| .unzip(); |
| (pushable_to_left, pushable_to_right, keep) |
| } |
| |
| /// Optimizes the plan |
| fn push_down(state: &State, plan: &LogicalPlan) -> Result<LogicalPlan> { |
| let new_inputs = utils::inputs(&plan) |
| .iter() |
| .map(|input| optimize(input, state.clone())) |
| .collect::<Result<Vec<_>>>()?; |
| |
| let expr = utils::expressions(&plan); |
| utils::from_plan(&plan, &expr, &new_inputs) |
| } |
| |
| /// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with |
| /// its predicate be all `predicates` ANDed. |
| fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { |
| // reduce filters to a single filter with an AND |
| let predicate = predicates |
| .iter() |
| .skip(1) |
| .fold(predicates[0].clone(), |acc, predicate| { |
| and(acc, (*predicate).to_owned()) |
| }); |
| |
| LogicalPlan::Filter { |
| predicate, |
| input: Arc::new(plan), |
| } |
| } |
| |
| // remove all filters from `filters` that are in `predicate_columns` |
| fn remove_filters( |
| filters: &[(Expr, HashSet<String>)], |
| predicate_columns: &[&HashSet<String>], |
| ) -> Vec<(Expr, HashSet<String>)> { |
| filters |
| .iter() |
| .filter(|(_, columns)| !predicate_columns.contains(&columns)) |
| .cloned() |
| .collect::<Vec<_>>() |
| } |
| |
| // keeps all filters from `filters` that are in `predicate_columns` |
| fn keep_filters( |
| filters: &[(Expr, HashSet<String>)], |
| predicate_columns: &[&HashSet<String>], |
| ) -> Vec<(Expr, HashSet<String>)> { |
| filters |
| .iter() |
| .filter(|(_, columns)| predicate_columns.contains(&columns)) |
| .cloned() |
| .collect::<Vec<_>>() |
| } |
| |
| /// builds a new [LogicalPlan] from `plan` by issuing new [LogicalPlan::Filter] if any of the filters |
| /// in `state` depend on the columns `used_columns`. |
| fn issue_filters( |
| mut state: State, |
| used_columns: HashSet<String>, |
| plan: &LogicalPlan, |
| ) -> Result<LogicalPlan> { |
| let (predicates, predicate_columns) = get_predicates(&state, &used_columns); |
| |
| if predicates.is_empty() { |
| // all filters can be pushed down => optimize inputs and return new plan |
| return push_down(&state, plan); |
| } |
| |
| let plan = add_filter(plan.clone(), &predicates); |
| |
| state.filters = remove_filters(&state.filters, &predicate_columns); |
| |
| // continue optimization over all input nodes by cloning the current state (i.e. each node is independent) |
| push_down(&state, &plan) |
| } |
| |
| fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> { |
| match plan { |
| LogicalPlan::Filter { input, predicate } => { |
| let mut columns: HashSet<String> = HashSet::new(); |
| utils::expr_to_column_names(predicate, &mut columns)?; |
| // collect the predicate |
| state.filters.push((predicate.clone(), columns)); |
| optimize(input, state) |
| } |
| LogicalPlan::Projection { |
| input, |
| expr, |
| schema, |
| } => { |
| // A projection is filter-commutable, but re-writes all predicate expressions |
| // collect projection. |
| let mut projection = HashMap::new(); |
| schema.fields().iter().enumerate().for_each(|(i, field)| { |
| // strip alias, as they should not be part of filters |
| let expr = match &expr[i] { |
| Expr::Alias(expr, _) => expr.as_ref().clone(), |
| expr => expr.clone(), |
| }; |
| |
| projection.insert(field.name().clone(), expr); |
| }); |
| |
| // re-write all filters based on this projection |
| // E.g. in `Filter: #b\n Projection: #a > 1 as b`, we can swap them, but the filter must be "#a > 1" |
| for (predicate, columns) in state.filters.iter_mut() { |
| *predicate = rewrite(predicate, &projection)?; |
| |
| columns.clear(); |
| utils::expr_to_column_names(predicate, columns)?; |
| } |
| |
| // optimize inner |
| let new_input = optimize(input, state)?; |
| |
| utils::from_plan(&plan, &expr, &vec![new_input]) |
| } |
| LogicalPlan::Aggregate { |
| input, aggr_expr, .. |
| } => { |
| // An aggregate's aggreagate columns are _not_ filter-commutable => collect these: |
| // * columns whose aggregation expression depends on |
| // * the aggregation columns themselves |
| |
| // construct set of columns that `aggr_expr` depends on |
| let mut used_columns = HashSet::new(); |
| utils::exprlist_to_column_names(aggr_expr, &mut used_columns)?; |
| |
| let agg_columns = aggr_expr |
| .iter() |
| .map(|x| x.name(input.schema())) |
| .collect::<Result<HashSet<_>>>()?; |
| used_columns.extend(agg_columns); |
| |
| issue_filters(state, used_columns, plan) |
| } |
| LogicalPlan::Sort { .. } => { |
| // sort is filter-commutable |
| push_down(&state, plan) |
| } |
| LogicalPlan::Limit { input, .. } => { |
| // limit is _not_ filter-commutable => collect all columns from its input |
| let used_columns = input |
| .schema() |
| .fields() |
| .iter() |
| .map(|f| f.name().clone()) |
| .collect::<HashSet<_>>(); |
| issue_filters(state, used_columns, plan) |
| } |
| LogicalPlan::Join { left, right, .. } => { |
| let (pushable_to_left, pushable_to_right, keep) = |
| get_join_predicates(&state, &left.schema(), &right.schema()); |
| |
| let mut left_state = state.clone(); |
| left_state.filters = keep_filters(&left_state.filters, &pushable_to_left); |
| let left = optimize(left, left_state)?; |
| |
| let mut right_state = state.clone(); |
| right_state.filters = keep_filters(&right_state.filters, &pushable_to_right); |
| let right = optimize(right, right_state)?; |
| |
| // create a new Join with the new `left` and `right` |
| let expr = utils::expressions(&plan); |
| let plan = utils::from_plan(&plan, &expr, &vec![left, right])?; |
| |
| if keep.0.is_empty() { |
| Ok(plan) |
| } else { |
| // wrap the join on the filter whose predicates must be kept |
| let plan = add_filter(plan, &keep.0); |
| state.filters = remove_filters(&state.filters, &keep.1); |
| |
| Ok(plan) |
| } |
| } |
| _ => { |
| // all other plans are _not_ filter-commutable |
| let used_columns = plan |
| .schema() |
| .fields() |
| .iter() |
| .map(|f| f.name().clone()) |
| .collect::<HashSet<_>>(); |
| issue_filters(state, used_columns, plan) |
| } |
| } |
| } |
| |
| impl OptimizerRule for FilterPushDown { |
| fn name(&self) -> &str { |
| return "filter_push_down"; |
| } |
| |
| fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> { |
| optimize(plan, State::default()) |
| } |
| } |
| |
| impl FilterPushDown { |
| #[allow(missing_docs)] |
| pub fn new() -> Self { |
| Self {} |
| } |
| } |
| |
| /// replaces columns by its name on the projection. |
| fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> { |
| let expressions = utils::expr_sub_expressions(&expr)?; |
| |
| let expressions = expressions |
| .iter() |
| .map(|e| rewrite(e, &projection)) |
| .collect::<Result<Vec<_>>>()?; |
| |
| match expr { |
| Expr::Column(name) => { |
| if let Some(expr) = projection.get(name) { |
| return Ok(expr.clone()); |
| } |
| } |
| _ => {} |
| } |
| |
| utils::rewrite_expression(&expr, &expressions) |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::logical_plan::{lit, sum, Expr, LogicalPlanBuilder, Operator}; |
| use crate::test::*; |
| use crate::{logical_plan::col, prelude::JoinType}; |
| |
| fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { |
| let mut rule = FilterPushDown::new(); |
| let optimized_plan = rule.optimize(plan).expect("failed to optimize plan"); |
| let formatted_plan = format!("{:?}", optimized_plan); |
| assert_eq!(formatted_plan, expected); |
| } |
| |
| #[test] |
| fn filter_before_projection() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("b")])? |
| .filter(col("a").eq(lit(1i64)))? |
| .build()?; |
| // filter is before projection |
| let expected = "\ |
| Projection: #a, #b\ |
| \n Filter: #a Eq Int64(1)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn filter_after_limit() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("b")])? |
| .limit(10)? |
| .filter(col("a").eq(lit(1i64)))? |
| .build()?; |
| // filter is before single projection |
| let expected = "\ |
| Filter: #a Eq Int64(1)\ |
| \n Limit: 10\ |
| \n Projection: #a, #b\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn filter_jump_2_plans() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("b"), col("c")])? |
| .project(vec![col("c"), col("b")])? |
| .filter(col("a").eq(lit(1i64)))? |
| .build()?; |
| // filter is before double projection |
| let expected = "\ |
| Projection: #c, #b\ |
| \n Projection: #a, #b, #c\ |
| \n Filter: #a Eq Int64(1)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn filter_move_agg() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .aggregate(vec![col("a")], vec![sum(col("b")).alias("total_salary")])? |
| .filter(col("a").gt(lit(10i64)))? |
| .build()?; |
| // filter of key aggregation is commutative |
| let expected = "\ |
| Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS total_salary]]\ |
| \n Filter: #a Gt Int64(10)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| #[test] |
| fn filter_keep_agg() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .aggregate(vec![col("a")], vec![sum(col("b")).alias("b")])? |
| .filter(col("b").gt(lit(10i64)))? |
| .build()?; |
| // filter of aggregate is after aggregation since they are non-commutative |
| let expected = "\ |
| Filter: #b Gt Int64(10)\ |
| \n Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS b]]\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written |
| #[test] |
| fn alias() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a").alias("b"), col("c")])? |
| .filter(col("b").eq(lit(1i64)))? |
| .build()?; |
| // filter is before projection |
| let expected = "\ |
| Projection: #a AS b, #c\ |
| \n Filter: #a Eq Int64(1)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| fn add(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr { |
| left: Box::new(left), |
| op: Operator::Plus, |
| right: Box::new(right), |
| } |
| } |
| |
| fn multiply(left: Expr, right: Expr) -> Expr { |
| Expr::BinaryExpr { |
| left: Box::new(left), |
| op: Operator::Multiply, |
| right: Box::new(right), |
| } |
| } |
| |
| /// verifies that a filter is pushed to before a projection with a complex expression, the filter expression is correctly re-written |
| #[test] |
| fn complex_expression() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![ |
| add(multiply(col("a"), lit(2)), col("c")).alias("b"), |
| col("c"), |
| ])? |
| .filter(col("b").eq(lit(1i64)))? |
| .build()?; |
| |
| // not part of the test, just good to know: |
| assert_eq!( |
| format!("{:?}", plan), |
| "\ |
| Filter: #b Eq Int64(1)\ |
| \n Projection: #a Multiply Int32(2) Plus #c AS b, #c\ |
| \n TableScan: test projection=None" |
| ); |
| |
| // filter is before projection |
| let expected = "\ |
| Projection: #a Multiply Int32(2) Plus #c AS b, #c\ |
| \n Filter: #a Multiply Int32(2) Plus #c Eq Int64(1)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written |
| #[test] |
| fn complex_plan() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![ |
| add(multiply(col("a"), lit(2)), col("c")).alias("b"), |
| col("c"), |
| ])? |
| // second projection where we rename columns, just to make it difficult |
| .project(vec![multiply(col("b"), lit(3)).alias("a"), col("c")])? |
| .filter(col("a").eq(lit(1i64)))? |
| .build()?; |
| |
| // not part of the test, just good to know: |
| assert_eq!( |
| format!("{:?}", plan), |
| "\ |
| Filter: #a Eq Int64(1)\ |
| \n Projection: #b Multiply Int32(3) AS a, #c\ |
| \n Projection: #a Multiply Int32(2) Plus #c AS b, #c\ |
| \n TableScan: test projection=None" |
| ); |
| |
| // filter is before the projections |
| let expected = "\ |
| Projection: #b Multiply Int32(3) AS a, #c\ |
| \n Projection: #a Multiply Int32(2) Plus #c AS b, #c\ |
| \n Filter: #a Multiply Int32(2) Plus #c Multiply Int32(3) Eq Int64(1)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// verifies that when two filters apply after an aggregation that only allows one to be pushed, one is pushed |
| /// and the other not. |
| #[test] |
| fn multi_filter() -> Result<()> { |
| // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c)) |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a").alias("b"), col("c")])? |
| .aggregate(vec![col("b")], vec![sum(col("c"))])? |
| .filter(col("b").gt(lit(10i64)))? |
| .filter(col("SUM(c)").gt(lit(10i64)))? |
| .build()?; |
| |
| // not part of the test, just good to know: |
| assert_eq!( |
| format!("{:?}", plan), |
| "\ |
| Filter: #SUM(c) Gt Int64(10)\ |
| \n Filter: #b Gt Int64(10)\ |
| \n Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\ |
| \n Projection: #a AS b, #c\ |
| \n TableScan: test projection=None" |
| ); |
| |
| // filter is before the projections |
| let expected = "\ |
| Filter: #SUM(c) Gt Int64(10)\ |
| \n Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\ |
| \n Projection: #a AS b, #c\ |
| \n Filter: #a Gt Int64(10)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| |
| Ok(()) |
| } |
| |
| /// verifies that when two limits are in place, we jump neither |
| #[test] |
| fn double_limit() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("b")])? |
| .limit(20)? |
| .limit(10)? |
| .project(vec![col("a"), col("b")])? |
| .filter(col("a").eq(lit(1i64)))? |
| .build()?; |
| // filter does not just any of the limits |
| let expected = "\ |
| Projection: #a, #b\ |
| \n Filter: #a Eq Int64(1)\ |
| \n Limit: 10\ |
| \n Limit: 20\ |
| \n Projection: #a, #b\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// verifies that filters with the same columns are correctly placed |
| #[test] |
| fn filter_2_breaks_limits() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a")])? |
| .filter(col("a").lt_eq(lit(1i64)))? |
| .limit(1)? |
| .project(vec![col("a")])? |
| .filter(col("a").gt_eq(lit(1i64)))? |
| .build()?; |
| // Should be able to move both filters below the projections |
| |
| // not part of the test |
| assert_eq!( |
| format!("{:?}", plan), |
| "Filter: #a GtEq Int64(1)\ |
| \n Projection: #a\ |
| \n Limit: 1\ |
| \n Filter: #a LtEq Int64(1)\ |
| \n Projection: #a\ |
| \n TableScan: test projection=None" |
| ); |
| |
| let expected = "\ |
| Projection: #a\ |
| \n Filter: #a GtEq Int64(1)\ |
| \n Limit: 1\ |
| \n Projection: #a\ |
| \n Filter: #a LtEq Int64(1)\ |
| \n TableScan: test projection=None"; |
| |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// verifies that filters to be placed on the same depth are ANDed |
| #[test] |
| fn two_filters_on_same_depth() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .limit(1)? |
| .filter(col("a").lt_eq(lit(1i64)))? |
| .filter(col("a").gt_eq(lit(1i64)))? |
| .project(vec![col("a")])? |
| .build()?; |
| |
| // not part of the test |
| assert_eq!( |
| format!("{:?}", plan), |
| "Projection: #a\ |
| \n Filter: #a GtEq Int64(1)\ |
| \n Filter: #a LtEq Int64(1)\ |
| \n Limit: 1\ |
| \n TableScan: test projection=None" |
| ); |
| |
| let expected = "\ |
| Projection: #a\ |
| \n Filter: #a GtEq Int64(1) And #a LtEq Int64(1)\ |
| \n Limit: 1\ |
| \n TableScan: test projection=None"; |
| |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// verifies that filters on a plan with user nodes are not lost |
| /// (ARROW-10547) |
| #[test] |
| fn filters_user_defined_node() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let plan = LogicalPlanBuilder::from(&table_scan) |
| .filter(col("a").lt_eq(lit(1i64)))? |
| .build()?; |
| |
| let plan = crate::test::user_defined::new(plan); |
| |
| let expected = "\ |
| TestUserDefined\ |
| \n Filter: #a LtEq Int64(1)\ |
| \n TableScan: test projection=None"; |
| |
| // not part of the test |
| assert_eq!(format!("{:?}", plan), expected); |
| |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// post-join predicates on a column common to both sides is pushed to both sides |
| #[test] |
| fn filter_join_on_common_independent() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let left = LogicalPlanBuilder::from(&table_scan).build()?; |
| let right = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a")])? |
| .build()?; |
| let plan = LogicalPlanBuilder::from(&left) |
| .join(&right, JoinType::Inner, &["a"], &["a"])? |
| .filter(col("a").lt_eq(lit(1i64)))? |
| .build()?; |
| |
| // not part of the test, just good to know: |
| assert_eq!( |
| format!("{:?}", plan), |
| "\ |
| Filter: #a LtEq Int64(1)\ |
| \n Join: a = a\ |
| \n TableScan: test projection=None\ |
| \n Projection: #a\ |
| \n TableScan: test projection=None" |
| ); |
| |
| // filter sent to side before the join |
| let expected = "\ |
| Join: a = a\ |
| \n Filter: #a LtEq Int64(1)\ |
| \n TableScan: test projection=None\ |
| \n Projection: #a\ |
| \n Filter: #a LtEq Int64(1)\ |
| \n TableScan: test projection=None"; |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// post-join predicates with columns from both sides are not pushed |
| #[test] |
| fn filter_join_on_common_dependent() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let left = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("c")])? |
| .build()?; |
| let right = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("b")])? |
| .build()?; |
| let plan = LogicalPlanBuilder::from(&left) |
| .join(&right, JoinType::Inner, &["a"], &["a"])? |
| // "b" and "c" are not shared by either side: they are only available together after the join |
| .filter(col("c").lt_eq(col("b")))? |
| .build()?; |
| |
| // not part of the test, just good to know: |
| assert_eq!( |
| format!("{:?}", plan), |
| "\ |
| Filter: #c LtEq #b\ |
| \n Join: a = a\ |
| \n Projection: #a, #c\ |
| \n TableScan: test projection=None\ |
| \n Projection: #a, #b\ |
| \n TableScan: test projection=None" |
| ); |
| |
| // expected is equal: no push-down |
| let expected = &format!("{:?}", plan); |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| |
| /// post-join predicates with columns from one side of a join are pushed only to that side |
| #[test] |
| fn filter_join_on_one_side() -> Result<()> { |
| let table_scan = test_table_scan()?; |
| let left = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("b")])? |
| .build()?; |
| let right = LogicalPlanBuilder::from(&table_scan) |
| .project(vec![col("a"), col("c")])? |
| .build()?; |
| let plan = LogicalPlanBuilder::from(&left) |
| .join(&right, JoinType::Inner, &["a"], &["a"])? |
| .filter(col("b").lt_eq(lit(1i64)))? |
| .build()?; |
| |
| // not part of the test, just good to know: |
| assert_eq!( |
| format!("{:?}", plan), |
| "\ |
| Filter: #b LtEq Int64(1)\ |
| \n Join: a = a\ |
| \n Projection: #a, #b\ |
| \n TableScan: test projection=None\ |
| \n Projection: #a, #c\ |
| \n TableScan: test projection=None" |
| ); |
| |
| let expected = "\ |
| Join: a = a\ |
| \n Projection: #a, #b\ |
| \n Filter: #b LtEq Int64(1)\ |
| \n TableScan: test projection=None\ |
| \n Projection: #a, #c\ |
| \n TableScan: test projection=None"; |
| |
| assert_optimized_plan_eq(&plan, expected); |
| Ok(()) |
| } |
| } |