Add `LogicalPlan::CreateIndex` (#11817)
* Add create index plan
* Fix clippy lints
diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs
index e6bb148..c883b7a 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -544,30 +544,35 @@
// stack overflows.
match ddl {
DdlStatement::CreateExternalTable(cmd) => {
- Box::pin(async move { self.create_external_table(&cmd).await })
- as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>
+ (Box::pin(async move { self.create_external_table(&cmd).await })
+ as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>)
+ .await
}
DdlStatement::CreateMemoryTable(cmd) => {
- Box::pin(self.create_memory_table(cmd))
+ Box::pin(self.create_memory_table(cmd)).await
}
- DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)),
+ DdlStatement::CreateView(cmd) => {
+ Box::pin(self.create_view(cmd)).await
+ }
DdlStatement::CreateCatalogSchema(cmd) => {
- Box::pin(self.create_catalog_schema(cmd))
+ Box::pin(self.create_catalog_schema(cmd)).await
}
DdlStatement::CreateCatalog(cmd) => {
- Box::pin(self.create_catalog(cmd))
+ Box::pin(self.create_catalog(cmd)).await
}
- DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)),
- DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)),
+ DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)).await,
+ DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)).await,
DdlStatement::DropCatalogSchema(cmd) => {
- Box::pin(self.drop_schema(cmd))
+ Box::pin(self.drop_schema(cmd)).await
}
DdlStatement::CreateFunction(cmd) => {
- Box::pin(self.create_function(cmd))
+ Box::pin(self.create_function(cmd)).await
}
- DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)),
+ DdlStatement::DropFunction(cmd) => {
+ Box::pin(self.drop_function(cmd)).await
+ }
+ ddl => Ok(DataFrame::new(self.state(), LogicalPlan::Ddl(ddl))),
}
- .await
}
// TODO what about the other statements (like TransactionStart and TransactionEnd)
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs
index 255bf46..ad0fcd2 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -41,6 +41,8 @@
CreateCatalogSchema(CreateCatalogSchema),
/// Creates a new catalog (aka "Database").
CreateCatalog(CreateCatalog),
+ /// Creates a new index.
+ CreateIndex(CreateIndex),
/// Drops a table.
DropTable(DropTable),
/// Drops a view.
@@ -66,6 +68,7 @@
schema
}
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
+ DdlStatement::CreateIndex(CreateIndex { schema, .. }) => schema,
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
DdlStatement::DropView(DropView { schema, .. }) => schema,
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
@@ -83,6 +86,7 @@
DdlStatement::CreateView(_) => "CreateView",
DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema",
DdlStatement::CreateCatalog(_) => "CreateCatalog",
+ DdlStatement::CreateIndex(_) => "CreateIndex",
DdlStatement::DropTable(_) => "DropTable",
DdlStatement::DropView(_) => "DropView",
DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
@@ -101,6 +105,7 @@
vec![input]
}
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
+ DdlStatement::CreateIndex(_) => vec![],
DdlStatement::DropTable(_) => vec![],
DdlStatement::DropView(_) => vec![],
DdlStatement::DropCatalogSchema(_) => vec![],
@@ -147,6 +152,9 @@
}) => {
write!(f, "CreateCatalog: {catalog_name:?}")
}
+ DdlStatement::CreateIndex(CreateIndex { name, .. }) => {
+ write!(f, "CreateIndex: {name:?}")
+ }
DdlStatement::DropTable(DropTable {
name, if_exists, ..
}) => {
@@ -351,3 +359,14 @@
pub if_exists: bool,
pub schema: DFSchemaRef,
}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateIndex {
+ pub name: Option<String>,
+ pub table: TableReference,
+ pub using: Option<String>,
+ pub columns: Vec<Expr>,
+ pub unique: bool,
+ pub if_not_exists: bool,
+ pub schema: DFSchemaRef,
+}
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 8928f70..b582085 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -30,8 +30,8 @@
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
- CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DropCatalogSchema,
- DropFunction, DropTable, DropView, OperateFunctionArg,
+ CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement,
+ DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg,
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs
index a47906f..dbe4312 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -303,6 +303,7 @@
DdlStatement::CreateExternalTable(_)
| DdlStatement::CreateCatalogSchema(_)
| DdlStatement::CreateCatalog(_)
+ | DdlStatement::CreateIndex(_)
| DdlStatement::DropTable(_)
| DdlStatement::DropView(_)
| DdlStatement::DropCatalogSchema(_)
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 0a91bab..bc01972 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1645,6 +1645,9 @@
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for CreateMemoryTable",
)),
+ LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
+ "LogicalPlan serde is not yet implemented for CreateIndex",
+ )),
LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropTable",
)),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 3737e1a..6d47232 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -45,20 +45,20 @@
use datafusion_expr::{
cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
- CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropCatalogSchema,
- DropFunction, DropTable, DropView, EmptyRelation, Explain, Expr, ExprSchemable,
- Filter, LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
- SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
- TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
- Volatility, WriteOp,
+ CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable,
+ DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation,
+ Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
+ OperateFunctionArg, PlanType, Prepare, SetVariable, Statement as PlanStatement,
+ ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd,
+ TransactionIsolationLevel, TransactionStart, Volatility, WriteOp,
};
use sqlparser::ast;
use sqlparser::ast::{
- Assignment, AssignmentTarget, ColumnDef, CreateTable, CreateTableOptions, Delete,
- DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert, ObjectName, ObjectType,
- OneOrManyWithParens, Query, SchemaName, SetExpr, ShowCreateObject,
- ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
- TransactionMode, UnaryOperator, Value,
+ Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
+ CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
+ ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
+ ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
+ TableWithJoins, TransactionMode, UnaryOperator, Value,
};
use sqlparser::parser::ParserError::ParserError;
@@ -769,6 +769,42 @@
exec_err!("Function name not provided")
}
}
+ Statement::CreateIndex(CreateIndex {
+ name,
+ table_name,
+ using,
+ columns,
+ unique,
+ if_not_exists,
+ ..
+ }) => {
+ let name: Option<String> = name.as_ref().map(object_name_to_string);
+ let table = self.object_name_to_table_reference(table_name)?;
+ let table_schema = self
+ .context_provider
+ .get_table_source(table.clone())?
+ .schema()
+ .to_dfschema_ref()?;
+ let using: Option<String> = using.as_ref().map(ident_to_string);
+ let columns = self.order_by_to_sort_expr(
+ columns,
+ &table_schema,
+ planner_context,
+ false,
+ None,
+ )?;
+ Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
+ PlanCreateIndex {
+ name,
+ table,
+ using,
+ columns,
+ unique,
+ if_not_exists,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ },
+ )))
+ }
_ => {
not_impl_err!("Unsupported SQL statement: {sql:?}")
}
diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs
index 8a5510e..4d7e608 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -28,11 +28,12 @@
assert_contains, DataFusionError, ParamValues, Result, ScalarValue,
};
use datafusion_expr::{
+ col,
dml::CopyTo,
logical_plan::{LogicalPlan, Prepare},
test::function_stub::sum_udaf,
- ColumnarValue, CreateExternalTable, DdlStatement, ScalarUDF, ScalarUDFImpl,
- Signature, Volatility,
+ ColumnarValue, CreateExternalTable, CreateIndex, DdlStatement, ScalarUDF,
+ ScalarUDFImpl, Signature, Volatility,
};
use datafusion_functions::{string, unicode};
use datafusion_sql::{
@@ -4426,6 +4427,35 @@
)
}
+#[test]
+fn plan_create_index() {
+ let sql =
+ "CREATE UNIQUE INDEX IF NOT EXISTS idx_name ON test USING btree (name, age DESC)";
+ let plan = logical_plan_with_options(sql, ParserOptions::default()).unwrap();
+ match plan {
+ LogicalPlan::Ddl(DdlStatement::CreateIndex(CreateIndex {
+ name,
+ table,
+ using,
+ columns,
+ unique,
+ if_not_exists,
+ ..
+ })) => {
+ assert_eq!(name, Some("idx_name".to_string()));
+ assert_eq!(format!("{table}"), "test");
+ assert_eq!(using, Some("btree".to_string()));
+ assert_eq!(
+ columns,
+ vec![col("name").sort(true, false), col("age").sort(false, true),]
+ );
+ assert!(unique);
+ assert!(if_not_exists);
+ }
+ _ => panic!("wrong plan type"),
+ }
+}
+
fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {