Add` COPY .. TO ..` syntax support (#6355)

* Add `COPY .. TO ..` syntax support

* fix rustdocs

* Apply suggestions from code review

Co-authored-by: Armin Primadi <aprimadi@gmail.com>

* Update message

* Clarify error

---------

Co-authored-by: Armin Primadi <aprimadi@gmail.com>
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ee31d9e..4c70595 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -69,7 +69,11 @@
     LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE,
 };
 use crate::optimizer::OptimizerRule;
-use datafusion_sql::{planner::ParserOptions, ResolvedTableReference, TableReference};
+use datafusion_sql::{
+    parser::{CopyToSource, CopyToStatement},
+    planner::ParserOptions,
+    ResolvedTableReference, TableReference,
+};
 
 use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use crate::physical_optimizer::repartition::Repartition;
@@ -1686,45 +1690,58 @@
         // table providers for all relations referenced in this query
         let mut relations = hashbrown::HashSet::with_capacity(10);
 
+        struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
+
+        impl<'a> RelationVisitor<'a> {
+            /// Record that `relation` was used in this statement
+            fn insert(&mut self, relation: &ObjectName) {
+                self.0.get_or_insert_with(relation, |_| relation.clone());
+            }
+        }
+
+        impl<'a> Visitor for RelationVisitor<'a> {
+            type Break = ();
+
+            fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
+                self.insert(relation);
+                ControlFlow::Continue(())
+            }
+
+            fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
+                if let Statement::ShowCreate {
+                    obj_type: ShowCreateObject::Table | ShowCreateObject::View,
+                    obj_name,
+                } = statement
+                {
+                    self.insert(obj_name)
+                }
+                ControlFlow::Continue(())
+            }
+        }
+
+        let mut visitor = RelationVisitor(&mut relations);
         match statement {
             DFStatement::Statement(s) => {
-                struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
-
-                impl<'a> Visitor for RelationVisitor<'a> {
-                    type Break = ();
-
-                    fn pre_visit_relation(
-                        &mut self,
-                        relation: &ObjectName,
-                    ) -> ControlFlow<()> {
-                        self.0.get_or_insert_with(relation, |_| relation.clone());
-                        ControlFlow::Continue(())
-                    }
-
-                    fn pre_visit_statement(
-                        &mut self,
-                        statement: &Statement,
-                    ) -> ControlFlow<()> {
-                        if let Statement::ShowCreate {
-                            obj_type: ShowCreateObject::Table | ShowCreateObject::View,
-                            obj_name,
-                        } = statement
-                        {
-                            self.0.get_or_insert_with(obj_name, |_| obj_name.clone());
-                        }
-                        ControlFlow::Continue(())
-                    }
-                }
-                let mut visitor = RelationVisitor(&mut relations);
                 let _ = s.as_ref().visit(&mut visitor);
             }
             DFStatement::CreateExternalTable(table) => {
-                relations.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
+                visitor
+                    .0
+                    .insert(ObjectName(vec![Ident::from(table.name.as_str())]));
             }
-            DFStatement::DescribeTableStmt(table) => {
-                relations
-                    .get_or_insert_with(&table.table_name, |_| table.table_name.clone());
-            }
+            DFStatement::DescribeTableStmt(table) => visitor.insert(&table.table_name),
+            DFStatement::CopyTo(CopyToStatement {
+                source,
+                target: _,
+                options: _,
+            }) => match source {
+                CopyToSource::Relation(table_name) => {
+                    visitor.insert(table_name);
+                }
+                CopyToSource::Query(query) => {
+                    query.visit(&mut visitor);
+                }
+            },
         }
 
         // Always include information_schema if available
