blob: 877a84fe4dc143c76c5662698ee4cd7f9486f292 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! [`Optimizer`] and [`OptimizerRule`]
use std::fmt::Debug;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::{InvariantLevel, assert_expected_schema};
use log::{debug, warn};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err};
use datafusion_expr::logical_plan::LogicalPlan;
use crate::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::decorrelate_lateral_join::DecorrelateLateralJoin;
use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
use crate::eliminate_cross_join::EliminateCrossJoin;
use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_group_by_constant::EliminateGroupByConstant;
use crate::eliminate_join::EliminateJoin;
use crate::eliminate_limit::EliminateLimit;
use crate::eliminate_outer_join::EliminateOuterJoin;
use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::optimize_projections::OptimizeProjections;
use crate::optimize_unions::OptimizeUnions;
use crate::plan_signature::LogicalPlanSignature;
use crate::propagate_empty_relation::PropagateEmptyRelation;
use crate::push_down_filter::PushDownFilter;
use crate::push_down_limit::PushDownLimit;
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
use crate::rewrite_set_comparison::RewriteSetComparison;
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
use crate::simplify_expressions::SimplifyExpressions;
use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::utils::log_plan;
/// `OptimizerRule`s transforms one [`LogicalPlan`] into another which
/// computes the same results, but in a potentially more efficient
/// way. If there are no suitable transformations for the input plan,
/// the optimizer should simply return it unmodified.
///
/// To change the semantics of a `LogicalPlan`, see [`AnalyzerRule`]
///
/// Use [`SessionState::add_optimizer_rule`] to register additional
/// `OptimizerRule`s.
///
/// [`AnalyzerRule`]: crate::analyzer::AnalyzerRule
/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_optimizer_rule
pub trait OptimizerRule: Debug {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
/// How should the rule be applied by the optimizer? See comments on
/// [`ApplyOrder`] for details.
///
/// If returns `None`, the default, the rule must handle recursion itself
fn apply_order(&self) -> Option<ApplyOrder> {
None
}
/// Does this rule support rewriting owned plans (rather than by reference)?
#[deprecated(since = "47.0.0", note = "This method is no longer used")]
fn supports_rewrite(&self) -> bool {
true
}
/// Try to rewrite `plan` to an optimized form, returning `Transformed::yes`
/// if the plan was rewritten and `Transformed::no` if it was not.
fn rewrite(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
internal_err!("rewrite is not implemented for {}", self.name())
}
}
/// Options to control the DataFusion Optimizer.
pub trait OptimizerConfig {
/// Return the time at which the query execution started. This
/// time is used as the value for `now()`. If `None`, time-dependent
/// functions like `now()` will not be simplified during optimization.
fn query_execution_start_time(&self) -> Option<DateTime<Utc>>;
/// Return alias generator used to generate unique aliases for subqueries
fn alias_generator(&self) -> &Arc<AliasGenerator>;
fn options(&self) -> Arc<ConfigOptions>;
fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
None
}
}
/// A standalone [`OptimizerConfig`] that can be used independently
/// of DataFusion's config management
#[derive(Debug)]
pub struct OptimizerContext {
/// Query execution start time that can be used to rewrite
/// expressions such as `now()` to use a literal value instead.
/// If `None`, time-dependent functions will not be simplified.
query_execution_start_time: Option<DateTime<Utc>>,
/// Alias generator used to generate unique aliases for subqueries
alias_generator: Arc<AliasGenerator>,
options: Arc<ConfigOptions>,
}
impl OptimizerContext {
/// Create optimizer config
pub fn new() -> Self {
let mut options = ConfigOptions::default();
options.optimizer.filter_null_join_keys = true;
Self::new_with_config_options(Arc::new(options))
}
/// Create a optimizer config with provided [ConfigOptions].
pub fn new_with_config_options(options: Arc<ConfigOptions>) -> Self {
Self {
query_execution_start_time: Some(Utc::now()),
alias_generator: Arc::new(AliasGenerator::new()),
options,
}
}
/// Specify whether to enable the filter_null_keys rule
pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
Arc::make_mut(&mut self.options)
.optimizer
.filter_null_join_keys = filter_null_keys;
self
}
/// Set the query execution start time
pub fn with_query_execution_start_time(
mut self,
query_execution_start_time: DateTime<Utc>,
) -> Self {
self.query_execution_start_time = Some(query_execution_start_time);
self
}
/// Clear the query execution start time. When `None`, time-dependent
/// functions like `now()` will not be simplified during optimization.
pub fn without_query_execution_start_time(mut self) -> Self {
self.query_execution_start_time = None;
self
}
/// Specify whether the optimizer should skip rules that produce
/// errors, or fail the query
pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
Arc::make_mut(&mut self.options).optimizer.skip_failed_rules = b;
self
}
/// Specify how many times to attempt to optimize the plan
pub fn with_max_passes(mut self, v: u8) -> Self {
Arc::make_mut(&mut self.options).optimizer.max_passes = v as usize;
self
}
}
impl Default for OptimizerContext {
/// Create optimizer config
fn default() -> Self {
Self::new()
}
}
impl OptimizerConfig for OptimizerContext {
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
self.query_execution_start_time
}
fn alias_generator(&self) -> &Arc<AliasGenerator> {
&self.alias_generator
}
fn options(&self) -> Arc<ConfigOptions> {
Arc::clone(&self.options)
}
}
/// A rule-based optimizer.
#[derive(Clone, Debug)]
pub struct Optimizer {
/// All optimizer rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
/// Specifies how recursion for an `OptimizerRule` should be handled.
///
/// * `Some(apply_order)`: The Optimizer will recursively apply the rule to the plan.
/// * `None`: the rule must handle any required recursion itself.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ApplyOrder {
/// Apply the rule to the node before its inputs
TopDown,
/// Apply the rule to the node after its inputs
BottomUp,
}
impl Default for Optimizer {
fn default() -> Self {
Self::new()
}
}
impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new() -> Self {
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(RewriteSetComparison::new()),
Arc::new(OptimizeUnions::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(ReplaceDistinctWithAggregate::new()),
Arc::new(EliminateJoin::new()),
Arc::new(DecorrelatePredicateSubquery::new()),
Arc::new(ScalarSubqueryToJoin::new()),
Arc::new(DecorrelateLateralJoin::new()),
Arc::new(ExtractEquijoinPredicate::new()),
Arc::new(EliminateDuplicatedExpr::new()),
Arc::new(EliminateFilter::new()),
Arc::new(EliminateCrossJoin::new()),
Arc::new(EliminateLimit::new()),
Arc::new(PropagateEmptyRelation::new()),
Arc::new(FilterNullJoinKeys::default()),
Arc::new(EliminateOuterJoin::new()),
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
Arc::new(PushDownLimit::new()),
Arc::new(PushDownFilter::new()),
Arc::new(SingleDistinctToGroupBy::new()),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(EliminateGroupByConstant::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(OptimizeProjections::new()),
];
Self::with_rules(rules)
}
/// Create a new optimizer with the given rules
pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}
}
/// Recursively rewrites LogicalPlans
struct Rewriter<'a> {
apply_order: ApplyOrder,
rule: &'a dyn OptimizerRule,
config: &'a dyn OptimizerConfig,
}
impl<'a> Rewriter<'a> {
fn new(
apply_order: ApplyOrder,
rule: &'a dyn OptimizerRule,
config: &'a dyn OptimizerConfig,
) -> Self {
Self {
apply_order,
rule,
config,
}
}
}
impl TreeNodeRewriter for Rewriter<'_> {
type Node = LogicalPlan;
fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
if self.apply_order == ApplyOrder::TopDown {
self.rule.rewrite(node, self.config)
} else {
Ok(Transformed::no(node))
}
}
fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
if self.apply_order == ApplyOrder::BottomUp {
self.rule.rewrite(node, self.config)
} else {
Ok(Transformed::no(node))
}
}
}
impl Optimizer {
/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
pub fn optimize<F>(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
// verify LP is valid, before the first LP optimizer pass.
plan.check_invariants(InvariantLevel::Executable)
.map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
let start_time = Instant::now();
let options = config.options();
let mut new_plan = plan;
let mut previous_plans = HashSet::with_capacity(16);
previous_plans.insert(LogicalPlanSignature::new(&new_plan));
let starting_schema = Arc::clone(new_plan.schema());
let mut i = 0;
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
for rule in &self.rules {
// If skipping failed rules, copy plan before attempting to rewrite
// as rewriting is destructive
let prev_plan = options
.optimizer
.skip_failed_rules
.then(|| new_plan.clone());
let starting_schema = Arc::clone(new_plan.schema());
let result = match rule.apply_order() {
// optimizer handles recursion
Some(apply_order) => new_plan.rewrite_with_subqueries(
&mut Rewriter::new(apply_order, rule.as_ref(), config),
),
// rule handles recursion itself
None => {
rule.rewrite(new_plan, config)
},
}
.and_then(|tnr| {
// run checks optimizer invariant checks, per optimizer rule applied
assert_valid_optimization(&tnr.data, &starting_schema)
.map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
// run LP invariant checks only in debug mode for performance reasons
#[cfg(debug_assertions)]
tnr.data.check_invariants(InvariantLevel::Executable)
.map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
Ok(tnr)
});
// Handle results
match (result, prev_plan) {
// OptimizerRule was successful
(
Ok(Transformed {
data, transformed, ..
}),
_,
) => {
new_plan = data;
observer(&new_plan, rule.as_ref());
if transformed {
log_plan(rule.name(), &new_plan);
} else {
debug!(
"Plan unchanged by optimizer rule '{}' (pass {})",
rule.name(),
i
);
}
}
// OptimizerRule was unsuccessful, but skipped failed rules is on
// so use the previous plan
(Err(e), Some(orig_plan)) => {
// Note to future readers: if you see this warning it signals a
// bug in the DataFusion optimizer. Please consider filing a ticket
// https://github.com/apache/datafusion
warn!(
"Skipping optimizer rule '{}' due to unexpected error: {}",
rule.name(),
e
);
new_plan = orig_plan;
}
// OptimizerRule was unsuccessful, but skipped failed rules is off, return error
(Err(e), None) => {
return Err(e.context(format!(
"Optimizer rule '{}' failed",
rule.name()
)));
}
}
}
log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
// HashSet::insert returns, whether the value was newly inserted.
let plan_is_fresh =
previous_plans.insert(LogicalPlanSignature::new(&new_plan));
if !plan_is_fresh {
// plan did not change, so no need to continue trying to optimize
debug!("optimizer pass {i} did not make changes");
break;
}
i += 1;
}
// verify that the optimizer passes only mutated what was permitted.
assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
e.context("Check optimizer-specific invariants after all passes")
})?;
// verify LP is valid, after the last optimizer pass.
new_plan
.check_invariants(InvariantLevel::Executable)
.map_err(|e| {
e.context("Invalid (non-executable) plan after LP Optimizers")
})?;
log_plan("Final optimized plan", &new_plan);
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}
/// These are invariants which should hold true before and after [`LogicalPlan`] optimization.
///
/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular
/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes.
fn assert_valid_optimization(
plan: &LogicalPlan,
prev_schema: &Arc<DFSchema>,
) -> Result<()> {
// verify invariant: optimizer passes should not change the schema if the schema can't be cast from the previous schema.
// Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
assert_expected_schema(prev_schema, plan)?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{
DFSchema, DFSchemaRef, DataFusionError, Result, assert_contains, plan_err,
};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, Projection, col, lit};
use crate::optimizer::Optimizer;
use crate::test::test_table_scan;
use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
use super::ApplyOrder;
#[test]
fn skip_failing_rule() {
let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
let config = OptimizerContext::new().with_skip_failing_rules(true);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
opt.optimize(plan, &config, &observe).unwrap();
}
#[test]
fn no_skip_failing_rule() {
let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
let config = OptimizerContext::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
let err = opt.optimize(plan, &config, &observe).unwrap_err();
assert_eq!(
"Optimizer rule 'bad rule' failed\ncaused by\n\
Error during planning: rule failed",
err.strip_backtrace()
);
}
#[test]
fn generate_different_schema() {
let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
let config = OptimizerContext::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
let err = opt.optimize(plan, &config, &observe).unwrap_err();
// Simplify assert to check the error message contains the expected message
assert_contains!(
err.strip_backtrace(),
"Failed due to a difference in schemas: original schema: DFSchema"
);
}
#[test]
fn skip_generate_different_schema() {
let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
let config = OptimizerContext::new().with_skip_failing_rules(true);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
opt.optimize(plan, &config, &observe).unwrap();
}
#[test]
fn generate_same_schema_different_metadata() -> Result<()> {
// if the plan creates more metadata than previously (because
// some wrapping functions are removed, etc) do not error
let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
let config = OptimizerContext::new().with_skip_failing_rules(false);
let input = Arc::new(test_table_scan()?);
let input_schema = Arc::clone(input.schema());
let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
vec![col("a"), col("b"), col("c")],
input,
add_metadata_to_fields(input_schema.as_ref()),
)?);
// optimizing should be ok, but the schema will have changed (no metadata)
assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
let optimized_plan = opt.optimize(plan, &config, &observe)?;
// metadata was removed
assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
Ok(())
}
#[test]
fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
// Run a goofy optimizer, which rotates projection columns
// [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
let config = OptimizerContext::new().with_max_passes(16);
let initial_plan = LogicalPlanBuilder::empty(false)
.project([lit(1), lit(2), lit(3)])?
.project([lit(100)])? // to not trigger changed schema error
.build()?;
let mut plans: Vec<LogicalPlan> = Vec::new();
let final_plan =
opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
// initial_plan is not observed, so we have 3 plans
assert_eq!(3, plans.len());
// we got again the initial_plan with [1, 2, 3]
assert_eq!(initial_plan, final_plan);
Ok(())
}
#[test]
fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
// Run a goofy optimizer, which reverses and rotates projection columns
// [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
let config = OptimizerContext::new().with_max_passes(16);
let initial_plan = LogicalPlanBuilder::empty(false)
.project([lit(1), lit(2), lit(3)])?
.project([lit(100)])? // to not trigger changed schema error
.build()?;
let mut plans: Vec<LogicalPlan> = Vec::new();
let final_plan =
opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
// initial_plan is not observed, so we have 4 plans
assert_eq!(4, plans.len());
// we got again the plan with [3, 2, 1]
assert_eq!(plans[0], final_plan);
Ok(())
}
fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
let new_fields = schema
.iter()
.enumerate()
.map(|(i, (qualifier, field))| {
let metadata =
[("key".into(), format!("value {i}"))].into_iter().collect();
let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
(qualifier.cloned(), Arc::new(new_arrow_field))
})
.collect::<Vec<_>>();
let new_metadata = schema.metadata().clone();
Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
}
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
#[derive(Default, Debug)]
struct BadRule {}
impl OptimizerRule for BadRule {
fn name(&self) -> &str {
"bad rule"
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
plan_err!("rule failed")
}
}
/// Replaces whatever plan with a single table scan
#[derive(Default, Debug)]
struct GetTableScanRule {}
impl OptimizerRule for GetTableScanRule {
fn name(&self) -> &str {
"get table_scan rule"
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let table_scan = test_table_scan()?;
Ok(Transformed::yes(
LogicalPlanBuilder::from(table_scan).build()?,
))
}
}
/// A goofy rule doing rotation of columns in all projections.
///
/// Useful to test cycle detection.
#[derive(Default, Debug)]
struct RotateProjectionRule {
// reverse exprs instead of rotating on the first pass
reverse_on_first_pass: Mutex<bool>,
}
impl RotateProjectionRule {
fn new(reverse_on_first_pass: bool) -> Self {
Self {
reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
}
}
}
impl OptimizerRule for RotateProjectionRule {
fn name(&self) -> &str {
"rotate_projection"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let projection = match plan {
LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
_ => return Ok(Transformed::no(plan)),
};
let mut exprs = projection.expr.clone();
let mut reverse = self.reverse_on_first_pass.lock().unwrap();
if *reverse {
exprs.reverse();
*reverse = false;
} else {
exprs.rotate_left(1);
}
Ok(Transformed::yes(LogicalPlan::Projection(
Projection::try_new(exprs, Arc::clone(&projection.input))?,
)))
}
}
}