blob: c83b639c232e8852cdedaabea88047f9d9db5022 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Physical query planner
use std::sync::Arc;
use super::{aggregates, empty::EmptyExec, expressions::binary, functions, udaf};
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType,
StringifiedPlan, UserDefinedLogicalNode,
};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::{expressions, Distribution};
use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::variable::VarType;
use arrow::compute::can_cast_types;
use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use expressions::col;
/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
pub trait ExtensionPlanner {
/// Create a physical plan for a [`UserDefinedLogicalNode`].
/// This errors when the planner knows how to plan the concrete implementation of `node`
/// but errors while doing so, and `None` when the planner does not know how to plan the `node`
/// and wants to delegate the planning to another [`ExtensionPlanner`].
fn plan_extension(
&self,
node: &dyn UserDefinedLogicalNode,
inputs: &[Arc<dyn ExecutionPlan>],
ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
/// Default single node physical query planner that converts a
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
pub struct DefaultPhysicalPlanner {
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}
impl Default for DefaultPhysicalPlanner {
fn default() -> Self {
Self {
extension_planners: vec![],
}
}
}
impl PhysicalPlanner for DefaultPhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
self.optimize_plan(plan, ctx_state)
}
}
impl DefaultPhysicalPlanner {
/// Create a physical planner that uses `extension_planners` to
/// plan user-defined logical nodes [`LogicalPlan::Extension`].
/// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
/// plan.
pub fn with_extension_planners(
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
) -> Self {
Self { extension_planners }
}
/// Create a physical plan from a logical plan
fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let children = plan
.children()
.iter()
.map(|child| self.optimize_plan(child.clone(), ctx_state))
.collect::<Result<Vec<_>>>()?;
if children.is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
// wrap operators in CoalesceBatches to avoid lots of tiny batches when we have
// highly selective filters
let plan_any = plan.as_any();
//TODO we should do this in a more generic way either by wrapping all operators
// or having an API so that operators can declare when their inputs or outputs
// need to be wrapped in a coalesce batches operator.
// See https://issues.apache.org/jira/browse/ARROW-11068
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
//TODO we should also do this for HashAggregateExec but we need to update tests
// as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068
// || plan_any.downcast_ref::<HashAggregateExec>().is_some();
let plan = if wrap_in_coalesce {
//TODO we should add specific configuration settings for coalescing batches and
// we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
// implemented. For now, we choose half the configured batch size to avoid copies
// when a small number of rows are removed from a batch
let target_batch_size = ctx_state.config.batch_size / 2;
Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
} else {
plan.clone()
};
let children = plan.children().clone();
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
Distribution::SinglePartition => plan.with_new_children(
children
.iter()
.map(|child| {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(MergeExec::new(child.clone()))
}
})
.collect(),
),
}
}
}
/// Create a physical plan from a logical plan
fn create_initial_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let batch_size = ctx_state.config.batch_size;
match logical_plan {
LogicalPlan::TableScan {
source,
projection,
filters,
..
} => source.scan(projection, batch_size, filters),
LogicalPlan::Aggregate {
input,
group_expr,
aggr_expr,
..
} => {
// Initially need to perform the aggregate and then merge the partitions
let input_exec = self.create_physical_plan(input, ctx_state)?;
let input_schema = input_exec.schema();
let physical_input_schema = input_exec.as_ref().schema();
let logical_input_schema = input.as_ref().schema();
let groups = group_expr
.iter()
.map(|e| {
tuple_err((
self.create_physical_expr(
e,
&physical_input_schema,
ctx_state,
),
e.name(&logical_input_schema),
))
})
.collect::<Result<Vec<_>>>()?;
let aggregates = aggr_expr
.iter()
.map(|e| {
self.create_aggregate_expr(
e,
&logical_input_schema,
&physical_input_schema,
ctx_state,
)
})
.collect::<Result<Vec<_>>>()?;
let initial_aggr = Arc::new(HashAggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
input_exec,
input_schema.clone(),
)?);
let final_group: Vec<Arc<dyn PhysicalExpr>> =
(0..groups.len()).map(|i| col(&groups[i].1)).collect();
// construct a second aggregation, keeping the final column name equal to the first aggregation
// and the expressions corresponding to the respective aggregate
Ok(Arc::new(HashAggregateExec::try_new(
AggregateMode::Final,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
initial_aggr,
input_schema,
)?))
}
LogicalPlan::Projection { input, expr, .. } => {
let input_exec = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
let runtime_expr = expr
.iter()
.map(|e| {
tuple_err((
self.create_physical_expr(
e,
&input_exec.schema(),
&ctx_state,
),
e.name(&input_schema),
))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input_exec)?))
}
LogicalPlan::Filter {
input, predicate, ..
} => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
let runtime_expr =
self.create_physical_expr(predicate, &input_schema, ctx_state)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?))
}
LogicalPlan::Repartition {
input,
partitioning_scheme,
} => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.schema();
let physical_partitioning = match partitioning_scheme {
LogicalPartitioning::RoundRobinBatch(n) => {
Partitioning::RoundRobinBatch(*n)
}
LogicalPartitioning::Hash(expr, n) => {
let runtime_expr = expr
.iter()
.map(|e| {
self.create_physical_expr(e, &input_schema, &ctx_state)
})
.collect::<Result<Vec<_>>>()?;
Partitioning::Hash(runtime_expr, *n)
}
};
Ok(Arc::new(RepartitionExec::try_new(
input,
physical_partitioning,
)?))
}
LogicalPlan::Sort { expr, input, .. } => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
let sort_expr = expr
.iter()
.map(|e| match e {
Expr::Sort {
expr,
asc,
nulls_first,
} => self.create_physical_sort_expr(
expr,
&input_schema,
SortOptions {
descending: !*asc,
nulls_first: *nulls_first,
},
ctx_state,
),
_ => Err(DataFusionError::Plan(
"Sort only accepts sort expressions".to_string(),
)),
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(SortExec::try_new(sort_expr, input)?))
}
LogicalPlan::Join {
left,
right,
on: keys,
join_type,
..
} => {
let left = self.create_physical_plan(left, ctx_state)?;
let right = self.create_physical_plan(right, ctx_state)?;
let physical_join_type = match join_type {
JoinType::Inner => hash_utils::JoinType::Inner,
JoinType::Left => hash_utils::JoinType::Left,
JoinType::Right => hash_utils::JoinType::Right,
};
Ok(Arc::new(HashJoinExec::try_new(
left,
right,
&keys,
&physical_join_type,
)?))
}
LogicalPlan::EmptyRelation {
produce_one_row,
schema,
} => Ok(Arc::new(EmptyExec::new(
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::Limit { input, n, .. } => {
let limit = *n;
let input = self.create_physical_plan(input, ctx_state)?;
// GlobalLimitExec requires a single partition for input
let input = if input.output_partitioning().partition_count() == 1 {
input
} else {
// Apply a LocalLimitExec to each partition. The optimizer will also insert
// a MergeExec between the GlobalLimitExec and LocalLimitExec
Arc::new(LocalLimitExec::new(input, limit))
};
Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
LogicalPlan::CreateExternalTable { .. } => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
// the context)
Err(DataFusionError::Internal(
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} => {
let input = self.create_physical_plan(plan, ctx_state)?;
let mut stringified_plans = stringified_plans
.iter()
.filter(|s| s.should_display(*verbose))
.cloned()
.collect::<Vec<_>>();
// add in the physical plan if requested
if *verbose {
stringified_plans.push(StringifiedPlan::new(
PlanType::PhysicalPlan,
format!("{:#?}", input),
));
}
Ok(Arc::new(ExplainExec::new(
SchemaRef::new(schema.as_ref().to_owned().into()),
stringified_plans,
)))
}
LogicalPlan::Extension { node } => {
let inputs = node
.inputs()
.into_iter()
.map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
.collect::<Result<Vec<_>>>()?;
let maybe_plan = self.extension_planners.iter().try_fold(
None,
|maybe_plan, planner| {
if let Some(plan) = maybe_plan {
Ok(Some(plan))
} else {
planner.plan_extension(node.as_ref(), &inputs, ctx_state)
}
},
)?;
let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!(
"No installed planner was able to convert the custom node to an execution plan: {:?}", node
)))?;
// Ensure the ExecutionPlan's schema matches the
// declared logical schema to catch and warn about
// logic errors when creating user defined plans.
if plan.schema() != node.schema().as_ref().to_owned().into() {
Err(DataFusionError::Plan(format!(
"Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
node, node.schema(), plan.schema()
)))
} else {
Ok(plan)
}
}
}
}
/// Create a physical expression from a logical expression
pub fn create_physical_expr(
&self,
e: &Expr,
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>> {
match e {
Expr::Alias(expr, ..) => {
Ok(self.create_physical_expr(expr, input_schema, ctx_state)?)
}
Expr::Column(name) => {
// check that name exists
input_schema.field_with_name(&name)?;
Ok(Arc::new(Column::new(name)))
}
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::ScalarVariable(variable_names) => {
if &variable_names[0][0..2] == "@@" {
match ctx_state.var_provider.get(&VarType::System) {
Some(provider) => {
let scalar_value =
provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => Err(DataFusionError::Plan(
"No system variable provider found".to_string(),
)),
}
} else {
match ctx_state.var_provider.get(&VarType::UserDefined) {
Some(provider) => {
let scalar_value =
provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => Err(DataFusionError::Plan(
"No user defined variable provider found".to_string(),
)),
}
}
}
Expr::BinaryExpr { left, op, right } => {
let lhs = self.create_physical_expr(left, input_schema, ctx_state)?;
let rhs = self.create_physical_expr(right, input_schema, ctx_state)?;
binary(lhs, *op, rhs, input_schema)
}
Expr::Case {
expr,
when_then_expr,
else_expr,
..
} => {
let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = expr {
Some(self.create_physical_expr(
e.as_ref(),
input_schema,
ctx_state,
)?)
} else {
None
};
let when_expr = when_then_expr
.iter()
.map(|(w, _)| {
self.create_physical_expr(w.as_ref(), input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let then_expr = when_then_expr
.iter()
.map(|(_, t)| {
self.create_physical_expr(t.as_ref(), input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
when_expr
.iter()
.zip(then_expr.iter())
.map(|(w, t)| (w.clone(), t.clone()))
.collect();
let else_expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = else_expr
{
Some(self.create_physical_expr(
e.as_ref(),
input_schema,
ctx_state,
)?)
} else {
None
};
Ok(Arc::new(CaseExpr::try_new(
expr,
&when_then_expr,
else_expr,
)?))
}
Expr::Cast { expr, data_type } => expressions::cast(
self.create_physical_expr(expr, input_schema, ctx_state)?,
input_schema,
data_type.clone(),
),
Expr::Not(expr) => expressions::not(
self.create_physical_expr(expr, input_schema, ctx_state)?,
input_schema,
),
Expr::Negative(expr) => expressions::negative(
self.create_physical_expr(expr, input_schema, ctx_state)?,
input_schema,
),
Expr::IsNull(expr) => expressions::is_null(self.create_physical_expr(
expr,
input_schema,
ctx_state,
)?),
Expr::IsNotNull(expr) => expressions::is_not_null(
self.create_physical_expr(expr, input_schema, ctx_state)?,
),
Expr::ScalarFunction { fun, args } => {
let physical_args = args
.iter()
.map(|e| self.create_physical_expr(e, input_schema, ctx_state))
.collect::<Result<Vec<_>>>()?;
functions::create_physical_expr(fun, &physical_args, input_schema)
}
Expr::ScalarUDF { fun, args } => {
let mut physical_args = vec![];
for e in args {
physical_args.push(self.create_physical_expr(
e,
input_schema,
ctx_state,
)?);
}
udf::create_physical_expr(
fun.clone().as_ref(),
&physical_args,
input_schema,
)
}
Expr::Between {
expr,
negated,
low,
high,
} => {
let value_expr =
self.create_physical_expr(expr, input_schema, ctx_state)?;
let low_expr = self.create_physical_expr(low, input_schema, ctx_state)?;
let high_expr =
self.create_physical_expr(high, input_schema, ctx_state)?;
// rewrite the between into the two binary operators
let binary_expr = binary(
binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?,
Operator::And,
binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?,
input_schema,
);
if *negated {
expressions::not(binary_expr?, input_schema)
} else {
binary_expr
}
}
Expr::InList {
expr,
list,
negated,
} => match expr.as_ref() {
Expr::Literal(ScalarValue::Utf8(None)) => {
Ok(expressions::lit(ScalarValue::Boolean(None)))
}
_ => {
let value_expr =
self.create_physical_expr(expr, input_schema, ctx_state)?;
let value_expr_data_type = value_expr.data_type(input_schema)?;
let list_exprs =
list.iter()
.map(|expr| match expr {
Expr::Literal(ScalarValue::Utf8(None)) => self
.create_physical_expr(expr, input_schema, ctx_state),
_ => {
let list_expr = self.create_physical_expr(
expr,
input_schema,
ctx_state,
)?;
let list_expr_data_type =
list_expr.data_type(input_schema)?;
if list_expr_data_type == value_expr_data_type {
Ok(list_expr)
} else if can_cast_types(
&list_expr_data_type,
&value_expr_data_type,
) {
expressions::cast(
list_expr,
input_schema,
value_expr.data_type(input_schema)?,
)
} else {
Err(DataFusionError::Plan(format!(
"Unsupported CAST from {:?} to {:?}",
list_expr_data_type, value_expr_data_type
)))
}
}
})
.collect::<Result<Vec<_>>>()?;
expressions::in_list(value_expr, list_exprs, negated)
}
},
other => Err(DataFusionError::NotImplemented(format!(
"Physical plan does not support logical expression {:?}",
other
))),
}
}
/// Create an aggregate expression from a logical expression
pub fn create_aggregate_expr(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};
match e {
Expr::AggregateFunction {
fun,
distinct,
args,
..
} => {
let args = args
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
aggregates::create_aggregate_expr(
fun,
*distinct,
&args,
physical_input_schema,
name,
)
}
Expr::AggregateUDF { fun, args, .. } => {
let args = args
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
udaf::create_aggregate_expr(fun, &args, physical_input_schema, name)
}
other => Err(DataFusionError::Internal(format!(
"Invalid aggregate expression '{:?}'",
other
))),
}
}
/// Create an aggregate expression from a logical expression
pub fn create_physical_sort_expr(
&self,
e: &Expr,
input_schema: &Schema,
options: SortOptions,
ctx_state: &ExecutionContextState,
) -> Result<PhysicalSortExpr> {
Ok(PhysicalSortExpr {
expr: self.create_physical_expr(e, input_schema, ctx_state)?,
options,
})
}
}
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
match value {
(Ok(e), Ok(e1)) => Ok((e, e1)),
(Err(e), Ok(_)) => Err(e),
(Ok(_), Err(e1)) => Err(e1),
(Err(e), Err(_)) => Err(e),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
use crate::prelude::ExecutionConfig;
use crate::scalar::ScalarValue;
use crate::{
logical_plan::{col, lit, sum, LogicalPlanBuilder},
physical_plan::SendableRecordBatchStream,
};
use arrow::datatypes::{DataType, Field, SchemaRef};
use async_trait::async_trait;
use fmt::Debug;
use std::{any::Any, collections::HashMap, fmt};
fn make_ctx_state() -> ExecutionContextState {
ExecutionContextState {
datasources: HashMap::new(),
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
}
}
fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let ctx_state = make_ctx_state();
let planner = DefaultPhysicalPlanner::default();
planner.create_physical_plan(logical_plan, &ctx_state)
}
#[test]
fn test_all_operators() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
// filter clause needs the type coercion rule applied
.filter(col("c7").lt(lit(5_u8)))?
.project(&[col("c1"), col("c2")])?
.aggregate(&[col("c1")], &[sum(col("c2"))])?
.sort(&[col("c1").sort(true, true)])?
.limit(10)?
.build()?;
let plan = plan(&logical_plan)?;
// verify that the plan correctly casts u8 to i64
let expected = "BinaryExpr { left: Column { name: \"c7\" }, op: Lt, right: CastExpr { expr: Literal { value: UInt8(5) }, cast_type: Int64 } }";
assert!(format!("{:?}", plan).contains(expected));
Ok(())
}
#[test]
fn test_create_not() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
let planner = DefaultPhysicalPlanner::default();
let expr =
planner.create_physical_expr(&col("a").not(), &schema, &make_ctx_state())?;
let expected = expressions::not(expressions::col("a"), &schema)?;
assert_eq!(format!("{:?}", expr), format!("{:?}", expected));
Ok(())
}
#[test]
fn test_with_csv_plan() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
.filter(col("c7").lt(col("c12")))?
.build()?;
let plan = plan(&logical_plan)?;
// c12 is f64, c7 is u8 -> cast c7 to f64
let expected = "predicate: BinaryExpr { left: CastExpr { expr: Column { name: \"c7\" }, cast_type: Float64 }, op: Lt, right: Column { name: \"c12\" } }";
assert!(format!("{:?}", plan).contains(expected));
Ok(())
}
#[test]
fn errors() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let bool_expr = col("c1").eq(col("c1"));
let cases = vec![
// utf8 < u32
col("c1").lt(col("c2")),
// utf8 AND utf8
col("c1").and(col("c1")),
// u8 AND u8
col("c3").and(col("c3")),
// utf8 = u32
col("c1").eq(col("c2")),
// utf8 = bool
col("c1").eq(bool_expr.clone()),
// u32 AND bool
col("c2").and(bool_expr),
// utf8 LIKE u32
col("c1").like(col("c2")),
];
for case in cases {
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
.project(&[case.clone()]);
let message = format!(
"Expression {:?} expected to error due to impossible coercion",
case
);
assert!(logical_plan.is_err(), "{}", message);
}
Ok(())
}
#[test]
fn default_extension_planner() {
let ctx_state = make_ctx_state();
let planner = DefaultPhysicalPlanner::default();
let logical_plan = LogicalPlan::Extension {
node: Arc::new(NoOpExtensionNode::default()),
};
let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
let expected_error = "No installed planner was able to convert the custom node to an execution plan: NoOp";
match plan {
Ok(_) => panic!("Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{}' did not contain expected error '{}'",
e.to_string(),
expected_error
),
}
}
#[test]
fn bad_extension_planner() {
// Test that creating an execution plan whose schema doesn't
// match the logical plan's schema generates an error.
let ctx_state = make_ctx_state();
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
BadExtensionPlanner {},
)]);
let logical_plan = LogicalPlan::Extension {
node: Arc::new(NoOpExtensionNode::default()),
};
let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
let expected_error: &str = "Error during planning: \
Extension planner for NoOp created an ExecutionPlan with mismatched schema. \
LogicalPlan schema: DFSchema { fields: [\
DFField { qualifier: None, field: Field { \
name: \"a\", \
data_type: Int32, \
nullable: false, \
dict_id: 0, \
dict_is_ordered: false, \
metadata: None } }\
] }, \
ExecutionPlan schema: Schema { fields: [\
Field { \
name: \"b\", \
data_type: Int32, \
nullable: false, \
dict_id: 0, \
dict_is_ordered: false, \
metadata: None }\
], metadata: {} }";
match plan {
Ok(_) => panic!("Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{}' did not contain expected error '{}'",
e.to_string(),
expected_error
),
}
}
#[test]
fn in_list_types() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
// expression: "a in ('a', 1)"
let list = vec![
Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
Expr::Literal(ScalarValue::Int64(Some(1))),
];
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
// filter clause needs the type coercion rule applied
.filter(col("c12").lt(lit(0.05)))?
.project(&[col("c1").in_list(list, false)])?
.build()?;
let execution_plan = plan(&logical_plan)?;
// verify that the plan correctly adds cast from Int64(1) to Utf8
let expected = "InListExpr { expr: Column { name: \"c1\" }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8 }], negated: false }";
assert!(format!("{:?}", execution_plan).contains(expected));
// expression: "a in (true, 'a')"
let list = vec![
Expr::Literal(ScalarValue::Boolean(Some(true))),
Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
];
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
// filter clause needs the type coercion rule applied
.filter(col("c12").lt(lit(0.05)))?
.project(&[col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
.build()?;
let execution_plan = plan(&logical_plan);
let expected_error = "Unsupported CAST from Utf8 to Boolean";
match execution_plan {
Ok(_) => panic!("Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{}' did not contain expected error '{}'",
e.to_string(),
expected_error
),
}
Ok(())
}
#[test]
fn hash_agg_input_schema() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
.aggregate(&[col("c1")], &[sum(col("c2"))])?
.build()?;
let execution_plan = plan(&logical_plan)?;
let final_hash_agg = execution_plan
.as_any()
.downcast_ref::<HashAggregateExec>()
.expect("hash aggregate");
assert_eq!("SUM(c2)", final_hash_agg.schema().field(1).name());
// we need access to the input to the partial aggregate so that other projects can
// implement serde
assert_eq!("c2", final_hash_agg.input_schema().field(1).name());
Ok(())
}
/// An example extension node that doesn't do anything
struct NoOpExtensionNode {
schema: DFSchemaRef,
}
impl Default for NoOpExtensionNode {
fn default() -> Self {
Self {
schema: DFSchemaRef::new(
DFSchema::new(vec![DFField::new(None, "a", DataType::Int32, false)])
.unwrap(),
),
}
}
}
impl Debug for NoOpExtensionNode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NoOp")
}
}
impl UserDefinedLogicalNode for NoOpExtensionNode {
fn as_any(&self) -> &dyn Any {
self
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NoOp")
}
fn from_template(
&self,
_exprs: &[Expr],
_inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
unimplemented!("NoOp");
}
}
#[derive(Debug)]
struct NoOpExecutionPlan {
schema: SchemaRef,
}
#[async_trait]
impl ExecutionPlan for NoOpExecutionPlan {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
&self,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!("NoOpExecutionPlan::with_new_children");
}
async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}
}
// Produces an execution plan where the schema is mismatched from
// the logical plan node.
struct BadExtensionPlanner {}
impl ExtensionPlanner for BadExtensionPlanner {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
_node: &dyn UserDefinedLogicalNode,
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(Some(Arc::new(NoOpExecutionPlan {
schema: SchemaRef::new(Schema::new(vec![Field::new(
"b",
DataType::Int32,
false,
)])),
})))
}
}
}