[FLINK-35432][pipeline-connector][mysql] Support catch modify event in mysql to send AlterColumnTypeEvent. (#3352)


Co-authored-by: haoke <haoke@bytedance.com>
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index cde5aa0..79583d8 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -235,6 +235,32 @@
     }
 
     @Override
+    public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
+        String oldColumnName = parser.parseName(ctx.uid(0));
+        ColumnEditor columnEditor = Column.editor().name(oldColumnName);
+        columnEditor.unsetDefaultValueExpression();
+
+        columnDefinitionListener =
+                new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
+        listeners.add(columnDefinitionListener);
+        super.enterAlterByModifyColumn(ctx);
+    }
+
+    @Override
+    public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    Column column = columnDefinitionListener.getColumn();
+                    Map<String, DataType> typeMapping = new HashMap<>();
+                    typeMapping.put(column.name(), fromDbzColumn(column));
+                    changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
+                    listeners.remove(columnDefinitionListener);
+                },
+                columnDefinitionListener);
+        super.exitAlterByModifyColumn(ctx);
+    }
+
+    @Override
     public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) {
         parser.runIfNotNull(
                 () -> {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 076e573..bd2c059 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -556,6 +556,14 @@
 
         statement.execute(
                 String.format(
+                        "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
+                        inventoryDatabase.getDatabaseName()));
+        expected.add(
+                new AlterColumnTypeEvent(
+                        tableId, Collections.singletonMap("DESC3", DataTypes.VARCHAR(255))));
+
+        statement.execute(
+                String.format(
                         "ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;",
                         inventoryDatabase.getDatabaseName()));
         expected.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3")));