diff --git a/datafusion/core/tests/sqllogictests/test_files/copy.slt b/datafusion/core/tests/sqllogictests/test_files/copy.slt
new file mode 100644
index 0000000..e7bde89
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/copy.slt
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# tests for copy command
+
+statement ok
+create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, 'Bar');
+
+# Copy from table
+statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported
+COPY source_table  to '/tmp/table.parquet';
+
+# Copy from table with options
+statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported
+COPY source_table  to '/tmp/table.parquet' (row_group_size 55);
+
+# Copy from table with options (and trailing comma)
+statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported
+COPY source_table  to '/tmp/table.parquet' (row_group_size 55, row_group_limit_bytes 9,);
+
+
+# Error cases:
+
+# Incomplete statement
+statement error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\)
+COPY (select col2, sum(col1) from source_table
+
+# Copy from table with non literal
+statement error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' after option definition, found: \+"\)
+COPY source_table  to '/tmp/table.parquet' (row_group_size 55 + 102);
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index a70868f..a0e9289 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -18,7 +18,8 @@
 //! DataFusion SQL Parser based on [`sqlparser`]
 
 use datafusion_common::parsers::CompressionTypeVariant;
-use sqlparser::ast::OrderByExpr;
+use sqlparser::ast::{OrderByExpr, Query, Value};
+use sqlparser::tokenizer::Word;
 use sqlparser::{
     ast::{
         ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement,
@@ -28,8 +29,9 @@
     parser::{Parser, ParserError},
     tokenizer::{Token, TokenWithLocation, Tokenizer},
 };
+use std::collections::VecDeque;
+use std::fmt;
 use std::{collections::HashMap, str::FromStr};
-use std::{collections::VecDeque, fmt};
 
 // Use `Parser::expected` instead, if possible
 macro_rules! parser_err {
@@ -42,6 +44,78 @@
     Ok(s.to_uppercase())
 }
 
+/// DataFusion extension DDL for `COPY`
+///
+/// # Syntax:
+///
+/// ```text
+/// COPY <table_name | (<query>)>
+/// TO
+/// <destination_url>
+/// (key_value_list)
+/// ```
+///
+/// # Examples
+///
+/// ```sql
+/// COPY lineitem  TO 'lineitem'
+///  (format parquet,
+///   partitions 16,
+///   row_group_limit_rows 100000,
+//    row_group_limit_bytes 200000
+///  )
+///
+/// COPY (SELECT l_orderkey from lineitem) to 'lineitem.parquet';
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CopyToStatement {
+    /// From where the data comes from
+    pub source: CopyToSource,
+    /// The URL to where the data is heading
+    pub target: String,
+    /// Target specific options
+    pub options: HashMap<String, Value>,
+}
+
+impl fmt::Display for CopyToStatement {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let Self {
+            source,
+            target,
+            options,
+        } = self;
+
+        write!(f, "COPY {source} TO {target}")?;
+
+        if !options.is_empty() {
+            let mut opts: Vec<_> =
+                options.iter().map(|(k, v)| format!("{k} {v}")).collect();
+            // print them in sorted order
+            opts.sort_unstable();
+            write!(f, " ({})", opts.join(", "))?;
+        }
+
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum CopyToSource {
+    /// `COPY <table> TO ...`
+    Relation(ObjectName),
+    /// COPY (...query...) TO ...
+    Query(Query),
+}
+
+impl fmt::Display for CopyToSource {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            CopyToSource::Relation(r) => write!(f, "{r}"),
+            CopyToSource::Query(q) => write!(f, "({q})"),
+        }
+    }
+}
+
 /// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
 ///
 /// Syntax:
@@ -119,12 +193,25 @@
 /// Tokens parsed by [`DFParser`] are converted into these values.
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum Statement {
-    /// ANSI SQL AST node
+    /// ANSI SQL AST node (from sqlparser-rs)
     Statement(Box<SQLStatement>),
     /// Extension: `CREATE EXTERNAL TABLE`
     CreateExternalTable(CreateExternalTable),
     /// Extension: `DESCRIBE TABLE`
     DescribeTableStmt(DescribeTableStmt),
+    /// Extension: `COPY TO`
+    CopyTo(CopyToStatement),
+}
+
+impl fmt::Display for Statement {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Statement::Statement(stmt) => write!(f, "{stmt}"),
+            Statement::CreateExternalTable(stmt) => write!(f, "{stmt}"),
+            Statement::DescribeTableStmt(_) => write!(f, "DESCRIBE TABLE ..."),
+            Statement::CopyTo(stmt) => write!(f, "{stmt}"),
+        }
+    }
 }
 
 /// DataFusion SQL Parser based on [`sqlparser`]
