| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Column |
| |
| use crate::error::{_schema_err, add_possible_columns_to_diag}; |
| use crate::utils::parse_identifiers_normalized; |
| use crate::utils::quote_identifier; |
| use crate::{DFSchema, Diagnostic, Result, SchemaError, Spans, TableReference}; |
| use arrow::datatypes::{Field, FieldRef}; |
| use std::collections::HashSet; |
| use std::fmt; |
| |
| /// A named reference to a qualified field in a schema. |
| #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] |
| pub struct Column { |
| /// relation/table reference. |
| pub relation: Option<TableReference>, |
| /// field/column name. |
| pub name: String, |
| /// Original source code location, if known |
| pub spans: Spans, |
| } |
| |
| impl fmt::Debug for Column { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("Column") |
| .field("relation", &self.relation) |
| .field("name", &self.name) |
| .finish() |
| } |
| } |
| |
| impl Column { |
| /// Create Column from optional qualifier and name. The optional qualifier, if present, |
| /// will be parsed and normalized by default. |
| /// |
| /// See full details on [`TableReference::parse_str`] |
| /// |
| /// [`TableReference::parse_str`]: crate::TableReference::parse_str |
| pub fn new( |
| relation: Option<impl Into<TableReference>>, |
| name: impl Into<String>, |
| ) -> Self { |
| Self { |
| relation: relation.map(|r| r.into()), |
| name: name.into(), |
| spans: Spans::new(), |
| } |
| } |
| |
| /// Convenience method for when there is no qualifier |
| pub fn new_unqualified(name: impl Into<String>) -> Self { |
| Self { |
| relation: None, |
| name: name.into(), |
| spans: Spans::new(), |
| } |
| } |
| |
| /// Create Column from unqualified name. |
| /// |
| /// Alias for `Column::new_unqualified` |
| pub fn from_name(name: impl Into<String>) -> Self { |
| Self { |
| relation: None, |
| name: name.into(), |
| spans: Spans::new(), |
| } |
| } |
| |
| /// Create a Column from multiple normalized identifiers |
| /// |
| /// For example, `foo.bar` would be represented as a two element vector |
| /// `["foo", "bar"]` |
| fn from_idents(mut idents: Vec<String>) -> Option<Self> { |
| let (relation, name) = match idents.len() { |
| 1 => (None, idents.remove(0)), |
| 2 => ( |
| Some(TableReference::Bare { |
| table: idents.remove(0).into(), |
| }), |
| idents.remove(0), |
| ), |
| 3 => ( |
| Some(TableReference::Partial { |
| schema: idents.remove(0).into(), |
| table: idents.remove(0).into(), |
| }), |
| idents.remove(0), |
| ), |
| 4 => ( |
| Some(TableReference::Full { |
| catalog: idents.remove(0).into(), |
| schema: idents.remove(0).into(), |
| table: idents.remove(0).into(), |
| }), |
| idents.remove(0), |
| ), |
| // any expression that failed to parse or has more than 4 period delimited |
| // identifiers will be treated as an unqualified column name |
| _ => return None, |
| }; |
| Some(Self { |
| relation, |
| name, |
| spans: Spans::new(), |
| }) |
| } |
| |
| /// Deserialize a fully qualified name string into a column |
| /// |
| /// Treats the name as a SQL identifier. For example |
| /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case) |
| /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR` |
| pub fn from_qualified_name(flat_name: impl Into<String>) -> Self { |
| let flat_name = flat_name.into(); |
| Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else( |
| || Self { |
| relation: None, |
| name: flat_name, |
| spans: Spans::new(), |
| }, |
| ) |
| } |
| |
| /// Deserialize a fully qualified name string into a column preserving column text case |
| #[cfg(feature = "sql")] |
| pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self { |
| let flat_name = flat_name.into(); |
| Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else( |
| || Self { |
| relation: None, |
| name: flat_name, |
| spans: Spans::new(), |
| }, |
| ) |
| } |
| |
| #[cfg(not(feature = "sql"))] |
| pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self { |
| Self::from_qualified_name(flat_name) |
| } |
| |
| /// return the column's name. |
| /// |
| /// Note: This ignores the relation and returns the column name only. |
| pub fn name(&self) -> &str { |
| &self.name |
| } |
| |
| /// Serialize column into a flat name string |
| pub fn flat_name(&self) -> String { |
| match &self.relation { |
| Some(r) => format!("{}.{}", r, self.name), |
| None => self.name.clone(), |
| } |
| } |
| |
| /// Serialize column into a quoted flat name string |
| pub fn quoted_flat_name(&self) -> String { |
| match &self.relation { |
| Some(r) => { |
| format!( |
| "{}.{}", |
| r.to_quoted_string(), |
| quote_identifier(self.name.as_str()) |
| ) |
| } |
| None => quote_identifier(&self.name).to_string(), |
| } |
| } |
| |
| /// Qualify column if not done yet. |
| /// |
| /// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are |
| /// ignored. Otherwise this will search through the given schemas to find the column. |
| /// |
| /// Will check for ambiguity at each level of `schemas`. |
| /// |
| /// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an |
| /// exception for `USING` statements, see below. |
| /// |
| /// # Using columns |
| /// Take the following SQL statement: |
| /// |
| /// ```sql |
| /// SELECT id FROM t1 JOIN t2 USING(id) |
| /// ``` |
| /// |
| /// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use |
| /// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So |
| /// in this example this would be `[{t1.id, t2.id}]`. |
| /// |
| /// Regarding ambiguity check, `schemas` is structured to allow levels of schemas to be passed in. |
| /// For example: |
| /// |
| /// ```text |
| /// schemas = &[ |
| /// &[schema1, schema2], // first level |
| /// &[schema3, schema4], // second level |
| /// ] |
| /// ``` |
| /// |
| /// Will search for a matching field in all schemas in the first level. If a matching field according to above |
| /// mentioned conditions is not found, then will check the next level. If found more than one matching column across |
| /// all schemas in a level, that isn't a USING column, will return an error due to ambiguous column. |
| /// |
| /// If checked all levels and couldn't find field, will return field not found error. |
| pub fn normalize_with_schemas_and_ambiguity_check( |
| self, |
| schemas: &[&[&DFSchema]], |
| using_columns: &[HashSet<Column>], |
| ) -> Result<Self> { |
| if self.relation.is_some() { |
| return Ok(self); |
| } |
| |
| for schema_level in schemas { |
| let qualified_fields = schema_level |
| .iter() |
| .flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name)) |
| .collect::<Vec<_>>(); |
| match qualified_fields.len() { |
| 0 => continue, |
| 1 => return Ok(Column::from(qualified_fields[0])), |
| _ => { |
| // More than 1 fields in this schema have their names set to self.name. |
| // |
| // This should only happen when a JOIN query with USING constraint references |
| // join columns using unqualified column name. For example: |
| // |
| // ```sql |
| // SELECT id FROM t1 JOIN t2 USING(id) |
| // ``` |
| // |
| // In this case, both `t1.id` and `t2.id` will match unqualified column `id`. |
| // We will use the relation from the first matched field to normalize self. |
| |
| // Compare matched fields with one USING JOIN clause at a time |
| let columns = schema_level |
| .iter() |
| .flat_map(|s| s.columns_with_unqualified_name(&self.name)) |
| .collect::<Vec<_>>(); |
| for using_col in using_columns { |
| let all_matched = columns.iter().all(|c| using_col.contains(c)); |
| // All matched fields belong to the same using column set, in other words |
| // the same join clause. We simply pick the qualifier from the first match. |
| if all_matched { |
| return Ok(columns[0].clone()); |
| } |
| } |
| |
| // If not due to USING columns then due to ambiguous column name |
| return _schema_err!(SchemaError::AmbiguousReference { |
| field: Box::new(Column::new_unqualified(&self.name)), |
| }) |
| .map_err(|err| { |
| let mut diagnostic = Diagnostic::new_error( |
| format!("column '{}' is ambiguous", &self.name), |
| self.spans().first(), |
| ); |
| // TODO If [`DFSchema`] had spans, we could show the |
| // user which columns are candidates, or which table |
| // they come from. For now, let's list the table names |
| // only. |
| add_possible_columns_to_diag( |
| &mut diagnostic, |
| &Column::new_unqualified(&self.name), |
| &columns, |
| ); |
| err.with_diagnostic(diagnostic) |
| }); |
| } |
| } |
| } |
| |
| _schema_err!(SchemaError::FieldNotFound { |
| field: Box::new(self), |
| valid_fields: schemas |
| .iter() |
| .flat_map(|s| s.iter()) |
| .flat_map(|s| s.columns()) |
| .collect(), |
| }) |
| } |
| |
| /// Returns a reference to the set of locations in the SQL query where this |
| /// column appears, if known. |
| pub fn spans(&self) -> &Spans { |
| &self.spans |
| } |
| |
| /// Returns a mutable reference to the set of locations in the SQL query |
| /// where this column appears, if known. |
| pub fn spans_mut(&mut self) -> &mut Spans { |
| &mut self.spans |
| } |
| |
| /// Replaces the set of locations in the SQL query where this column |
| /// appears, if known. |
| pub fn with_spans(mut self, spans: Spans) -> Self { |
| self.spans = spans; |
| self |
| } |
| |
| /// Qualifies the column with the given table reference. |
| pub fn with_relation(&self, relation: TableReference) -> Self { |
| Self { |
| relation: Some(relation), |
| ..self.clone() |
| } |
| } |
| } |
| |
| impl From<&str> for Column { |
| fn from(c: &str) -> Self { |
| Self::from_qualified_name(c) |
| } |
| } |
| |
| /// Create a column, cloning the string |
| impl From<&String> for Column { |
| fn from(c: &String) -> Self { |
| Self::from_qualified_name(c) |
| } |
| } |
| |
| /// Create a column, reusing the existing string |
| impl From<String> for Column { |
| fn from(c: String) -> Self { |
| Self::from_qualified_name(c) |
| } |
| } |
| |
| /// Create a column, use qualifier and field name |
| impl From<(Option<&TableReference>, &Field)> for Column { |
| fn from((relation, field): (Option<&TableReference>, &Field)) -> Self { |
| Self::new(relation.cloned(), field.name()) |
| } |
| } |
| |
| /// Create a column, use qualifier and field name |
| impl From<(Option<&TableReference>, &FieldRef)> for Column { |
| fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self { |
| Self::new(relation.cloned(), field.name()) |
| } |
| } |
| |
| #[cfg(feature = "sql")] |
| impl std::str::FromStr for Column { |
| type Err = std::convert::Infallible; |
| |
| fn from_str(s: &str) -> Result<Self, Self::Err> { |
| Ok(s.into()) |
| } |
| } |
| |
| impl fmt::Display for Column { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| write!(f, "{}", self.flat_name()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use arrow::datatypes::{DataType, SchemaBuilder}; |
| use std::sync::Arc; |
| |
| fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> { |
| let mut schema_builder = SchemaBuilder::new(); |
| schema_builder.extend( |
| names |
| .iter() |
| .map(|f| Field::new(*f, DataType::Boolean, true)), |
| ); |
| let schema = Arc::new(schema_builder.finish()); |
| DFSchema::try_from_qualified_schema(qualifier, &schema) |
| } |
| |
| #[test] |
| fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> { |
| let schema1 = create_qualified_schema("t1", vec!["a", "b"])?; |
| let schema2 = create_qualified_schema("t2", vec!["c", "d"])?; |
| let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?; |
| |
| // already normalized |
| let col = Column::new(Some("t1"), "a"); |
| let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?; |
| assert_eq!(col, Column::new(Some("t1"), "a")); |
| |
| // should find in first level (schema1) |
| let col = Column::from_name("a"); |
| let col = col.normalize_with_schemas_and_ambiguity_check( |
| &[&[&schema1, &schema2], &[&schema3]], |
| &[], |
| )?; |
| assert_eq!(col, Column::new(Some("t1"), "a")); |
| |
| // should find in second level (schema3) |
| let col = Column::from_name("e"); |
| let col = col.normalize_with_schemas_and_ambiguity_check( |
| &[&[&schema1, &schema2], &[&schema3]], |
| &[], |
| )?; |
| assert_eq!(col, Column::new(Some("t3"), "e")); |
| |
| // using column in first level (pick schema1) |
| let mut using_columns = HashSet::new(); |
| using_columns.insert(Column::new(Some("t1"), "a")); |
| using_columns.insert(Column::new(Some("t3"), "a")); |
| let col = Column::from_name("a"); |
| let col = col.normalize_with_schemas_and_ambiguity_check( |
| &[&[&schema1, &schema3], &[&schema2]], |
| &[using_columns], |
| )?; |
| assert_eq!(col, Column::new(Some("t1"), "a")); |
| |
| // not found in any level |
| let col = Column::from_name("z"); |
| let err = col |
| .normalize_with_schemas_and_ambiguity_check( |
| &[&[&schema1, &schema2], &[&schema3]], |
| &[], |
| ) |
| .expect_err("should've failed to find field"); |
| let expected = "Schema error: No field named z. \ |
| Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e."; |
| assert_eq!(err.strip_backtrace(), expected); |
| |
| // ambiguous column reference |
| let col = Column::from_name("a"); |
| let err = col |
| .normalize_with_schemas_and_ambiguity_check( |
| &[&[&schema1, &schema3], &[&schema2]], |
| &[], |
| ) |
| .expect_err("should've found ambiguous field"); |
| let expected = "Schema error: Ambiguous reference to unqualified field a"; |
| assert_eq!(err.strip_backtrace(), expected); |
| |
| Ok(()) |
| } |
| } |