blob: 520a2d55ef6a298e306ab40237a9a3c330f2cc3b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::vec;
use crate::utils::make_decimal_type;
use arrow::datatypes::*;
use datafusion_common::TableReference;
use datafusion_common::config::SqlParserOptions;
use datafusion_common::datatype::{DataTypeExt, FieldExt};
use datafusion_common::error::add_possible_columns_to_diag;
use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
use datafusion_common::{
DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
plan_datafusion_err,
};
use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
pub use datafusion_expr::planner::ContextProvider;
use datafusion_expr::utils::find_column_exprs;
use datafusion_expr::{Expr, col};
use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
/// SQL parser options
#[derive(Debug, Clone, Copy)]
pub struct ParserOptions {
/// Whether to parse float as decimal.
pub parse_float_as_decimal: bool,
/// Whether to normalize identifiers.
pub enable_ident_normalization: bool,
/// Whether to support varchar with length.
pub support_varchar_with_length: bool,
/// Whether to normalize options value.
pub enable_options_value_normalization: bool,
/// Whether to collect spans
pub collect_spans: bool,
/// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
pub map_string_types_to_utf8view: bool,
/// Default null ordering for sorting expressions.
pub default_null_ordering: NullOrdering,
}
impl ParserOptions {
/// Creates a new `ParserOptions` instance with default values.
///
/// # Examples
///
/// ```
/// use datafusion_sql::planner::ParserOptions;
/// let opts = ParserOptions::new();
/// assert_eq!(opts.parse_float_as_decimal, false);
/// assert_eq!(opts.enable_ident_normalization, true);
/// ```
pub fn new() -> Self {
Self {
parse_float_as_decimal: false,
enable_ident_normalization: true,
support_varchar_with_length: true,
map_string_types_to_utf8view: true,
enable_options_value_normalization: false,
collect_spans: false,
// By default, `nulls_max` is used to follow Postgres's behavior.
// postgres rule: https://www.postgresql.org/docs/current/queries-order.html
default_null_ordering: NullOrdering::NullsMax,
}
}
/// Sets the `parse_float_as_decimal` option.
///
/// # Examples
///
/// ```
/// use datafusion_sql::planner::ParserOptions;
/// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
/// assert_eq!(opts.parse_float_as_decimal, true);
/// ```
pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
self.parse_float_as_decimal = value;
self
}
/// Sets the `enable_ident_normalization` option.
///
/// # Examples
///
/// ```
/// use datafusion_sql::planner::ParserOptions;
/// let opts = ParserOptions::new().with_enable_ident_normalization(false);
/// assert_eq!(opts.enable_ident_normalization, false);
/// ```
pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
self.enable_ident_normalization = value;
self
}
/// Sets the `support_varchar_with_length` option.
pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
self.support_varchar_with_length = value;
self
}
/// Sets the `map_string_types_to_utf8view` option.
pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
self.map_string_types_to_utf8view = value;
self
}
/// Sets the `enable_options_value_normalization` option.
pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
self.enable_options_value_normalization = value;
self
}
/// Sets the `collect_spans` option.
pub fn with_collect_spans(mut self, value: bool) -> Self {
self.collect_spans = value;
self
}
/// Sets the `default_null_ordering` option.
pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
self.default_null_ordering = value;
self
}
}
impl Default for ParserOptions {
fn default() -> Self {
Self::new()
}
}
impl From<&SqlParserOptions> for ParserOptions {
fn from(options: &SqlParserOptions) -> Self {
Self {
parse_float_as_decimal: options.parse_float_as_decimal,
enable_ident_normalization: options.enable_ident_normalization,
support_varchar_with_length: options.support_varchar_with_length,
map_string_types_to_utf8view: options.map_string_types_to_utf8view,
enable_options_value_normalization: options
.enable_options_value_normalization,
collect_spans: options.collect_spans,
default_null_ordering: options.default_null_ordering.as_str().into(),
}
}
}
/// Represents the null ordering for sorting expressions.
#[derive(Debug, Clone, Copy)]
pub enum NullOrdering {
/// Nulls appear last in ascending order.
NullsMax,
/// Nulls appear first in descending order.
NullsMin,
/// Nulls appear first.
NullsFirst,
/// Nulls appear last.
NullsLast,
}
impl NullOrdering {
/// Evaluates the null ordering based on the given ascending flag.
///
/// # Returns
/// * `true` if nulls should appear first.
/// * `false` if nulls should appear last.
pub fn nulls_first(&self, asc: bool) -> bool {
match self {
Self::NullsMax => !asc,
Self::NullsMin => asc,
Self::NullsFirst => true,
Self::NullsLast => false,
}
}
}
impl FromStr for NullOrdering {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self> {
match s {
"nulls_max" => Ok(Self::NullsMax),
"nulls_min" => Ok(Self::NullsMin),
"nulls_first" => Ok(Self::NullsFirst),
"nulls_last" => Ok(Self::NullsLast),
_ => plan_err!(
"Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"
),
}
}
}
impl From<&str> for NullOrdering {
fn from(s: &str) -> Self {
Self::from_str(s).unwrap_or(Self::NullsMax)
}
}
/// Ident Normalizer
#[derive(Debug)]
pub struct IdentNormalizer {
normalize: bool,
}
impl Default for IdentNormalizer {
fn default() -> Self {
Self { normalize: true }
}
}
impl IdentNormalizer {
pub fn new(normalize: bool) -> Self {
Self { normalize }
}
pub fn normalize(&self, ident: Ident) -> String {
if self.normalize {
crate::utils::normalize_ident(ident)
} else {
ident.value
}
}
}
/// Struct to store the states used by the Planner. The Planner will leverage the states
/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
/// Parameter Data Types provided with PREPARE statement and the query schema of the
/// outer query plan.
///
/// # Cloning
///
/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
/// This helps resolve scoping issues of CTEs.
/// By using cloning, a subquery can inherit CTEs from the outer query
/// and can also define its own private CTEs without affecting the outer query.
#[derive(Debug, Clone)]
pub struct PlannerContext {
/// Data types for numbered parameters ($1, $2, etc), if supplied
/// in `PREPARE` statement
prepare_param_data_types: Arc<Vec<FieldRef>>,
/// Map of CTE name to logical plan of the WITH clause.
/// Use `Arc<LogicalPlan>` to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,
/// The query schema of the outer query plan, used to resolve the columns in subquery
outer_query_schema: Option<DFSchemaRef>,
/// The joined schemas of all FROM clauses planned so far. When planning LATERAL
/// FROM clauses, this should become a suffix of the `outer_query_schema`.
outer_from_schema: Option<DFSchemaRef>,
/// The query schema defined by the table
create_table_schema: Option<DFSchemaRef>,
}
impl Default for PlannerContext {
fn default() -> Self {
Self::new()
}
}
impl PlannerContext {
/// Create an empty PlannerContext
pub fn new() -> Self {
Self {
prepare_param_data_types: Arc::new(vec![]),
ctes: HashMap::new(),
outer_query_schema: None,
outer_from_schema: None,
create_table_schema: None,
}
}
/// Update the PlannerContext with provided prepare_param_data_types
pub fn with_prepare_param_data_types(
mut self,
prepare_param_data_types: Vec<FieldRef>,
) -> Self {
self.prepare_param_data_types = prepare_param_data_types.into();
self
}
// Return a reference to the outer query's schema
pub fn outer_query_schema(&self) -> Option<&DFSchema> {
self.outer_query_schema.as_ref().map(|s| s.as_ref())
}
/// Sets the outer query schema, returning the existing one, if
/// any
pub fn set_outer_query_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
) -> Option<DFSchemaRef> {
std::mem::swap(&mut self.outer_query_schema, &mut schema);
schema
}
pub fn set_table_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
) -> Option<DFSchemaRef> {
std::mem::swap(&mut self.create_table_schema, &mut schema);
schema
}
pub fn table_schema(&self) -> Option<DFSchemaRef> {
self.create_table_schema.clone()
}
// Return a clone of the outer FROM schema
pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
self.outer_from_schema.clone()
}
/// Sets the outer FROM schema, returning the existing one, if any
pub fn set_outer_from_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
) -> Option<DFSchemaRef> {
std::mem::swap(&mut self.outer_from_schema, &mut schema);
schema
}
/// Extends the FROM schema, returning the existing one, if any
pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
match self.outer_from_schema.as_mut() {
Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
None => self.outer_from_schema = Some(Arc::clone(schema)),
};
Ok(())
}
/// Return the types of parameters (`$1`, `$2`, etc) if known
pub fn prepare_param_data_types(&self) -> &[FieldRef] {
&self.prepare_param_data_types
}
/// Returns true if there is a Common Table Expression (CTE) /
/// Subquery for the specified name
pub fn contains_cte(&self, cte_name: &str) -> bool {
self.ctes.contains_key(cte_name)
}
/// Inserts a LogicalPlan for the Common Table Expression (CTE) /
/// Subquery for the specified name
pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
let cte_name = cte_name.into();
self.ctes.insert(cte_name, Arc::new(plan));
}
/// Return a plan for the Common Table Expression (CTE) / Subquery for the
/// specified name
pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
self.ctes.get(cte_name).map(|cte| cte.as_ref())
}
/// Remove the plan of CTE / Subquery for the specified name
pub(super) fn remove_cte(&mut self, cte_name: &str) {
self.ctes.remove(cte_name);
}
}
/// SQL query planner and binder
///
/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
///
/// You can control the behavior of the planner by providing [`ParserOptions`].
///
/// It performs the following tasks:
///
/// 1. Name and type resolution (called "binding" in other systems). This
/// phase looks up table and column names using the [`ContextProvider`].
/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
///
/// It does not perform type coercion, or perform optimization, which are done
/// by subsequent passes.
///
/// Key interfaces are:
/// * [`Self::sql_statement_to_plan`]: Convert a statement
/// (e.g. `SELECT ...`) into a [`LogicalPlan`]
/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) ident_normalizer: IdentNormalizer,
}
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Create a new query planner.
///
/// The query planner derives the parser options from the context provider.
pub fn new(context_provider: &'a S) -> Self {
let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
Self::new_with_options(context_provider, parser_options)
}
/// Create a new query planner with the given parser options.
///
/// The query planner ignores the parser options from the context provider
/// and uses the given parser options instead.
pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
let ident_normalize = options.enable_ident_normalization;
SqlToRel {
context_provider,
options,
ident_normalizer: IdentNormalizer::new(ident_normalize),
}
}
pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
let mut fields = Vec::with_capacity(columns.len());
for column in columns {
let data_type = self.convert_data_type_to_field(&column.data_type)?;
let not_nullable = column
.options
.iter()
.any(|x| x.option == ColumnOption::NotNull);
fields.push(
data_type
.as_ref()
.clone()
.with_name(self.ident_normalizer.normalize(column.name))
.with_nullable(!not_nullable),
);
}
Ok(Schema::new(fields))
}
/// Returns a vector of (column_name, default_expr) pairs
pub(super) fn build_column_defaults(
&self,
columns: &Vec<SQLColumnDef>,
planner_context: &mut PlannerContext,
) -> Result<Vec<(String, Expr)>> {
let mut column_defaults = vec![];
// Default expressions are restricted, column references are not allowed
let empty_schema = DFSchema::empty();
let error_desc = |e: DataFusionError| match e {
DataFusionError::SchemaError(ref err, _)
if matches!(**err, SchemaError::FieldNotFound { .. }) =>
{
plan_datafusion_err!(
"Column reference is not allowed in the DEFAULT expression : {}",
e
)
}
_ => e,
};
for column in columns {
if let Some(default_sql_expr) =
column.options.iter().find_map(|o| match &o.option {
ColumnOption::Default(expr) => Some(expr),
_ => None,
})
{
let default_expr = self
.sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
.map_err(error_desc)?;
column_defaults.push((
self.ident_normalizer.normalize(column.name.clone()),
default_expr,
));
}
}
Ok(column_defaults)
}
/// Apply the given TableAlias to the input plan
pub(crate) fn apply_table_alias(
&self,
plan: LogicalPlan,
alias: TableAlias,
) -> Result<LogicalPlan> {
let idents = alias.columns.into_iter().map(|c| c.name).collect();
let plan = self.apply_expr_alias(plan, idents)?;
LogicalPlanBuilder::from(plan)
.alias(TableReference::bare(
self.ident_normalizer.normalize(alias.name),
))?
.build()
}
pub(crate) fn apply_expr_alias(
&self,
plan: LogicalPlan,
idents: Vec<Ident>,
) -> Result<LogicalPlan> {
if idents.is_empty() {
Ok(plan)
} else if idents.len() != plan.schema().fields().len() {
plan_err!(
"Source table contains {} columns but only {} \
names given as column alias",
plan.schema().fields().len(),
idents.len()
)
} else {
let fields = plan.schema().fields().clone();
LogicalPlanBuilder::from(plan)
.project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
col(field.name()).alias(self.ident_normalizer.normalize(ident))
}))?
.build()
}
}
/// Validate the schema provides all of the columns referenced in the expressions.
pub(crate) fn validate_schema_satisfies_exprs(
&self,
schema: &DFSchema,
exprs: &[Expr],
) -> Result<()> {
find_column_exprs(exprs)
.iter()
.try_for_each(|col| match col {
Expr::Column(col) => match &col.relation {
Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
None => {
if !schema.fields_with_unqualified_name(&col.name).is_empty() {
Ok(())
} else {
Err(field_not_found(
col.relation.clone(),
col.name.as_str(),
schema,
))
}
}
}
.map_err(|err: DataFusionError| match &err {
DataFusionError::SchemaError(inner, _)
if matches!(
inner.as_ref(),
SchemaError::FieldNotFound { .. }
) =>
{
let SchemaError::FieldNotFound {
field,
valid_fields,
} = inner.as_ref()
else {
unreachable!()
};
let mut diagnostic = if let Some(relation) = &col.relation {
Diagnostic::new_error(
format!(
"column '{}' not found in '{}'",
&col.name, relation
),
col.spans().first(),
)
} else {
Diagnostic::new_error(
format!("column '{}' not found", &col.name),
col.spans().first(),
)
};
add_possible_columns_to_diag(
&mut diagnostic,
field,
valid_fields,
);
err.with_diagnostic(diagnostic)
}
_ => err,
}),
_ => internal_err!("Not a column"),
})
}
pub(crate) fn convert_data_type_to_field(
&self,
sql_type: &SQLDataType,
) -> Result<FieldRef> {
// First check if any of the registered type_planner can handle this type
if let Some(type_planner) = self.context_provider.get_type_planner()
&& let Some(data_type) = type_planner.plan_type(sql_type)?
{
return Ok(data_type.into_nullable_field_ref());
}
// If no type_planner can handle this type, use the default conversion
match sql_type {
SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
// Arrays may be multi-dimensional.
Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
}
SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
inner_sql_type,
maybe_array_size,
)) => {
let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
if let Some(array_size) = maybe_array_size {
let array_size: i32 = (*array_size).try_into().map_err(|_| {
plan_datafusion_err!(
"Array size must be a positive 32 bit integer, got {array_size}"
)
})?;
Ok(inner_field.into_fixed_size_list(array_size))
} else {
Ok(inner_field.into_list())
}
}
SQLDataType::Array(ArrayElemTypeDef::None) => {
not_impl_err!("Arrays with unspecified type is not supported")
}
other => Ok(self
.convert_simple_data_type(other)?
.into_nullable_field_ref()),
}
}
fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
match sql_type {
SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
SQLDataType::TinyInt(_) => Ok(DataType::Int8),
SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
Ok(DataType::Int32)
}
SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
Ok(DataType::UInt16)
}
SQLDataType::IntUnsigned(_)
| SQLDataType::IntegerUnsigned(_)
| SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
SQLDataType::Varchar(length) => {
match (length, self.options.support_varchar_with_length) {
(Some(_), false) => plan_err!(
"does not support Varchar with length, \
please set `support_varchar_with_length` to be true"
),
_ => {
if self.options.map_string_types_to_utf8view {
Ok(DataType::Utf8View)
} else {
Ok(DataType::Utf8)
}
}
}
}
SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
Ok(DataType::UInt64)
}
SQLDataType::Float(_) => Ok(DataType::Float32),
SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
SQLDataType::Double(ExactNumberInfo::None)
| SQLDataType::DoublePrecision
| SQLDataType::Float8 => Ok(DataType::Float64),
SQLDataType::Double(
ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
) => {
not_impl_err!(
"Unsupported SQL type (precision/scale not supported) {sql_type}"
)
}
SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
if self.options.map_string_types_to_utf8view {
Ok(DataType::Utf8View)
} else {
Ok(DataType::Utf8)
}
}
SQLDataType::Timestamp(precision, tz_info)
if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
{
let tz = if matches!(tz_info, TimezoneInfo::Tz)
|| matches!(tz_info, TimezoneInfo::WithTimeZone)
{
// Timestamp With Time Zone
// INPUT : [SQLDataType] TimestampTz + [Config] Time Zone
// OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
self.context_provider.options().execution.time_zone.clone()
} else {
// Timestamp Without Time zone
None
};
let precision = match precision {
Some(0) => TimeUnit::Second,
Some(3) => TimeUnit::Millisecond,
Some(6) => TimeUnit::Microsecond,
None | Some(9) => TimeUnit::Nanosecond,
_ => unreachable!(),
};
Ok(DataType::Timestamp(precision, tz.map(Into::into)))
}
SQLDataType::Date => Ok(DataType::Date32),
SQLDataType::Time(None, tz_info) => {
if matches!(tz_info, TimezoneInfo::None)
|| matches!(tz_info, TimezoneInfo::WithoutTimeZone)
{
Ok(DataType::Time64(TimeUnit::Nanosecond))
} else {
// We don't support TIMETZ and TIME WITH TIME ZONE for now
not_impl_err!("Unsupported SQL type {sql_type}")
}
}
SQLDataType::Numeric(exact_number_info)
| SQLDataType::Decimal(exact_number_info) => {
let (precision, scale) = match *exact_number_info {
ExactNumberInfo::None => (None, None),
ExactNumberInfo::Precision(precision) => (Some(precision), None),
ExactNumberInfo::PrecisionAndScale(precision, scale) => {
(Some(precision), Some(scale))
}
};
make_decimal_type(precision, scale.map(|s| s as u64))
}
SQLDataType::Bytea => Ok(DataType::Binary),
SQLDataType::Interval { fields, precision } => {
if fields.is_some() || precision.is_some() {
return not_impl_err!("Unsupported SQL type {sql_type}");
}
Ok(DataType::Interval(IntervalUnit::MonthDayNano))
}
SQLDataType::Struct(fields, _) => {
let fields = fields
.iter()
.enumerate()
.map(|(idx, sql_struct_field)| {
let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
let field_name = match &sql_struct_field.field_name {
Some(ident) => ident.clone(),
None => Ident::new(format!("c{idx}")),
};
Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
})
.collect::<Result<Vec<_>>>()?;
Ok(DataType::Struct(Fields::from(fields)))
}
SQLDataType::Nvarchar(_)
| SQLDataType::JSON
| SQLDataType::Uuid
| SQLDataType::Binary(_)
| SQLDataType::Varbinary(_)
| SQLDataType::Blob(_)
| SQLDataType::Datetime(_)
| SQLDataType::Regclass
| SQLDataType::Custom(_, _)
| SQLDataType::Array(_)
| SQLDataType::Enum(_, _)
| SQLDataType::Set(_)
| SQLDataType::MediumInt(_)
| SQLDataType::MediumIntUnsigned(_)
| SQLDataType::Character(_)
| SQLDataType::CharacterVarying(_)
| SQLDataType::CharVarying(_)
| SQLDataType::CharacterLargeObject(_)
| SQLDataType::CharLargeObject(_)
| SQLDataType::Timestamp(_, _)
| SQLDataType::Time(Some(_), _)
| SQLDataType::Dec(_)
| SQLDataType::BigNumeric(_)
| SQLDataType::BigDecimal(_)
| SQLDataType::Clob(_)
| SQLDataType::Bytes(_)
| SQLDataType::Int64
| SQLDataType::Float64
| SQLDataType::JSONB
| SQLDataType::Unspecified
| SQLDataType::Int16
| SQLDataType::Int32
| SQLDataType::Int128
| SQLDataType::Int256
| SQLDataType::UInt8
| SQLDataType::UInt16
| SQLDataType::UInt32
| SQLDataType::UInt64
| SQLDataType::UInt128
| SQLDataType::UInt256
| SQLDataType::Float32
| SQLDataType::Date32
| SQLDataType::Datetime64(_, _)
| SQLDataType::FixedString(_)
| SQLDataType::Map(_, _)
| SQLDataType::Tuple(_)
| SQLDataType::Nested(_)
| SQLDataType::Union(_)
| SQLDataType::Nullable(_)
| SQLDataType::LowCardinality(_)
| SQLDataType::Trigger
| SQLDataType::TinyBlob
| SQLDataType::MediumBlob
| SQLDataType::LongBlob
| SQLDataType::TinyText
| SQLDataType::MediumText
| SQLDataType::LongText
| SQLDataType::Bit(_)
| SQLDataType::BitVarying(_)
| SQLDataType::Signed
| SQLDataType::SignedInteger
| SQLDataType::Unsigned
| SQLDataType::UnsignedInteger
| SQLDataType::AnyType
| SQLDataType::Table(_)
| SQLDataType::VarBit(_)
| SQLDataType::UTinyInt
| SQLDataType::USmallInt
| SQLDataType::HugeInt
| SQLDataType::UHugeInt
| SQLDataType::UBigInt
| SQLDataType::TimestampNtz{..}
| SQLDataType::NamedTable { .. }
| SQLDataType::TsVector
| SQLDataType::TsQuery
| SQLDataType::GeometricType(_)
| SQLDataType::DecimalUnsigned(_) // deprecated mysql type
| SQLDataType::FloatUnsigned(_) // deprecated mysql type
| SQLDataType::RealUnsigned // deprecated mysql type
| SQLDataType::DecUnsigned(_) // deprecated mysql type
| SQLDataType::DoubleUnsigned(_) // deprecated mysql type
| SQLDataType::DoublePrecisionUnsigned // deprecated mysql type
=> {
not_impl_err!("Unsupported SQL type {sql_type}")
}
}
}
pub(crate) fn object_name_to_table_reference(
&self,
object_name: ObjectName,
) -> Result<TableReference> {
object_name_to_table_reference(
object_name,
self.options.enable_ident_normalization,
)
}
}
/// Create a [`TableReference`] after normalizing the specified ObjectName
///
/// Examples
/// ```text
/// ['foo'] -> Bare { table: "foo" }
/// ['"foo.bar"]] -> Bare { table: "foo.bar" }
/// ['foo', 'Bar'] -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
/// ['foo', 'bar'] -> Partial { schema: "foo", table: "bar" }
/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
/// ```
pub fn object_name_to_table_reference(
object_name: ObjectName,
enable_normalization: bool,
) -> Result<TableReference> {
// Use destructure to make it clear no fields on ObjectName are ignored
let ObjectName(object_name_parts) = object_name;
let idents = object_name_parts
.into_iter()
.map(|object_name_part| {
object_name_part.as_ident().cloned().ok_or_else(|| {
plan_datafusion_err!(
"Expected identifier, but found: {:?}",
object_name_part
)
})
})
.collect::<Result<Vec<_>>>()?;
idents_to_table_reference(idents, enable_normalization)
}
struct IdentTaker {
normalizer: IdentNormalizer,
idents: Vec<Ident>,
}
/// Take the next identifier from the back of idents, panic'ing if
/// there are none left
impl IdentTaker {
fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
Self {
normalizer: IdentNormalizer::new(enable_normalization),
idents,
}
}
fn take(&mut self) -> String {
let ident = self.idents.pop().expect("no more identifiers");
self.normalizer.normalize(ident)
}
/// Returns the number of remaining identifiers
fn len(&self) -> usize {
self.idents.len()
}
}
// impl Display for a nicer error message
impl std::fmt::Display for IdentTaker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut first = true;
for ident in self.idents.iter() {
if !first {
write!(f, ".")?;
}
write!(f, "{ident}")?;
first = false;
}
Ok(())
}
}
/// Create a [`TableReference`] after normalizing the specified identifier
pub(crate) fn idents_to_table_reference(
idents: Vec<Ident>,
enable_normalization: bool,
) -> Result<TableReference> {
let mut taker = IdentTaker::new(idents, enable_normalization);
match taker.len() {
1 => {
let table = taker.take();
Ok(TableReference::bare(table))
}
2 => {
let table = taker.take();
let schema = taker.take();
Ok(TableReference::partial(schema, table))
}
3 => {
let table = taker.take();
let schema = taker.take();
let catalog = taker.take();
Ok(TableReference::full(catalog, schema, table))
}
_ => plan_err!(
"Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
taker,
taker.len()
),
}
}
/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
/// from the provided object identifiers (catalog, schema and table names).
pub fn object_name_to_qualifier(
sql_table_name: &ObjectName,
enable_normalization: bool,
) -> Result<String> {
let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
let normalizer = IdentNormalizer::new(enable_normalization);
sql_table_name
.0
.iter()
.rev()
.zip(columns)
.map(|(object_name_part, column_name)| {
object_name_part
.as_ident()
.map(|ident| {
format!(
r#"{} = '{}'"#,
column_name,
normalizer.normalize(ident.clone())
)
})
.ok_or_else(|| {
plan_datafusion_err!(
"Expected identifier, but found: {:?}",
object_name_part
)
})
})
.collect::<Result<Vec<_>>>()
.map(|parts| parts.join(" AND "))
}