Add` COPY .. TO ..` syntax support (#6355)
* Add `COPY .. TO ..` syntax support
* fix rustdocs
* Apply suggestions from code review
Co-authored-by: Armin Primadi <aprimadi@gmail.com>
* Update message
* Clarify error
---------
Co-authored-by: Armin Primadi <aprimadi@gmail.com>
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ee31d9e..4c70595 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -69,7 +69,11 @@
LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE,
};
use crate::optimizer::OptimizerRule;
-use datafusion_sql::{planner::ParserOptions, ResolvedTableReference, TableReference};
+use datafusion_sql::{
+ parser::{CopyToSource, CopyToStatement},
+ planner::ParserOptions,
+ ResolvedTableReference, TableReference,
+};
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;
@@ -1686,45 +1690,58 @@
// table providers for all relations referenced in this query
let mut relations = hashbrown::HashSet::with_capacity(10);
+ struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
+
+ impl<'a> RelationVisitor<'a> {
+ /// Record that `relation` was used in this statement
+ fn insert(&mut self, relation: &ObjectName) {
+ self.0.get_or_insert_with(relation, |_| relation.clone());
+ }
+ }
+
+ impl<'a> Visitor for RelationVisitor<'a> {
+ type Break = ();
+
+ fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
+ self.insert(relation);
+ ControlFlow::Continue(())
+ }
+
+ fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
+ if let Statement::ShowCreate {
+ obj_type: ShowCreateObject::Table | ShowCreateObject::View,
+ obj_name,
+ } = statement
+ {
+ self.insert(obj_name)
+ }
+ ControlFlow::Continue(())
+ }
+ }
+
+ let mut visitor = RelationVisitor(&mut relations);
match statement {
DFStatement::Statement(s) => {
- struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
-
- impl<'a> Visitor for RelationVisitor<'a> {
- type Break = ();
-
- fn pre_visit_relation(
- &mut self,
- relation: &ObjectName,
- ) -> ControlFlow<()> {
- self.0.get_or_insert_with(relation, |_| relation.clone());
- ControlFlow::Continue(())
- }
-
- fn pre_visit_statement(
- &mut self,
- statement: &Statement,
- ) -> ControlFlow<()> {
- if let Statement::ShowCreate {
- obj_type: ShowCreateObject::Table | ShowCreateObject::View,
- obj_name,
- } = statement
- {
- self.0.get_or_insert_with(obj_name, |_| obj_name.clone());
- }
- ControlFlow::Continue(())
- }
- }
- let mut visitor = RelationVisitor(&mut relations);
let _ = s.as_ref().visit(&mut visitor);
}
DFStatement::CreateExternalTable(table) => {
- relations.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
+ visitor
+ .0
+ .insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
- DFStatement::DescribeTableStmt(table) => {
- relations
- .get_or_insert_with(&table.table_name, |_| table.table_name.clone());
- }
+ DFStatement::DescribeTableStmt(table) => visitor.insert(&table.table_name),
+ DFStatement::CopyTo(CopyToStatement {
+ source,
+ target: _,
+ options: _,
+ }) => match source {
+ CopyToSource::Relation(table_name) => {
+ visitor.insert(table_name);
+ }
+ CopyToSource::Query(query) => {
+ query.visit(&mut visitor);
+ }
+ },
}
// Always include information_schema if available
diff --git a/datafusion/core/tests/sqllogictests/test_files/copy.slt b/datafusion/core/tests/sqllogictests/test_files/copy.slt
new file mode 100644
index 0000000..e7bde89
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/copy.slt
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# tests for copy command
+
+statement ok
+create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, 'Bar');
+
+# Copy from table
+statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported
+COPY source_table to '/tmp/table.parquet';
+
+# Copy from table with options
+statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported
+COPY source_table to '/tmp/table.parquet' (row_group_size 55);
+
+# Copy from table with options (and trailing comma)
+statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported
+COPY source_table to '/tmp/table.parquet' (row_group_size 55, row_group_limit_bytes 9,);
+
+
+# Error cases:
+
+# Incomplete statement
+statement error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\)
+COPY (select col2, sum(col1) from source_table
+
+# Copy from table with non literal
+statement error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' after option definition, found: \+"\)
+COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102);
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index a70868f..a0e9289 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -18,7 +18,8 @@
//! DataFusion SQL Parser based on [`sqlparser`]
use datafusion_common::parsers::CompressionTypeVariant;
-use sqlparser::ast::OrderByExpr;
+use sqlparser::ast::{OrderByExpr, Query, Value};
+use sqlparser::tokenizer::Word;
use sqlparser::{
ast::{
ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement,
@@ -28,8 +29,9 @@
parser::{Parser, ParserError},
tokenizer::{Token, TokenWithLocation, Tokenizer},
};
+use std::collections::VecDeque;
+use std::fmt;
use std::{collections::HashMap, str::FromStr};
-use std::{collections::VecDeque, fmt};
// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
@@ -42,6 +44,78 @@
Ok(s.to_uppercase())
}
+/// DataFusion extension DDL for `COPY`
+///
+/// # Syntax:
+///
+/// ```text
+/// COPY <table_name | (<query>)>
+/// TO
+/// <destination_url>
+/// (key_value_list)
+/// ```
+///
+/// # Examples
+///
+/// ```sql
+/// COPY lineitem TO 'lineitem'
+/// (format parquet,
+/// partitions 16,
+/// row_group_limit_rows 100000,
+// row_group_limit_bytes 200000
+/// )
+///
+/// COPY (SELECT l_orderkey from lineitem) to 'lineitem.parquet';
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CopyToStatement {
+ /// From where the data comes from
+ pub source: CopyToSource,
+ /// The URL to where the data is heading
+ pub target: String,
+ /// Target specific options
+ pub options: HashMap<String, Value>,
+}
+
+impl fmt::Display for CopyToStatement {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let Self {
+ source,
+ target,
+ options,
+ } = self;
+
+ write!(f, "COPY {source} TO {target}")?;
+
+ if !options.is_empty() {
+ let mut opts: Vec<_> =
+ options.iter().map(|(k, v)| format!("{k} {v}")).collect();
+ // print them in sorted order
+ opts.sort_unstable();
+ write!(f, " ({})", opts.join(", "))?;
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum CopyToSource {
+ /// `COPY <table> TO ...`
+ Relation(ObjectName),
+ /// COPY (...query...) TO ...
+ Query(Query),
+}
+
+impl fmt::Display for CopyToSource {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ CopyToSource::Relation(r) => write!(f, "{r}"),
+ CopyToSource::Query(q) => write!(f, "({q})"),
+ }
+ }
+}
+
/// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
///
/// Syntax:
@@ -119,12 +193,25 @@
/// Tokens parsed by [`DFParser`] are converted into these values.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Statement {
- /// ANSI SQL AST node
+ /// ANSI SQL AST node (from sqlparser-rs)
Statement(Box<SQLStatement>),
/// Extension: `CREATE EXTERNAL TABLE`
CreateExternalTable(CreateExternalTable),
/// Extension: `DESCRIBE TABLE`
DescribeTableStmt(DescribeTableStmt),
+ /// Extension: `COPY TO`
+ CopyTo(CopyToStatement),
+}
+
+impl fmt::Display for Statement {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Statement::Statement(stmt) => write!(f, "{stmt}"),
+ Statement::CreateExternalTable(stmt) => write!(f, "{stmt}"),
+ Statement::DescribeTableStmt(_) => write!(f, "DESCRIBE TABLE ..."),
+ Statement::CopyTo(stmt) => write!(f, "{stmt}"),
+ }
+ }
}
/// DataFusion SQL Parser based on [`sqlparser`]
@@ -213,6 +300,11 @@
// use custom parsing
self.parse_create()
}
+ Keyword::COPY => {
+ // move one token forward
+ self.parser.next_token();
+ self.parse_copy()
+ }
Keyword::DESCRIBE => {
// move one token forward
self.parser.next_token();
@@ -244,6 +336,79 @@
}))
}
+ /// Parse a SQL `COPY TO` statement
+ pub fn parse_copy(&mut self) -> Result<Statement, ParserError> {
+ // parse as a query
+ let source = if self.parser.consume_token(&Token::LParen) {
+ let query = self.parser.parse_query()?;
+ self.parser.expect_token(&Token::RParen)?;
+ CopyToSource::Query(query)
+ } else {
+ // parse as table reference
+ let table_name = self.parser.parse_object_name()?;
+ CopyToSource::Relation(table_name)
+ };
+
+ self.parser.expect_keyword(Keyword::TO)?;
+
+ let target = self.parser.parse_literal_string()?;
+
+ // check for options in parens
+ let options = if self.parser.peek_token().token == Token::LParen {
+ self.parse_value_options()?
+ } else {
+ HashMap::new()
+ };
+
+ Ok(Statement::CopyTo(CopyToStatement {
+ source,
+ target,
+ options,
+ }))
+ }
+
+ /// Parse the next token as a key name for an option list
+ ///
+ /// Note this is different than [`parse_literal_string`]
+ /// because it allows keywords as well as other non words
+ ///
+ /// [`parse_literal_string`]: sqlparser::parser::Parser::parse_literal_string
+ pub fn parse_option_key(&mut self) -> Result<String, ParserError> {
+ let next_token = self.parser.next_token();
+ match next_token.token {
+ Token::Word(Word { value, .. }) => Ok(value),
+ Token::SingleQuotedString(s) => Ok(s),
+ Token::DoubleQuotedString(s) => Ok(s),
+ Token::EscapedStringLiteral(s) => Ok(s),
+ _ => self.parser.expected("key name", next_token),
+ }
+ }
+
+ /// Parse the next token as a value for an option list
+ ///
+ /// Note this is different than [`parse_value`] as it allows any
+ /// word or keyword in this location.
+ ///
+ /// [`parse_value`]: sqlparser::parser::Parser::parse_value
+ pub fn parse_option_value(&mut self) -> Result<Value, ParserError> {
+ let next_token = self.parser.next_token();
+ match next_token.token {
+ Token::Word(Word { value, .. }) => Ok(Value::UnQuotedString(value)),
+ Token::SingleQuotedString(s) => Ok(Value::SingleQuotedString(s)),
+ Token::DoubleQuotedString(s) => Ok(Value::DoubleQuotedString(s)),
+ Token::EscapedStringLiteral(s) => Ok(Value::EscapedStringLiteral(s)),
+ Token::Number(ref n, l) => match n.parse() {
+ Ok(n) => Ok(Value::Number(n, l)),
+ // The tokenizer should have ensured `n` is an integer
+ // so this should not be possible
+ Err(e) => parser_err!(format!(
+ "Unexpected error: could not parse '{n}' as number: {e}"
+ )),
+ },
+ _ => self.parser.expected("string or numeric value", next_token),
+ }
+ }
+
/// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
if self.parser.parse_keyword(Keyword::EXTERNAL) {
@@ -485,7 +650,7 @@
}
Keyword::OPTIONS => {
ensure_not_set(&builder.options, "OPTIONS")?;
- builder.options = Some(self.parse_options()?);
+ builder.options = Some(self.parse_string_options()?);
}
_ => {
unreachable!()
@@ -555,14 +720,42 @@
}
}
- fn parse_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
- let mut options: HashMap<String, String> = HashMap::new();
+ /// Parses (key value) style options where the values are literal strings.
+ fn parse_string_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
+ let mut options = HashMap::new();
self.parser.expect_token(&Token::LParen)?;
loop {
let key = self.parser.parse_literal_string()?;
let value = self.parser.parse_literal_string()?;
- options.insert(key.to_string(), value.to_string());
+ options.insert(key, value);
+ let comma = self.parser.consume_token(&Token::Comma);
+ if self.parser.consume_token(&Token::RParen) {
+ // allow a trailing comma, even though it's not in standard
+ break;
+ } else if !comma {
+ return self.expected(
+ "',' or ')' after option definition",
+ self.parser.peek_token(),
+ );
+ }
+ }
+ Ok(options)
+ }
+
+ /// Parses (key value) style options into a map of String --> [`Value`].
+ ///
+ /// Unlike [`Self::parse_string_options`], this method supports
+ /// keywords as key names as well as multiple value types such as
+ /// Numbers as well as Strings.
+ fn parse_value_options(&mut self) -> Result<HashMap<String, Value>, ParserError> {
+ let mut options = HashMap::new();
+ self.parser.expect_token(&Token::LParen)?;
+
+ loop {
+ let key = self.parse_option_key()?;
+ let value = self.parse_option_value()?;
+ options.insert(key, value);
let comma = self.parser.consume_token(&Token::Comma);
if self.parser.consume_token(&Token::RParen) {
// allow a trailing comma, even though it's not in standard
@@ -602,7 +795,7 @@
1,
"Expected to parse exactly one statement"
);
- assert_eq!(statements[0], expected);
+ assert_eq!(statements[0], expected, "actual:\n{:#?}", statements[0]);
Ok(())
}
@@ -1074,4 +1267,134 @@
Ok(())
}
+
+ #[test]
+ fn copy_to_table_to_table() -> Result<(), ParserError> {
+ // positive case
+ let sql = "COPY foo TO bar";
+ let expected = Statement::CopyTo(CopyToStatement {
+ source: object_name("foo"),
+ target: "bar".to_string(),
+ options: HashMap::new(),
+ });
+
+ assert_eq!(verified_stmt(sql), expected);
+ Ok(())
+ }
+
+ #[test]
+ fn copy_to_query_to_table() -> Result<(), ParserError> {
+ let statement = verified_stmt("SELECT 1");
+
+ // unwrap the various layers
+ let statement = if let Statement::Statement(statement) = statement {
+ *statement
+ } else {
+ panic!("Expected statement, got {statement:?}");
+ };
+
+ let query = if let SQLStatement::Query(query) = statement {
+ *query
+ } else {
+ panic!("Expected query, got {statement:?}");
+ };
+
+ let sql = "COPY (SELECT 1) TO bar";
+ let expected = Statement::CopyTo(CopyToStatement {
+ source: CopyToSource::Query(query),
+ target: "bar".to_string(),
+ options: HashMap::new(),
+ });
+ assert_eq!(verified_stmt(sql), expected);
+ Ok(())
+ }
+
+ #[test]
+ fn copy_to_options() -> Result<(), ParserError> {
+ let sql = "COPY foo TO bar (row_group_size 55)";
+ let expected = Statement::CopyTo(CopyToStatement {
+ source: object_name("foo"),
+ target: "bar".to_string(),
+ options: HashMap::from([(
+ "row_group_size".to_string(),
+ Value::Number("55".to_string(), false),
+ )]),
+ });
+ assert_eq!(verified_stmt(sql), expected);
+ Ok(())
+ }
+
+ #[test]
+ fn copy_to_multi_options() -> Result<(), ParserError> {
+ let sql =
+ "COPY foo TO bar (format parquet, row_group_size 55, compression snappy)";
+ // canonical order is alphabetical
+ let canonical =
+ "COPY foo TO bar (compression snappy, format parquet, row_group_size 55)";
+
+ let expected_options = HashMap::from([
+ (
+ "compression".to_string(),
+ Value::UnQuotedString("snappy".to_string()),
+ ),
+ (
+ "format".to_string(),
+ Value::UnQuotedString("parquet".to_string()),
+ ),
+ (
+ "row_group_size".to_string(),
+ Value::Number("55".to_string(), false),
+ ),
+ ]);
+
+ let options =
+ if let Statement::CopyTo(copy_to) = one_statement_parses_to(sql, canonical) {
+ copy_to.options
+ } else {
+ panic!("Expected copy");
+ };
+
+ assert_eq!(options, expected_options);
+
+ Ok(())
+ }
+
+ // For error cases, see: `copy.slt`
+
+ fn object_name(name: &str) -> CopyToSource {
+ CopyToSource::Relation(ObjectName(vec![Ident::new(name)]))
+ }
+
+ // Based on sqlparser-rs
+ // https://github.com/sqlparser-rs/sqlparser-rs/blob/ae3b5844c839072c235965fe0d1bddc473dced87/src/test_utils.rs#L104-L116
+
+ /// Ensures that `sql` parses as a single [Statement]
+ ///
+ /// If `canonical` is non empty,this function additionally asserts
+ /// that:
+ ///
+ /// 1. parsing `sql` results in the same [`Statement`] as parsing
+ /// `canonical`.
+ ///
+ /// 2. re-serializing the result of parsing `sql` produces the same
+ /// `canonical` sql string
+ fn one_statement_parses_to(sql: &str, canonical: &str) -> Statement {
+ let mut statements = DFParser::parse_sql(sql).unwrap();
+ assert_eq!(statements.len(), 1);
+
+ if sql != canonical {
+ assert_eq!(DFParser::parse_sql(canonical).unwrap(), statements);
+ }
+
+ let only_statement = statements.pop_front().unwrap();
+ assert_eq!(canonical, only_statement.to_string());
+ only_statement
+ }
+
+ /// Ensures that `sql` parses as a single [Statement], and that
+ /// re-serializing the parse result produces the same `sql`
+ /// string (is not modified after a serialization round-trip).
+ fn verified_stmt(sql: &str) -> Statement {
+ one_statement_parses_to(sql, sql)
+ }
}
diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs
index e2ff668..3cc0e5d 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -23,6 +23,7 @@
mod join;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ /// Create a `LogicalPlan` that scans the named relation
fn create_relation(
&self,
relation: TableFactor,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 64c59ee..bd9ca3ff 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -16,7 +16,8 @@
// under the License.
use crate::parser::{
- CreateExternalTable, DFParser, DescribeTableStmt, Statement as DFStatement,
+ CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt,
+ Statement as DFStatement,
};
use crate::planner::{
object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
@@ -85,6 +86,7 @@
DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
DFStatement::DescribeTableStmt(s) => self.describe_table_to_plan(s),
+ DFStatement::CopyTo(s) => self.copy_to_plan(s),
}
}
@@ -537,6 +539,13 @@
}))
}
+ fn copy_to_plan(&self, _statement: CopyToStatement) -> Result<LogicalPlan> {
+ // TODO: implement as part of https://github.com/apache/arrow-datafusion/issues/5654
+ Err(DataFusionError::NotImplemented(
+ "`COPY .. TO ..` statement is not yet supported".to_string(),
+ ))
+ }
+
fn build_order_by(
&self,
order_exprs: Vec<OrderByExpr>,