[SPARK-30192][SQL] support column position in DS v2

### What changes were proposed in this pull request?

update DS v2 API to support add/alter column with column position

### Why are the changes needed?

We have a parser rule for column position, but we fail the query if it's specified, because the builtin catalog can't support add/alter column with column position.

Since we have the catalog plugin API now, we should let the catalog implementation to decide if it supports column position or not.

### Does this PR introduce any user-facing change?

not yet

### How was this patch tested?

new tests

Closes #26817 from cloud-fan/parser.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index be482a6..1f9d18c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -855,7 +855,7 @@
     ;
 
 colPosition
-    : FIRST | AFTER multipartIdentifier
+    : position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
     ;
 
 dataType
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
index 56d13ef..a56007b 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
@@ -17,14 +17,15 @@
 
 package org.apache.spark.sql.connector.catalog;
 
-import com.google.common.base.Preconditions;
-import org.apache.spark.annotation.Experimental;
-
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.google.common.base.Preconditions;
+
+import org.apache.spark.annotation.Experimental;
+
 /**
  *  An {@link Identifier} implementation.
  */
@@ -51,19 +52,11 @@
     return name;
   }
 
-  private String escapeQuote(String part) {
-    if (part.contains("`")) {
-      return part.replace("`", "``");
-    } else {
-      return part;
-    }
-  }
-
   @Override
   public String toString() {
     return Stream.concat(Stream.of(namespace), Stream.of(name))
-        .map(part -> '`' + escapeQuote(part) + '`')
-        .collect(Collectors.joining("."));
+      .map(CatalogV2Implicits::quote)
+      .collect(Collectors.joining("."));
   }
 
   @Override
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index 20c22388..7834399 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.connector.catalog;
 
-import org.apache.spark.annotation.Experimental;
-import org.apache.spark.sql.types.DataType;
-
 import java.util.Arrays;
 import java.util.Objects;
+import javax.annotation.Nullable;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
 
 /**
  * TableChange subclasses represent requested changes to a table. These are passed to
@@ -76,7 +77,7 @@
    * @return a TableChange for the addition
    */
   static TableChange addColumn(String[] fieldNames, DataType dataType) {
-    return new AddColumn(fieldNames, dataType, true, null);
+    return new AddColumn(fieldNames, dataType, true, null, null);
   }
 
   /**
@@ -92,7 +93,7 @@
    * @return a TableChange for the addition
    */
   static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
-    return new AddColumn(fieldNames, dataType, isNullable, null);
+    return new AddColumn(fieldNames, dataType, isNullable, null, null);
   }
 
   /**
@@ -113,7 +114,30 @@
       DataType dataType,
       boolean isNullable,
       String comment) {
-    return new AddColumn(fieldNames, dataType, isNullable, comment);
+    return new AddColumn(fieldNames, dataType, isNullable, comment, null);
+  }
+
+  /**
+   * Create a TableChange for adding a column.
+   * <p>
+   * If the field already exists, the change will result in an {@link IllegalArgumentException}.
+   * If the new field is nested and its parent does not exist or is not a struct, the change will
+   * result in an {@link IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the new column
+   * @param dataType the new column's data type
+   * @param isNullable whether the new column can contain null
+   * @param comment the new field's comment string
+   * @param position the new columns's position
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(
+      String[] fieldNames,
+      DataType dataType,
+      boolean isNullable,
+      String comment,
+      ColumnPosition position) {
+    return new AddColumn(fieldNames, dataType, isNullable, comment, position);
   }
 
   /**
@@ -181,6 +205,21 @@
   }
 
   /**
+   * Create a TableChange for updating the position of a field.
+   * <p>
+   * The name is used to find the field to update.
+   * <p>
+   * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the column to update
+   * @param newPosition the new position
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newPosition) {
+    return new UpdateColumnPosition(fieldNames, newPosition);
+  }
+
+  /**
    * Create a TableChange for deleting a field.
    * <p>
    * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
@@ -259,6 +298,69 @@
     }
   }
 
+  interface ColumnPosition {
+
+    static ColumnPosition first() {
+      return First.SINGLETON;
+    }
+
+    static ColumnPosition after(String column) {
+      return new After(column);
+    }
+  }
+
+  /**
+   * Column position FIRST means the specified column should be the first column.
+   * Note that, the specified column may be a nested field, and then FIRST means this field should
+   * be the first one within the struct.
+   */
+  final class First implements ColumnPosition {
+    private static final First SINGLETON = new First();
+
+    private First() {}
+
+    @Override
+    public String toString() {
+      return "FIRST";
+    }
+  }
+
+  /**
+   * Column position AFTER means the specified column should be put after the given `column`.
+   * Note that, the specified column may be a nested field, and then the given `column` refers to
+   * a field in the same struct.
+   */
+  final class After implements ColumnPosition {
+    private final String column;
+
+    private After(String column) {
+      assert column != null;
+      this.column = column;
+    }
+
+    public String column() {
+      return column;
+    }
+
+    @Override
+    public String toString() {
+      return "AFTER " + column;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      After after = (After) o;
+      return column.equals(after.column);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(column);
+    }
+  }
+
   interface ColumnChange extends TableChange {
     String[] fieldNames();
   }
