blob: d6082f3a72008a8657514b3f68c41d89acea04d5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Physical query planner
use std::sync::Arc;
use super::{aggregates, empty::EmptyExec, expressions::binary, functions, udaf};
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
Expr, LogicalPlan, PlanType, StringifiedPlan, TableSource, UserDefinedLogicalNode,
};
use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::hash_utils;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::{expressions, Distribution};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner};
use crate::prelude::JoinType;
use crate::variable::VarType;
use arrow::compute::SortOptions;
use arrow::datatypes::Schema;
use expressions::col;
/// This trait permits the `DefaultPhysicalPlanner` to create plans for
/// user defined `ExtensionPlanNode`s
pub trait ExtensionPlanner {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
node: &dyn UserDefinedLogicalNode,
inputs: Vec<Arc<dyn ExecutionPlan>>,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;
}
/// Default single node physical query planner that converts a
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
pub struct DefaultPhysicalPlanner {
extension_planner: Arc<dyn ExtensionPlanner + Send + Sync>,
}
impl Default for DefaultPhysicalPlanner {
/// Create an implementation of the default physical planner
fn default() -> Self {
Self {
extension_planner: Arc::new(DefaultExtensionPlanner {}),
}
}
}
impl PhysicalPlanner for DefaultPhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
self.optimize_plan(plan, ctx_state)
}
}
impl DefaultPhysicalPlanner {
/// Create a physical planner that uses `extension_planner` to
/// plan extension nodes.
pub fn with_extension_planner(
extension_planner: Arc<dyn ExtensionPlanner + Send + Sync>,
) -> Self {
Self { extension_planner }
}
/// Create a physical plan from a logical plan
fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let children = plan
.children()
.iter()
.map(|child| self.optimize_plan(child.clone(), ctx_state))
.collect::<Result<Vec<_>>>()?;
if children.len() == 0 {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
Distribution::SinglePartition => plan.with_new_children(
children
.iter()
.map(|child| {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(MergeExec::new(child.clone()))
}
})
.collect(),
),
}
}
}
/// Create a physical plan from a logical plan
fn create_initial_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let batch_size = ctx_state.config.batch_size;
match logical_plan {
LogicalPlan::TableScan {
source, projection, ..
} => match source {
TableSource::FromContext(table_name) => {
match ctx_state.datasources.get(table_name) {
Some(provider) => provider.scan(projection, batch_size),
_ => Err(DataFusionError::Plan(format!(
"No table named {}. Existing tables: {:?}",
table_name,
ctx_state.datasources.keys().collect::<Vec<_>>(),
))),
}
}
TableSource::FromProvider(ref provider) => {
provider.scan(projection, batch_size)
}
},
LogicalPlan::InMemoryScan {
data,
projection,
projected_schema,
..
} => Ok(Arc::new(MemoryExec::try_new(
data,
Arc::new(projected_schema.as_ref().to_owned()),
projection.to_owned(),
)?)),
LogicalPlan::CsvScan {
path,
schema,
has_header,
delimiter,
projection,
..
} => Ok(Arc::new(CsvExec::try_new(
path,
CsvReadOptions::new()
.schema(schema.as_ref())
.delimiter_option(*delimiter)
.has_header(*has_header),
projection.to_owned(),
batch_size,
)?)),
LogicalPlan::ParquetScan {
path, projection, ..
} => Ok(Arc::new(ParquetExec::try_new(
path,
projection.to_owned(),
batch_size,
)?)),
LogicalPlan::Projection { input, expr, .. } => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
let runtime_expr = expr
.iter()
.map(|e| {
tuple_err((
self.create_physical_expr(e, &input_schema, &ctx_state),
e.name(&input_schema),
))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
}
LogicalPlan::Aggregate {
input,
group_expr,
aggr_expr,
..
} => {
// Initially need to perform the aggregate and then merge the partitions
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
let groups = group_expr
.iter()
.map(|e| {
tuple_err((
self.create_physical_expr(e, &input_schema, ctx_state),
e.name(&input_schema),
))
})
.collect::<Result<Vec<_>>>()?;
let aggregates = aggr_expr
.iter()
.map(|e| self.create_aggregate_expr(e, &input_schema, ctx_state))
.collect::<Result<Vec<_>>>()?;
let initial_aggr = Arc::new(HashAggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
input,
)?);
let final_group: Vec<Arc<dyn PhysicalExpr>> =
(0..groups.len()).map(|i| col(&groups[i].1)).collect();
// construct a second aggregation, keeping the final column name equal to the first aggregation
// and the expressions corresponding to the respective aggregate
Ok(Arc::new(HashAggregateExec::try_new(
AggregateMode::Final,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
initial_aggr,
)?))
}
LogicalPlan::Filter {
input, predicate, ..
} => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
let runtime_expr =
self.create_physical_expr(predicate, &input_schema, ctx_state)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?))
}
LogicalPlan::Sort { expr, input, .. } => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
let sort_expr = expr
.iter()
.map(|e| match e {
Expr::Sort {
expr,
asc,
nulls_first,
} => self.create_physical_sort_expr(
expr,
&input_schema,
SortOptions {
descending: !*asc,
nulls_first: *nulls_first,
},
ctx_state,
),
_ => Err(DataFusionError::Plan(
"Sort only accepts sort expressions".to_string(),
)),
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(SortExec::try_new(
sort_expr,
input,
ctx_state.config.concurrency,
)?))
}
LogicalPlan::Join {
left,
right,
on: keys,
join_type,
..
} => {
let left = self.create_physical_plan(left, ctx_state)?;
let right = self.create_physical_plan(right, ctx_state)?;
let physical_join_type = match join_type {
JoinType::Inner => hash_utils::JoinType::Inner,
};
Ok(Arc::new(HashJoinExec::try_new(
left,
right,
&keys,
&physical_join_type,
)?))
}
LogicalPlan::EmptyRelation {
produce_one_row,
schema,
} => Ok(Arc::new(EmptyExec::new(
*produce_one_row,
Arc::new(schema.as_ref().clone()),
))),
LogicalPlan::Limit { input, n, .. } => {
let limit = *n;
let input = self.create_physical_plan(input, ctx_state)?;
// GlobalLimitExec requires a single partition for input
let input = if input.output_partitioning().partition_count() == 1 {
input
} else {
// Apply a LocalLimitExec to each partition. The optimizer will also insert
// a MergeExec between the GlobalLimitExec and LocalLimitExec
Arc::new(LocalLimitExec::new(input, limit))
};
Ok(Arc::new(GlobalLimitExec::new(
input,
limit,
ctx_state.config.concurrency,
)))
}
LogicalPlan::CreateExternalTable { .. } => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
// the context)
Err(DataFusionError::Internal(
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} => {
let input = self.create_physical_plan(plan, ctx_state)?;
let mut stringified_plans = stringified_plans
.iter()
.filter(|s| s.should_display(*verbose))
.map(|s| s.clone())
.collect::<Vec<_>>();
// add in the physical plan if requested
if *verbose {
stringified_plans.push(StringifiedPlan::new(
PlanType::PhysicalPlan,
format!("{:#?}", input),
));
}
let schema_ref = Arc::new(schema.as_ref().clone());
Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
}
LogicalPlan::Extension { node } => {
let inputs = node
.inputs()
.into_iter()
.map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
.collect::<Result<Vec<_>>>()?;
let plan = self.extension_planner.plan_extension(
node.as_ref(),
inputs,
ctx_state,
)?;
// Ensure the ExecutionPlan's schema matches the
// declared logical schema to catch and warn about
// logic errors when creating user defined plans.
if plan.schema() != *node.schema() {
Err(DataFusionError::Plan(format!(
"Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
node, node.schema(), plan.schema()
)))
} else {
Ok(plan)
}
}
}
}
/// Create a physical expression from a logical expression
pub fn create_physical_expr(
&self,
e: &Expr,
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>> {
match e {
Expr::Alias(expr, ..) => {
Ok(self.create_physical_expr(expr, input_schema, ctx_state)?)
}
Expr::Column(name) => {
// check that name exists
input_schema.field_with_name(&name)?;
Ok(Arc::new(Column::new(name)))
}
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::ScalarVariable(variable_names) => {
if &variable_names[0][0..2] == "@@" {
match ctx_state.var_provider.get(&VarType::System) {
Some(provider) => {
let scalar_value =
provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => Err(DataFusionError::Plan(format!(
"No system variable provider found"
))),
}
} else {
match ctx_state.var_provider.get(&VarType::UserDefined) {
Some(provider) => {
let scalar_value =
provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => Err(DataFusionError::Plan(format!(
"No user defined variable provider found"
))),
}
}
}
Expr::BinaryExpr { left, op, right } => {
let lhs = self.create_physical_expr(left, input_schema, ctx_state)?;
let rhs = self.create_physical_expr(right, input_schema, ctx_state)?;
binary(lhs, op.clone(), rhs, input_schema)
}
Expr::Case {
expr,
when_then_expr,
else_expr,
..
} => {
let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = expr {
Some(self.create_physical_expr(
e.as_ref(),
input_schema,
ctx_state,
)?)
} else {
None
};
let when_expr = when_then_expr
.iter()
.map(|(w, _)| {
self.create_physical_expr(w.as_ref(), input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let then_expr = when_then_expr
.iter()
.map(|(_, t)| {
self.create_physical_expr(t.as_ref(), input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
when_expr
.iter()
.zip(then_expr.iter())
.map(|(w, t)| (w.clone(), t.clone()))
.collect();
let else_expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = else_expr
{
Some(self.create_physical_expr(
e.as_ref(),
input_schema,
ctx_state,
)?)
} else {
None
};
Ok(Arc::new(CaseExpr::try_new(
expr,
&when_then_expr,
else_expr,
)?))
}
Expr::Cast { expr, data_type } => expressions::cast(
self.create_physical_expr(expr, input_schema, ctx_state)?,
input_schema,
data_type.clone(),
),
Expr::Not(expr) => expressions::not(
self.create_physical_expr(expr, input_schema, ctx_state)?,
input_schema,
),
Expr::IsNull(expr) => expressions::is_null(self.create_physical_expr(
expr,
input_schema,
ctx_state,
)?),
Expr::IsNotNull(expr) => expressions::is_not_null(
self.create_physical_expr(expr, input_schema, ctx_state)?,
),
Expr::ScalarFunction { fun, args } => {
let physical_args = args
.iter()
.map(|e| self.create_physical_expr(e, input_schema, ctx_state))
.collect::<Result<Vec<_>>>()?;
functions::create_physical_expr(fun, &physical_args, input_schema)
}
Expr::ScalarUDF { fun, args } => {
let mut physical_args = vec![];
for e in args {
physical_args.push(self.create_physical_expr(
e,
input_schema,
ctx_state,
)?);
}
udf::create_physical_expr(
fun.clone().as_ref(),
&physical_args,
input_schema,
)
}
other => Err(DataFusionError::NotImplemented(format!(
"Physical plan does not support logical expression {:?}",
other
))),
}
}
/// Create an aggregate expression from a logical expression
pub fn create_aggregate_expr(
&self,
e: &Expr,
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(input_schema)?, e),
};
match e {
Expr::AggregateFunction {
fun,
distinct,
args,
..
} => {
let args = args
.iter()
.map(|e| self.create_physical_expr(e, input_schema, ctx_state))
.collect::<Result<Vec<_>>>()?;
aggregates::create_aggregate_expr(
fun,
*distinct,
&args,
input_schema,
name,
)
}
Expr::AggregateUDF { fun, args, .. } => {
let args = args
.iter()
.map(|e| self.create_physical_expr(e, input_schema, ctx_state))
.collect::<Result<Vec<_>>>()?;
udaf::create_aggregate_expr(fun, &args, input_schema, name)
}
other => Err(DataFusionError::Internal(format!(
"Invalid aggregate expression '{:?}'",
other
))),
}
}
/// Create an aggregate expression from a logical expression
pub fn create_physical_sort_expr(
&self,
e: &Expr,
input_schema: &Schema,
options: SortOptions,
ctx_state: &ExecutionContextState,
) -> Result<PhysicalSortExpr> {
Ok(PhysicalSortExpr {
expr: self.create_physical_expr(e, input_schema, ctx_state)?,
options: options,
})
}
}
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
match value {
(Ok(e), Ok(e1)) => Ok((e, e1)),
(Err(e), Ok(_)) => Err(e),
(Ok(_), Err(e1)) => Err(e1),
(Err(e), Err(_)) => Err(e),
}
}
struct DefaultExtensionPlanner {}
impl ExtensionPlanner for DefaultExtensionPlanner {
fn plan_extension(
&self,
node: &dyn UserDefinedLogicalNode,
_inputs: Vec<Arc<dyn ExecutionPlan>>,
_ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::NotImplemented(format!(
"DefaultPhysicalPlanner does not know how to plan {:?}. \
Provide a custom ExtensionPlanNodePlanner that does",
node
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
use crate::{
logical_plan::{col, lit, sum, LogicalPlanBuilder},
physical_plan::SendableRecordBatchStream,
};
use crate::{prelude::ExecutionConfig, test::arrow_testdata_path};
use arrow::datatypes::{DataType, Field, SchemaRef};
use async_trait::async_trait;
use fmt::Debug;
use std::{any::Any, collections::HashMap, fmt};
fn make_ctx_state() -> ExecutionContextState {
ExecutionContextState {
datasources: HashMap::new(),
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
}
}
fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let ctx_state = make_ctx_state();
let planner = DefaultPhysicalPlanner::default();
planner.create_physical_plan(logical_plan, &ctx_state)
}
#[test]
fn test_all_operators() -> Result<()> {
let testdata = arrow_testdata_path();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
// filter clause needs the type coercion rule applied
.filter(col("c7").lt(lit(5_u8)))?
.project(vec![col("c1"), col("c2")])?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.sort(vec![col("c1").sort(true, true)])?
.limit(10)?
.build()?;
let plan = plan(&logical_plan)?;
// verify that the plan correctly casts u8 to i64
let expected = "BinaryExpr { left: Column { name: \"c7\" }, op: Lt, right: CastExpr { expr: Literal { value: UInt8(5) }, cast_type: Int64 } }";
assert!(format!("{:?}", plan).contains(expected));
Ok(())
}
#[test]
fn test_create_not() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
let planner = DefaultPhysicalPlanner::default();
let expr =
planner.create_physical_expr(&col("a").not(), &schema, &make_ctx_state())?;
let expected = expressions::not(expressions::col("a"), &schema)?;
assert_eq!(format!("{:?}", expr), format!("{:?}", expected));
Ok(())
}
#[test]
fn test_with_csv_plan() -> Result<()> {
let testdata = arrow_testdata_path();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
.filter(col("c7").lt(col("c12")))?
.build()?;
let plan = plan(&logical_plan)?;
// c12 is f64, c7 is u8 -> cast c7 to f64
let expected = "predicate: BinaryExpr { left: CastExpr { expr: Column { name: \"c7\" }, cast_type: Float64 }, op: Lt, right: Column { name: \"c12\" } }";
assert!(format!("{:?}", plan).contains(expected));
Ok(())
}
#[test]
fn errors() -> Result<()> {
let testdata = arrow_testdata_path();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let bool_expr = col("c1").eq(col("c1"));
let cases = vec![
// utf8 < u32
col("c1").lt(col("c2")),
// utf8 AND utf8
col("c1").and(col("c1")),
// u8 AND u8
col("c3").and(col("c3")),
// utf8 = u32
col("c1").eq(col("c2")),
// utf8 = bool
col("c1").eq(bool_expr.clone()),
// u32 AND bool
col("c2").and(bool_expr),
// utf8 LIKE u32
col("c1").like(col("c2")),
];
for case in cases {
let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
.project(vec![case.clone()]);
let message = format!(
"Expression {:?} expected to error due to impossible coercion",
case
);
assert!(logical_plan.is_err(), message);
}
Ok(())
}
#[test]
fn default_extension_planner() -> Result<()> {
let ctx_state = make_ctx_state();
let planner = DefaultPhysicalPlanner::default();
let logical_plan = LogicalPlan::Extension {
node: Arc::new(NoOpExtensionNode::default()),
};
let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
let expected_error = "DefaultPhysicalPlanner does not know how to plan NoOp";
match plan {
Ok(_) => assert!(false, "Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{}' did not contain expected error '{}'",
e.to_string(),
expected_error
),
}
Ok(())
}
#[test]
fn bad_extension_planner() -> Result<()> {
// Test that creating an execution plan whose schema doesn't
// match the logical plan's schema generates an error.
let ctx_state = make_ctx_state();
let planner = DefaultPhysicalPlanner::with_extension_planner(Arc::new(
BadExtensionPlanner {},
));
let logical_plan = LogicalPlan::Extension {
node: Arc::new(NoOpExtensionNode::default()),
};
let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
let expected_error = "Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: Schema { fields: [Field { name: \"a\", data_type: NullableDataType { data_type: Int32, nullable: false }, dict_id: 0, dict_is_ordered: false }], metadata: {} }, ExecutionPlan schema: Schema { fields: [Field { name: \"b\", data_type: NullableDataType { data_type: Int32, nullable: false }, dict_id: 0, dict_is_ordered: false }], metadata: {} }";
match plan {
Ok(_) => assert!(false, "Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{}' did not contain expected error '{}'",
e.to_string(),
expected_error
),
}
Ok(())
}
/// An example extension node that doesn't do anything
struct NoOpExtensionNode {
schema: SchemaRef,
}
impl Default for NoOpExtensionNode {
fn default() -> Self {
Self {
schema: SchemaRef::new(Schema::new(vec![Field::new(
"a",
DataType::Int32,
false,
)])),
}
}
}
impl Debug for NoOpExtensionNode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NoOp")
}
}
impl UserDefinedLogicalNode for NoOpExtensionNode {
fn as_any(&self) -> &dyn Any {
self
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NoOp")
}
fn from_template(
&self,
_exprs: &Vec<Expr>,
_inputs: &Vec<LogicalPlan>,
) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
unimplemented!("NoOp");
}
}
#[derive(Debug)]
struct NoOpExecutionPlan {
schema: SchemaRef,
}
#[async_trait]
impl ExecutionPlan for NoOpExecutionPlan {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
&self,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!("NoOpExecutionPlan::with_new_children");
}
async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}
}
// Produces an execution plan where the schema is mismatched from
// the logical plan node.
struct BadExtensionPlanner {}
impl ExtensionPlanner for BadExtensionPlanner {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
_node: &dyn UserDefinedLogicalNode,
_inputs: Vec<Arc<dyn ExecutionPlan>>,
_ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(NoOpExecutionPlan {
schema: SchemaRef::new(Schema::new(vec![Field::new(
"b",
DataType::Int32,
false,
)])),
}))
}
}
}