blob: 1b7bb856a592b6a099db8ba5559bf53c10e9509f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::stack::StackGuard;
use datafusion_common::{Constraints, DFSchema, Result, not_impl_err};
use datafusion_expr::expr::{Sort, WildcardOptions};
use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows,
OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
SetOperator, SetQuantifier, TableAlias,
};
use sqlparser::tokenizer::Span;
impl<S: ContextProvider> SqlToRel<'_, S> {
/// Generate a logical plan from an SQL query/subquery
pub(crate) fn query_to_plan(
&self,
query: Query,
outer_planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
// Each query has its own planner context, including CTEs that are visible within that query.
// It also inherits the CTEs from the outer query by cloning the outer planner context.
let mut query_plan_context = outer_planner_context.clone();
let planner_context = &mut query_plan_context;
let Query {
with,
body,
order_by,
limit_clause,
fetch,
locks: _,
for_clause: _,
settings: _,
format_clause: _,
pipe_operators,
} = query;
if fetch.is_some() {
return not_impl_err!("FETCH clause is not supported yet");
}
if let Some(with) = with {
self.plan_with_clause(with, planner_context)?;
}
let set_expr = *body;
let plan = match set_expr {
SetExpr::Select(mut select) => {
let select_into = select.into.take();
let plan =
self.select_to_plan(*select, order_by.clone(), planner_context)?;
let plan = self.limit(plan, limit_clause.clone(), planner_context)?;
// Process the `SELECT INTO` after `LIMIT`.
self.select_into(plan, select_into)
}
other => {
// The functions called from `set_expr_to_plan()` need more than 128KB
// stack in debug builds as investigated in:
// https://github.com/apache/datafusion/pull/13310#discussion_r1836813902
let plan = {
// scope for dropping _guard
let _guard = StackGuard::new(256 * 1024);
self.set_expr_to_plan(other, planner_context)
}?;
let oby_exprs = to_order_by_exprs(order_by)?;
let order_by_rex = self.order_by_to_sort_expr(
oby_exprs,
plan.schema(),
planner_context,
true,
None,
)?;
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, limit_clause, planner_context)
}
}?;
self.pipe_operators(plan, pipe_operators, planner_context)
}
/// Apply pipe operators to a plan
fn pipe_operators(
&self,
mut plan: LogicalPlan,
pipe_operators: Vec<PipeOperator>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
for pipe_operator in pipe_operators {
plan = self.pipe_operator(plan, pipe_operator, planner_context)?;
}
Ok(plan)
}
/// Apply a pipe operator to a plan
fn pipe_operator(
&self,
plan: LogicalPlan,
pipe_operator: PipeOperator,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match pipe_operator {
PipeOperator::Where { expr } => {
self.plan_selection(Some(expr), plan, planner_context)
}
PipeOperator::OrderBy { exprs } => {
let sort_exprs = self.order_by_to_sort_expr(
exprs,
plan.schema(),
planner_context,
true,
None,
)?;
self.order_by(plan, sort_exprs)
}
PipeOperator::Limit { expr, offset } => self.limit(
plan,
Some(LimitClause::LimitOffset {
limit: Some(expr),
offset: offset.map(|offset| Offset {
value: offset,
rows: OffsetRows::None,
}),
limit_by: vec![],
}),
planner_context,
),
PipeOperator::Select { exprs } => {
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
let select_exprs =
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
self.project(plan, select_exprs)
}
PipeOperator::Extend { exprs } => {
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
let extend_exprs =
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
let all_exprs =
std::iter::once(SelectExpr::Wildcard(WildcardOptions::default()))
.chain(extend_exprs)
.collect();
self.project(plan, all_exprs)
}
PipeOperator::As { alias } => self.apply_table_alias(
plan,
TableAlias {
name: alias,
// Apply to all fields
columns: vec![],
explicit: true,
},
),
PipeOperator::Union {
set_quantifier,
queries,
} => self.pipe_operator_set(
plan,
SetOperator::Union,
set_quantifier,
queries,
planner_context,
),
PipeOperator::Intersect {
set_quantifier,
queries,
} => self.pipe_operator_set(
plan,
SetOperator::Intersect,
set_quantifier,
queries,
planner_context,
),
PipeOperator::Except {
set_quantifier,
queries,
} => self.pipe_operator_set(
plan,
SetOperator::Except,
set_quantifier,
queries,
planner_context,
),
PipeOperator::Aggregate {
full_table_exprs,
group_by_expr,
} => self.pipe_operator_aggregate(
plan,
full_table_exprs,
group_by_expr,
planner_context,
),
PipeOperator::Join(join) => {
self.parse_relation_join(plan, join, planner_context)
}
x => not_impl_err!("`{x}` pipe operator is not supported yet"),
}
}
/// Handle Union/Intersect/Except pipe operators
fn pipe_operator_set(
&self,
mut plan: LogicalPlan,
set_operator: SetOperator,
set_quantifier: SetQuantifier,
queries: Vec<Query>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
for query in queries {
let right_plan = self.query_to_plan(query, planner_context)?;
plan = self.set_operation_to_plan(
set_operator,
plan,
right_plan,
set_quantifier,
)?;
}
Ok(plan)
}
/// Wrap a plan in a limit
fn limit(
&self,
input: LogicalPlan,
limit_clause: Option<LimitClause>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let Some(limit_clause) = limit_clause else {
return Ok(input);
};
let empty_schema = DFSchema::empty();
let (skip, fetch, limit_by_exprs) = match limit_clause {
LimitClause::LimitOffset {
limit,
offset,
limit_by,
} => {
let skip = offset
.map(|o| self.sql_to_expr(o.value, &empty_schema, planner_context))
.transpose()?;
let fetch = limit
.map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
.transpose()?;
let limit_by_exprs = limit_by
.into_iter()
.map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
.collect::<Result<Vec<_>>>()?;
(skip, fetch, limit_by_exprs)
}
LimitClause::OffsetCommaLimit { offset, limit } => {
let skip =
Some(self.sql_to_expr(offset, &empty_schema, planner_context)?);
let fetch =
Some(self.sql_to_expr(limit, &empty_schema, planner_context)?);
(skip, fetch, vec![])
}
};
if !limit_by_exprs.is_empty() {
return not_impl_err!("LIMIT BY clause is not supported yet");
}
if skip.is_none() && fetch.is_none() {
return Ok(input);
}
LogicalPlanBuilder::from(input)
.limit_by_expr(skip, fetch)?
.build()
}
/// Wrap the logical in a sort
pub(super) fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<Sort>,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}
if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
// In case of `DISTINCT ON` we must capture the sort expressions since during the plan
// optimization we're effectively doing a `first_value` aggregation according to them.
let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
} else {
LogicalPlanBuilder::from(plan).sort(order_by)?.build()
}
}
/// Handle AGGREGATE pipe operator
fn pipe_operator_aggregate(
&self,
plan: LogicalPlan,
full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
group_by_expr: Vec<ExprWithAliasAndOrderBy>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let plan_schema = plan.schema();
let process_expr =
|expr_with_alias_and_order_by: ExprWithAliasAndOrderBy,
planner_context: &mut PlannerContext| {
let expr_with_alias = expr_with_alias_and_order_by.expr;
let sql_expr = expr_with_alias.expr;
let alias = expr_with_alias.alias;
let df_expr = self.sql_to_expr(sql_expr, plan_schema, planner_context)?;
match alias {
Some(alias_ident) => df_expr.alias_if_changed(alias_ident.value),
None => Ok(df_expr),
}
};
let aggr_exprs: Vec<Expr> = full_table_exprs
.into_iter()
.map(|e| process_expr(e, planner_context))
.collect::<Result<Vec<_>>>()?;
let group_by_exprs: Vec<Expr> = group_by_expr
.into_iter()
.map(|e| process_expr(e, planner_context))
.collect::<Result<Vec<_>>>()?;
LogicalPlanBuilder::from(plan)
.aggregate(group_by_exprs, aggr_exprs)?
.build()
}
/// Wrap the logical plan in a `SelectInto`
fn select_into(
&self,
plan: LogicalPlan,
select_into: Option<SelectInto>,
) -> Result<LogicalPlan> {
match select_into {
Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::default(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
temporary: false,
column_defaults: vec![],
},
))),
_ => Ok(plan),
}
}
}
/// Returns the order by expressions from the query.
fn to_order_by_exprs(order_by: Option<OrderBy>) -> Result<Vec<OrderByExpr>> {
to_order_by_exprs_with_select(order_by, None)
}
/// Returns the order by expressions from the query with the select expressions.
pub(crate) fn to_order_by_exprs_with_select(
order_by: Option<OrderBy>,
select_exprs: Option<&Vec<Expr>>,
) -> Result<Vec<OrderByExpr>> {
let Some(OrderBy { kind, interpolate }) = order_by else {
// If no order by, return an empty array.
return Ok(vec![]);
};
if let Some(_interpolate) = interpolate {
return not_impl_err!("ORDER BY INTERPOLATE is not supported");
}
match kind {
OrderByKind::All(order_by_options) => {
let Some(exprs) = select_exprs else {
return Ok(vec![]);
};
let order_by_exprs = exprs
.iter()
.map(|select_expr| match select_expr {
Expr::Column(column) => Ok(OrderByExpr {
expr: SQLExpr::Identifier(Ident {
value: column.name.clone(),
quote_style: None,
span: Span::empty(),
}),
options: order_by_options,
with_fill: None,
}),
// TODO: Support other types of expressions
_ => not_impl_err!(
"ORDER BY ALL is not supported for non-column expressions"
),
})
.collect::<Result<Vec<_>>>()?;
Ok(order_by_exprs)
}
OrderByKind::Expressions(order_by_exprs) => Ok(order_by_exprs),
}
}