[FLINK-35432][pipeline-connector][mysql] Support catch modify event in mysql to send AlterColumnTypeEvent. (#3352)
Co-authored-by: haoke <haoke@bytedance.com>
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index cde5aa0..79583d8 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -235,6 +235,32 @@
}
@Override
+ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
+ String oldColumnName = parser.parseName(ctx.uid(0));
+ ColumnEditor columnEditor = Column.editor().name(oldColumnName);
+ columnEditor.unsetDefaultValueExpression();
+
+ columnDefinitionListener =
+ new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
+ listeners.add(columnDefinitionListener);
+ super.enterAlterByModifyColumn(ctx);
+ }
+
+ @Override
+ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
+ parser.runIfNotNull(
+ () -> {
+ Column column = columnDefinitionListener.getColumn();
+ Map<String, DataType> typeMapping = new HashMap<>();
+ typeMapping.put(column.name(), fromDbzColumn(column));
+ changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
+ listeners.remove(columnDefinitionListener);
+ },
+ columnDefinitionListener);
+ super.exitAlterByModifyColumn(ctx);
+ }
+
+ @Override
public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) {
parser.runIfNotNull(
() -> {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 076e573..bd2c059 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -556,6 +556,14 @@
statement.execute(
String.format(
+ "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AlterColumnTypeEvent(
+ tableId, Collections.singletonMap("DESC3", DataTypes.VARCHAR(255))));
+
+ statement.execute(
+ String.format(
"ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;",
inventoryDatabase.getDatabaseName()));
expected.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3")));