blob: 308a0de62a242e486ddfc3c4621e54b26e7ca3b7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! This example demonstrates extending the DataFusion SQL parser to support
//! custom DDL statements, specifically `CREATE EXTERNAL CATALOG`.
//!
//! ### Custom Syntax
//! ```sql
//! CREATE EXTERNAL CATALOG my_catalog
//! STORED AS ICEBERG
//! LOCATION 's3://my-bucket/warehouse/'
//! OPTIONS (
//! 'region' = 'us-west-2'
//! );
//! ```
//!
//! Note: For the purpose of this example, we use `local://workspace/` to
//! automatically discover and register files from the project's test data.
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use datafusion::catalog::{
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
TableProviderFactory,
};
use datafusion::datasource::listing_table_factory::ListingTableFactory;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use datafusion::sql::{
parser::{DFParser, DFParserBuilder, Statement},
sqlparser::{
ast::{ObjectName, Value},
keywords::Keyword,
tokenizer::Token,
},
};
use datafusion_common::{DFSchema, TableReference, plan_datafusion_err, plan_err};
use datafusion_expr::CreateExternalTable;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::ObjectStore;
use object_store::local::LocalFileSystem;
/// Entry point for the example.
pub async fn custom_sql_parser() -> Result<()> {
// Use standard Parquet testing data as our "external" source.
let base_path = datafusion::common::test_util::parquet_test_data();
let base_path = std::path::Path::new(&base_path).canonicalize()?;
// Make the path relative to the workspace root
let workspace_root = workspace_root();
let location = base_path
.strip_prefix(&workspace_root)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| base_path.to_string_lossy().to_string());
let create_catalog_sql = format!(
"CREATE EXTERNAL CATALOG parquet_testing
STORED AS parquet
LOCATION 'local://workspace/{location}'
OPTIONS (
'schema_name' = 'staged_data',
'format.pruning' = 'true'
)"
);
// =========================================================================
// Part 1: Standard DataFusion parser rejects the custom DDL
// =========================================================================
println!("=== Part 1: Standard DataFusion Parser ===\n");
println!("Parsing: {}\n", create_catalog_sql.trim());
let ctx_standard = SessionContext::new();
let err = ctx_standard
.sql(&create_catalog_sql)
.await
.expect_err("Expected the standard parser to reject CREATE EXTERNAL CATALOG (custom DDL syntax)");
println!("Error: {err}\n");
assert_snapshot!(err.to_string(), @r#"SQL error: ParserError("Expected: TABLE, found: CATALOG at Line: 1, Column: 17")"#);
// =========================================================================
// Part 2: Custom parser handles the statement
// =========================================================================
println!("=== Part 2: Custom Parser ===\n");
println!("Parsing: {}\n", create_catalog_sql.trim());
let ctx = SessionContext::new();
let mut parser = CustomParser::new(&create_catalog_sql)?;
let statement = parser.parse_statement()?;
match statement {
CustomStatement::CreateExternalCatalog(stmt) => {
handle_create_external_catalog(&ctx, stmt).await?;
}
CustomStatement::DFStatement(_) => {
panic!("Expected CreateExternalCatalog statement");
}
}
// Query a table from the registered catalog
let query_sql = "SELECT id, bool_col, tinyint_col FROM parquet_testing.staged_data.alltypes_plain LIMIT 5";
println!("Executing: {query_sql}\n");
let results = execute_sql(&ctx, query_sql).await?;
println!("{results}");
assert_snapshot!(results, @r"
+----+----------+-------------+
| id | bool_col | tinyint_col |
+----+----------+-------------+
| 4 | true | 0 |
| 5 | false | 1 |
| 6 | true | 0 |
| 7 | false | 1 |
| 2 | true | 0 |
+----+----------+-------------+
");
Ok(())
}
/// Execute SQL and return formatted results.
async fn execute_sql(ctx: &SessionContext, sql: &str) -> Result<String> {
let batches = ctx.sql(sql).await?.collect().await?;
Ok(arrow::util::pretty::pretty_format_batches(&batches)?.to_string())
}
/// Custom handler for the `CREATE EXTERNAL CATALOG` statement.
async fn handle_create_external_catalog(
ctx: &SessionContext,
stmt: CreateExternalCatalog,
) -> Result<()> {
let factory = ListingTableFactory::new();
let catalog = Arc::new(MemoryCatalogProvider::new());
let schema = Arc::new(MemorySchemaProvider::new());
// Extract options
let mut schema_name = "public".to_string();
let mut table_options = HashMap::new();
for (k, v) in stmt.options {
let val_str = match v {
Value::SingleQuotedString(ref s) | Value::DoubleQuotedString(ref s) => {
s.to_string()
}
Value::Number(ref n, _) => n.to_string(),
Value::Boolean(b) => b.to_string(),
_ => v.to_string(),
};
if k == "schema_name" {
schema_name = val_str;
} else {
table_options.insert(k, val_str);
}
}
println!(" Target Catalog: {}", stmt.name);
println!(" Data Location: {}", stmt.location);
println!(" Resolved Schema: {schema_name}");
// Register a local object store rooted at the workspace root.
// We use a specific authority 'workspace' to ensure consistent resolution.
let store = Arc::new(LocalFileSystem::new_with_prefix(workspace_root())?);
let store_url = url::Url::parse("local://workspace").unwrap();
ctx.register_object_store(&store_url, Arc::clone(&store) as _);
let target_ext = format!(".{}", stmt.catalog_type.to_lowercase());
// For 'local://workspace/parquet-testing/data', the path is 'parquet-testing/data'.
let path_str = stmt
.location
.strip_prefix("local://workspace/")
.unwrap_or(&stmt.location);
let prefix = object_store::path::Path::from(path_str);
// Discover data files using the ObjectStore API
let mut table_count = 0;
let mut list_stream = store.list(Some(&prefix));
while let Some(meta) = list_stream.next().await {
let meta = meta?;
let path = &meta.location;
if path.as_ref().ends_with(&target_ext) {
let name = std::path::Path::new(path.as_ref())
.file_stem()
.unwrap()
.to_string_lossy()
.to_string();
let table_url = format!("local://workspace/{path}");
let cmd = CreateExternalTable::builder(
TableReference::bare(name.clone()),
table_url,
stmt.catalog_type.clone(),
Arc::new(DFSchema::empty()),
)
.with_options(table_options.clone())
.build();
match factory.create(&ctx.state(), &cmd).await {
Ok(table) => {
schema.register_table(name, table)?;
table_count += 1;
}
Err(e) => {
eprintln!("Failed to create table {name}: {e}");
}
}
}
}
println!(" Registered {table_count} tables into schema: {schema_name}");
catalog.register_schema(&schema_name, schema)?;
ctx.register_catalog(stmt.name.to_string(), catalog);
Ok(())
}
/// Possible statements returned by our custom parser.
#[derive(Debug, Clone)]
pub enum CustomStatement {
/// Standard DataFusion statement
DFStatement(Box<Statement>),
/// Custom `CREATE EXTERNAL CATALOG` statement
CreateExternalCatalog(CreateExternalCatalog),
}
/// Data structure for `CREATE EXTERNAL CATALOG`.
#[derive(Debug, Clone)]
pub struct CreateExternalCatalog {
pub name: ObjectName,
pub catalog_type: String,
pub location: String,
pub options: Vec<(String, Value)>,
}
impl Display for CustomStatement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DFStatement(s) => write!(f, "{s}"),
Self::CreateExternalCatalog(s) => write!(f, "{s}"),
}
}
}
impl Display for CreateExternalCatalog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"CREATE EXTERNAL CATALOG {} STORED AS {} LOCATION '{}'",
self.name, self.catalog_type, self.location
)?;
if !self.options.is_empty() {
write!(f, " OPTIONS (")?;
for (i, (k, v)) in self.options.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "'{k}' = '{v}'")?;
}
write!(f, ")")?;
}
Ok(())
}
}
/// A parser that extends `DFParser` with custom syntax.
struct CustomParser<'a> {
df_parser: DFParser<'a>,
}
impl<'a> CustomParser<'a> {
fn new(sql: &'a str) -> Result<Self> {
Ok(Self {
df_parser: DFParserBuilder::new(sql).build()?,
})
}
pub fn parse_statement(&mut self) -> Result<CustomStatement> {
if self.is_create_external_catalog() {
return self.parse_create_external_catalog();
}
Ok(CustomStatement::DFStatement(Box::new(
self.df_parser.parse_statement()?,
)))
}
fn is_create_external_catalog(&self) -> bool {
let t1 = &self.df_parser.parser.peek_nth_token(0).token;
let t2 = &self.df_parser.parser.peek_nth_token(1).token;
let t3 = &self.df_parser.parser.peek_nth_token(2).token;
matches!(t1, Token::Word(w) if w.keyword == Keyword::CREATE)
&& matches!(t2, Token::Word(w) if w.keyword == Keyword::EXTERNAL)
&& matches!(t3, Token::Word(w) if w.value.to_uppercase() == "CATALOG")
}
fn parse_create_external_catalog(&mut self) -> Result<CustomStatement> {
// Consume prefix tokens: CREATE EXTERNAL CATALOG
for _ in 0..3 {
self.df_parser.parser.next_token();
}
let name = self
.df_parser
.parser
.parse_object_name(false)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut catalog_type = None;
let mut location = None;
let mut options = vec![];
while let Some(keyword) = self.df_parser.parser.parse_one_of_keywords(&[
Keyword::STORED,
Keyword::LOCATION,
Keyword::OPTIONS,
]) {
match keyword {
Keyword::STORED => {
if catalog_type.is_some() {
return plan_err!("Duplicate STORED AS");
}
self.df_parser
.parser
.expect_keyword(Keyword::AS)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
catalog_type = Some(
self.df_parser
.parser
.parse_identifier()
.map_err(|e| DataFusionError::External(Box::new(e)))?
.value,
);
}
Keyword::LOCATION => {
if location.is_some() {
return plan_err!("Duplicate LOCATION");
}
location = Some(
self.df_parser
.parser
.parse_literal_string()
.map_err(|e| DataFusionError::External(Box::new(e)))?,
);
}
Keyword::OPTIONS => {
if !options.is_empty() {
return plan_err!("Duplicate OPTIONS");
}
options = self.parse_value_options()?;
}
_ => unreachable!(),
}
}
Ok(CustomStatement::CreateExternalCatalog(
CreateExternalCatalog {
name,
catalog_type: catalog_type
.ok_or_else(|| plan_datafusion_err!("Missing STORED AS"))?,
location: location
.ok_or_else(|| plan_datafusion_err!("Missing LOCATION"))?,
options,
},
))
}
/// Parse options in the form: (key [=] value, key [=] value, ...)
fn parse_value_options(&mut self) -> Result<Vec<(String, Value)>> {
let mut options = vec![];
self.df_parser
.parser
.expect_token(&Token::LParen)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
loop {
let key = self.df_parser.parse_option_key()?;
// Support optional '=' between key and value
let _ = self.df_parser.parser.consume_token(&Token::Eq);
let value = self.df_parser.parse_option_value()?;
options.push((key, value));
let comma = self.df_parser.parser.consume_token(&Token::Comma);
if self.df_parser.parser.consume_token(&Token::RParen) {
break;
} else if !comma {
return plan_err!("Expected ',' or ')' in OPTIONS");
}
}
Ok(options)
}
}
/// Returns the workspace root directory (parent of datafusion-examples).
fn workspace_root() -> std::path::PathBuf {
std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("CARGO_MANIFEST_DIR should have a parent")
.to_path_buf()
}