@@ -213,6 +300,11 @@
                         // use custom parsing
                         self.parse_create()
                     }
+                    Keyword::COPY => {
+                        // move one token forward
+                        self.parser.next_token();
+                        self.parse_copy()
+                    }
                     Keyword::DESCRIBE => {
                         // move one token forward
                         self.parser.next_token();
@@ -244,6 +336,79 @@
         }))
     }
 
+    /// Parse a SQL `COPY TO` statement
+    pub fn parse_copy(&mut self) -> Result<Statement, ParserError> {
+        // parse as a query
+        let source = if self.parser.consume_token(&Token::LParen) {
+            let query = self.parser.parse_query()?;
+            self.parser.expect_token(&Token::RParen)?;
+            CopyToSource::Query(query)
+        } else {
+            // parse as table reference
+            let table_name = self.parser.parse_object_name()?;
+            CopyToSource::Relation(table_name)
+        };
+
+        self.parser.expect_keyword(Keyword::TO)?;
+
+        let target = self.parser.parse_literal_string()?;
+
+        // check for options in parens
+        let options = if self.parser.peek_token().token == Token::LParen {
+            self.parse_value_options()?
+        } else {
+            HashMap::new()
+        };
+
+        Ok(Statement::CopyTo(CopyToStatement {
+            source,
+            target,
+            options,
+        }))
+    }
+
+    /// Parse the next token as a key name for an option list
+    ///
+    /// Note this is different than [`parse_literal_string`]
+    /// because it allows keywords as well as other non words
+    ///
+    /// [`parse_literal_string`]: sqlparser::parser::Parser::parse_literal_string
+    pub fn parse_option_key(&mut self) -> Result<String, ParserError> {
+        let next_token = self.parser.next_token();
+        match next_token.token {
+            Token::Word(Word { value, .. }) => Ok(value),
+            Token::SingleQuotedString(s) => Ok(s),
+            Token::DoubleQuotedString(s) => Ok(s),
+            Token::EscapedStringLiteral(s) => Ok(s),
+            _ => self.parser.expected("key name", next_token),
+        }
+    }
+
+    /// Parse the next token as a value for an option list
+    ///
+    /// Note this is different than [`parse_value`] as it allows any
+    /// word or keyword in this location.
+    ///
+    /// [`parse_value`]: sqlparser::parser::Parser::parse_value
+    pub fn parse_option_value(&mut self) -> Result<Value, ParserError> {
+        let next_token = self.parser.next_token();
+        match next_token.token {
+            Token::Word(Word { value, .. }) => Ok(Value::UnQuotedString(value)),
+            Token::SingleQuotedString(s) => Ok(Value::SingleQuotedString(s)),
+            Token::DoubleQuotedString(s) => Ok(Value::DoubleQuotedString(s)),
+            Token::EscapedStringLiteral(s) => Ok(Value::EscapedStringLiteral(s)),
+            Token::Number(ref n, l) => match n.parse() {
+                Ok(n) => Ok(Value::Number(n, l)),
+                // The tokenizer should have ensured `n` is an integer
+                // so this should not be possible
+                Err(e) => parser_err!(format!(
+                    "Unexpected error: could not parse '{n}' as number: {e}"
+                )),
+            },
+            _ => self.parser.expected("string or numeric value", next_token),
+        }
+    }
+
     /// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
     pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
         if self.parser.parse_keyword(Keyword::EXTERNAL) {
@@ -485,7 +650,7 @@
                     }
                     Keyword::OPTIONS => {
                         ensure_not_set(&builder.options, "OPTIONS")?;
-                        builder.options = Some(self.parse_options()?);
+                        builder.options = Some(self.parse_string_options()?);
                     }
                     _ => {
                         unreachable!()
@@ -555,14 +720,42 @@
         }
     }
 
