blob: 877ff8c754ad5ac327088f0429750e81e8c95560 [file] [log] [blame] [view]
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# DataFusion Query Optimizer
[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory
format.
DataFusion has modular design, allowing individual crates to be re-used in other projects.
This crate is a submodule of DataFusion that provides a query optimizer for logical plans, and
contains an extensive set of OptimizerRules that may rewrite the plan and/or its expressions so
they execute more quickly while still computing the same result.
## Running the Optimizer
The following code demonstrates the basic flow of creating the optimizer with a default set of optimization rules
and applying it to a logical plan to produce an optimized logical plan.
```rust
use std::sync::Arc;
use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::{OptimizerRule, OptimizerContext, Optimizer};
// We need a logical plan as the starting point. There are many ways to build a logical plan:
//
// The `datafusion-expr` crate provides a LogicalPlanBuilder
// The `datafusion-sql` crate provides a SQL query planner that can create a LogicalPlan from SQL
// The `datafusion` crate provides a DataFrame API that can create a LogicalPlan
let initial_logical_plan = LogicalPlanBuilder::empty(false).build().unwrap();
// use builtin rules or customized rules
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![];
let optimizer = Optimizer::with_rules(rules);
let config = OptimizerContext::new().with_max_passes(16);
let optimized_plan = optimizer.optimize(initial_logical_plan.clone(), &config, observer);
fn observer(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
println!(
"After applying rule '{}':\n{}",
rule.name(),
plan.display_indent()
)
}
```
## Writing Optimization Rules
Please refer to the
[optimizer_rule.rs](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/optimizer_rule.rs)
example to learn more about the general approach to writing optimizer rules and
then move onto studying the existing rules.
`OptimizerRule` transforms one ['LogicalPlan'] into another which
computes the same results, but in a potentially more efficient
way. If there are no suitable transformations for the input plan,
the optimizer can simply return it as is.
All rules must implement the `OptimizerRule` trait.
```rust
# use datafusion::common::tree_node::Transformed;
# use datafusion::common::Result;
# use datafusion::logical_expr::LogicalPlan;
# use datafusion::optimizer::{OptimizerConfig, OptimizerRule};
#
#[derive(Default, Debug)]
struct MyOptimizerRule {}
impl OptimizerRule for MyOptimizerRule {
fn name(&self) -> &str {
"my_optimizer_rule"
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
unimplemented!()
}
}
```
## Providing Custom Rules
The optimizer can be created with a custom set of rules.
```rust
# use std::sync::Arc;
# use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
# use datafusion::optimizer::{OptimizerRule, OptimizerConfig, OptimizerContext, Optimizer};
# use datafusion::common::tree_node::Transformed;
# use datafusion::common::Result;
#
# #[derive(Default, Debug)]
# struct MyOptimizerRule {}
#
# impl OptimizerRule for MyOptimizerRule {
# fn name(&self) -> &str {
# "my_optimizer_rule"
# }
#
# fn rewrite(
# &self,
# plan: LogicalPlan,
# _config: &dyn OptimizerConfig,
# ) -> Result<Transformed<LogicalPlan>> {
# unimplemented!()
# }
# }
let optimizer = Optimizer::with_rules(vec![
Arc::new(MyOptimizerRule {})
]);
```
### General Guidelines
Rules typical walk the logical plan and walk the expression trees inside operators and selectively mutate
individual operators or expressions.
Sometimes there is an initial pass that visits the plan and builds state that is used in a second pass that performs
the actual optimization. This approach is used in projection push down and filter push down.
### Expression Naming
Every expression in DataFusion has a name, which is used as the column name. For example, in this example the output
contains a single column with the name `"COUNT(aggregate_test_100.c9)"`:
```text
> select count(c9) from aggregate_test_100;
+------------------------------+
| COUNT(aggregate_test_100.c9) |
+------------------------------+
| 100 |
+------------------------------+
```
These names are used to refer to the columns in both subqueries as well as internally from one stage of the LogicalPlan
to another. For example:
```text
> select "COUNT(aggregate_test_100.c9)" + 1 from (select count(c9) from aggregate_test_100) as sq;
+--------------------------------------------+
| sq.COUNT(aggregate_test_100.c9) + Int64(1) |
+--------------------------------------------+
| 101 |
+--------------------------------------------+
```
### Implication
Because DataFusion identifies columns using a string name, it means it is critical that the names of expressions are
not changed by the optimizer when it rewrites expressions. This is typically accomplished by renaming a rewritten
expression by adding an alias.
Here is a simple example of such a rewrite. The expression `1 + 2` can be internally simplified to 3 but must still be
displayed the same as `1 + 2`:
```text
> select 1 + 2;
+---------------------+
| Int64(1) + Int64(2) |
+---------------------+
| 3 |
+---------------------+
```
Looking at the `EXPLAIN` output we can see that the optimizer has effectively rewritten `1 + 2` into effectively
`3 as "1 + 2"`:
```text
> explain format indent select 1 + 2;
+---------------+-------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------+
| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) |
| | EmptyRelation |
| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] |
| | PlaceholderRowExec |
| | |
+---------------+-------------------------------------------------+
```
If the expression name is not preserved, bugs such as [#3704](https://github.com/apache/datafusion/issues/3704)
and [#3555](https://github.com/apache/datafusion/issues/3555) occur where the expected columns can not be found.
### Building Expression Names
There are currently two ways to create a name for an expression in the logical plan.
```rust
# use datafusion::common::Result;
# struct Expr;
impl Expr {
/// Returns the name of this expression as it should appear in a schema. This name
/// will not include any CAST expressions.
pub fn display_name(&self) -> Result<String> {
Ok("display_name".to_string())
}
/// Returns a full and complete string representation of this expression.
pub fn canonical_name(&self) -> String {
"canonical_name".to_string()
}
}
```
When comparing expressions to determine if they are equivalent, `canonical_name` should be used, and when creating a
name to be used in a schema, `display_name` should be used.
### Utilities
There are a number of [utility methods][util] provided that take care of some common tasks.
[util]: https://github.com/apache/datafusion/blob/main/datafusion/expr/src/utils.rs
### Recursively walk an expression tree
The [TreeNode API] provides a convenient way to recursively walk an expression or plan tree.
For example, to find all subquery references in a logical plan, the following code can be used:
```rust
# use datafusion::prelude::*;
# use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
# use datafusion::common::Result;
// Return all subquery references in an expression
fn extract_subquery_filters(expression: &Expr) -> Result<Vec<&Expr>> {
let mut extracted = vec![];
expression.apply(|expr| {
if let Expr::InSubquery(_) = expr {
extracted.push(expr);
}
Ok(TreeNodeRecursion::Continue)
})?;
Ok(extracted)
}
```
Likewise you can use the [TreeNode API] to rewrite a `LogicalPlan` or `ExecutionPlan`
```rust
# use datafusion::prelude::*;
# use datafusion::logical_expr::{LogicalPlan, Join};
# use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
# use datafusion::common::Result;
// Return all joins in a logical plan
fn find_joins(overall_plan: &LogicalPlan) -> Result<Vec<&Join>> {
let mut extracted = vec![];
overall_plan.apply(|plan| {
if let LogicalPlan::Join(join) = plan {
extracted.push(join);
}
Ok(TreeNodeRecursion::Continue)
})?;
Ok(extracted)
}
```
### Rewriting expressions
The [TreeNode API] also provides a convenient way to rewrite expressions and
plans as well. For example to rewrite all expressions like
```sql
col BETWEEN x AND y
```
into
```sql
col >= x AND col <= y
```
you can use the following code:
```rust
# use datafusion::prelude::*;
# use datafusion::logical_expr::{Between};
# use datafusion::logical_expr::expr_fn::*;
# use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
# use datafusion::common::Result;
// Recursively rewrite all BETWEEN expressions
// returns Transformed::yes if any changes were made
fn rewrite_between(expr: Expr) -> Result<Transformed<Expr>> {
// transform_up does a bottom up rewrite
expr.transform_up(|expr| {
// only handle BETWEEN expressions
let Expr::Between(Between {
negated,
expr,
low,
high,
}) = expr else {
return Ok(Transformed::no(expr))
};
let rewritten_expr = if negated {
// don't rewrite NOT BETWEEN
Expr::Between(Between::new(expr, negated, low, high))
} else {
// rewrite to (expr >= low) AND (expr <= high)
expr.clone().gt_eq(*low).and(expr.lt_eq(*high))
};
Ok(Transformed::yes(rewritten_expr))
})
}
```
### Writing Tests
There should be unit tests in the same file as the new rule that test the effect of the rule being applied to a plan
in isolation (without any other rule being applied).
There should also be a test in `integration-tests.rs` that tests the rule as part of the overall optimization process.
### Debugging
The `EXPLAIN VERBOSE` command can be used to show the effect of each optimization rule on a query.
In the following example, the `type_coercion` and `simplify_expressions` passes have simplified the plan so that it returns the constant `"3.2"` rather than doing a computation at execution time.
```text
> explain verbose select cast(1 + 2.2 as string) as foo;
+------------------------------------------------------------+---------------------------------------------------------------------------+
| plan_type | plan |
+------------------------------------------------------------+---------------------------------------------------------------------------+
| initial_logical_plan | Projection: CAST(Int64(1) + Float64(2.2) AS Utf8) AS foo |
| | EmptyRelation |
| logical_plan after type_coercion | Projection: CAST(CAST(Int64(1) AS Float64) + Float64(2.2) AS Utf8) AS foo |
| | EmptyRelation |
| logical_plan after simplify_expressions | Projection: Utf8("3.2") AS foo |
| | EmptyRelation |
| logical_plan after unwrap_cast_in_comparison | SAME TEXT AS ABOVE |
| logical_plan after decorrelate_where_exists | SAME TEXT AS ABOVE |
| logical_plan after decorrelate_where_in | SAME TEXT AS ABOVE |
| logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE |
| logical_plan after subquery_filter_to_join | SAME TEXT AS ABOVE |
| logical_plan after simplify_expressions | SAME TEXT AS ABOVE |
| logical_plan after eliminate_filter | SAME TEXT AS ABOVE |
| logical_plan after reduce_cross_join | SAME TEXT AS ABOVE |
| logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE |
| logical_plan after eliminate_limit | SAME TEXT AS ABOVE |
| logical_plan after projection_push_down | SAME TEXT AS ABOVE |
| logical_plan after rewrite_disjunctive_predicate | SAME TEXT AS ABOVE |
| logical_plan after reduce_outer_join | SAME TEXT AS ABOVE |
| logical_plan after filter_push_down | SAME TEXT AS ABOVE |
| logical_plan after limit_push_down | SAME TEXT AS ABOVE |
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE |
| logical_plan | Projection: Utf8("3.2") AS foo |
| | EmptyRelation |
| initial_physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | PlaceholderRowExec |
| | |
| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE |
| physical_plan after join_selection | SAME TEXT AS ABOVE |
| physical_plan after coalesce_batches | SAME TEXT AS ABOVE |
| physical_plan after repartition | SAME TEXT AS ABOVE |
| physical_plan after add_merge_exec | SAME TEXT AS ABOVE |
| physical_plan | ProjectionExec: expr=[3.2 as foo] |
| | PlaceholderRowExec |
| | |
+------------------------------------------------------------+---------------------------------------------------------------------------+
```
[df]: https://crates.io/crates/datafusion
## Thinking about Query Optimization
Query optimization in DataFusion uses a cost based model. The cost based model
relies on table and column level statistics to estimate selectivity; selectivity
estimates are an important piece in cost analysis for filters and projections
as they allow estimating the cost of joins and filters.
An important piece of building these estimates is _boundary analysis_ which uses
interval arithmetic to take an expression such as `a > 2500 AND a <= 5000` and
build an accurate selectivity estimate that can then be used to find more efficient
plans.
### `AnalysisContext` API
The `AnalysisContext` serves as a shared knowledge base during expression evaluation
and boundary analysis. Think of it as a dynamic repository that maintains information about:
1. Current known boundaries for columns and expressions
2. Statistics that have been gathered or inferred
3. A mutable state that can be updated as analysis progresses
What makes `AnalysisContext` particularly powerful is its ability to propagate information
through the expression tree. As each node in the expression tree is analyzed, it can both
read from and write to this shared context, allowing for sophisticated boundary analysis and inference.
### `ColumnStatistics` for Cardinality Estimation
Column statistics form the foundation of optimization decisions. Rather than just tracking
simple metrics, DataFusion's `ColumnStatistics` provides a rich set of information including:
- Null value counts
- Maximum and minimum values
- Value sums (for numeric columns)
- Distinct value counts
Each of these statistics is wrapped in a `Precision` type that indicates whether the value is
exact or estimated, allowing the optimizer to make informed decisions about the reliability
of its cardinality estimates.
### Boundary Analysis Flow
The boundary analysis process flows through several stages, with each stage building
upon the information gathered in previous stages. The `AnalysisContext` is continuously
updated as the analysis progresses through the expression tree.
#### Expression Boundary Analysis
When analyzing expressions, DataFusion runs boundary analysis using interval arithmetic.
Consider a simple predicate like age > 18 AND age <= 25. The analysis flows as follows:
1. Context Initialization
- Begin with known column statistics
- Set up initial boundaries based on column constraints
- Initialize the shared analysis context
2. Expression Tree Walk
- Analyze each node in the expression tree
- Propagate boundary information upward
- Allow child nodes to influence parent boundaries
3. Boundary Updates
- Each expression can update the shared context
- Changes flow through the entire expression tree
- Final boundaries inform optimization decisions
### Working with the analysis API
The following example shows how you can run an analysis pass on a physical expression
to infer the selectivity of the expression and the space of possible values it can
take.
```rust
# use std::sync::Arc;
# use datafusion::prelude::*;
# use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
# use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
# use datafusion::common::stats::Precision;
#
# use datafusion::common::{ColumnStatistics, DFSchema};
# use datafusion::common::{ScalarValue, ToDFSchema};
# use datafusion::error::Result;
fn analyze_filter_example() -> Result<()> {
// Create a schema with an 'age' column
let age = Field::new("age", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![age]));
// Define column statistics
let column_stats = ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int64(Some(79))),
min_value: Precision::Exact(ScalarValue::Int64(Some(14))),
distinct_count: Precision::Absent,
sum_value: Precision::Absent,
};
// Create expression: age > 18 AND age <= 25
let expr = col("age")
.gt(lit(18i64))
.and(col("age").lt_eq(lit(25i64)));
// Initialize analysis context
let initial_boundaries = vec![ExprBoundaries::try_from_column(
&schema, &column_stats, 0)?];
let context = AnalysisContext::new(initial_boundaries);
// Analyze expression
let df_schema = DFSchema::try_from(schema)?;
let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?;
let analysis = analyze(&physical_expr, context, df_schema.as_ref())?;
Ok(())
}
```