[SPARK-30192][SQL] support column position in DS v2
### What changes were proposed in this pull request?
update DS v2 API to support add/alter column with column position
### Why are the changes needed?
We have a parser rule for column position, but we fail the query if it's specified, because the builtin catalog can't support add/alter column with column position.
Since we have the catalog plugin API now, we should let the catalog implementation to decide if it supports column position or not.
### Does this PR introduce any user-facing change?
not yet
### How was this patch tested?
new tests
Closes #26817 from cloud-fan/parser.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index be482a6..1f9d18c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -855,7 +855,7 @@
;
colPosition
- : FIRST | AFTER multipartIdentifier
+ : position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
;
dataType
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
index 56d13ef..a56007b 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
@@ -17,14 +17,15 @@
package org.apache.spark.sql.connector.catalog;
-import com.google.common.base.Preconditions;
-import org.apache.spark.annotation.Experimental;
-
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import com.google.common.base.Preconditions;
+
+import org.apache.spark.annotation.Experimental;
+
/**
* An {@link Identifier} implementation.
*/
@@ -51,19 +52,11 @@
return name;
}
- private String escapeQuote(String part) {
- if (part.contains("`")) {
- return part.replace("`", "``");
- } else {
- return part;
- }
- }
-
@Override
public String toString() {
return Stream.concat(Stream.of(namespace), Stream.of(name))
- .map(part -> '`' + escapeQuote(part) + '`')
- .collect(Collectors.joining("."));
+ .map(CatalogV2Implicits::quote)
+ .collect(Collectors.joining("."));
}
@Override
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index 20c22388..7834399 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -17,11 +17,12 @@
package org.apache.spark.sql.connector.catalog;
-import org.apache.spark.annotation.Experimental;
-import org.apache.spark.sql.types.DataType;
-
import java.util.Arrays;
import java.util.Objects;
+import javax.annotation.Nullable;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
/**
* TableChange subclasses represent requested changes to a table. These are passed to
@@ -76,7 +77,7 @@
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType) {
- return new AddColumn(fieldNames, dataType, true, null);
+ return new AddColumn(fieldNames, dataType, true, null, null);
}
/**
@@ -92,7 +93,7 @@
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
- return new AddColumn(fieldNames, dataType, isNullable, null);
+ return new AddColumn(fieldNames, dataType, isNullable, null, null);
}
/**
@@ -113,7 +114,30 @@
DataType dataType,
boolean isNullable,
String comment) {
- return new AddColumn(fieldNames, dataType, isNullable, comment);
+ return new AddColumn(fieldNames, dataType, isNullable, comment, null);
+ }
+
+ /**
+ * Create a TableChange for adding a column.
+ * <p>
+ * If the field already exists, the change will result in an {@link IllegalArgumentException}.
+ * If the new field is nested and its parent does not exist or is not a struct, the change will
+ * result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the new column
+ * @param dataType the new column's data type
+ * @param isNullable whether the new column can contain null
+ * @param comment the new field's comment string
+ * @param position the new columns's position
+ * @return a TableChange for the addition
+ */
+ static TableChange addColumn(
+ String[] fieldNames,
+ DataType dataType,
+ boolean isNullable,
+ String comment,
+ ColumnPosition position) {
+ return new AddColumn(fieldNames, dataType, isNullable, comment, position);
}
/**
@@ -181,6 +205,21 @@
}
/**
+ * Create a TableChange for updating the position of a field.
+ * <p>
+ * The name is used to find the field to update.
+ * <p>
+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the column to update
+ * @param newPosition the new position
+ * @return a TableChange for the update
+ */
+ static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newPosition) {
+ return new UpdateColumnPosition(fieldNames, newPosition);
+ }
+
+ /**
* Create a TableChange for deleting a field.
* <p>
* If the field does not exist, the change will result in an {@link IllegalArgumentException}.
@@ -259,6 +298,69 @@
}
}
+ interface ColumnPosition {
+
+ static ColumnPosition first() {
+ return First.SINGLETON;
+ }
+
+ static ColumnPosition after(String column) {
+ return new After(column);
+ }
+ }
+
+ /**
+ * Column position FIRST means the specified column should be the first column.
+ * Note that, the specified column may be a nested field, and then FIRST means this field should
+ * be the first one within the struct.
+ */
+ final class First implements ColumnPosition {
+ private static final First SINGLETON = new First();
+
+ private First() {}
+
+ @Override
+ public String toString() {
+ return "FIRST";
+ }
+ }
+
+ /**
+ * Column position AFTER means the specified column should be put after the given `column`.
+ * Note that, the specified column may be a nested field, and then the given `column` refers to
+ * a field in the same struct.
+ */
+ final class After implements ColumnPosition {
+ private final String column;
+
+ private After(String column) {
+ assert column != null;
+ this.column = column;
+ }
+
+ public String column() {
+ return column;
+ }
+
+ @Override
+ public String toString() {
+ return "AFTER " + column;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ After after = (After) o;
+ return column.equals(after.column);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(column);
+ }
+ }
+
interface ColumnChange extends TableChange {
String[] fieldNames();
}
@@ -275,12 +377,19 @@
private final DataType dataType;
private final boolean isNullable;
private final String comment;
+ private final ColumnPosition position;
- private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) {
+ private AddColumn(
+ String[] fieldNames,
+ DataType dataType,
+ boolean isNullable,
+ String comment,
+ ColumnPosition position) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.isNullable = isNullable;
this.comment = comment;
+ this.position = position;
}
@Override
@@ -296,10 +405,16 @@
return isNullable;
}
+ @Nullable
public String comment() {
return comment;
}
+ @Nullable
+ public ColumnPosition position() {
+ return position;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -308,12 +423,13 @@
return isNullable == addColumn.isNullable &&
Arrays.equals(fieldNames, addColumn.fieldNames) &&
dataType.equals(addColumn.dataType) &&
- comment.equals(addColumn.comment);
+ Objects.equals(comment, addColumn.comment) &&
+ Objects.equals(position, addColumn.position);
}
@Override
public int hashCode() {
- int result = Objects.hash(dataType, isNullable, comment);
+ int result = Objects.hash(dataType, isNullable, comment, position);
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
@@ -454,6 +570,48 @@
}
/**
+ * A TableChange to update the position of a field.
+ * <p>
+ * The field names are used to find the field to update.
+ * <p>
+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
+ */
+ final class UpdateColumnPosition implements ColumnChange {
+ private final String[] fieldNames;
+ private final ColumnPosition position;
+
+ private UpdateColumnPosition(String[] fieldNames, ColumnPosition position) {
+ this.fieldNames = fieldNames;
+ this.position = position;
+ }
+
+ @Override
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public ColumnPosition position() {
+ return position;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ UpdateColumnPosition that = (UpdateColumnPosition) o;
+ return Arrays.equals(fieldNames, that.fieldNames) &&
+ position.equals(that.position);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(position);
+ result = 31 * result + Arrays.hashCode(fieldNames);
+ return result;
+ }
+ }
+
+ /**
* A TableChange to delete a field.
* <p>
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 8183aa3..3361173 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -35,19 +35,32 @@
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map { col =>
- TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
+ TableChange.addColumn(
+ col.name.toArray,
+ col.dataType,
+ true,
+ col.comment.orNull,
+ col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)
case AlterTableAlterColumnStatement(
- nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment) =>
+ nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
+ val colNameArray = colName.toArray
val typeChange = dataType.map { newDataType =>
- TableChange.updateColumnType(colName.toArray, newDataType, true)
+ TableChange.updateColumnType(colNameArray, newDataType, true)
}
val commentChange = comment.map { newComment =>
- TableChange.updateColumnComment(colName.toArray, newComment)
+ TableChange.updateColumnComment(colNameArray, newComment)
}
- createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange)
+ val positionChange = pos.map { newPosition =>
+ TableChange.updateColumnPosition(colNameArray, newPosition)
+ }
+ createAlterTable(
+ nameParts,
+ catalog,
+ tbl,
+ typeChange.toSeq ++ commentChange ++ positionChange)
case AlterTableRenameColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 9f8c1d2..8f38273 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -40,6 +40,7 @@
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -2803,19 +2804,23 @@
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}
+ override def visitColPosition(ctx: ColPositionContext): ColumnPosition = {
+ ctx.position.getType match {
+ case SqlBaseParser.FIRST => ColumnPosition.first()
+ case SqlBaseParser.AFTER => ColumnPosition.after(ctx.afterCol.getText)
+ }
+ }
+
/**
* Parse new column info from ADD COLUMN into a QualifiedColType.
*/
override def visitQualifiedColTypeWithPosition(
ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
- if (ctx.colPosition != null) {
- operationNotAllowed("ALTER TABLE table ADD COLUMN ... FIRST | AFTER otherCol", ctx)
- }
-
QualifiedColType(
typedVisit[Seq[String]](ctx.name),
typedVisit[DataType](ctx.dataType),
- Option(ctx.comment).map(string))
+ Option(ctx.comment).map(string),
+ Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
}
/**
@@ -2863,19 +2868,17 @@
override def visitAlterTableColumn(
ctx: AlterTableColumnContext): LogicalPlan = withOrigin(ctx) {
val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
- if (ctx.colPosition != null) {
- operationNotAllowed(s"ALTER TABLE table $verb COLUMN ... FIRST | AFTER otherCol", ctx)
- }
-
- if (ctx.dataType == null && ctx.comment == null) {
- operationNotAllowed(s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT", ctx)
+ if (ctx.dataType == null && ctx.comment == null && ctx.colPosition == null) {
+ operationNotAllowed(
+ s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER", ctx)
}
AlterTableAlterColumnStatement(
visitMultipartIdentifier(ctx.table),
typedVisit[Seq[String]](ctx.column),
Option(ctx.dataType).map(typedVisit[DataType]),
- Option(ctx.comment).map(string))
+ Option(ctx.comment).map(string),
+ Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 145a15c..e205dd4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -21,6 +21,7 @@
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.{DataType, StructType}
@@ -141,7 +142,11 @@
/**
* Column data as parsed by ALTER TABLE ... ADD COLUMNS.
*/
-case class QualifiedColType(name: Seq[String], dataType: DataType, comment: Option[String])
+case class QualifiedColType(
+ name: Seq[String],
+ dataType: DataType,
+ comment: Option[String],
+ position: Option[ColumnPosition])
/**
* ALTER TABLE ... ADD COLUMNS command, as parsed from SQL.
@@ -157,7 +162,8 @@
tableName: Seq[String],
column: Seq[String],
dataType: Option[DataType],
- comment: Option[String]) extends ParsedStatement
+ comment: Option[String],
+ position: Option[ColumnPosition]) extends ParsedStatement
/**
* ALTER TABLE ... RENAME COLUMN command, as parsed from SQL.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 882e968..86e5894 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -118,7 +118,7 @@
def quoted: String = parts.map(quote).mkString(".")
}
- private def quote(part: String): String = {
+ def quote(part: String): String = {
if (part.contains(".") || part.contains("`")) {
s"`${part.replace("`", "``")}`"
} else {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 0dcd595..2f4914d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -104,26 +104,16 @@
case add: AddColumn =>
add.fieldNames match {
case Array(name) =>
- val newField = StructField(name, add.dataType, nullable = add.isNullable)
- Option(add.comment) match {
- case Some(comment) =>
- schema.add(newField.withComment(comment))
- case _ =>
- schema.add(newField)
- }
+ val field = StructField(name, add.dataType, nullable = add.isNullable)
+ val newField = Option(add.comment).map(field.withComment).getOrElse(field)
+ addField(schema, newField, add.position())
case names =>
replace(schema, names.init, parent => parent.dataType match {
case parentType: StructType =>
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
- val newParentType = Option(add.comment) match {
- case Some(comment) =>
- parentType.add(field.withComment(comment))
- case None =>
- parentType.add(field)
- }
-
- Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata))
+ val newField = Option(add.comment).map(field.withComment).getOrElse(field)
+ Some(parent.copy(dataType = addField(parentType, newField, add.position())))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
@@ -147,6 +137,27 @@
replace(schema, update.fieldNames, field =>
Some(field.withComment(update.newComment)))
+ case update: UpdateColumnPosition =>
+ def updateFieldPos(struct: StructType, name: String): StructType = {
+ val oldField = struct.fields.find(_.name == name).getOrElse {
+ throw new IllegalArgumentException("Field not found: " + name)
+ }
+ val withFieldRemoved = StructType(struct.fields.filter(_ != oldField))
+ addField(withFieldRemoved, oldField, update.position())
+ }
+
+ update.fieldNames() match {
+ case Array(name) =>
+ updateFieldPos(schema, name)
+ case names =>
+ replace(schema, names.init, parent => parent.dataType match {
+ case parentType: StructType =>
+ Some(parent.copy(dataType = updateFieldPos(parentType, names.last)))
+ case _ =>
+ throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
+ })
+ }
+
case delete: DeleteColumn =>
replace(schema, delete.fieldNames, _ => None)
@@ -157,6 +168,25 @@
}
}
+ private def addField(
+ schema: StructType,
+ field: StructField,
+ position: ColumnPosition): StructType = {
+ if (position == null) {
+ schema.add(field)
+ } else if (position.isInstanceOf[First]) {
+ StructType(field +: schema.fields)
+ } else {
+ val afterCol = position.asInstanceOf[After].column()
+ val fieldIndex = schema.fields.indexWhere(_.name == afterCol)
+ if (fieldIndex == -1) {
+ throw new IllegalArgumentException("AFTER column not found: " + afterCol)
+ }
+ val (before, after) = schema.fields.splitAt(fieldIndex + 1)
+ StructType(before ++ (field +: after))
+ }
+ }
+
private def replace(
struct: StructType,
fieldNames: Seq[String],
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index e6d503b..ec3a731 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -24,6 +24,7 @@
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
@@ -492,7 +493,7 @@
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x"), IntegerType, None)
+ QualifiedColType(Seq("x"), IntegerType, None, None)
)))
}
@@ -500,8 +501,8 @@
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x"), IntegerType, None),
- QualifiedColType(Seq("y"), StringType, None)
+ QualifiedColType(Seq("x"), IntegerType, None, None),
+ QualifiedColType(Seq("y"), StringType, None, None)
)))
}
@@ -509,7 +510,7 @@
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS x int"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x"), IntegerType, None)
+ QualifiedColType(Seq("x"), IntegerType, None, None)
)))
}
@@ -517,7 +518,7 @@
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x"), IntegerType, None)
+ QualifiedColType(Seq("x"), IntegerType, None, None)
)))
}
@@ -525,7 +526,7 @@
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x"), IntegerType, Some("doc"))
+ QualifiedColType(Seq("x"), IntegerType, Some("doc"), None)
)))
}
@@ -533,7 +534,21 @@
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x"), IntegerType, Some("doc"))
+ QualifiedColType(Seq("x"), IntegerType, Some("doc"), None)
+ )))
+ }
+
+ test("alter table: add column with position") {
+ comparePlans(
+ parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"),
+ AlterTableAddColumnsStatement(Seq("table_name"), Seq(
+ QualifiedColType(Seq("x"), IntegerType, None, Some(first()))
+ )))
+
+ comparePlans(
+ parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"),
+ AlterTableAddColumnsStatement(Seq("table_name"), Seq(
+ QualifiedColType(Seq("x"), IntegerType, None, Some(after("y")))
)))
}
@@ -541,25 +556,19 @@
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"))
+ QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None)
)))
}
test("alter table: add multiple columns with nested column name") {
comparePlans(
- parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string"),
+ parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"),
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
- QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc")),
- QualifiedColType(Seq("a", "b"), StringType, None)
+ QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None),
+ QualifiedColType(Seq("a", "b"), StringType, None, Some(first()))
)))
}
- test("alter table: add column at position (not supported)") {
- assertUnsupported("ALTER TABLE table_name ADD COLUMNS name bigint COMMENT 'doc' FIRST, a.b int")
- assertUnsupported("ALTER TABLE table_name ADD COLUMN name bigint COMMENT 'doc' FIRST")
- assertUnsupported("ALTER TABLE table_name ADD COLUMN name string AFTER a.b")
- }
-
test("alter table: set location") {
comparePlans(
parsePlan("ALTER TABLE a.b.c SET LOCATION 'new location'"),
@@ -589,6 +598,7 @@
Seq("table_name"),
Seq("a", "b", "c"),
Some(LongType),
+ None,
None))
}
@@ -599,6 +609,7 @@
Seq("table_name"),
Seq("a", "b", "c"),
Some(LongType),
+ None,
None))
}
@@ -609,22 +620,31 @@
Seq("table_name"),
Seq("a", "b", "c"),
None,
- Some("new comment")))
+ Some("new comment"),
+ None))
}
- test("alter table: update column type and comment") {
+ test("alter table: update column position") {
comparePlans(
- parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint COMMENT 'new comment'"),
+ parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"),
+ AlterTableAlterColumnStatement(
+ Seq("table_name"),
+ Seq("a", "b", "c"),
+ None,
+ None,
+ Some(first())))
+ }
+
+ test("alter table: update column type, comment and position") {
+ comparePlans(
+ parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c " +
+ "TYPE bigint COMMENT 'new comment' AFTER d"),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
Some(LongType),
- Some("new comment")))
- }
-
- test("alter table: change column position (not supported)") {
- assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name COMMENT 'doc' FIRST")
- assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name TYPE INT AFTER other_col")
+ Some("new comment"),
+ Some(after("d"))))
}
test("alter table: drop column") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 1dedf6e7..834e99c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -55,13 +55,18 @@
AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField))
}.getOrElse {
val changes = cols.map { col =>
- TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
+ TableChange.addColumn(
+ col.name.toArray,
+ col.dataType,
+ true,
+ col.comment.orNull,
+ col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableAlterColumnStatement(
- nameParts @ SessionCatalogAndTable(catalog, tbl), colName, dataType, comment) =>
+ nameParts @ SessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (colName.length > 1) {
@@ -72,6 +77,10 @@
throw new AnalysisException(
"ALTER COLUMN with v1 tables must specify new data type.")
}
+ if (pos.isDefined) {
+ throw new AnalysisException("" +
+ "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.")
+ }
val builder = new MetadataBuilder
// Add comment to metadata
comment.map(c => builder.putString("comment", c))
@@ -87,13 +96,21 @@
builder.build())
AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName(0), newColumn)
}.getOrElse {
+ val colNameArray = colName.toArray
val typeChange = dataType.map { newDataType =>
- TableChange.updateColumnType(colName.toArray, newDataType, true)
+ TableChange.updateColumnType(colNameArray, newDataType, true)
}
val commentChange = comment.map { newComment =>
- TableChange.updateColumnComment(colName.toArray, newComment)
+ TableChange.updateColumnComment(colNameArray, newComment)
}
- createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange)
+ val positionChange = pos.map { newPosition =>
+ TableChange.updateColumnPosition(colNameArray, newPosition)
+ }
+ createAlterTable(
+ nameParts,
+ catalog,
+ tbl,
+ typeChange.toSeq ++ commentChange ++ positionChange)
}
case AlterTableRenameColumnStatement(
diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
index 21a344c..8232634 100644
--- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
@@ -27,7 +27,7 @@
-- !query 2 output
org.apache.spark.sql.catalyst.parser.ParseException
-Operation not allowed: ALTER TABLE table CHANGE COLUMN requires a TYPE or a COMMENT(line 1, pos 0)
+Operation not allowed: ALTER TABLE table CHANGE COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER(line 1, pos 0)
== SQL ==
ALTER TABLE test_change CHANGE a
@@ -87,13 +87,8 @@
-- !query 8 schema
struct<>
-- !query 8 output
-org.apache.spark.sql.catalyst.parser.ParseException
-
-Operation not allowed: ALTER TABLE table CHANGE COLUMN ... FIRST | AFTER otherCol(line 1, pos 0)
-
-== SQL ==
-ALTER TABLE test_change CHANGE a TYPE INT AFTER b
-^^^
+org.apache.spark.sql.AnalysisException
+ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.;
-- !query 9
@@ -101,13 +96,8 @@
-- !query 9 schema
struct<>
-- !query 9 output
-org.apache.spark.sql.catalyst.parser.ParseException
-
-Operation not allowed: ALTER TABLE table CHANGE COLUMN ... FIRST | AFTER otherCol(line 1, pos 0)
-
-== SQL ==
-ALTER TABLE test_change CHANGE b TYPE STRING FIRST
-^^^
+org.apache.spark.sql.AnalysisException
+ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.;
-- !query 10
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 7392850..2ba3c99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -101,6 +101,49 @@
}
}
+ test("AlterTable: add column with position") {
+ val t = s"${catalogAndNamespace}table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (point struct<x: int>) USING $v2Format")
+
+ sql(s"ALTER TABLE $t ADD COLUMN a string FIRST")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("a", StringType)
+ .add("point", new StructType().add("x", IntegerType)))
+
+ sql(s"ALTER TABLE $t ADD COLUMN b string AFTER point")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("a", StringType)
+ .add("point", new StructType().add("x", IntegerType))
+ .add("b", StringType))
+
+ val e1 = intercept[SparkException](
+ sql(s"ALTER TABLE $t ADD COLUMN c string AFTER non_exist"))
+ assert(e1.getMessage().contains("AFTER column not found"))
+
+ sql(s"ALTER TABLE $t ADD COLUMN point.y int FIRST")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("a", StringType)
+ .add("point", new StructType()
+ .add("y", IntegerType)
+ .add("x", IntegerType))
+ .add("b", StringType))
+
+ sql(s"ALTER TABLE $t ADD COLUMN point.z int AFTER x")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("a", StringType)
+ .add("point", new StructType()
+ .add("y", IntegerType)
+ .add("x", IntegerType)
+ .add("z", IntegerType))
+ .add("b", StringType))
+
+ val e2 = intercept[SparkException](
+ sql(s"ALTER TABLE $t ADD COLUMN point.x2 int AFTER non_exist"))
+ assert(e2.getMessage().contains("AFTER column not found"))
+ }
+ }
+
test("AlterTable: add multiple columns") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {
@@ -471,6 +514,61 @@
}
}
+ test("AlterTable: update column position") {
+ val t = s"${catalogAndNamespace}table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (a int, b int, point struct<x: int, y: int, z: int>) USING $v2Format")
+
+ sql(s"ALTER TABLE $t ALTER COLUMN b FIRST")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("b", IntegerType)
+ .add("a", IntegerType)
+ .add("point", new StructType()
+ .add("x", IntegerType)
+ .add("y", IntegerType)
+ .add("z", IntegerType)))
+
+ sql(s"ALTER TABLE $t ALTER COLUMN b AFTER point")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("a", IntegerType)
+ .add("point", new StructType()
+ .add("x", IntegerType)
+ .add("y", IntegerType)
+ .add("z", IntegerType))
+ .add("b", IntegerType))
+
+ val e1 = intercept[SparkException](
+ sql(s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist"))
+ assert(e1.getMessage.contains("AFTER column not found"))
+
+ sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("a", IntegerType)
+ .add("point", new StructType()
+ .add("y", IntegerType)
+ .add("x", IntegerType)
+ .add("z", IntegerType))
+ .add("b", IntegerType))
+
+ sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER z")
+ assert(getTableMetadata(t).schema == new StructType()
+ .add("a", IntegerType)
+ .add("point", new StructType()
+ .add("x", IntegerType)
+ .add("z", IntegerType)
+ .add("y", IntegerType))
+ .add("b", IntegerType))
+
+ val e2 = intercept[SparkException](
+ sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist"))
+ assert(e2.getMessage.contains("AFTER column not found"))
+
+ // `AlterTable.resolved` checks column existence.
+ intercept[AnalysisException](
+ sql(s"ALTER TABLE $t ALTER COLUMN a.y AFTER x"))
+ }
+ }
+
test("AlterTable: update column type and comment") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index cfcd8c7..15381e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1199,7 +1199,7 @@
}
test("tableCreation: duplicate column names in the table definition") {
- val errorMsg = "Found duplicate column(s) in the table definition of `t`"
+ val errorMsg = "Found duplicate column(s) in the table definition of t"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
assertAnalysisError(
@@ -1223,7 +1223,7 @@
}
test("tableCreation: duplicate nested column names in the table definition") {
- val errorMsg = "Found duplicate column(s) in the table definition of `t`"
+ val errorMsg = "Found duplicate column(s) in the table definition of t"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
assertAnalysisError(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 1087367..2bb121b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -181,6 +181,16 @@
assert(e.contains("Hive built-in ORC data source must be used with Hive support enabled"))
}
}
+
+ test("ALTER TABLE ALTER COLUMN with position is not supported") {
+ withTable("t") {
+ sql("CREATE TABLE t(i INT) USING parquet")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t ALTER COLUMN i TYPE INT FIRST")
+ }
+ assert(e.message.contains("ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables"))
+ }
+ }
}
abstract class DDLSuite extends QueryTest with SQLTestUtils {