-    fn parse_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
-        let mut options: HashMap<String, String> = HashMap::new();
+    /// Parses (key value) style options where the values are literal strings.
+    fn parse_string_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
+        let mut options = HashMap::new();
         self.parser.expect_token(&Token::LParen)?;
 
         loop {
             let key = self.parser.parse_literal_string()?;
             let value = self.parser.parse_literal_string()?;
-            options.insert(key.to_string(), value.to_string());
+            options.insert(key, value);
+            let comma = self.parser.consume_token(&Token::Comma);
+            if self.parser.consume_token(&Token::RParen) {
+                // allow a trailing comma, even though it's not in standard
+                break;
+            } else if !comma {
+                return self.expected(
+                    "',' or ')' after option definition",
+                    self.parser.peek_token(),
+                );
+            }
+        }
+        Ok(options)
+    }
+
+    /// Parses (key value) style options into a map of String --> [`Value`].
+    ///
+    /// Unlike [`Self::parse_string_options`], this method supports
+    /// keywords as key names as well as multiple value types such as
+    /// Numbers as well as Strings.
+    fn parse_value_options(&mut self) -> Result<HashMap<String, Value>, ParserError> {
+        let mut options = HashMap::new();
+        self.parser.expect_token(&Token::LParen)?;
+
+        loop {
+            let key = self.parse_option_key()?;
+            let value = self.parse_option_value()?;
+            options.insert(key, value);
             let comma = self.parser.consume_token(&Token::Comma);
             if self.parser.consume_token(&Token::RParen) {
                 // allow a trailing comma, even though it's not in standard
@@ -602,7 +795,7 @@
             1,
             "Expected to parse exactly one statement"
         );
-        assert_eq!(statements[0], expected);
+        assert_eq!(statements[0], expected, "actual:\n{:#?}", statements[0]);
         Ok(())
     }
 
@@ -1074,4 +1267,134 @@
 
         Ok(())
     }
+
+    #[test]
+    fn copy_to_table_to_table() -> Result<(), ParserError> {
+        // positive case
+        let sql = "COPY foo TO bar";
+        let expected = Statement::CopyTo(CopyToStatement {
+            source: object_name("foo"),
+            target: "bar".to_string(),
+            options: HashMap::new(),
+        });
+
+        assert_eq!(verified_stmt(sql), expected);
+        Ok(())
+    }
+
+    #[test]
+    fn copy_to_query_to_table() -> Result<(), ParserError> {
+        let statement = verified_stmt("SELECT 1");
+
+        // unwrap the various layers
+        let statement = if let Statement::Statement(statement) = statement {
+            *statement
+        } else {
+            panic!("Expected statement, got {statement:?}");
+        };
+
+        let query = if let SQLStatement::Query(query) = statement {
+            *query
+        } else {
+            panic!("Expected query, got {statement:?}");
+        };
+
+        let sql = "COPY (SELECT 1) TO bar";
+        let expected = Statement::CopyTo(CopyToStatement {
+            source: CopyToSource::Query(query),
+            target: "bar".to_string(),
+            options: HashMap::new(),
+        });
+        assert_eq!(verified_stmt(sql), expected);
+        Ok(())
+    }
+
+    #[test]
+    fn copy_to_options() -> Result<(), ParserError> {
+        let sql = "COPY foo TO bar (row_group_size 55)";
+        let expected = Statement::CopyTo(CopyToStatement {
+            source: object_name("foo"),
+            target: "bar".to_string(),
+            options: HashMap::from([(
+                "row_group_size".to_string(),
+                Value::Number("55".to_string(), false),
+            )]),
+        });
+        assert_eq!(verified_stmt(sql), expected);
+        Ok(())
+    }
+
+    #[test]
+    fn copy_to_multi_options() -> Result<(), ParserError> {
+        let sql =
+            "COPY foo TO bar (format parquet, row_group_size 55, compression snappy)";
+        // canonical order is alphabetical
+        let canonical =
+            "COPY foo TO bar (compression snappy, format parquet, row_group_size 55)";
+
+        let expected_options = HashMap::from([
+            (
+                "compression".to_string(),
+                Value::UnQuotedString("snappy".to_string()),
+            ),
+            (
+                "format".to_string(),
+                Value::UnQuotedString("parquet".to_string()),
+            ),
+            (
+                "row_group_size".to_string(),
+                Value::Number("55".to_string(), false),
+            ),
+        ]);
+
+        let options =
+            if let Statement::CopyTo(copy_to) = one_statement_parses_to(sql, canonical) {
+                copy_to.options
+            } else {
+                panic!("Expected copy");
+            };
+
+        assert_eq!(options, expected_options);
+
+        Ok(())
+    }
+
+    // For error cases, see: `copy.slt`
+
+    fn object_name(name: &str) -> CopyToSource {
+        CopyToSource::Relation(ObjectName(vec![Ident::new(name)]))
+    }
+
+    // Based on  sqlparser-rs
+    // https://github.com/sqlparser-rs/sqlparser-rs/blob/ae3b5844c839072c235965fe0d1bddc473dced87/src/test_utils.rs#L104-L116
+
+    /// Ensures that `sql` parses as a single [Statement]
+    ///
+    /// If `canonical` is non empty,this function additionally asserts
+    /// that:
+    ///
+    /// 1. parsing `sql` results in the same [`Statement`] as parsing
+    /// `canonical`.
+    ///
+    /// 2. re-serializing the result of parsing `sql` produces the same
+    /// `canonical` sql string
+    fn one_statement_parses_to(sql: &str, canonical: &str) -> Statement {
+        let mut statements = DFParser::parse_sql(sql).unwrap();
+        assert_eq!(statements.len(), 1);
+
+        if sql != canonical {
+            assert_eq!(DFParser::parse_sql(canonical).unwrap(), statements);
+        }
+
+        let only_statement = statements.pop_front().unwrap();
+        assert_eq!(canonical, only_statement.to_string());
+        only_statement
+    }
+
+    /// Ensures that `sql` parses as a single [Statement], and that
+    /// re-serializing the parse result produces the same `sql`
+    /// string (is not modified after a serialization round-trip).
+    fn verified_stmt(sql: &str) -> Statement {
+        one_statement_parses_to(sql, sql)
+    }
 }
diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs
index e2ff668..3cc0e5d 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -23,6 +23,7 @@
 mod join;
 
 impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    /// Create a `LogicalPlan` that scans the named relation
     fn create_relation(
         &self,
         relation: TableFactor,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 64c59ee..bd9ca3ff 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use crate::parser::{
-    CreateExternalTable, DFParser, DescribeTableStmt, Statement as DFStatement,
+    CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt,
+    Statement as DFStatement,
 };
 use crate::planner::{
     object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
@@ -85,6 +86,7 @@
             DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
             DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
             DFStatement::DescribeTableStmt(s) => self.describe_table_to_plan(s),
+            DFStatement::CopyTo(s) => self.copy_to_plan(s),
         }
     }
 
@@ -537,6 +539,13 @@
         }))
     }
 
+    fn copy_to_plan(&self, _statement: CopyToStatement) -> Result<LogicalPlan> {
+        // TODO: implement as part of https://github.com/apache/arrow-datafusion/issues/5654
+        Err(DataFusionError::NotImplemented(
+            "`COPY .. TO ..` statement is not yet supported".to_string(),
+        ))
+    }
+
     fn build_order_by(
         &self,
         order_exprs: Vec<OrderByExpr>,