@@ -275,12 +377,19 @@
     private final DataType dataType;
     private final boolean isNullable;
     private final String comment;
+    private final ColumnPosition position;
 
-    private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) {
+    private AddColumn(
+        String[] fieldNames,
+        DataType dataType,
+        boolean isNullable,
+        String comment,
+        ColumnPosition position) {
       this.fieldNames = fieldNames;
       this.dataType = dataType;
       this.isNullable = isNullable;
       this.comment = comment;
+      this.position = position;
     }
 
     @Override
@@ -296,10 +405,16 @@
       return isNullable;
     }
 
+    @Nullable
     public String comment() {
       return comment;
     }
 
+    @Nullable
+    public ColumnPosition position() {
+      return position;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) return true;
@@ -308,12 +423,13 @@
       return isNullable == addColumn.isNullable &&
         Arrays.equals(fieldNames, addColumn.fieldNames) &&
         dataType.equals(addColumn.dataType) &&
-        comment.equals(addColumn.comment);
+        Objects.equals(comment, addColumn.comment) &&
+        Objects.equals(position, addColumn.position);
     }
 
     @Override
     public int hashCode() {
-      int result = Objects.hash(dataType, isNullable, comment);
+      int result = Objects.hash(dataType, isNullable, comment, position);
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
@@ -454,6 +570,48 @@
   }
 
   /**
+   * A TableChange to update the position of a field.
+   * <p>
+   * The field names are used to find the field to update.
+   * <p>
+   * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
+   */
+  final class UpdateColumnPosition implements ColumnChange {
+    private final String[] fieldNames;
+    private final ColumnPosition position;
+
+    private UpdateColumnPosition(String[] fieldNames, ColumnPosition position) {
+      this.fieldNames = fieldNames;
+      this.position = position;
+    }
+
+    @Override
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+
+    public ColumnPosition position() {
+      return position;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      UpdateColumnPosition that = (UpdateColumnPosition) o;
+      return Arrays.equals(fieldNames, that.fieldNames) &&
+        position.equals(that.position);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(position);
+      result = 31 * result + Arrays.hashCode(fieldNames);
+      return result;
+    }
+  }
+
+  /**
    * A TableChange to delete a field.
    * <p>
    * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 8183aa3..3361173 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -35,19 +35,32 @@
     case AlterTableAddColumnsStatement(
          nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
       val changes = cols.map { col =>
-        TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
+        TableChange.addColumn(
+          col.name.toArray,
+          col.dataType,
+          true,
+          col.comment.orNull,
+          col.position.orNull)
       }
       createAlterTable(nameParts, catalog, tbl, changes)
 
     case AlterTableAlterColumnStatement(
-         nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment) =>
+         nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
+      val colNameArray = colName.toArray
       val typeChange = dataType.map { newDataType =>
-        TableChange.updateColumnType(colName.toArray, newDataType, true)
+        TableChange.updateColumnType(colNameArray, newDataType, true)
       }
       val commentChange = comment.map { newComment =>
-        TableChange.updateColumnComment(colName.toArray, newComment)
+        TableChange.updateColumnComment(colNameArray, newComment)
       }
-      createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange)
+      val positionChange = pos.map { newPosition =>
+        TableChange.updateColumnPosition(colNameArray, newPosition)
+      }
+      createAlterTable(
+        nameParts,
+        catalog,
+        tbl,
+        typeChange.toSeq ++ commentChange ++ positionChange)
 
     case AlterTableRenameColumnStatement(
          nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 9f8c1d2..8f38273 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -40,6 +40,7 @@
 import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -2803,19 +2804,23 @@
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
+  override def visitColPosition(ctx: ColPositionContext): ColumnPosition = {
+    ctx.position.getType match {
+      case SqlBaseParser.FIRST => ColumnPosition.first()
+      case SqlBaseParser.AFTER => ColumnPosition.after(ctx.afterCol.getText)
+    }
+  }
+
   /**
    * Parse new column info from ADD COLUMN into a QualifiedColType.
    */
   override def visitQualifiedColTypeWithPosition(
       ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
-    if (ctx.colPosition != null) {
-      operationNotAllowed("ALTER TABLE table ADD COLUMN ... FIRST | AFTER otherCol", ctx)
-    }
-
     QualifiedColType(
       typedVisit[Seq[String]](ctx.name),
       typedVisit[DataType](ctx.dataType),
-      Option(ctx.comment).map(string))
+      Option(ctx.comment).map(string),
+      Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
   }
 
   /**
@@ -2863,19 +2868,17 @@
   override def visitAlterTableColumn(
       ctx: AlterTableColumnContext): LogicalPlan = withOrigin(ctx) {
     val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
-    if (ctx.colPosition != null) {
-      operationNotAllowed(s"ALTER TABLE table $verb COLUMN ... FIRST | AFTER otherCol", ctx)
-    }
-
-    if (ctx.dataType == null && ctx.comment == null) {
-      operationNotAllowed(s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT", ctx)
+    if (ctx.dataType == null && ctx.comment == null && ctx.colPosition == null) {
+      operationNotAllowed(
+        s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER", ctx)
     }
 
     AlterTableAlterColumnStatement(
       visitMultipartIdentifier(ctx.table),
       typedVisit[Seq[String]](ctx.column),
       Option(ctx.dataType).map(typedVisit[DataType]),
-      Option(ctx.comment).map(string))
+      Option(ctx.comment).map(string),
+      Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 145a15c..e205dd4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -21,6 +21,7 @@
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -141,7 +142,11 @@
 /**
  * Column data as parsed by ALTER TABLE ... ADD COLUMNS.
  */
