blob: 4981db5537a74a43a3178c7fa695f0ccfef75f2d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use crate::parser::{
CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
LexOrdering, ResetStatement, Statement as DFStatement,
};
use crate::planner::{
ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
};
use crate::utils::normalize_ident;
use arrow::datatypes::{Field, FieldRef, Fields};
use datafusion_common::error::_plan_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
unqualified_field_not_found,
};
use datafusion_expr::dml::{CopyTo, InsertOp};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::DdlStatement;
use datafusion_expr::logical_plan::builder::project;
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{
Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
ResetVariable, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan,
TransactionAccessMode, TransactionConclusion, TransactionEnd,
TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, cast, col,
};
use sqlparser::ast::{
self, BeginTransactionKind, CheckConstraint, ForeignKeyConstraint, IndexColumn,
IndexType, NullsDistinctOption, OrderByExpr, OrderByOptions, PrimaryKeyConstraint,
Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict, TableObject,
UniqueConstraint, Update, UpdateTableFromKind, ValueWithSpan,
};
use sqlparser::ast::{
Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
TransactionMode, UnaryOperator, Value,
};
use sqlparser::parser::ParserError::ParserError;
fn ident_to_string(ident: &Ident) -> String {
normalize_ident(ident.to_owned())
}
fn object_name_to_string(object_name: &ObjectName) -> String {
object_name
.0
.iter()
.map(|object_name_part| {
object_name_part
.as_ident()
// TODO: It might be better to return an error
// than to silently use a default value.
.map_or_else(String::new, ident_to_string)
})
.collect::<Vec<String>>()
.join(".")
}
fn get_schema_name(schema_name: &SchemaName) -> String {
match schema_name {
SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
SchemaName::NamedAuthorization(schema_name, auth) => format!(
"{}.{}",
object_name_to_string(schema_name),
ident_to_string(auth)
),
}
}
/// Construct `TableConstraint`(s) for the given columns by iterating over
/// `columns` and extracting individual inline constraint definitions.
fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
let mut constraints: Vec<TableConstraint> = vec![];
for column in columns {
for ast::ColumnOptionDef { name, option } in &column.options {
match option {
ast::ColumnOption::Unique(UniqueConstraint {
characteristics,
name,
index_name: _index_name,
index_type_display: _index_type_display,
index_type: _index_type,
columns: _column,
index_options: _index_options,
nulls_distinct: _nulls_distinct,
}) => constraints.push(TableConstraint::Unique(UniqueConstraint {
name: name.clone(),
index_name: None,
index_type_display: ast::KeyOrIndexDisplay::None,
index_type: None,
columns: vec![IndexColumn {
column: OrderByExpr {
expr: SQLExpr::Identifier(column.name.clone()),
options: OrderByOptions {
asc: None,
nulls_first: None,
},
with_fill: None,
},
operator_class: None,
}],
index_options: vec![],
characteristics: *characteristics,
nulls_distinct: NullsDistinctOption::None,
})),
ast::ColumnOption::PrimaryKey(PrimaryKeyConstraint {
characteristics,
name: _name,
index_name: _index_name,
index_type: _index_type,
columns: _columns,
index_options: _index_options,
}) => {
constraints.push(TableConstraint::PrimaryKey(PrimaryKeyConstraint {
name: name.clone(),
index_name: None,
index_type: None,
columns: vec![IndexColumn {
column: OrderByExpr {
expr: SQLExpr::Identifier(column.name.clone()),
options: OrderByOptions {
asc: None,
nulls_first: None,
},
with_fill: None,
},
operator_class: None,
}],
index_options: vec![],
characteristics: *characteristics,
}))
}
ast::ColumnOption::ForeignKey(ForeignKeyConstraint {
foreign_table,
referred_columns,
on_delete,
on_update,
characteristics,
name: _name,
index_name: _index_name,
columns: _columns,
match_kind: _match_kind,
}) => {
constraints.push(TableConstraint::ForeignKey(ForeignKeyConstraint {
name: name.clone(),
index_name: None,
columns: vec![],
foreign_table: foreign_table.clone(),
referred_columns: referred_columns.clone(),
on_delete: *on_delete,
on_update: *on_update,
match_kind: None,
characteristics: *characteristics,
}))
}
ast::ColumnOption::Check(CheckConstraint {
name,
expr,
enforced: _enforced,
}) => constraints.push(TableConstraint::Check(CheckConstraint {
name: name.clone(),
expr: expr.clone(),
enforced: None,
})),
ast::ColumnOption::Default(_)
| ast::ColumnOption::Null
| ast::ColumnOption::NotNull
| ast::ColumnOption::DialectSpecific(_)
| ast::ColumnOption::CharacterSet(_)
| ast::ColumnOption::Generated { .. }
| ast::ColumnOption::Comment(_)
| ast::ColumnOption::Options(_)
| ast::ColumnOption::OnUpdate(_)
| ast::ColumnOption::Materialized(_)
| ast::ColumnOption::Ephemeral(_)
| ast::ColumnOption::Identity(_)
| ast::ColumnOption::OnConflict(_)
| ast::ColumnOption::Policy(_)
| ast::ColumnOption::Tags(_)
| ast::ColumnOption::Alias(_)
| ast::ColumnOption::Srid(_)
| ast::ColumnOption::Collation(_)
| ast::ColumnOption::Invisible => {}
}
}
}
constraints
}
impl<S: ContextProvider> SqlToRel<'_, S> {
/// Generate a logical plan from an DataFusion SQL statement
pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
match statement {
DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
DFStatement::CopyTo(s) => self.copy_to_plan(s),
DFStatement::Explain(ExplainStatement {
verbose,
analyze,
format,
statement,
}) => self.explain_to_plan(verbose, analyze, format, *statement),
DFStatement::Reset(statement) => self.reset_statement_to_plan(statement),
}
}
/// Generate a logical plan from an SQL statement
pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
self.sql_statement_to_plan_with_context_impl(
statement,
&mut PlannerContext::new(),
)
}
/// Generate a logical plan from an SQL statement
pub fn sql_statement_to_plan_with_context(
&self,
statement: Statement,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
self.sql_statement_to_plan_with_context_impl(statement, planner_context)
}
fn sql_statement_to_plan_with_context_impl(
&self,
statement: Statement,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match statement {
Statement::ExplainTable {
describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE table_name' or 'DESC table_name' and not 'EXPLAIN table_name'
table_name,
..
} => self.describe_table_to_plan(table_name),
Statement::Explain {
describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE statement' or 'DESC statement' and not 'EXPLAIN statement'
statement,
..
} => match *statement {
Statement::Query(query) => self.describe_query_to_plan(*query),
_ => {
not_impl_err!("Describing statements other than SELECT not supported")
}
},
Statement::Explain {
verbose,
statement,
analyze,
format,
describe_alias: _,
..
} => {
let format = format.map(|format| format.to_string());
let statement = DFStatement::Statement(statement);
self.explain_to_plan(verbose, analyze, format, statement)
}
Statement::Query(query) => self.query_to_plan(*query, planner_context),
Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
Statement::Set(statement) => self.set_statement_to_plan(statement),
Statement::CreateTable(CreateTable {
temporary,
external,
global,
transient,
volatile,
hive_distribution,
hive_formats,
file_format,
location,
query,
name,
columns,
constraints,
if_not_exists,
or_replace,
without_rowid,
like,
clone,
comment,
on_commit,
on_cluster,
primary_key,
order_by,
partition_by,
cluster_by,
clustered_by,
strict,
copy_grants,
enable_schema_evolution,
change_tracking,
data_retention_time_in_days,
max_data_extension_time_in_days,
default_ddl_collation,
with_aggregation_policy,
with_row_access_policy,
with_tags,
iceberg,
external_volume,
base_location,
catalog,
catalog_sync,
storage_serialization_policy,
inherits,
table_options: CreateTableOptions::None,
dynamic,
version,
target_lag,
warehouse,
refresh_mode,
initialize,
require_user,
}) => {
if temporary {
return not_impl_err!("Temporary tables not supported")?;
}
if external {
return not_impl_err!("External tables not supported")?;
}
if global.is_some() {
return not_impl_err!("Global tables not supported")?;
}
if transient {
return not_impl_err!("Transient tables not supported")?;
}
if volatile {
return not_impl_err!("Volatile tables not supported")?;
}
if hive_distribution != ast::HiveDistributionStyle::NONE {
return not_impl_err!(
"Hive distribution not supported: {hive_distribution:?}"
)?;
}
if hive_formats.is_some()
&& !matches!(
hive_formats,
Some(ast::HiveFormat {
row_format: None,
serde_properties: None,
storage: None,
location: None,
})
)
{
return not_impl_err!(
"Hive formats not supported: {hive_formats:?}"
)?;
}
if file_format.is_some() {
return not_impl_err!("File format not supported")?;
}
if location.is_some() {
return not_impl_err!("Location not supported")?;
}
if without_rowid {
return not_impl_err!("Without rowid not supported")?;
}
if like.is_some() {
return not_impl_err!("Like not supported")?;
}
if clone.is_some() {
return not_impl_err!("Clone not supported")?;
}
if comment.is_some() {
return not_impl_err!("Comment not supported")?;
}
if on_commit.is_some() {
return not_impl_err!("On commit not supported")?;
}
if on_cluster.is_some() {
return not_impl_err!("On cluster not supported")?;
}
if primary_key.is_some() {
return not_impl_err!("Primary key not supported")?;
}
if order_by.is_some() {
return not_impl_err!("Order by not supported")?;
}
if partition_by.is_some() {
return not_impl_err!("Partition by not supported")?;
}
if cluster_by.is_some() {
return not_impl_err!("Cluster by not supported")?;
}
if clustered_by.is_some() {
return not_impl_err!("Clustered by not supported")?;
}
if strict {
return not_impl_err!("Strict not supported")?;
}
if copy_grants {
return not_impl_err!("Copy grants not supported")?;
}
if enable_schema_evolution.is_some() {
return not_impl_err!("Enable schema evolution not supported")?;
}
if change_tracking.is_some() {
return not_impl_err!("Change tracking not supported")?;
}
if data_retention_time_in_days.is_some() {
return not_impl_err!("Data retention time in days not supported")?;
}
if max_data_extension_time_in_days.is_some() {
return not_impl_err!(
"Max data extension time in days not supported"
)?;
}
if default_ddl_collation.is_some() {
return not_impl_err!("Default DDL collation not supported")?;
}
if with_aggregation_policy.is_some() {
return not_impl_err!("With aggregation policy not supported")?;
}
if with_row_access_policy.is_some() {
return not_impl_err!("With row access policy not supported")?;
}
if with_tags.is_some() {
return not_impl_err!("With tags not supported")?;
}
if iceberg {
return not_impl_err!("Iceberg not supported")?;
}
if external_volume.is_some() {
return not_impl_err!("External volume not supported")?;
}
if base_location.is_some() {
return not_impl_err!("Base location not supported")?;
}
if catalog.is_some() {
return not_impl_err!("Catalog not supported")?;
}
if catalog_sync.is_some() {
return not_impl_err!("Catalog sync not supported")?;
}
if storage_serialization_policy.is_some() {
return not_impl_err!("Storage serialization policy not supported")?;
}
if inherits.is_some() {
return not_impl_err!("Table inheritance not supported")?;
}
if dynamic {
return not_impl_err!("Dynamic tables not supported")?;
}
if version.is_some() {
return not_impl_err!("Version not supported")?;
}
if target_lag.is_some() {
return not_impl_err!("Target lag not supported")?;
}
if warehouse.is_some() {
return not_impl_err!("Warehouse not supported")?;
}
if refresh_mode.is_some() {
return not_impl_err!("Refresh mode not supported")?;
}
if initialize.is_some() {
return not_impl_err!("Initialize not supported")?;
}
if require_user {
return not_impl_err!("Require user not supported")?;
}
// Merge inline constraints and existing constraints
let mut all_constraints = constraints;
let inline_constraints = calc_inline_constraints_from_columns(&columns);
all_constraints.extend(inline_constraints);
// Build column default values
let column_defaults =
self.build_column_defaults(&columns, planner_context)?;
let has_columns = !columns.is_empty();
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
if has_columns {
planner_context.set_table_schema(Some(Arc::clone(&schema)));
}
match query {
Some(query) => {
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();
let plan = if has_columns {
if schema.fields().len() != input_schema.fields().len() {
return plan_err!(
"Mismatch: {} columns specified, but result has {} columns",
schema.fields().len(),
input_schema.fields().len()
);
}
let input_fields = input_schema.fields();
let project_exprs = schema
.fields()
.iter()
.zip(input_fields)
.map(|(field, input_field)| {
cast(
col(input_field.name()),
field.data_type().clone(),
)
.alias(field.name())
})
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan.clone())
.project(project_exprs)?
.build()?
} else {
plan
};
let constraints = self.new_constraint_from_table_constraints(
&all_constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
column_defaults,
temporary,
},
)))
}
None => {
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
let constraints = self.new_constraint_from_table_constraints(
&all_constraints,
plan.schema(),
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
column_defaults,
temporary,
},
)))
}
}
}
Statement::CreateView(ast::CreateView {
or_replace,
materialized,
name,
columns,
query,
options: CreateTableOptions::None,
cluster_by,
comment,
with_no_schema_binding,
if_not_exists,
temporary,
to,
params,
or_alter,
secure,
name_before_not_exists,
}) => {
if materialized {
return not_impl_err!("Materialized views not supported")?;
}
if !cluster_by.is_empty() {
return not_impl_err!("Cluster by not supported")?;
}
if comment.is_some() {
return not_impl_err!("Comment not supported")?;
}
if with_no_schema_binding {
return not_impl_err!("With no schema binding not supported")?;
}
if if_not_exists {
return not_impl_err!("If not exists not supported")?;
}
if to.is_some() {
return not_impl_err!("To not supported")?;
}
// put the statement back together temporarily to get the SQL
// string representation
let stmt = Statement::CreateView(ast::CreateView {
or_replace,
materialized,
name,
columns,
query,
options: CreateTableOptions::None,
cluster_by,
comment,
with_no_schema_binding,
if_not_exists,
temporary,
to,
params,
or_alter,
secure,
name_before_not_exists,
});
let sql = stmt.to_string();
let Statement::CreateView(ast::CreateView {
name,
columns,
query,
or_replace,
temporary,
..
}) = stmt
else {
return internal_err!("Unreachable code in create view");
};
let columns = columns
.into_iter()
.map(|view_column_def| {
if let Some(options) = view_column_def.options {
plan_err!(
"Options not supported for view columns: {options:?}"
)
} else {
Ok(view_column_def.name)
}
})
.collect::<Result<Vec<_>>>()?;
let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
plan = self.apply_expr_alias(plan, columns)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name: self.object_name_to_table_reference(name)?,
input: Arc::new(plan),
or_replace,
definition: Some(sql),
temporary,
})))
}
Statement::ShowCreate { obj_type, obj_name } => match obj_type {
ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
_ => {
not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
}
},
Statement::CreateSchema {
schema_name,
if_not_exists,
..
} => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
CreateCatalogSchema {
schema_name: get_schema_name(&schema_name),
if_not_exists,
schema: Arc::new(DFSchema::empty()),
},
))),
Statement::CreateDatabase {
db_name,
if_not_exists,
..
} => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
CreateCatalog {
catalog_name: object_name_to_string(&db_name),
if_not_exists,
schema: Arc::new(DFSchema::empty()),
},
))),
Statement::Drop {
object_type,
if_exists,
mut names,
cascade,
restrict: _,
purge: _,
temporary: _,
table: _,
} => {
// We don't support cascade and purge for now.
// nor do we support multiple object names
let name = match names.len() {
0 => Err(ParserError("Missing table name.".to_string()).into()),
1 => self.object_name_to_table_reference(names.pop().unwrap()),
_ => {
Err(ParserError("Multiple objects not supported".to_string())
.into())
}
}?;
match object_type {
ObjectType::Table => {
Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
name,
if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
ObjectType::View => {
Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
name,
if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
ObjectType::Schema => {
let name = match name {
TableReference::Bare { table } => {
Ok(SchemaReference::Bare { schema: table })
}
TableReference::Partial { schema, table } => {
Ok(SchemaReference::Full {
schema: table,
catalog: schema,
})
}
TableReference::Full {
catalog: _,
schema: _,
table: _,
} => Err(ParserError(
"Invalid schema specifier (has 3 parts)".to_string(),
)),
}?;
Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(
DropCatalogSchema {
name,
if_exists,
cascade,
schema: DFSchemaRef::new(DFSchema::empty()),
},
)))
}
_ => not_impl_err!(
"Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
),
}
}
Statement::Prepare {
name,
data_types,
statement,
} => {
// Convert parser data types to DataFusion data types
let mut fields: Vec<FieldRef> = data_types
.into_iter()
.map(|t| self.convert_data_type_to_field(&t))
.collect::<Result<_>>()?;
// Create planner context with parameters
let mut planner_context =
PlannerContext::new().with_prepare_param_data_types(fields.clone());
// Build logical plan for inner statement of the prepare statement
let plan = self.sql_statement_to_plan_with_context_impl(
*statement,
&mut planner_context,
)?;
if fields.is_empty() {
let map_types = plan.get_parameter_fields()?;
let param_types: Vec<_> = (1..=map_types.len())
.filter_map(|i| {
let key = format!("${i}");
map_types.get(&key).and_then(|opt| opt.clone())
})
.collect();
fields.extend(param_types.iter().cloned());
planner_context.with_prepare_param_data_types(param_types);
}
Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
name: ident_to_string(&name),
fields,
input: Arc::new(plan),
})))
}
Statement::Execute {
name,
parameters,
using,
// has_parentheses specifies the syntax, but the plan is the
// same no matter the syntax used, so ignore it
has_parentheses: _,
immediate,
into,
output,
default,
} => {
// `USING` is a MySQL-specific syntax and currently not supported.
if !using.is_empty() {
return not_impl_err!(
"Execute statement with USING is not supported"
);
}
if immediate {
return not_impl_err!(
"Execute statement with IMMEDIATE is not supported"
);
}
if !into.is_empty() {
return not_impl_err!("Execute statement with INTO is not supported");
}
if output {
return not_impl_err!(
"Execute statement with OUTPUT is not supported"
);
}
if default {
return not_impl_err!(
"Execute statement with DEFAULT is not supported"
);
}
let empty_schema = DFSchema::empty();
let parameters = parameters
.into_iter()
.map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
.collect::<Result<Vec<Expr>>>()?;
Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
name: object_name_to_string(&name.unwrap()),
parameters,
})))
}
Statement::Deallocate {
name,
// Similar to PostgreSQL, the PREPARE keyword is ignored
prepare: _,
} => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
Deallocate {
name: ident_to_string(&name),
},
))),
Statement::ShowTables {
extended,
full,
terse,
history,
external,
show_options,
} => {
// We only support the basic "SHOW TABLES"
// https://github.com/apache/datafusion/issues/3188
if extended {
return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
}
if full {
return not_impl_err!("SHOW FULL TABLES not supported")?;
}
if terse {
return not_impl_err!("SHOW TERSE TABLES not supported")?;
}
if history {
return not_impl_err!("SHOW TABLES HISTORY not supported")?;
}
if external {
return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
}
let ShowStatementOptions {
show_in,
starts_with,
limit,
limit_from,
filter_position,
} = show_options;
if show_in.is_some() {
return not_impl_err!("SHOW TABLES IN not supported")?;
}
if starts_with.is_some() {
return not_impl_err!("SHOW TABLES LIKE not supported")?;
}
if limit.is_some() {
return not_impl_err!("SHOW TABLES LIMIT not supported")?;
}
if limit_from.is_some() {
return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
}
if filter_position.is_some() {
return not_impl_err!("SHOW TABLES FILTER not supported")?;
}
self.show_tables_to_plan()
}
Statement::ShowColumns {
extended,
full,
show_options,
} => {
let ShowStatementOptions {
show_in,
starts_with,
limit,
limit_from,
filter_position,
} = show_options;
if starts_with.is_some() {
return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
}
if limit.is_some() {
return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
}
if limit_from.is_some() {
return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
}
if filter_position.is_some() {
return not_impl_err!(
"SHOW COLUMNS with WHERE or LIKE is not supported"
)?;
}
let Some(ShowStatementIn {
// specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
// COLUMNS FROM` which is not different in DataFusion
clause: _,
parent_type,
parent_name,
}) = show_in
else {
return plan_err!("SHOW COLUMNS requires a table name");
};
if let Some(parent_type) = parent_type {
return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
}
let Some(table_name) = parent_name else {
return plan_err!("SHOW COLUMNS requires a table name");
};
self.show_columns_to_plan(extended, full, table_name)
}
Statement::ShowFunctions { filter, .. } => {
self.show_functions_to_plan(filter)
}
Statement::Insert(Insert {
or,
into,
columns,
overwrite,
source,
partitioned,
after_columns,
table,
on,
returning,
ignore,
table_alias,
mut replace_into,
priority,
insert_alias,
assignments,
has_table_keyword,
settings,
format_clause,
insert_token: _insert_token, // record the location the `INSERT` token
}) => {
let table_name = match table {
TableObject::TableName(table_name) => table_name,
TableObject::TableFunction(_) => {
return not_impl_err!(
"INSERT INTO Table functions not supported"
);
}
};
if let Some(or) = or {
match or {
SqliteOnConflict::Replace => replace_into = true,
_ => plan_err!("Inserts with {or} clause is not supported")?,
}
}
if partitioned.is_some() {
plan_err!("Partitioned inserts not yet supported")?;
}
if !after_columns.is_empty() {
plan_err!("After-columns clause not supported")?;
}
if on.is_some() {
plan_err!("Insert-on clause not supported")?;
}
if returning.is_some() {
plan_err!("Insert-returning clause not supported")?;
}
if ignore {
plan_err!("Insert-ignore clause not supported")?;
}
let Some(source) = source else {
plan_err!("Inserts without a source not supported")?
};
if let Some(table_alias) = table_alias {
plan_err!(
"Inserts with a table alias not supported: {table_alias:?}"
)?
};
if let Some(priority) = priority {
plan_err!(
"Inserts with a `PRIORITY` clause not supported: {priority:?}"
)?
};
if insert_alias.is_some() {
plan_err!("Inserts with an alias not supported")?;
}
if !assignments.is_empty() {
plan_err!("Inserts with assignments not supported")?;
}
if settings.is_some() {
plan_err!("Inserts with settings not supported")?;
}
if format_clause.is_some() {
plan_err!("Inserts with format clause not supported")?;
}
// optional keywords don't change behavior
let _ = into;
let _ = has_table_keyword;
self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
}
Statement::Update(Update {
table,
assignments,
from,
selection,
returning,
or,
limit,
update_token: _,
}) => {
let from_clauses =
from.map(|update_table_from_kind| match update_table_from_kind {
UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
});
// TODO: support multiple tables in UPDATE SET FROM
if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
}
let update_from = from_clauses.and_then(|mut f| f.pop());
if returning.is_some() {
plan_err!("Update-returning clause not yet supported")?;
}
if or.is_some() {
plan_err!("ON conflict not supported")?;
}
if limit.is_some() {
return not_impl_err!("Update-limit clause not supported")?;
}
self.update_to_plan(table, &assignments, update_from, selection)
}
Statement::Delete(Delete {
tables,
using,
selection,
returning,
from,
order_by,
limit,
delete_token: _,
}) => {
if !tables.is_empty() {
plan_err!("DELETE <TABLE> not supported")?;
}
if using.is_some() {
plan_err!("Using clause not supported")?;
}
if returning.is_some() {
plan_err!("Delete-returning clause not yet supported")?;
}
if !order_by.is_empty() {
plan_err!("Delete-order-by clause not yet supported")?;
}
if limit.is_some() {
plan_err!("Delete-limit clause not yet supported")?;
}
let table_name = self.get_delete_target(from)?;
self.delete_to_plan(&table_name, selection)
}
Statement::StartTransaction {
modes,
begin: false,
modifier,
transaction,
statements,
has_end_keyword,
exception,
} => {
if let Some(modifier) = modifier {
return not_impl_err!(
"Transaction modifier not supported: {modifier}"
);
}
if !statements.is_empty() {
return not_impl_err!(
"Transaction with multiple statements not supported"
);
}
if exception.is_some() {
return not_impl_err!(
"Transaction with exception statements not supported"
);
}
if has_end_keyword {
return not_impl_err!("Transaction with END keyword not supported");
}
self.validate_transaction_kind(transaction.as_ref())?;
let isolation_level: ast::TransactionIsolationLevel = modes
.iter()
.filter_map(|m: &TransactionMode| match m {
TransactionMode::AccessMode(_) => None,
TransactionMode::IsolationLevel(level) => Some(level),
})
.next_back()
.copied()
.unwrap_or(ast::TransactionIsolationLevel::Serializable);
let access_mode: ast::TransactionAccessMode = modes
.iter()
.filter_map(|m: &TransactionMode| match m {
TransactionMode::AccessMode(mode) => Some(mode),
TransactionMode::IsolationLevel(_) => None,
})
.next_back()
.copied()
.unwrap_or(ast::TransactionAccessMode::ReadWrite);
let isolation_level = match isolation_level {
ast::TransactionIsolationLevel::ReadUncommitted => {
TransactionIsolationLevel::ReadUncommitted
}
ast::TransactionIsolationLevel::ReadCommitted => {
TransactionIsolationLevel::ReadCommitted
}
ast::TransactionIsolationLevel::RepeatableRead => {
TransactionIsolationLevel::RepeatableRead
}
ast::TransactionIsolationLevel::Serializable => {
TransactionIsolationLevel::Serializable
}
ast::TransactionIsolationLevel::Snapshot => {
TransactionIsolationLevel::Snapshot
}
};
let access_mode = match access_mode {
ast::TransactionAccessMode::ReadOnly => {
TransactionAccessMode::ReadOnly
}
ast::TransactionAccessMode::ReadWrite => {
TransactionAccessMode::ReadWrite
}
};
let statement = PlanStatement::TransactionStart(TransactionStart {
access_mode,
isolation_level,
});
Ok(LogicalPlan::Statement(statement))
}
Statement::Commit {
chain,
end,
modifier,
} => {
if end {
return not_impl_err!("COMMIT AND END not supported");
};
if let Some(modifier) = modifier {
return not_impl_err!("COMMIT {modifier} not supported");
};
let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Commit,
chain,
});
Ok(LogicalPlan::Statement(statement))
}
Statement::Rollback { chain, savepoint } => {
if savepoint.is_some() {
plan_err!("Savepoints not supported")?;
}
let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Rollback,
chain,
});
Ok(LogicalPlan::Statement(statement))
}
Statement::CreateFunction(ast::CreateFunction {
or_replace,
temporary,
name,
args,
return_type,
function_body,
behavior,
language,
..
}) => {
let return_type = match return_type {
Some(t) => Some(self.convert_data_type_to_field(&t)?),
None => None,
};
let mut planner_context = PlannerContext::new();
let empty_schema = &DFSchema::empty();
let args = match args {
Some(function_args) => {
let function_args = function_args
.into_iter()
.map(|arg| {
let data_type =
self.convert_data_type_to_field(&arg.data_type)?;
let default_expr = match arg.default_expr {
Some(expr) => Some(self.sql_to_expr(
expr,
empty_schema,
&mut planner_context,
)?),
None => None,
};
Ok(OperateFunctionArg {
name: arg.name,
default_expr,
data_type: data_type.data_type().clone(),
})
})
.collect::<Result<Vec<OperateFunctionArg>>>();
Some(function_args?)
}
None => None,
};
// Validate default arguments
let first_default = match args.as_ref() {
Some(arg) => arg.iter().position(|t| t.default_expr.is_some()),
None => None,
};
let last_non_default = match args.as_ref() {
Some(arg) => arg
.iter()
.rev()
.position(|t| t.default_expr.is_none())
.map(|reverse_pos| arg.len() - reverse_pos - 1),
None => None,
};
if let (Some(pos_default), Some(pos_non_default)) =
(first_default, last_non_default)
&& pos_non_default > pos_default
{
return plan_err!(
"Non-default arguments cannot follow default arguments."
);
}
// At the moment functions can't be qualified `schema.name`
let name = match &name.0[..] {
[] => exec_err!("Function should have name")?,
[n] => n.as_ident().unwrap().value.clone(),
[..] => not_impl_err!("Qualified functions are not supported")?,
};
//
// Convert resulting expression to data fusion expression
//
let arg_types = args.as_ref().map(|arg| {
arg.iter()
.map(|t| {
let name = match t.name.clone() {
Some(name) => name.value,
None => "".to_string(),
};
Arc::new(Field::new(name, t.data_type.clone(), true))
})
.collect::<Vec<_>>()
});
// Validate parameter style
if let Some(ref fields) = arg_types {
let count_positional =
fields.iter().filter(|f| f.name() == "").count();
if !(count_positional == 0 || count_positional == fields.len()) {
return plan_err!(
"All function arguments must use either named or positional style."
);
}
}
let mut planner_context = PlannerContext::new()
.with_prepare_param_data_types(arg_types.unwrap_or_default());
let function_body = match function_body {
Some(r) => Some(self.sql_to_expr(
match r {
// `link_symbol` indicates if the primary expression contains the name of shared library file.
ast::CreateFunctionBody::AsBeforeOptions{body: expr, link_symbol: _link_symbol} => expr,
ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
ast::CreateFunctionBody::Return(expr) => expr,
ast::CreateFunctionBody::AsBeginEnd(_) => {
return not_impl_err!(
"BEGIN/END enclosed function body syntax is not supported"
)?;
}
ast::CreateFunctionBody::AsReturnExpr(_)
| ast::CreateFunctionBody::AsReturnSelect(_) => {
return not_impl_err!(
"AS RETURN function syntax is not supported"
)?
}
},
&DFSchema::empty(),
&mut planner_context,
)?),
None => None,
};
let params = CreateFunctionBody {
language,
behavior: behavior.map(|b| match b {
ast::FunctionBehavior::Immutable => Volatility::Immutable,
ast::FunctionBehavior::Stable => Volatility::Stable,
ast::FunctionBehavior::Volatile => Volatility::Volatile,
}),
function_body,
};
let statement = DdlStatement::CreateFunction(CreateFunction {
or_replace,
temporary,
name,
return_type: return_type.map(|f| f.data_type().clone()),
args,
params,
schema: DFSchemaRef::new(DFSchema::empty()),
});
Ok(LogicalPlan::Ddl(statement))
}
Statement::DropFunction(ast::DropFunction {
if_exists,
func_desc,
drop_behavior: _,
}) => {
// According to postgresql documentation it can be only one function
// specified in drop statement
if let Some(desc) = func_desc.first() {
// At the moment functions can't be qualified `schema.name`
let name = match &desc.name.0[..] {
[] => exec_err!("Function should have name")?,
[n] => n.as_ident().unwrap().value.clone(),
[..] => not_impl_err!("Qualified functions are not supported")?,
};
let statement = DdlStatement::DropFunction(DropFunction {
if_exists,
name,
schema: DFSchemaRef::new(DFSchema::empty()),
});
Ok(LogicalPlan::Ddl(statement))
} else {
exec_err!("Function name not provided")
}
}
Statement::Truncate(ast::Truncate {
table_names,
partitions,
identity,
cascade,
on_cluster,
table,
}) => {
let _ = table; // Support TRUNCATE TABLE and TRUNCATE syntax
if table_names.len() != 1 {
return not_impl_err!(
"TRUNCATE with multiple tables is not supported"
);
}
let target = &table_names[0];
if target.only {
return not_impl_err!("TRUNCATE with ONLY is not supported");
}
if partitions.is_some() {
return not_impl_err!("TRUNCATE with PARTITION is not supported");
}
if identity.is_some() {
return not_impl_err!(
"TRUNCATE with RESTART/CONTINUE IDENTITY is not supported"
);
}
if cascade.is_some() {
return not_impl_err!(
"TRUNCATE with CASCADE/RESTRICT is not supported"
);
}
if on_cluster.is_some() {
return not_impl_err!("TRUNCATE with ON CLUSTER is not supported");
}
let table = self.object_name_to_table_reference(target.name.clone())?;
let source = self.context_provider.get_table_source(table.clone())?;
// TRUNCATE does not operate on input rows. The EmptyRelation is a logical placeholder
// since the real operation is executed directly by the TableProvider's truncate() hook.
Ok(LogicalPlan::Dml(DmlStatement::new(
table.clone(),
source,
WriteOp::Truncate,
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(DFSchema::empty()),
})),
)))
}
Statement::CreateIndex(CreateIndex {
name,
table_name,
using,
columns,
unique,
if_not_exists,
..
}) => {
let name: Option<String> = name.as_ref().map(object_name_to_string);
let table = self.object_name_to_table_reference(table_name)?;
let table_schema = self
.context_provider
.get_table_source(table.clone())?
.schema()
.to_dfschema_ref()?;
let using: Option<String> =
using.as_ref().map(|index_type| match index_type {
IndexType::Custom(ident) => ident_to_string(ident),
_ => index_type.to_string().to_ascii_lowercase(),
});
let order_by_exprs: Vec<OrderByExpr> =
columns.into_iter().map(|col| col.column).collect();
let columns = self.order_by_to_sort_expr(
order_by_exprs,
&table_schema,
planner_context,
false,
None,
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
PlanCreateIndex {
name,
table,
using,
columns,
unique,
if_not_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
},
)))
}
stmt => {
not_impl_err!("Unsupported SQL statement: {stmt}")
}
}
}
fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
let mut from = match from {
FromTable::WithFromKeyword(v) => v,
FromTable::WithoutKeyword(v) => v,
};
if from.len() != 1 {
return not_impl_err!(
"DELETE FROM only supports single table, got {}: {from:?}",
from.len()
);
}
let table_factor = from.pop().unwrap();
if !table_factor.joins.is_empty() {
return not_impl_err!("DELETE FROM only supports single table, got: joins");
}
let TableFactor::Table { name, .. } = table_factor.relation else {
return not_impl_err!(
"DELETE FROM only supports single table, got: {table_factor:?}"
);
};
Ok(name)
}
/// Generate a logical plan from a "SHOW TABLES" query
fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
if self.has_table("information_schema", "tables") {
let query = "SELECT * FROM information_schema.tables;";
let mut rewrite = DFParser::parse_sql(query)?;
assert_eq!(rewrite.len(), 1);
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
} else {
plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
}
}
fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
let table_ref = self.object_name_to_table_reference(table_name)?;
let table_source = self.context_provider.get_table_source(table_ref)?;
let schema = table_source.schema();
let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
Ok(LogicalPlan::DescribeTable(DescribeTable {
schema,
output_schema: Arc::new(output_schema),
}))
}
fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
let schema = Arc::new(plan.schema().as_arrow().clone());
let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
Ok(LogicalPlan::DescribeTable(DescribeTable {
schema,
output_schema: Arc::new(output_schema),
}))
}
fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
// Determine if source is table or query and handle accordingly
let copy_source = statement.source;
let (input, input_schema, table_ref) = match copy_source {
CopyToSource::Relation(object_name) => {
let table_name = object_name_to_string(&object_name);
let table_ref = self.object_name_to_table_reference(object_name)?;
let table_source =
self.context_provider.get_table_source(table_ref.clone())?;
let plan =
LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
let input_schema = Arc::clone(plan.schema());
(plan, input_schema, Some(table_ref))
}
CopyToSource::Query(query) => {
let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
let input_schema = Arc::clone(plan.schema());
(plan, input_schema, None)
}
};
let options_map = self.parse_options_map(statement.options, true)?;
let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
self.context_provider.get_file_type(stored_as).ok()
} else {
None
};
let file_type = match maybe_file_type {
Some(ft) => ft,
None => {
let e = || {
DataFusionError::Configuration(
"Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
.to_string(),
)
};
// Try to infer file format from file extension
let extension: &str = &Path::new(&statement.target)
.extension()
.ok_or_else(e)?
.to_str()
.ok_or_else(e)?
.to_lowercase();
self.context_provider.get_file_type(extension)?
}
};
let partition_by = statement
.partitioned_by
.iter()
.map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|f| f.name().to_owned())
.collect();
Ok(LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
statement.target,
partition_by,
file_type,
options_map,
)))
}
fn build_order_by(
&self,
order_exprs: Vec<LexOrdering>,
schema: &DFSchemaRef,
planner_context: &mut PlannerContext,
) -> Result<Vec<Vec<SortExpr>>> {
if !order_exprs.is_empty() && schema.fields().is_empty() {
let results = order_exprs
.iter()
.map(|lex_order| {
let result = lex_order
.iter()
.map(|order_by_expr| {
let ordered_expr = &order_by_expr.expr;
let ordered_expr = ordered_expr.to_owned();
let ordered_expr = self.sql_expr_to_logical_expr(
ordered_expr,
schema,
planner_context,
)?;
let asc = order_by_expr.options.asc.unwrap_or(true);
let nulls_first =
order_by_expr.options.nulls_first.unwrap_or_else(|| {
self.options.default_null_ordering.nulls_first(asc)
});
Ok(SortExpr::new(ordered_expr, asc, nulls_first))
})
.collect::<Result<Vec<SortExpr>>>()?;
Ok(result)
})
.collect::<Result<Vec<Vec<SortExpr>>>>()?;
return Ok(results);
}
let mut all_results = vec![];
for expr in order_exprs {
// Convert each OrderByExpr to a SortExpr:
let expr_vec =
self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
// Verify that columns of all SortExprs exist in the schema:
for sort in expr_vec.iter() {
for column in sort.expr.column_refs().iter() {
if !schema.has_column(column) {
// Return an error if any column is not in the schema:
return plan_err!("Column {column} is not in schema");
}
}
}
// If all SortExprs are valid, return them as an expression vector
all_results.push(expr_vec)
}
Ok(all_results)
}
/// Generate a logical plan from a CREATE EXTERNAL TABLE statement
fn external_table_to_plan(
&self,
statement: CreateExternalTable,
) -> Result<LogicalPlan> {
let definition = Some(statement.to_string());
let CreateExternalTable {
name,
columns,
file_type,
location,
table_partition_cols,
if_not_exists,
temporary,
order_exprs,
unbounded,
options,
constraints,
or_replace,
} = statement;
// Merge inline constraints and existing constraints
let mut all_constraints = constraints;
let inline_constraints = calc_inline_constraints_from_columns(&columns);
all_constraints.extend(inline_constraints);
let options_map = self.parse_options_map(options, false)?;
let compression = options_map
.get("format.compression")
.map(|c| CompressionTypeVariant::from_str(c))
.transpose()?;
if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
&& compression
.map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
.unwrap_or(false)
{
plan_err!(
"File compression type cannot be set for PARQUET, AVRO, or ARROW files."
)?;
}
let mut planner_context = PlannerContext::new();
let column_defaults = self
.build_column_defaults(&columns, &mut planner_context)?
.into_iter()
.collect();
let schema = self.build_schema(columns)?;
let df_schema = schema.to_dfschema_ref()?;
df_schema.check_names()?;
let ordered_exprs =
self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
let name = self.object_name_to_table_reference(name)?;
let constraints =
self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
PlanCreateExternalTable::builder(name, location, file_type, df_schema)
.with_partition_cols(table_partition_cols)
.with_if_not_exists(if_not_exists)
.with_or_replace(or_replace)
.with_temporary(temporary)
.with_definition(definition)
.with_order_exprs(ordered_exprs)
.with_unbounded(unbounded)
.with_options(options_map)
.with_constraints(constraints)
.with_column_defaults(column_defaults)
.build(),
)))
}
/// Get the indices of the constraint columns in the schema.
/// If any column is not found, return an error.
fn get_constraint_column_indices(
&self,
df_schema: &DFSchemaRef,
columns: &[IndexColumn],
constraint_name: &str,
) -> Result<Vec<usize>> {
let field_names = df_schema.field_names();
columns
.iter()
.map(|index_column| {
let expr = &index_column.column.expr;
let ident = if let SQLExpr::Identifier(ident) = expr {
ident
} else {
return Err(plan_datafusion_err!(
"Column name for {constraint_name} must be an identifier: {expr}"
));
};
let column = self.ident_normalizer.normalize(ident.clone());
field_names
.iter()
.position(|item| *item == column)
.ok_or_else(|| {
plan_datafusion_err!(
"Column for {constraint_name} not found in schema: {column}"
)
})
})
.collect::<Result<Vec<_>>>()
}
/// Convert each [TableConstraint] to corresponding [Constraint]
pub fn new_constraint_from_table_constraints(
&self,
constraints: &[TableConstraint],
df_schema: &DFSchemaRef,
) -> Result<Constraints> {
let constraints = constraints
.iter()
.map(|c: &TableConstraint| match c {
TableConstraint::Unique(UniqueConstraint {
name,
index_name: _,
index_type_display: _,
index_type: _,
columns,
index_options: _,
characteristics: _,
nulls_distinct: _,
}) => {
let constraint_name = match &name {
Some(name) => &format!("unique constraint with name '{name}'"),
None => "unique constraint",
};
// Get unique constraint indices in the schema
let indices = self.get_constraint_column_indices(
df_schema,
columns,
constraint_name,
)?;
Ok(Constraint::Unique(indices))
}
TableConstraint::PrimaryKey(PrimaryKeyConstraint {
name: _,
index_name: _,
index_type: _,
columns,
index_options: _,
characteristics: _,
}) => {
// Get primary key indices in the schema
let indices = self.get_constraint_column_indices(
df_schema,
columns,
"primary key",
)?;
Ok(Constraint::PrimaryKey(indices))
}
TableConstraint::ForeignKey { .. } => {
_plan_err!("Foreign key constraints are not currently supported")
}
TableConstraint::Check { .. } => {
_plan_err!("Check constraints are not currently supported")
}
TableConstraint::Index { .. } => {
_plan_err!("Indexes are not currently supported")
}
TableConstraint::FulltextOrSpatial { .. } => {
_plan_err!("Indexes are not currently supported")
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new_unverified(constraints))
}
fn parse_options_map(
&self,
options: Vec<(String, Value)>,
allow_duplicates: bool,
) -> Result<HashMap<String, String>> {
let mut options_map = HashMap::new();
for (key, value) in options {
if !allow_duplicates && options_map.contains_key(&key) {
return plan_err!("Option {key} is specified multiple times");
}
let Some(value_string) = crate::utils::value_to_string(&value) else {
return plan_err!("Unsupported Value {}", value);
};
if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{key}");
options_map.insert(renamed_key.to_lowercase(), value_string);
} else {
options_map.insert(key.to_lowercase(), value_string);
}
}
Ok(options_map)
}
/// Generate a plan for EXPLAIN ... that will print out a plan
///
/// Note this is the sqlparser explain statement, not the
/// datafusion `EXPLAIN` statement.
fn explain_to_plan(
&self,
verbose: bool,
analyze: bool,
format: Option<String>,
statement: DFStatement,
) -> Result<LogicalPlan> {
let plan = self.statement_to_plan(statement)?;
if matches!(plan, LogicalPlan::Explain(_)) {
return plan_err!("Nested EXPLAINs are not supported");
}
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
let schema = schema.to_dfschema_ref()?;
if verbose && format.is_some() {
return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
}
if analyze {
if format.is_some() {
return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
}
Ok(LogicalPlan::Analyze(Analyze {
verbose,
input: plan,
schema,
}))
} else {
let stringified_plans =
vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
// default to configuration value
// verbose mode only supports indent format
let options = self.context_provider.options();
let format = if verbose {
ExplainFormat::Indent
} else if let Some(format) = format {
ExplainFormat::from_str(&format)?
} else {
options.explain.format.clone()
};
Ok(LogicalPlan::Explain(Explain {
verbose,
explain_format: format,
plan,
stringified_plans,
schema,
logical_optimization_succeeded: false,
}))
}
}
fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
if !self.has_table("information_schema", "df_settings") {
return plan_err!(
"SHOW [VARIABLE] is not supported unless information_schema is enabled"
);
}
let verbose = variable
.last()
.map(|s| ident_to_string(s) == "verbose")
.unwrap_or(false);
let mut variable_vec = variable.to_vec();
let mut columns: String = "name, value".to_owned();
if verbose {
columns = format!("{columns}, description");
variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
}
let variable = object_name_to_string(&ObjectName::from(variable_vec));
let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
let query = if variable == "all" {
// Add an ORDER BY so the output comes out in a consistent order
format!("{base_query} ORDER BY name")
} else if variable == "timezone" || variable == "time.zone" {
// we could introduce alias in OptionDefinition if this string matching thing grows
format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
} else {
// These values are what are used to make the information_schema table, so we just
// check here, before actually planning or executing the query, if it would produce no
// results, and error preemptively if it would (for a better UX)
let is_valid_variable = self
.context_provider
.options()
.entries()
.iter()
.any(|opt| opt.key == variable);
// Check if it's a runtime variable
let is_runtime_variable = variable.starts_with("datafusion.runtime.");
if !is_valid_variable && !is_runtime_variable {
return plan_err!(
"'{variable}' is not a variable which can be viewed with 'SHOW'"
);
}
format!("{base_query} WHERE name = '{variable}'")
};
let mut rewrite = DFParser::parse_sql(&query)?;
assert_eq!(rewrite.len(), 1);
self.statement_to_plan(rewrite.pop_front().unwrap())
}
fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
match statement {
Set::SingleAssignment {
scope,
hivevar,
variable,
values,
} => {
if scope.is_some() {
return not_impl_err!("SET with scope modifiers is not supported");
}
if hivevar {
return not_impl_err!("SET HIVEVAR is not supported");
}
let variable = object_name_to_string(&variable);
let mut variable_lower = variable.to_lowercase();
// Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
if variable_lower == "timezone" || variable_lower == "time.zone" {
variable_lower = "datafusion.execution.time_zone".to_string();
}
if values.len() != 1 {
return plan_err!("SET only supports single value assignment");
}
let value_string = match &values[0] {
SQLExpr::Identifier(i) => ident_to_string(i),
SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
None => {
return plan_err!("Unsupported value {:?}", v.value);
}
Some(s) => s,
},
SQLExpr::UnaryOp { op, expr } => match op {
UnaryOperator::Plus => format!("+{expr}"),
UnaryOperator::Minus => format!("-{expr}"),
_ => return plan_err!("Unsupported unary op {:?}", op),
},
_ => return plan_err!("Unsupported expr {:?}", values[0]),
};
Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
SetVariable {
variable: variable_lower,
value: value_string,
},
)))
}
other => not_impl_err!("SET variant not implemented yet: {other:?}"),
}
}
fn reset_statement_to_plan(&self, statement: ResetStatement) -> Result<LogicalPlan> {
match statement {
ResetStatement::Variable(variable) => {
let variable = object_name_to_string(&variable);
let mut variable_lower = variable.to_lowercase();
// Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
if variable_lower == "timezone" || variable_lower == "time.zone" {
variable_lower = "datafusion.execution.time_zone".to_string();
}
Ok(LogicalPlan::Statement(PlanStatement::ResetVariable(
ResetVariable {
variable: variable_lower,
},
)))
}
}
}
fn delete_to_plan(
&self,
table_name: &ObjectName,
predicate_expr: Option<SQLExpr>,
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(table_name.clone())?;
let table_source = self.context_provider.get_table_source(table_ref.clone())?;
let schema = DFSchema::try_from_qualified_schema(
table_ref.clone(),
&table_source.schema(),
)?;
let scan =
LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
.build()?;
let mut planner_context = PlannerContext::new();
let source = match predicate_expr {
None => scan,
Some(predicate_expr) => {
let filter_expr =
self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
let schema = Arc::new(schema);
let mut using_columns = HashSet::new();
expr_to_columns(&filter_expr, &mut using_columns)?;
let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
filter_expr,
&[&[&schema]],
&[using_columns],
)?;
LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
}
};
let plan = LogicalPlan::Dml(DmlStatement::new(
table_ref,
table_source,
WriteOp::Delete,
Arc::new(source),
));
Ok(plan)
}
fn update_to_plan(
&self,
table: TableWithJoins,
assignments: &[Assignment],
from: Option<TableWithJoins>,
predicate_expr: Option<SQLExpr>,
) -> Result<LogicalPlan> {
let (table_name, table_alias) = match &table.relation {
TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
_ => plan_err!("Cannot update non-table relation!")?,
};
// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let table_source = self.context_provider.get_table_source(table_name.clone())?;
let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
table_name.clone(),
&table_source.schema(),
)?);
// Overwrite with assignment expressions
let mut planner_context = PlannerContext::new();
let mut assign_map = assignments
.iter()
.map(|assign| {
let cols = match &assign.target {
AssignmentTarget::ColumnName(cols) => cols,
_ => plan_err!("Tuples are not supported")?,
};
let col_name: &Ident = cols
.0
.iter()
.last()
.ok_or_else(|| plan_datafusion_err!("Empty column id"))?
.as_ident()
.unwrap();
// Validate that the assignment target column exists
table_schema.field_with_unqualified_name(&col_name.value)?;
Ok((col_name.value.clone(), assign.value.clone()))
})
.collect::<Result<HashMap<String, SQLExpr>>>()?;
// Build scan, join with from table if it exists.
let mut input_tables = vec![table];
input_tables.extend(from);
let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
// Filter
let source = match predicate_expr {
None => scan,
Some(predicate_expr) => {
let filter_expr = self.sql_to_expr(
predicate_expr,
scan.schema(),
&mut planner_context,
)?;
let mut using_columns = HashSet::new();
expr_to_columns(&filter_expr, &mut using_columns)?;
let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
filter_expr,
&[&[scan.schema()]],
&[using_columns],
)?;
LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
}
};
// Build updated values for each column, using the previous value if not modified
let exprs = table_schema
.iter()
.map(|(qualifier, field)| {
let expr = match assign_map.remove(field.name()) {
Some(new_value) => {
let mut expr = self.sql_to_expr(
new_value,
source.schema(),
&mut planner_context,
)?;
// Update placeholder's datatype to the type of the target column
if let Expr::Placeholder(placeholder) = &mut expr {
placeholder.field = placeholder
.field
.take()
.or_else(|| Some(Arc::clone(field)));
}
// Cast to target column type, if necessary
expr.cast_to(field.data_type(), source.schema())?
}
None => {
// If the target table has an alias, use it to qualify the column name
if let Some(alias) = &table_alias {
Expr::Column(Column::new(
Some(self.ident_normalizer.normalize(alias.name.clone())),
field.name(),
))
} else {
Expr::Column(Column::from((qualifier, field)))
}
}
};
Ok(expr.alias(field.name()))
})
.collect::<Result<Vec<_>>>()?;
let source = project(source, exprs)?;
let plan = LogicalPlan::Dml(DmlStatement::new(
table_name,
table_source,
WriteOp::Update,
Arc::new(source),
));
Ok(plan)
}
fn insert_to_plan(
&self,
table_name: ObjectName,
columns: Vec<Ident>,
source: Box<Query>,
overwrite: bool,
replace_into: bool,
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let table_source = self.context_provider.get_table_source(table_name.clone())?;
let table_schema = DFSchema::try_from(table_source.schema())?;
// Get insert fields and target table's value indices
//
// If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
// derived from the j-th output of the source.
//
// If value_indices[i] = None, it means that the value of the i-th target table's column is
// not provided, and should be filled with a default value later.
let (fields, value_indices) = if columns.is_empty() {
// Empty means we're inserting into all columns of the table
(
table_schema.fields().clone(),
(0..table_schema.fields().len())
.map(Some)
.collect::<Vec<_>>(),
)
} else {
let mut value_indices = vec![None; table_schema.fields().len()];
let fields = columns
.into_iter()
.enumerate()
.map(|(i, c)| {
let c = self.ident_normalizer.normalize(c);
let column_index = table_schema
.index_of_column_by_name(None, &c)
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
if value_indices[column_index].is_some() {
return schema_err!(SchemaError::DuplicateUnqualifiedField {
name: c,
});
} else {
value_indices[column_index] = Some(i);
}
Ok(Arc::clone(table_schema.field(column_index)))
})
.collect::<Result<Vec<_>>>()?;
(Fields::from(fields), value_indices)
};
// infer types for Values clause... other types should be resolvable the regular way
let mut prepare_param_data_types = BTreeMap::new();
if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
for row in rows.iter() {
for (idx, val) in row.iter().enumerate() {
if let SQLExpr::Value(ValueWithSpan {
value: Value::Placeholder(name),
span: _,
}) = val
{
let name =
name.replace('$', "").parse::<usize>().map_err(|_| {
plan_datafusion_err!("Can't parse placeholder: {name}")
})? - 1;
let field = fields.get(idx).ok_or_else(|| {
plan_datafusion_err!(
"Placeholder ${} refers to a non existent column",
idx + 1
)
})?;
let _ = prepare_param_data_types.insert(name, Arc::clone(field));
}
}
}
}
let prepare_param_data_types = prepare_param_data_types.into_values().collect();
// Projection
let mut planner_context =
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
planner_context.set_table_schema(Some(DFSchemaRef::new(
DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
)));
let source = self.query_to_plan(*source, &mut planner_context)?;
if fields.len() != source.schema().fields().len() {
plan_err!("Column count doesn't match insert query!")?;
}
let exprs = value_indices
.into_iter()
.enumerate()
.map(|(i, value_index)| {
let target_field = table_schema.field(i);
let expr = match value_index {
Some(v) => {
Expr::Column(Column::from(source.schema().qualified_field(v)))
.cast_to(target_field.data_type(), source.schema())?
}
// The value is not specified. Fill in the default value for the column.
None => table_source
.get_column_default(target_field.name())
.cloned()
.unwrap_or_else(|| {
// If there is no default for the column, then the default is NULL
Expr::Literal(ScalarValue::Null, None)
})
.cast_to(target_field.data_type(), &DFSchema::empty())?,
};
Ok(expr.alias(target_field.name()))
})
.collect::<Result<Vec<Expr>>>()?;
let source = project(source, exprs)?;
let insert_op = match (overwrite, replace_into) {
(false, false) => InsertOp::Append,
(true, false) => InsertOp::Overwrite,
(false, true) => InsertOp::Replace,
(true, true) => plan_err!(
"Conflicting insert operations: `overwrite` and `replace_into` cannot both be true"
)?,
};
let plan = LogicalPlan::Dml(DmlStatement::new(
table_name,
Arc::clone(&table_source),
WriteOp::Insert(insert_op),
Arc::new(source),
));
Ok(plan)
}
fn show_columns_to_plan(
&self,
extended: bool,
full: bool,
sql_table_name: ObjectName,
) -> Result<LogicalPlan> {
// Figure out the where clause
let where_clause = object_name_to_qualifier(
&sql_table_name,
self.options.enable_ident_normalization,
)?;
if !self.has_table("information_schema", "columns") {
return plan_err!(
"SHOW COLUMNS is not supported unless information_schema is enabled"
);
}
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
let _ = self.context_provider.get_table_source(table_ref)?;
// Treat both FULL and EXTENDED as the same
let select_list = if full || extended {
"*"
} else {
"table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
};
let query = format!(
"SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
);
let mut rewrite = DFParser::parse_sql(&query)?;
assert_eq!(rewrite.len(), 1);
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
}
/// Rewrite `SHOW FUNCTIONS` to another SQL query
/// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
///
/// The output columns:
/// - function_name: The name of function
/// - return_type: The return type of the function
/// - parameters: The name of parameters (ordered by the ordinal position)
/// - parameter_types: The type of parameters (ordered by the ordinal position)
/// - description: The description of the function (the description defined in the document)
/// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
fn show_functions_to_plan(
&self,
filter: Option<ShowStatementFilter>,
) -> Result<LogicalPlan> {
let where_clause = if let Some(filter) = filter {
match filter {
ShowStatementFilter::Like(like) => {
format!("WHERE p.function_name like '{like}'")
}
_ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
}
} else {
"".to_string()
};
let query = format!(
r#"
SELECT DISTINCT
p.*,
r.function_type function_type,
r.description description,
r.syntax_example syntax_example
FROM
(
SELECT
i.specific_name function_name,
o.data_type return_type,
array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
FROM (
SELECT
specific_catalog,
specific_schema,
specific_name,
ordinal_position,
parameter_name,
data_type,
rid
FROM
information_schema.parameters
WHERE
parameter_mode = 'IN'
) i
JOIN
(
SELECT
specific_catalog,
specific_schema,
specific_name,
ordinal_position,
parameter_name,
data_type,
rid
FROM
information_schema.parameters
WHERE
parameter_mode = 'OUT'
) o
ON i.specific_catalog = o.specific_catalog
AND i.specific_schema = o.specific_schema
AND i.specific_name = o.specific_name
AND i.rid = o.rid
GROUP BY 1, 2, i.rid
) as p
JOIN information_schema.routines r
ON p.function_name = r.routine_name
{where_clause}
"#
);
let mut rewrite = DFParser::parse_sql(&query)?;
assert_eq!(rewrite.len(), 1);
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
}
fn show_create_table_to_plan(
&self,
sql_table_name: ObjectName,
) -> Result<LogicalPlan> {
if !self.has_table("information_schema", "tables") {
return plan_err!(
"SHOW CREATE TABLE is not supported unless information_schema is enabled"
);
}
// Figure out the where clause
let where_clause = object_name_to_qualifier(
&sql_table_name,
self.options.enable_ident_normalization,
)?;
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
let _ = self.context_provider.get_table_source(table_ref)?;
let query = format!(
"SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
);
let mut rewrite = DFParser::parse_sql(&query)?;
assert_eq!(rewrite.len(), 1);
self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
}
/// Return true if there is a table provider available for "schema.table"
fn has_table(&self, schema: &str, table: &str) -> bool {
let tables_reference = TableReference::Partial {
schema: schema.into(),
table: table.into(),
};
self.context_provider
.get_table_source(tables_reference)
.is_ok()
}
fn validate_transaction_kind(
&self,
kind: Option<&BeginTransactionKind>,
) -> Result<()> {
match kind {
// BEGIN
None => Ok(()),
// BEGIN TRANSACTION
Some(BeginTransactionKind::Transaction) => Ok(()),
Some(BeginTransactionKind::Work) => {
not_impl_err!("Transaction kind not supported: {kind:?}")
}
}
}
}