-case class QualifiedColType(name: Seq[String], dataType: DataType, comment: Option[String])
+case class QualifiedColType(
+    name: Seq[String],
+    dataType: DataType,
+    comment: Option[String],
+    position: Option[ColumnPosition])
 
 /**
  * ALTER TABLE ... ADD COLUMNS command, as parsed from SQL.
@@ -157,7 +162,8 @@
     tableName: Seq[String],
     column: Seq[String],
     dataType: Option[DataType],
-    comment: Option[String]) extends ParsedStatement
+    comment: Option[String],
+    position: Option[ColumnPosition]) extends ParsedStatement
 
 /**
  * ALTER TABLE ... RENAME COLUMN command, as parsed from SQL.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 882e968..86e5894 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -118,7 +118,7 @@
     def quoted: String = parts.map(quote).mkString(".")
   }
 
-  private def quote(part: String): String = {
+  def quote(part: String): String = {
     if (part.contains(".") || part.contains("`")) {
       s"`${part.replace("`", "``")}`"
     } else {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 0dcd595..2f4914d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -104,26 +104,16 @@
         case add: AddColumn =>
           add.fieldNames match {
             case Array(name) =>
-              val newField = StructField(name, add.dataType, nullable = add.isNullable)
-              Option(add.comment) match {
-                case Some(comment) =>
-                  schema.add(newField.withComment(comment))
-                case _ =>
-                  schema.add(newField)
-              }
+              val field = StructField(name, add.dataType, nullable = add.isNullable)
+              val newField = Option(add.comment).map(field.withComment).getOrElse(field)
+              addField(schema, newField, add.position())
 
             case names =>
               replace(schema, names.init, parent => parent.dataType match {
                 case parentType: StructType =>
                   val field = StructField(names.last, add.dataType, nullable = add.isNullable)
-                  val newParentType = Option(add.comment) match {
-                    case Some(comment) =>
-                      parentType.add(field.withComment(comment))
-                    case None =>
-                      parentType.add(field)
-                  }
-
-                  Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata))
+                  val newField = Option(add.comment).map(field.withComment).getOrElse(field)
+                  Some(parent.copy(dataType = addField(parentType, newField, add.position())))
 
                 case _ =>
                   throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
@@ -147,6 +137,27 @@
           replace(schema, update.fieldNames, field =>
             Some(field.withComment(update.newComment)))
 
+        case update: UpdateColumnPosition =>
+          def updateFieldPos(struct: StructType, name: String): StructType = {
+            val oldField = struct.fields.find(_.name == name).getOrElse {
+              throw new IllegalArgumentException("Field not found: " + name)
+            }
+            val withFieldRemoved = StructType(struct.fields.filter(_ != oldField))
+            addField(withFieldRemoved, oldField, update.position())
+          }
+
+          update.fieldNames() match {
+            case Array(name) =>
+              updateFieldPos(schema, name)
+            case names =>
+              replace(schema, names.init, parent => parent.dataType match {
+                case parentType: StructType =>
+                  Some(parent.copy(dataType = updateFieldPos(parentType, names.last)))
+                case _ =>
+                  throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
+              })
+          }
+
         case delete: DeleteColumn =>
           replace(schema, delete.fieldNames, _ => None)
 
@@ -157,6 +168,25 @@
     }
   }
 
+  private def addField(
+      schema: StructType,
+      field: StructField,
+      position: ColumnPosition): StructType = {
+    if (position == null) {
+      schema.add(field)
+    } else if (position.isInstanceOf[First]) {
+      StructType(field +: schema.fields)
+    } else {
+      val afterCol = position.asInstanceOf[After].column()
+      val fieldIndex = schema.fields.indexWhere(_.name == afterCol)
+      if (fieldIndex == -1) {
+        throw new IllegalArgumentException("AFTER column not found: " + afterCol)
+      }
+      val (before, after) = schema.fields.splitAt(fieldIndex + 1)
+      StructType(before ++ (field +: after))
+    }
+  }
+
   private def replace(
       struct: StructType,
       fieldNames: Seq[String],
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index e6d503b..ec3a731 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -24,6 +24,7 @@
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
@@ -492,7 +493,7 @@
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None)
+        QualifiedColType(Seq("x"), IntegerType, None, None)
       )))
   }
 
@@ -500,8 +501,8 @@
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None),
-        QualifiedColType(Seq("y"), StringType, None)
+        QualifiedColType(Seq("x"), IntegerType, None, None),
+        QualifiedColType(Seq("y"), StringType, None, None)
       )))
   }
 
@@ -509,7 +510,7 @@
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS x int"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None)
+        QualifiedColType(Seq("x"), IntegerType, None, None)
       )))
   }
 
@@ -517,7 +518,7 @@
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None)
+        QualifiedColType(Seq("x"), IntegerType, None, None)
       )))
   }
 
@@ -525,7 +526,7 @@
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, Some("doc"))
+        QualifiedColType(Seq("x"), IntegerType, Some("doc"), None)
       )))
   }
 
@@ -533,7 +534,21 @@
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, Some("doc"))
+        QualifiedColType(Seq("x"), IntegerType, Some("doc"), None)
+      )))
+  }
+
+  test("alter table: add column with position") {
+    comparePlans(
+      parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"),
+      AlterTableAddColumnsStatement(Seq("table_name"), Seq(
+        QualifiedColType(Seq("x"), IntegerType, None, Some(first()))
+      )))
+
+    comparePlans(
+      parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"),
+      AlterTableAddColumnsStatement(Seq("table_name"), Seq(
+        QualifiedColType(Seq("x"), IntegerType, None, Some(after("y")))
       )))
   }
 
@@ -541,25 +556,19 @@
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"))
+        QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None)
       )))
   }
 
   test("alter table: add multiple columns with nested column name") {
     comparePlans(
-      parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string"),
+      parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc")),
-        QualifiedColType(Seq("a", "b"), StringType, None)
+        QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None),
+        QualifiedColType(Seq("a", "b"), StringType, None, Some(first()))
       )))
   }
 
-  test("alter table: add column at position (not supported)") {
-    assertUnsupported("ALTER TABLE table_name ADD COLUMNS name bigint COMMENT 'doc' FIRST, a.b int")
-    assertUnsupported("ALTER TABLE table_name ADD COLUMN name bigint COMMENT 'doc' FIRST")
-    assertUnsupported("ALTER TABLE table_name ADD COLUMN name string AFTER a.b")
-  }
-
   test("alter table: set location") {
     comparePlans(
       parsePlan("ALTER TABLE a.b.c SET LOCATION 'new location'"),
@@ -589,6 +598,7 @@
         Seq("table_name"),
         Seq("a", "b", "c"),
         Some(LongType),
+        None,
         None))
   }
 
@@ -599,6 +609,7 @@
         Seq("table_name"),
         Seq("a", "b", "c"),
         Some(LongType),
+        None,
         None))
   }
 
@@ -609,22 +620,31 @@
         Seq("table_name"),
         Seq("a", "b", "c"),
         None,
-        Some("new comment")))
+        Some("new comment"),
+        None))
   }
 
-  test("alter table: update column type and comment") {
+  test("alter table: update column position") {
     comparePlans(
-      parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint COMMENT 'new comment'"),
+      parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"),
+      AlterTableAlterColumnStatement(
+        Seq("table_name"),
+        Seq("a", "b", "c"),
+        None,
+        None,
+        Some(first())))
+  }
+
+  test("alter table: update column type, comment and position") {
+    comparePlans(
+      parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c " +
+        "TYPE bigint COMMENT 'new comment' AFTER d"),
       AlterTableAlterColumnStatement(
         Seq("table_name"),
         Seq("a", "b", "c"),
         Some(LongType),
-        Some("new comment")))
-  }
-
-  test("alter table: change column position (not supported)") {
-    assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name COMMENT 'doc' FIRST")
-    assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name TYPE INT AFTER other_col")
+        Some("new comment"),
+        Some(after("d"))))
   }
 
   test("alter table: drop column") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 1dedf6e7..834e99c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -55,13 +55,18 @@
           AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField))
       }.getOrElse {
         val changes = cols.map { col =>
-          TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
+          TableChange.addColumn(
+            col.name.toArray,
+            col.dataType,
+            true,
+            col.comment.orNull,
+            col.position.orNull)
         }
         createAlterTable(nameParts, catalog, tbl, changes)
       }
 
     case AlterTableAlterColumnStatement(
-         nameParts @ SessionCatalogAndTable(catalog, tbl), colName, dataType, comment) =>
+         nameParts @ SessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
       loadTable(catalog, tbl.asIdentifier).collect {
         case v1Table: V1Table =>
           if (colName.length > 1) {
@@ -72,6 +77,10 @@
             throw new AnalysisException(
               "ALTER COLUMN with v1 tables must specify new data type.")
           }
+          if (pos.isDefined) {
+            throw new AnalysisException("" +
+              "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.")
+          }
           val builder = new MetadataBuilder
           // Add comment to metadata
           comment.map(c => builder.putString("comment", c))
@@ -87,13 +96,21 @@
             builder.build())
           AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName(0), newColumn)
       }.getOrElse {
+        val colNameArray = colName.toArray
         val typeChange = dataType.map { newDataType =>
-          TableChange.updateColumnType(colName.toArray, newDataType, true)
+          TableChange.updateColumnType(colNameArray, newDataType, true)
         }
         val commentChange = comment.map { newComment =>
-          TableChange.updateColumnComment(colName.toArray, newComment)
+          TableChange.updateColumnComment(colNameArray, newComment)
         }
-        createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange)
+        val positionChange = pos.map { newPosition =>
+          TableChange.updateColumnPosition(colNameArray, newPosition)
+        }
+        createAlterTable(
+          nameParts,
+          catalog,
+          tbl,
+          typeChange.toSeq ++ commentChange ++ positionChange)
       }
 
     case AlterTableRenameColumnStatement(
diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
index 21a344c..8232634 100644
--- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
@@ -27,7 +27,7 @@
 -- !query 2 output
 org.apache.spark.sql.catalyst.parser.ParseException
 
-Operation not allowed: ALTER TABLE table CHANGE COLUMN requires a TYPE or a COMMENT(line 1, pos 0)
+Operation not allowed: ALTER TABLE table CHANGE COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER(line 1, pos 0)
 
 == SQL ==
 ALTER TABLE test_change CHANGE a
@@ -87,13 +87,8 @@
 -- !query 8 schema
 struct<>
 -- !query 8 output
-org.apache.spark.sql.catalyst.parser.ParseException
-
-Operation not allowed: ALTER TABLE table CHANGE COLUMN ... FIRST | AFTER otherCol(line 1, pos 0)
-
-== SQL ==
-ALTER TABLE test_change CHANGE a TYPE INT AFTER b
-^^^
+org.apache.spark.sql.AnalysisException
+ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.;
 
 
 -- !query 9
@@ -101,13 +96,8 @@
 -- !query 9 schema
 struct<>
 -- !query 9 output
-org.apache.spark.sql.catalyst.parser.ParseException
-
-Operation not allowed: ALTER TABLE table CHANGE COLUMN ... FIRST | AFTER otherCol(line 1, pos 0)
-
-== SQL ==
-ALTER TABLE test_change CHANGE b TYPE STRING FIRST
-^^^
+org.apache.spark.sql.AnalysisException
+ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.;
 
 
 -- !query 10
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 7392850..2ba3c99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -101,6 +101,49 @@
     }
   }
 
+  test("AlterTable: add column with position") {
+    val t = s"${catalogAndNamespace}table_name"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (point struct<x: int>) USING $v2Format")
+
+      sql(s"ALTER TABLE $t ADD COLUMN a string FIRST")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("a", StringType)
+        .add("point", new StructType().add("x", IntegerType)))
+
+      sql(s"ALTER TABLE $t ADD COLUMN b string AFTER point")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("a", StringType)
+        .add("point", new StructType().add("x", IntegerType))
+        .add("b", StringType))
+
+      val e1 = intercept[SparkException](
+        sql(s"ALTER TABLE $t ADD COLUMN c string AFTER non_exist"))
+      assert(e1.getMessage().contains("AFTER column not found"))
+
+      sql(s"ALTER TABLE $t ADD COLUMN point.y int FIRST")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("a", StringType)
+        .add("point", new StructType()
+          .add("y", IntegerType)
+          .add("x", IntegerType))
+        .add("b", StringType))
+
+      sql(s"ALTER TABLE $t ADD COLUMN point.z int AFTER x")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("a", StringType)
+        .add("point", new StructType()
+          .add("y", IntegerType)
+          .add("x", IntegerType)
+          .add("z", IntegerType))
+        .add("b", StringType))
+
+      val e2 = intercept[SparkException](
+        sql(s"ALTER TABLE $t ADD COLUMN point.x2 int AFTER non_exist"))
+      assert(e2.getMessage().contains("AFTER column not found"))
+    }
+  }
+
   test("AlterTable: add multiple columns") {
     val t = s"${catalogAndNamespace}table_name"
     withTable(t) {
@@ -471,6 +514,61 @@
     }
   }
 
+  test("AlterTable: update column position") {
+    val t = s"${catalogAndNamespace}table_name"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (a int, b int, point struct<x: int, y: int, z: int>) USING $v2Format")
+
+      sql(s"ALTER TABLE $t ALTER COLUMN b FIRST")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("b", IntegerType)
+        .add("a", IntegerType)
+        .add("point", new StructType()
+          .add("x", IntegerType)
+          .add("y", IntegerType)
+          .add("z", IntegerType)))
+
+      sql(s"ALTER TABLE $t ALTER COLUMN b AFTER point")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("a", IntegerType)
+        .add("point", new StructType()
+          .add("x", IntegerType)
+          .add("y", IntegerType)
+          .add("z", IntegerType))
+        .add("b", IntegerType))
+
+      val e1 = intercept[SparkException](
+        sql(s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist"))
+      assert(e1.getMessage.contains("AFTER column not found"))
+
+      sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("a", IntegerType)
+        .add("point", new StructType()
+          .add("y", IntegerType)
+          .add("x", IntegerType)
+          .add("z", IntegerType))
+        .add("b", IntegerType))
+
+      sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER z")
+      assert(getTableMetadata(t).schema == new StructType()
+        .add("a", IntegerType)
+        .add("point", new StructType()
+          .add("x", IntegerType)
+          .add("z", IntegerType)
+          .add("y", IntegerType))
+        .add("b", IntegerType))
+
+      val e2 = intercept[SparkException](
+        sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist"))
+      assert(e2.getMessage.contains("AFTER column not found"))
+
+      // `AlterTable.resolved` checks column existence.
+      intercept[AnalysisException](
+        sql(s"ALTER TABLE $t ALTER COLUMN a.y AFTER x"))
+    }
+  }
+
   test("AlterTable: update column type and comment") {
     val t = s"${catalogAndNamespace}table_name"
     withTable(t) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index cfcd8c7..15381e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1199,7 +1199,7 @@
   }
 
   test("tableCreation: duplicate column names in the table definition") {
-    val errorMsg = "Found duplicate column(s) in the table definition of `t`"
+    val errorMsg = "Found duplicate column(s) in the table definition of t"
     Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         assertAnalysisError(
@@ -1223,7 +1223,7 @@
   }
 
   test("tableCreation: duplicate nested column names in the table definition") {
-    val errorMsg = "Found duplicate column(s) in the table definition of `t`"
+    val errorMsg = "Found duplicate column(s) in the table definition of t"
     Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         assertAnalysisError(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 1087367..2bb121b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -181,6 +181,16 @@
       assert(e.contains("Hive built-in ORC data source must be used with Hive support enabled"))
     }
   }
+
+  test("ALTER TABLE ALTER COLUMN with position is not supported") {
+    withTable("t") {
+      sql("CREATE TABLE t(i INT) USING parquet")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t ALTER COLUMN i TYPE INT FIRST")
+      }
+      assert(e.message.contains("ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables"))
+    }
+  }
 }
 
 abstract class DDLSuite extends QueryTest with SQLTestUtils {