[Improve]Separate the record and schema-change in JsonDebeziumSchemaSerializer (#279)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java
new file mode 100644
index 0000000..8d7c5cc
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import java.io.Serializable;
+
+/**
+ * represents the change events of external systems, including data change and schema change event.
+ */
+public interface ChangeEvent extends Serializable {}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index cdbd482..370bca7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -18,88 +18,56 @@
 package org.apache.doris.flink.sink.writer.serializer;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.NullNode;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.doris.flink.catalog.doris.FieldSchema;
-import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.exception.IllegalArgumentException;
-import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
-import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
-import org.apache.doris.flink.sink.schema.SchemaChangeManager;
-import org.apache.doris.flink.sink.writer.EventType;
-import org.apache.doris.flink.tools.cdc.SourceConnector;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
-import org.apache.doris.flink.tools.cdc.oracle.OracleType;
-import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
-import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
 import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
+/**
+ * Serialize the records of the upstream data source into a data form that can be recognized by
+ * downstream doris.
+ *
+ * <p>There are two serialization methods here: <br>
+ * 1. data change{@link JsonDebeziumDataChange} record. <br>
+ * 2. schema change{@link JsonDebeziumSchemaChange} records.
+ */
 public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
     private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
-    private static final String OP_READ = "r"; // snapshot read
-    private static final String OP_CREATE = "c"; // insert
-    private static final String OP_UPDATE = "u"; // update
-    private static final String OP_DELETE = "d"; // delete
-    public static final String EXECUTE_DDL =
-            "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int
-    private static final String addDropDDLRegex =
-            "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
-    private static final Pattern renameDDLPattern =
-            Pattern.compile(
-                    "ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
-                    Pattern.CASE_INSENSITIVE);
-    private final Pattern addDropDDLPattern;
-    private DorisOptions dorisOptions;
-    private ObjectMapper objectMapper = new ObjectMapper();
-    private String database;
-    private String table;
+    private final Pattern pattern;
+    private final DorisOptions dorisOptions;
+    private final ObjectMapper objectMapper = new ObjectMapper();
     // table name of the cdc upstream, format is db.tbl
-    private String sourceTableName;
+    private final String sourceTableName;
     private boolean firstLoad;
-    private boolean firstSchemaChange;
-    private Map<String, FieldSchema> originFieldSchemaMap;
     private final boolean newSchemaChange;
     private String lineDelimiter = LINE_DELIMITER_DEFAULT;
     private boolean ignoreUpdateBefore = true;
-    private SourceConnector sourceConnector;
-    private SchemaChangeManager schemaChangeManager;
     // <cdc db.schema.table, doris db.table>
     private Map<String, String> tableMapping;
     // create table properties
     private Map<String, String> tableProperties;
     private String targetDatabase;
+    private JsonDebeziumDataChange dataChange;
+    private JsonDebeziumSchemaChange schemaChange;
 
     public JsonDebeziumSchemaSerializer(
             DorisOptions dorisOptions,
@@ -107,15 +75,7 @@
             String sourceTableName,
             boolean newSchemaChange) {
         this.dorisOptions = dorisOptions;
-        this.addDropDDLPattern =
-                pattern == null
-                        ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE)
-                        : pattern;
-        if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
-            String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
-            this.database = tableInfo[0];
-            this.table = tableInfo[1];
-        }
+        this.pattern = pattern;
         this.sourceTableName = sourceTableName;
         // Prevent loss of decimal data precision
         this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
@@ -123,8 +83,6 @@
         this.objectMapper.setNodeFactory(jsonNodeFactory);
         this.newSchemaChange = newSchemaChange;
         this.firstLoad = true;
-        this.firstSchemaChange = true;
-        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
     }
 
     public JsonDebeziumSchemaSerializer(
@@ -156,6 +114,27 @@
         this.tableMapping = tableMapping;
         this.tableProperties = tableProperties;
         this.targetDatabase = targetDatabase;
+        init();
+    }
+
+    private void init() {
+        JsonDebeziumChangeContext changeContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        sourceTableName,
+                        targetDatabase,
+                        tableProperties,
+                        objectMapper,
+                        pattern,
+                        lineDelimiter,
+                        ignoreUpdateBefore);
+
+        this.schemaChange =
+                newSchemaChange
+                        ? new JsonDebeziumSchemaChangeImplV2(changeContext)
+                        : new JsonDebeziumSchemaChangeImpl(changeContext);
+        this.dataChange = new JsonDebeziumDataChange(changeContext);
     }
 
     @Override
@@ -165,360 +144,15 @@
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
             // schema change ddl
-            if (newSchemaChange) {
-                schemaChangeV2(recordRoot);
-            } else {
-                schemaChange(recordRoot);
-            }
+            schemaChange.schemaChange(recordRoot);
             return null;
         }
 
-        if (newSchemaChange && firstLoad) {
-            initOriginFieldSchema(recordRoot);
+        if (firstLoad) {
+            schemaChange.init(recordRoot);
+            firstLoad = false;
         }
-
-        // Filter out table records that are not in tableMapping
-        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
-        String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
-        if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
-            LOG.warn(
-                    "filter table {}, because it is not listened, record detail is {}",
-                    cdcTableIdentifier,
-                    record);
-            return null;
-        }
-
-        Map<String, Object> valueMap;
-        switch (op) {
-            case OP_READ:
-            case OP_CREATE:
-                valueMap = extractAfterRow(recordRoot);
-                addDeleteSign(valueMap, false);
-                break;
-            case OP_UPDATE:
-                return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot));
-            case OP_DELETE:
-                valueMap = extractBeforeRow(recordRoot);
-                addDeleteSign(valueMap, true);
-                break;
-            default:
-                LOG.error("parse record fail, unknown op {} in {}", op, record);
-                return null;
-        }
-
-        return DorisRecord.of(
-                dorisTableIdentifier,
-                objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
-    }
-
-    /**
-     * Change the update event into two.
-     *
-     * @param recordRoot
-     * @return
-     */
-    private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
-        StringBuilder updateRow = new StringBuilder();
-        if (!ignoreUpdateBefore) {
-            // convert delete
-            Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
-            addDeleteSign(beforeRow, true);
-            updateRow.append(objectMapper.writeValueAsString(beforeRow)).append(this.lineDelimiter);
-        }
-
-        // convert insert
-        Map<String, Object> afterRow = extractAfterRow(recordRoot);
-        addDeleteSign(afterRow, false);
-        updateRow.append(objectMapper.writeValueAsString(afterRow));
-        return updateRow.toString().getBytes(StandardCharsets.UTF_8);
-    }
-
-    public boolean schemaChangeV2(JsonNode recordRoot) {
-        boolean status = false;
-        try {
-            if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
-                return false;
-            }
-
-            EventType eventType = extractEventType(recordRoot);
-            if (eventType == null) {
-                return false;
-            }
-            if (eventType.equals(EventType.CREATE)) {
-                TableSchema tableSchema = extractCreateTableSchema(recordRoot);
-                status = schemaChangeManager.createTable(tableSchema);
-                if (status) {
-                    String cdcTbl = getCdcTableIdentifier(recordRoot);
-                    String dorisTbl = getCreateTableIdentifier(recordRoot);
-                    tableMapping.put(cdcTbl, dorisTbl);
-                    LOG.info("create table ddl status: {}", status);
-                }
-            } else if (eventType.equals(EventType.ALTER)) {
-                // db,table
-                Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
-                if (tuple == null) {
-                    return false;
-                }
-                List<String> ddlSqlList = extractDDLList(recordRoot);
-                if (CollectionUtils.isEmpty(ddlSqlList)) {
-                    LOG.info("ddl can not do schema change:{}", recordRoot);
-                    return false;
-                }
-                List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
-                for (int i = 0; i < ddlSqlList.size(); i++) {
-                    DDLSchema ddlSchema = ddlSchemas.get(i);
-                    String ddlSql = ddlSqlList.get(i);
-                    boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
-                    status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
-                    LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
-                }
-            } else {
-                LOG.info("Unsupported event type {}", eventType);
-            }
-        } catch (Exception ex) {
-            LOG.warn("schema change error :", ex);
-        }
-        return status;
-    }
-
-    protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
-        JsonNode historyRecord = extractHistoryRecord(record);
-        JsonNode tableChanges = historyRecord.get("tableChanges");
-        if (!Objects.isNull(tableChanges)) {
-            JsonNode tableChange = tableChanges.get(0);
-            return tableChange;
-        }
-        return null;
-    }
-
-    /** Parse Alter Event. */
-    @VisibleForTesting
-    public List<String> extractDDLList(JsonNode record) throws IOException {
-        String dorisTable = getDorisTableIdentifier(record);
-        JsonNode historyRecord = extractHistoryRecord(record);
-        String ddl = extractJsonNode(historyRecord, "ddl");
-        JsonNode tableChange = extractTableChange(record);
-        if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
-            return null;
-        }
-
-        JsonNode columns = tableChange.get("table").get("columns");
-        if (firstSchemaChange) {
-            sourceConnector =
-                    SourceConnector.valueOf(
-                            record.get("source").get("connector").asText().toUpperCase());
-            fillOriginSchema(columns);
-        }
-
-        // rename ddl
-        Matcher renameMatcher = renameDDLPattern.matcher(ddl);
-        if (renameMatcher.find()) {
-            String oldColumnName = renameMatcher.group(2);
-            String newColumnName = renameMatcher.group(3);
-            return SchemaChangeHelper.generateRenameDDLSql(
-                    dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
-        }
-
-        // add/drop ddl
-        Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
-        for (JsonNode column : columns) {
-            buildFieldSchema(updateFiledSchema, column);
-        }
-        SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
-        // In order to avoid other source table column change operations other than add/drop/rename,
-        // which may lead to the accidental deletion of the doris column.
-        Matcher matcher = addDropDDLPattern.matcher(ddl);
-        if (!matcher.find()) {
-            return null;
-        }
-        return SchemaChangeHelper.generateDDLSql(dorisTable);
-    }
-
-    @VisibleForTesting
-    public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
-        if (sourceConnector == null) {
-            sourceConnector =
-                    SourceConnector.valueOf(
-                            record.get("source").get("connector").asText().toUpperCase());
-        }
-
-        String dorisTable = getCreateTableIdentifier(record);
-        JsonNode tableChange = extractTableChange(record);
-        JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
-        JsonNode columns = tableChange.get("table").get("columns");
-        JsonNode comment = tableChange.get("table").get("comment");
-        String tblComment = comment == null ? "" : comment.asText();
-        Map<String, FieldSchema> field = new LinkedHashMap<>();
-        for (JsonNode column : columns) {
-            buildFieldSchema(field, column);
-        }
-        List<String> pkList = new ArrayList<>();
-        for (JsonNode column : pkColumns) {
-            String fieldName = column.asText();
-            pkList.add(fieldName);
-        }
-
-        TableSchema tableSchema = new TableSchema();
-        tableSchema.setFields(field);
-        tableSchema.setKeys(pkList);
-        tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
-        tableSchema.setTableComment(tblComment);
-        tableSchema.setProperties(tableProperties);
-
-        String[] split = dorisTable.split("\\.");
-        Preconditions.checkArgument(split.length == 2);
-        tableSchema.setDatabase(split[0]);
-        tableSchema.setTable(split[1]);
-        return tableSchema;
-    }
-
-    private List<String> buildDistributeKeys(
-            List<String> primaryKeys, Map<String, FieldSchema> fields) {
-        if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
-            return primaryKeys;
-        }
-        if (!fields.isEmpty()) {
-            Map.Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
-            return Collections.singletonList(firstField.getKey());
-        }
-        return new ArrayList<>();
-    }
-
-    @VisibleForTesting
-    public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
-        this.originFieldSchemaMap = originFieldSchemaMap;
-    }
-
-    @VisibleForTesting
-    public boolean schemaChange(JsonNode recordRoot) {
-        boolean status = false;
-        try {
-            if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
-                return false;
-            }
-            // db,table
-            Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
-            if (tuple == null) {
-                return false;
-            }
-
-            String ddl = extractDDL(recordRoot);
-            if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
-                LOG.info("ddl can not do schema change:{}", recordRoot);
-                return false;
-            }
-
-            boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl);
-            status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
-            LOG.info("schema change status:{}", status);
-        } catch (Exception ex) {
-            LOG.warn("schema change error :", ex);
-        }
-        return status;
-    }
-
-    /** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */
-    protected boolean checkTable(JsonNode recordRoot) {
-        String db = extractDatabase(recordRoot);
-        String tbl = extractTable(recordRoot);
-        String dbTbl = db + "." + tbl;
-        return sourceTableName.equals(dbTbl);
-    }
-
-    public String getCdcTableIdentifier(JsonNode record) {
-        String db = extractJsonNode(record.get("source"), "db");
-        String schema = extractJsonNode(record.get("source"), "schema");
-        String table = extractJsonNode(record.get("source"), "table");
-        return SourceSchema.getString(db, schema, table);
-    }
-
-    public String getCreateTableIdentifier(JsonNode record) {
-        String table = extractJsonNode(record.get("source"), "table");
-        return targetDatabase + "." + table;
-    }
-
-    public String getDorisTableIdentifier(String cdcTableIdentifier) {
-        if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
-            return dorisOptions.getTableIdentifier();
-        }
-        if (!CollectionUtil.isNullOrEmpty(tableMapping)
-                && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
-                && tableMapping.get(cdcTableIdentifier) != null) {
-            return tableMapping.get(cdcTableIdentifier);
-        }
-        return null;
-    }
-
-    protected String getDorisTableIdentifier(JsonNode record) {
-        String identifier = getCdcTableIdentifier(record);
-        return getDorisTableIdentifier(identifier);
-    }
-
-    protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
-        String identifier = getDorisTableIdentifier(record);
-        if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
-            return null;
-        }
-        String[] tableInfo = identifier.split("\\.");
-        if (tableInfo.length != 2) {
-            return null;
-        }
-        return Tuple2.of(tableInfo[0], tableInfo[1]);
-    }
-
-    private boolean checkSchemaChange(String database, String table, String ddl)
-            throws IOException, IllegalArgumentException {
-        Map<String, Object> param = buildRequestParam(ddl);
-        return schemaChangeManager.checkSchemaChange(database, table, param);
-    }
-
-    private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
-            throws IOException, IllegalArgumentException {
-        Map<String, Object> param =
-                SchemaChangeManager.buildRequestParam(
-                        ddlSchema.isDropColumn(), ddlSchema.getColumnName());
-        return schemaChangeManager.checkSchemaChange(database, table, param);
-    }
-
-    /** Build param { "isDropColumn": true, "columnName" : "column" }. */
-    protected Map<String, Object> buildRequestParam(String ddl) {
-        Map<String, Object> params = new HashMap<>();
-        Matcher matcher = addDropDDLPattern.matcher(ddl);
-        if (matcher.find()) {
-            String op = matcher.group(1);
-            String col = matcher.group(3);
-            params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
-            params.put("columnName", col);
-        }
-        return params;
-    }
-
-    protected String extractDatabase(JsonNode record) {
-        if (record.get("source").has("schema")) {
-            // compatible with schema
-            return extractJsonNode(record.get("source"), "schema");
-        } else {
-            return extractJsonNode(record.get("source"), "db");
-        }
-    }
-
-    protected String extractTable(JsonNode record) {
-        return extractJsonNode(record.get("source"), "table");
-    }
-
-    /** Parse event type. */
-    protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
-        JsonNode tableChange = extractTableChange(record);
-        if (tableChange == null || tableChange.get("type") == null) {
-            return null;
-        }
-        String type = tableChange.get("type").asText();
-        if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
-            return EventType.ALTER;
-        } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
-            return EventType.CREATE;
-        }
-        return null;
+        return dataChange.serialize(record, recordRoot, op);
     }
 
     private String extractJsonNode(JsonNode record, String key) {
@@ -527,154 +161,9 @@
                 : null;
     }
 
-    private Map<String, Object> extractBeforeRow(JsonNode record) {
-        return extractRow(record.get("before"));
-    }
-
-    private Map<String, Object> extractAfterRow(JsonNode record) {
-        return extractRow(record.get("after"));
-    }
-
-    private Map<String, Object> extractRow(JsonNode recordRow) {
-        Map<String, Object> recordMap =
-                objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
-        return recordMap != null ? recordMap : new HashMap<>();
-    }
-
-    private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
-        if (record != null && record.has("historyRecord")) {
-            return objectMapper.readTree(record.get("historyRecord").asText());
-        }
-        // The ddl passed by some scenes will not be included in the historyRecord, such as
-        // DebeziumSourceFunction
-        return record;
-    }
-
-    public String extractDDL(JsonNode record) throws JsonProcessingException {
-        JsonNode historyRecord = extractHistoryRecord(record);
-        String ddl = extractJsonNode(historyRecord, "ddl");
-        LOG.debug("received debezium ddl :{}", ddl);
-        if (!Objects.isNull(ddl)) {
-            // filter add/drop operation
-            Matcher matcher = addDropDDLPattern.matcher(ddl);
-            if (matcher.find()) {
-                String op = matcher.group(1);
-                String col = matcher.group(3);
-                String type = matcher.group(5);
-                type = handleType(type);
-                ddl = String.format(EXECUTE_DDL, getDorisTableIdentifier(record), op, col, type);
-                LOG.info("parse ddl:{}", ddl);
-                return ddl;
-            }
-        }
-        return null;
-    }
-
     @VisibleForTesting
-    public void fillOriginSchema(JsonNode columns) {
-        if (Objects.nonNull(originFieldSchemaMap)) {
-            for (JsonNode column : columns) {
-                String fieldName = column.get("name").asText();
-                if (originFieldSchemaMap.containsKey(fieldName)) {
-                    String dorisTypeName = buildDorisTypeName(column);
-                    String defaultValue =
-                            handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
-                    String comment = extractJsonNode(column, "comment");
-                    FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
-                    fieldSchema.setName(fieldName);
-                    fieldSchema.setTypeString(dorisTypeName);
-                    fieldSchema.setComment(comment);
-                    fieldSchema.setDefaultValue(defaultValue);
-                }
-            }
-        } else {
-            LOG.error(
-                    "Current schema change failed! You need to ensure that "
-                            + "there is data in the table."
-                            + dorisOptions.getTableIdentifier());
-            originFieldSchemaMap = new LinkedHashMap<>();
-            columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
-        }
-        firstSchemaChange = false;
-        firstLoad = false;
-    }
-
-    private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
-        String fieldName = column.get("name").asText();
-        String dorisTypeName = buildDorisTypeName(column);
-        String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
-        String comment = extractJsonNode(column, "comment");
-        filedSchemaMap.put(
-                fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
-    }
-
-    @VisibleForTesting
-    public String buildDorisTypeName(JsonNode column) {
-        int length = column.get("length") == null ? 0 : column.get("length").asInt();
-        int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
-        String sourceTypeName = column.get("typeName").asText();
-        String dorisTypeName;
-        switch (sourceConnector) {
-            case MYSQL:
-                dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
-                break;
-            case ORACLE:
-                dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
-                break;
-            case POSTGRES:
-                dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
-                break;
-            case SQLSERVER:
-                dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
-                break;
-            default:
-                String errMsg = "Not support " + sourceTypeName + " schema change.";
-                throw new UnsupportedOperationException(errMsg);
-        }
-        return dorisTypeName;
-    }
-
-    private String handleDefaultValue(String defaultValue) {
-        if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
-            return null;
-        }
-        // Due to historical reasons, doris needs to add quotes to the default value of the new
-        // column
-        // For example in mysql: alter table add column c1 int default 100
-        // In Doris: alter table add column c1 int default '100'
-        if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
-            return defaultValue;
-        } else if (defaultValue.equals("1970-01-01 00:00:00")) {
-            // TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
-            return "current_timestamp";
-        }
-        return "'" + defaultValue + "'";
-    }
-
-    private void initOriginFieldSchema(JsonNode recordRoot) {
-        originFieldSchemaMap = new LinkedHashMap<>();
-        Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
-        if (CollectionUtils.isEmpty(columnNameSet)) {
-            columnNameSet = extractBeforeRow(recordRoot).keySet();
-        }
-        columnNameSet.forEach(
-                columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
-        firstLoad = false;
-    }
-
-    @VisibleForTesting
-    public Map<String, FieldSchema> getOriginFieldSchemaMap() {
-        return originFieldSchemaMap;
-    }
-
-    @VisibleForTesting
-    public void setSourceConnector(String sourceConnector) {
-        this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
-    }
-
-    @VisibleForTesting
-    public void setTableMapping(Map<String, String> tableMapping) {
-        this.tableMapping = tableMapping;
+    public JsonDebeziumSchemaChange getJsonDebeziumSchemaChange() {
+        return this.schemaChange;
     }
 
     public static JsonDebeziumSchemaSerializer.Builder builder() {
@@ -744,21 +233,4 @@
                     targetDatabase);
         }
     }
-
-    private String handleType(String type) {
-
-        if (type == null || "".equals(type)) {
-            return "";
-        }
-
-        // varchar len * 3
-        Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);
-        Matcher matcher = pattern.matcher(type);
-        if (matcher.find()) {
-            String len = matcher.group(1);
-            return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
-        }
-
-        return type;
-    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
new file mode 100644
index 0000000..9c59f14
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.cfg.DorisOptions;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Record the context of schema change and data change during serialization. */
+public class JsonDebeziumChangeContext implements Serializable {
+    private final DorisOptions dorisOptions;
+    // <cdc db.schema.table, doris db.table>
+    private final Map<String, String> tableMapping;
+    // table name of the cdc upstream, format is db.tbl
+    private final String sourceTableName;
+    private final String targetDatabase;
+    // create table properties
+    private final Map<String, String> tableProperties;
+    private final ObjectMapper objectMapper;
+    private final Pattern pattern;
+    private final String lineDelimiter;
+    private final boolean ignoreUpdateBefore;
+
+    public JsonDebeziumChangeContext(
+            DorisOptions dorisOptions,
+            Map<String, String> tableMapping,
+            String sourceTableName,
+            String targetDatabase,
+            Map<String, String> tableProperties,
+            ObjectMapper objectMapper,
+            Pattern pattern,
+            String lineDelimiter,
+            boolean ignoreUpdateBefore) {
+        this.dorisOptions = dorisOptions;
+        this.tableMapping = tableMapping;
+        this.sourceTableName = sourceTableName;
+        this.targetDatabase = targetDatabase;
+        this.tableProperties = tableProperties;
+        this.objectMapper = objectMapper;
+        this.pattern = pattern;
+        this.lineDelimiter = lineDelimiter;
+        this.ignoreUpdateBefore = ignoreUpdateBefore;
+    }
+
+    public DorisOptions getDorisOptions() {
+        return dorisOptions;
+    }
+
+    public Map<String, String> getTableMapping() {
+        return tableMapping;
+    }
+
+    public String getSourceTableName() {
+        return sourceTableName;
+    }
+
+    public String getTargetDatabase() {
+        return targetDatabase;
+    }
+
+    public Map<String, String> getTableProperties() {
+        return tableProperties;
+    }
+
+    public ObjectMapper getObjectMapper() {
+        return objectMapper;
+    }
+
+    public Pattern getPattern() {
+        return pattern;
+    }
+
+    public String getLineDelimiter() {
+        return lineDelimiter;
+    }
+
+    public boolean isIgnoreUpdateBefore() {
+        return ignoreUpdateBefore;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
new file mode 100644
index 0000000..5790c48
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
+
+/**
+ * Convert the data change record of the upstream data source into a byte array that can be imported
+ * into doris through stream load.<br>
+ * Supported data changes include: read, insert, update, delete.
+ */
+public class JsonDebeziumDataChange implements ChangeEvent {
+    private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumDataChange.class);
+
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+    private final ObjectMapper objectMapper;
+    private final DorisOptions dorisOptions;
+    private final boolean ignoreUpdateBefore;
+    private final String lineDelimiter;
+    private JsonDebeziumChangeContext changeContext;
+
+    public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
+        this.changeContext = changeContext;
+        this.dorisOptions = changeContext.getDorisOptions();
+        this.objectMapper = changeContext.getObjectMapper();
+        this.ignoreUpdateBefore = changeContext.isIgnoreUpdateBefore();
+        this.lineDelimiter = changeContext.getLineDelimiter();
+    }
+
+    public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException {
+        // Filter out table records that are not in tableMapping
+        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
+        if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
+            LOG.warn(
+                    "filter table {}, because it is not listened, record detail is {}",
+                    cdcTableIdentifier,
+                    record);
+            return null;
+        }
+
+        Map<String, Object> valueMap;
+        switch (op) {
+            case OP_READ:
+            case OP_CREATE:
+                valueMap = extractAfterRow(recordRoot);
+                addDeleteSign(valueMap, false);
+                break;
+            case OP_UPDATE:
+                return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot));
+            case OP_DELETE:
+                valueMap = extractBeforeRow(recordRoot);
+                addDeleteSign(valueMap, true);
+                break;
+            default:
+                LOG.error("parse record fail, unknown op {} in {}", op, record);
+                return null;
+        }
+
+        return DorisRecord.of(
+                dorisTableIdentifier,
+                objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Change the update event into two.
+     *
+     * @param recordRoot
+     * @return
+     */
+    private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
+        StringBuilder updateRow = new StringBuilder();
+        if (!ignoreUpdateBefore) {
+            // convert delete
+            Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
+            addDeleteSign(beforeRow, true);
+            updateRow.append(objectMapper.writeValueAsString(beforeRow)).append(this.lineDelimiter);
+        }
+
+        // convert insert
+        Map<String, Object> afterRow = extractAfterRow(recordRoot);
+        addDeleteSign(afterRow, false);
+        updateRow.append(objectMapper.writeValueAsString(afterRow));
+        return updateRow.toString().getBytes(StandardCharsets.UTF_8);
+    }
+
+    @VisibleForTesting
+    public String getCdcTableIdentifier(JsonNode record) {
+        String db = extractJsonNode(record.get("source"), "db");
+        String schema = extractJsonNode(record.get("source"), "schema");
+        String table = extractJsonNode(record.get("source"), "table");
+        return SourceSchema.getString(db, schema, table);
+    }
+
+    @VisibleForTesting
+    public String getDorisTableIdentifier(String cdcTableIdentifier) {
+        if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
+            return dorisOptions.getTableIdentifier();
+        }
+        Map<String, String> tableMapping = changeContext.getTableMapping();
+        if (!CollectionUtil.isNullOrEmpty(tableMapping)
+                && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+                && tableMapping.get(cdcTableIdentifier) != null) {
+            return tableMapping.get(cdcTableIdentifier);
+        }
+        return null;
+    }
+
+    private String extractJsonNode(JsonNode record, String key) {
+        return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
+                ? record.get(key).asText()
+                : null;
+    }
+
+    private Map<String, Object> extractBeforeRow(JsonNode record) {
+        return extractRow(record.get("before"));
+    }
+
+    private Map<String, Object> extractAfterRow(JsonNode record) {
+        return extractRow(record.get("after"));
+    }
+
+    private Map<String, Object> extractRow(JsonNode recordRow) {
+        Map<String, Object> recordMap =
+                objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+        return recordMap != null ? recordMap : new HashMap<>();
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
new file mode 100644
index 0000000..4c67164
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Synchronize the schema change in the upstream data source to the doris database table.
+ *
+ * <p>There are two schema change modes:<br>
+ * 1. {@link JsonDebeziumSchemaChangeImpl} only supports table column name and column type changes,
+ * and this mode is used by default. <br>
+ * 2. {@link JsonDebeziumSchemaChangeImplV2} supports table column name, column type, default,
+ * comment synchronization, supports multi-column changes, and supports column name rename. Need to
+ * be enabled by configuring use-new-schema-change.
+ */
+public abstract class JsonDebeziumSchemaChange implements ChangeEvent {
+    protected static String addDropDDLRegex =
+            "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
+    protected Pattern addDropDDLPattern;
+
+    // table name of the cdc upstream, format is db.tbl
+    protected String sourceTableName;
+    protected DorisOptions dorisOptions;
+    protected ObjectMapper objectMapper;
+    // <cdc db.schema.table, doris db.table>
+    protected Map<String, String> tableMapping;
+    protected SchemaChangeManager schemaChangeManager;
+    protected JsonDebeziumChangeContext changeContext;
+
+    public abstract boolean schemaChange(JsonNode recordRoot);
+
+    public abstract void init(JsonNode recordRoot);
+
+    /** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */
+    protected boolean checkTable(JsonNode recordRoot) {
+        String db = extractDatabase(recordRoot);
+        String tbl = extractTable(recordRoot);
+        String dbTbl = db + "." + tbl;
+        return sourceTableName.equals(dbTbl);
+    }
+
+    protected String extractDatabase(JsonNode record) {
+        if (record.get("source").has("schema")) {
+            // compatible with schema
+            return extractJsonNode(record.get("source"), "schema");
+        } else {
+            return extractJsonNode(record.get("source"), "db");
+        }
+    }
+
+    protected String extractTable(JsonNode record) {
+        return extractJsonNode(record.get("source"), "table");
+    }
+
+    protected String extractJsonNode(JsonNode record, String key) {
+        return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
+                ? record.get(key).asText()
+                : null;
+    }
+
+    @VisibleForTesting
+    public String getDorisTableIdentifier(String cdcTableIdentifier) {
+        if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
+            return dorisOptions.getTableIdentifier();
+        }
+        if (!CollectionUtil.isNullOrEmpty(tableMapping)
+                && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+                && tableMapping.get(cdcTableIdentifier) != null) {
+            return tableMapping.get(cdcTableIdentifier);
+        }
+        return null;
+    }
+
+    protected String getDorisTableIdentifier(JsonNode record) {
+        String identifier = getCdcTableIdentifier(record);
+        return getDorisTableIdentifier(identifier);
+    }
+
+    protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
+        String identifier = getDorisTableIdentifier(record);
+        if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
+            return null;
+        }
+        String[] tableInfo = identifier.split("\\.");
+        if (tableInfo.length != 2) {
+            return null;
+        }
+        return Tuple2.of(tableInfo[0], tableInfo[1]);
+    }
+
+    @VisibleForTesting
+    public String getCdcTableIdentifier(JsonNode record) {
+        String db = extractJsonNode(record.get("source"), "db");
+        String schema = extractJsonNode(record.get("source"), "schema");
+        String table = extractJsonNode(record.get("source"), "table");
+        return SourceSchema.getString(db, schema, table);
+    }
+
+    protected JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
+        if (record != null && record.has("historyRecord")) {
+            return objectMapper.readTree(record.get("historyRecord").asText());
+        }
+        // The ddl passed by some scenes will not be included in the historyRecord,
+        // such as DebeziumSourceFunction
+        return record;
+    }
+
+    @VisibleForTesting
+    public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) {
+        this.schemaChangeManager = schemaChangeManager;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
new file mode 100644
index 0000000..4cf0970
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Use expression to match ddl sql. */
+public class JsonDebeziumSchemaChangeImpl extends JsonDebeziumSchemaChange {
+    private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImpl.class);
+    // alter table tbl add cloumn aca int
+    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s";
+
+    public JsonDebeziumSchemaChangeImpl(JsonDebeziumChangeContext changeContext) {
+        this.changeContext = changeContext;
+        this.dorisOptions = changeContext.getDorisOptions();
+        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+        this.sourceTableName = changeContext.getSourceTableName();
+        this.tableMapping = changeContext.getTableMapping();
+        this.objectMapper = changeContext.getObjectMapper();
+        this.addDropDDLPattern =
+                Objects.isNull(changeContext.getPattern())
+                        ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE)
+                        : changeContext.getPattern();
+    }
+
+    @Override
+    public void init(JsonNode recordRoot) {
+        // do nothing
+    }
+
+    @VisibleForTesting
+    @Override
+    public boolean schemaChange(JsonNode recordRoot) {
+        boolean status = false;
+        try {
+            if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
+                return false;
+            }
+            // db,table
+            Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+            if (tuple == null) {
+                return false;
+            }
+
+            String ddl = extractDDL(recordRoot);
+            if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
+                LOG.info("ddl can not do schema change:{}", recordRoot);
+                return false;
+            }
+
+            boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl);
+            status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
+            LOG.info("schema change status:{}", status);
+        } catch (Exception ex) {
+            LOG.warn("schema change error :", ex);
+        }
+        return status;
+    }
+
+    private boolean checkSchemaChange(String database, String table, String ddl)
+            throws IOException, IllegalArgumentException {
+        Map<String, Object> param = buildRequestParam(ddl);
+        return schemaChangeManager.checkSchemaChange(database, table, param);
+    }
+
+    /** Build param { "isDropColumn": true, "columnName" : "column" }. */
+    protected Map<String, Object> buildRequestParam(String ddl) {
+        Map<String, Object> params = new HashMap<>();
+        Matcher matcher = addDropDDLPattern.matcher(ddl);
+        if (matcher.find()) {
+            String op = matcher.group(1);
+            String col = matcher.group(3);
+            params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
+            params.put("columnName", col);
+        }
+        return params;
+    }
+
+    @VisibleForTesting
+    public String extractDDL(JsonNode record) throws JsonProcessingException {
+        JsonNode historyRecord = extractHistoryRecord(record);
+        String ddl = extractJsonNode(historyRecord, "ddl");
+        LOG.debug("received debezium ddl :{}", ddl);
+        if (!Objects.isNull(ddl)) {
+            // filter add/drop operation
+            Matcher matcher = addDropDDLPattern.matcher(ddl);
+            if (matcher.find()) {
+                String op = matcher.group(1);
+                String col = matcher.group(3);
+                String type = matcher.group(5);
+                type = handleType(type);
+                ddl = String.format(EXECUTE_DDL, getDorisTableIdentifier(record), op, col, type);
+                LOG.info("parse ddl:{}", ddl);
+                return ddl;
+            }
+        }
+        return null;
+    }
+
+    private String handleType(String type) {
+        if (type == null || "".equals(type)) {
+            return "";
+        }
+        // varchar len * 3
+        Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(type);
+        if (matcher.find()) {
+            String len = matcher.group(1);
+            return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
+        }
+
+        return type;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
new file mode 100644
index 0000000..5604da7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -0,0 +1,389 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+import org.apache.doris.flink.tools.cdc.oracle.OracleType;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Extract the columns that need to be changed based on the change records of the upstream data
+ * source.
+ */
+public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
+    private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImplV2.class);
+    private static final Pattern renameDDLPattern =
+            Pattern.compile(
+                    "ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
+                    Pattern.CASE_INSENSITIVE);
+    private Map<String, FieldSchema> originFieldSchemaMap;
+    private SourceConnector sourceConnector;
+    // create table properties
+    private final Map<String, String> tableProperties;
+    private String targetDatabase;
+
+    public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
+        this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE);
+        this.changeContext = changeContext;
+        this.sourceTableName = changeContext.getSourceTableName();
+        this.dorisOptions = changeContext.getDorisOptions();
+        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+        this.targetDatabase = changeContext.getTargetDatabase();
+        this.tableProperties = changeContext.getTableProperties();
+        this.tableMapping = changeContext.getTableMapping();
+        this.objectMapper = changeContext.getObjectMapper();
+    }
+
+    @Override
+    public void init(JsonNode recordRoot) {
+        originFieldSchemaMap = new LinkedHashMap<>();
+        Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
+        if (CollectionUtils.isEmpty(columnNameSet)) {
+            columnNameSet = extractBeforeRow(recordRoot).keySet();
+        }
+        columnNameSet.forEach(
+                columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
+    }
+
+    @Override
+    public boolean schemaChange(JsonNode recordRoot) {
+        boolean status = false;
+        try {
+            if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
+                return false;
+            }
+
+            EventType eventType = extractEventType(recordRoot);
+            if (eventType == null) {
+                return false;
+            }
+            if (eventType.equals(EventType.CREATE)) {
+                TableSchema tableSchema = extractCreateTableSchema(recordRoot);
+                status = schemaChangeManager.createTable(tableSchema);
+                if (status) {
+                    String cdcTbl = getCdcTableIdentifier(recordRoot);
+                    String dorisTbl = getCreateTableIdentifier(recordRoot);
+                    changeContext.getTableMapping().put(cdcTbl, dorisTbl);
+                    this.tableMapping = changeContext.getTableMapping();
+                    LOG.info("create table ddl status: {}", status);
+                }
+            } else if (eventType.equals(EventType.ALTER)) {
+                // db,table
+                Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+                if (tuple == null) {
+                    return false;
+                }
+                List<String> ddlSqlList = extractDDLList(recordRoot);
+                if (CollectionUtils.isEmpty(ddlSqlList)) {
+                    LOG.info("ddl can not do schema change:{}", recordRoot);
+                    return false;
+                }
+                List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
+                for (int i = 0; i < ddlSqlList.size(); i++) {
+                    DDLSchema ddlSchema = ddlSchemas.get(i);
+                    String ddlSql = ddlSqlList.get(i);
+                    boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
+                    status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
+                    LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
+                }
+            } else {
+                LOG.info("Unsupported event type {}", eventType);
+            }
+        } catch (Exception ex) {
+            LOG.warn("schema change error :", ex);
+        }
+        return status;
+    }
+
+    private JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
+        JsonNode historyRecord = extractHistoryRecord(record);
+        JsonNode tableChanges = historyRecord.get("tableChanges");
+        if (!Objects.isNull(tableChanges)) {
+            JsonNode tableChange = tableChanges.get(0);
+            return tableChange;
+        }
+        return null;
+    }
+
+    /** Parse Alter Event. */
+    @VisibleForTesting
+    public List<String> extractDDLList(JsonNode record) throws IOException {
+        String dorisTable = getDorisTableIdentifier(record);
+        JsonNode historyRecord = extractHistoryRecord(record);
+        String ddl = extractJsonNode(historyRecord, "ddl");
+        JsonNode tableChange = extractTableChange(record);
+        EventType eventType = extractEventType(record);
+        if (Objects.isNull(tableChange)
+                || Objects.isNull(ddl)
+                || !eventType.equals(EventType.ALTER)) {
+            return null;
+        }
+
+        JsonNode columns = tableChange.get("table").get("columns");
+        if (Objects.isNull(sourceConnector)) {
+            sourceConnector =
+                    SourceConnector.valueOf(
+                            record.get("source").get("connector").asText().toUpperCase());
+            fillOriginSchema(columns);
+        }
+
+        // rename ddl
+        Matcher renameMatcher = renameDDLPattern.matcher(ddl);
+        if (renameMatcher.find()) {
+            String oldColumnName = renameMatcher.group(2);
+            String newColumnName = renameMatcher.group(3);
+            return SchemaChangeHelper.generateRenameDDLSql(
+                    dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
+        }
+
+        // add/drop ddl
+        Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
+        for (JsonNode column : columns) {
+            buildFieldSchema(updateFiledSchema, column);
+        }
+        SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
+        // In order to avoid other source table column change operations other than add/drop/rename,
+        // which may lead to the accidental deletion of the doris column.
+        Matcher matcher = addDropDDLPattern.matcher(ddl);
+        if (!matcher.find()) {
+            return null;
+        }
+        return SchemaChangeHelper.generateDDLSql(dorisTable);
+    }
+
+    @VisibleForTesting
+    public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
+        if (sourceConnector == null) {
+            sourceConnector =
+                    SourceConnector.valueOf(
+                            record.get("source").get("connector").asText().toUpperCase());
+        }
+
+        String dorisTable = getCreateTableIdentifier(record);
+        JsonNode tableChange = extractTableChange(record);
+        JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
+        JsonNode columns = tableChange.get("table").get("columns");
+        JsonNode comment = tableChange.get("table").get("comment");
+        String tblComment = comment == null ? "" : comment.asText();
+        Map<String, FieldSchema> field = new LinkedHashMap<>();
+        for (JsonNode column : columns) {
+            buildFieldSchema(field, column);
+        }
+        List<String> pkList = new ArrayList<>();
+        for (JsonNode column : pkColumns) {
+            String fieldName = column.asText();
+            pkList.add(fieldName);
+        }
+
+        TableSchema tableSchema = new TableSchema();
+        tableSchema.setFields(field);
+        tableSchema.setKeys(pkList);
+        tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
+        tableSchema.setTableComment(tblComment);
+        tableSchema.setProperties(tableProperties);
+        tableSchema.setModel(pkList.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE);
+
+        String[] split = dorisTable.split("\\.");
+        Preconditions.checkArgument(split.length == 2);
+        tableSchema.setDatabase(split[0]);
+        tableSchema.setTable(split[1]);
+        return tableSchema;
+    }
+
+    private List<String> buildDistributeKeys(
+            List<String> primaryKeys, Map<String, FieldSchema> fields) {
+        if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
+            return primaryKeys;
+        }
+        if (!fields.isEmpty()) {
+            Map.Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
+            return Collections.singletonList(firstField.getKey());
+        }
+        return new ArrayList<>();
+    }
+
+    private String getCreateTableIdentifier(JsonNode record) {
+        String table = extractJsonNode(record.get("source"), "table");
+        return targetDatabase + "." + table;
+    }
+
+    private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
+            throws IOException, IllegalArgumentException {
+        Map<String, Object> param =
+                SchemaChangeManager.buildRequestParam(
+                        ddlSchema.isDropColumn(), ddlSchema.getColumnName());
+        return schemaChangeManager.checkSchemaChange(database, table, param);
+    }
+
+    /** Parse event type. */
+    protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
+        JsonNode tableChange = extractTableChange(record);
+        if (tableChange == null || tableChange.get("type") == null) {
+            return null;
+        }
+        String type = tableChange.get("type").asText();
+        if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
+            return EventType.ALTER;
+        } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
+            return EventType.CREATE;
+        }
+        return null;
+    }
+
+    private Map<String, Object> extractBeforeRow(JsonNode record) {
+        return extractRow(record.get("before"));
+    }
+
+    private Map<String, Object> extractAfterRow(JsonNode record) {
+        return extractRow(record.get("after"));
+    }
+
+    private Map<String, Object> extractRow(JsonNode recordRow) {
+        Map<String, Object> recordMap =
+                objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+        return recordMap != null ? recordMap : new HashMap<>();
+    }
+
+    @VisibleForTesting
+    public void fillOriginSchema(JsonNode columns) {
+        if (Objects.nonNull(originFieldSchemaMap)) {
+            for (JsonNode column : columns) {
+                String fieldName = column.get("name").asText();
+                if (originFieldSchemaMap.containsKey(fieldName)) {
+                    String dorisTypeName = buildDorisTypeName(column);
+                    String defaultValue =
+                            handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
+                    String comment = extractJsonNode(column, "comment");
+                    FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
+                    fieldSchema.setName(fieldName);
+                    fieldSchema.setTypeString(dorisTypeName);
+                    fieldSchema.setComment(comment);
+                    fieldSchema.setDefaultValue(defaultValue);
+                }
+            }
+        } else {
+            LOG.error(
+                    "Current schema change failed! You need to ensure that "
+                            + "there is data in the table."
+                            + dorisOptions.getTableIdentifier());
+            originFieldSchemaMap = new LinkedHashMap<>();
+            columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
+        }
+    }
+
+    private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
+        String fieldName = column.get("name").asText();
+        String dorisTypeName = buildDorisTypeName(column);
+        String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
+        String comment = extractJsonNode(column, "comment");
+        filedSchemaMap.put(
+                fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
+    }
+
+    @VisibleForTesting
+    public String buildDorisTypeName(JsonNode column) {
+        int length = column.get("length") == null ? 0 : column.get("length").asInt();
+        int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
+        String sourceTypeName = column.get("typeName").asText();
+        String dorisTypeName;
+        switch (sourceConnector) {
+            case MYSQL:
+                dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
+                break;
+            case ORACLE:
+                dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
+                break;
+            case POSTGRES:
+                dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
+                break;
+            case SQLSERVER:
+                dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
+                break;
+            default:
+                String errMsg = "Not support " + sourceTypeName + " schema change.";
+                throw new UnsupportedOperationException(errMsg);
+        }
+        return dorisTypeName;
+    }
+
+    private String handleDefaultValue(String defaultValue) {
+        if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+            return null;
+        }
+        // Due to historical reasons, doris needs to add quotes to
+        // the default value of the new column
+        // For example in mysql: alter table add column c1 int default 100
+        // In Doris: alter table add column c1 int default '100'
+        if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
+            return defaultValue;
+        } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+            // TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
+            return "current_timestamp";
+        }
+        return "'" + defaultValue + "'";
+    }
+
+    @VisibleForTesting
+    public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
+        this.originFieldSchemaMap = originFieldSchemaMap;
+    }
+
+    @VisibleForTesting
+    public Map<String, FieldSchema> getOriginFieldSchemaMap() {
+        return originFieldSchemaMap;
+    }
+
+    @VisibleForTesting
+    public void setSourceConnector(String sourceConnector) {
+        this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 7da3a30..e32f1b3 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -18,48 +18,34 @@
 package org.apache.doris.flink.sink.writer;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.doris.flink.catalog.doris.FieldSchema;
-import org.apache.doris.flink.catalog.doris.TableSchema;
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.Field;
-import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
 import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
-import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
 import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
-/** test for JsonDebeziumSchemaSerializer. */
+/** Test for JsonDebeziumSchemaSerializer. */
 public class TestJsonDebeziumSchemaSerializer {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(TestJsonDebeziumSchemaSerializer.class);
-    static DorisOptions dorisOptions;
-    static JsonDebeziumSchemaSerializer serializer;
-    static ObjectMapper objectMapper = new ObjectMapper();
 
-    @BeforeClass
-    public static void setUp() {
+    protected static DorisOptions dorisOptions;
+    protected ObjectMapper objectMapper = new ObjectMapper();
+    private JsonDebeziumSchemaSerializer serializer;
+    private SchemaChangeManager mockSchemaChangeManager;
+
+    @Before
+    public void setUp() throws IOException, IllegalArgumentException {
         dorisOptions =
                 DorisOptions.builder()
                         .setFenodes("127.0.0.1:8030")
@@ -67,22 +53,31 @@
                         .setUsername("root")
                         .setPassword("")
                         .build();
-        serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
+        this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+        JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+        this.objectMapper.setNodeFactory(jsonNodeFactory);
+        this.serializer =
+                JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
+        this.mockSchemaChangeManager = Mockito.mock(SchemaChangeManager.class);
+        Mockito.when(
+                        mockSchemaChangeManager.checkSchemaChange(
+                                Mockito.any(), Mockito.any(), Mockito.any()))
+                .thenReturn(true);
     }
 
     @Test
-    public void testSerializeInsert() throws IOException {
-        // insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-01
-        // 10:01:03');
-        byte[] serializedValue =
-                serializer
-                        .serialize(
-                                "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1663923840146,\"transaction\":null}")
-                        .getRow();
+    public void testJsonDebeziumDataChange() throws IOException {
+        // insert into t1
+        // VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-0110:01:03');
+        String record =
+                "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1663923840146,\"transaction\":null}";
+        DorisRecord dorisRecord = serializer.serialize(record);
+        byte[] row = dorisRecord.getRow();
         Map<String, String> valueMap =
                 objectMapper.readValue(
-                        new String(serializedValue, StandardCharsets.UTF_8),
+                        new String(row, StandardCharsets.UTF_8),
                         new TypeReference<Map<String, String>>() {});
+
         Assert.assertEquals("1", valueMap.get("id"));
         Assert.assertEquals("doris", valueMap.get("name"));
         Assert.assertEquals("2022-01-01", valueMap.get("dt"));
@@ -93,388 +88,32 @@
     }
 
     @Test
-    public void testSerializeUpdate() throws IOException {
-        // update t1 set name='doris-update' WHERE id =1;
-        byte[] serializedValue =
-                serializer
-                        .serialize(
-                                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}")
-                        .getRow();
-        Map<String, String> valueMap =
-                objectMapper.readValue(
-                        new String(serializedValue, StandardCharsets.UTF_8),
-                        new TypeReference<Map<String, String>>() {});
-        Assert.assertEquals("1", valueMap.get("id"));
-        Assert.assertEquals("doris-update", valueMap.get("name"));
-        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
-        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
-        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
-        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
-        Assert.assertEquals(6, valueMap.size());
+    public void testTestJsonDebeziumSchemaChangeImpl() throws IOException {
+        // ALTER TABLE test.t1 add COLUMN c_1 varchar(600)
+        String record =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        JsonDebeziumSchemaChange schemaChange = serializer.getJsonDebeziumSchemaChange();
+        schemaChange.setSchemaChangeManager(mockSchemaChangeManager);
+        DorisRecord serializeRecord = serializer.serialize(record);
+        Assert.assertNull(serializeRecord);
     }
 
     @Test
-    public void testSerializeUpdateBefore() throws IOException {
-        serializer =
+    public void testTestJsonDebeziumSchemaChangeImplV2() throws IOException {
+        // alter table test_sink add column c4 varchar(100) default 'aaa', drop column c3;
+        String recordValue =
+                "{\"before\":null,\"after\":{\"id\":1,\"c3\":1111},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":0,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":0,\"gtid\":null,\"file\":\"\",\"pos\":0,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":1703151083107,\"transaction\":null}";
+        String recordSchema =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1703151236431,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000050\",\"pos\":6040,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000050\\\",\\\"pos\\\":6040,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1703151236,\\\"file\\\":\\\"binlog.000050\\\",\\\"pos\\\":6206,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink add column c4 varchar(100) default 'aaa', drop column c3\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"aaa\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+        this.serializer =
                 JsonDebeziumSchemaSerializer.builder()
                         .setDorisOptions(dorisOptions)
-                        .setExecutionOptions(
-                                DorisExecutionOptions.builderDefaults()
-                                        .setIgnoreUpdateBefore(false)
-                                        .build())
+                        .setNewSchemaChange(true)
                         .build();
-        // update t1 set name='doris-update' WHERE id =1;
-        byte[] serializedValue =
-                serializer
-                        .serialize(
-                                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}")
-                        .getRow();
-        String row = new String(serializedValue, StandardCharsets.UTF_8);
-        String[] split = row.split("\n");
-        Map<String, String> valueMap =
-                objectMapper.readValue(split[1], new TypeReference<Map<String, String>>() {});
-        Assert.assertEquals("1", valueMap.get("id"));
-        Assert.assertEquals("doris-update", valueMap.get("name"));
-        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
-        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
-        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
-        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
-        Assert.assertEquals(6, valueMap.size());
-
-        Map<String, String> beforeMap =
-                objectMapper.readValue(split[0], new TypeReference<Map<String, String>>() {});
-        Assert.assertEquals("doris", beforeMap.get("name"));
-    }
-
-    @Test
-    public void testSerializeDelete() throws IOException {
-        byte[] serializedValue =
-                serializer
-                        .serialize(
-                                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}")
-                        .getRow();
-        Map<String, String> valueMap =
-                objectMapper.readValue(
-                        new String(serializedValue, StandardCharsets.UTF_8),
-                        new TypeReference<Map<String, String>>() {});
-        Assert.assertEquals("1", valueMap.get("id"));
-        Assert.assertEquals("doris-update", valueMap.get("name"));
-        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
-        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
-        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
-        Assert.assertEquals("1", valueMap.get("__DORIS_DELETE_SIGN__"));
-        Assert.assertEquals(6, valueMap.size());
-    }
-
-    @Test
-    public void testExtractDDL() throws IOException {
-        String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)";
-        String record =
-                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        String ddl = serializer.extractDDL(recordRoot);
-        Assert.assertEquals(srcDDL, ddl);
-
-        String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)";
-        String record1 =
-                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(30000)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
-        JsonNode recordRoot1 = objectMapper.readTree(record1);
-        String ddl1 = serializer.extractDDL(recordRoot1);
-        Assert.assertEquals(targetDDL, ddl1);
-    }
-
-    @Test
-    public void testExtractDDLListMultipleColumns() throws IOException {
-        String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'";
-        String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
-        String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
-        String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13";
-        List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
-
-        Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
-        originFiledSchemaMap.put("c2", new FieldSchema());
-        originFiledSchemaMap.put("c555", new FieldSchema());
-        originFiledSchemaMap.put("c666", new FieldSchema());
-        originFiledSchemaMap.put("c4", new FieldSchema());
-        originFiledSchemaMap.put("c13", new FieldSchema());
-
-        String record =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        serializer.setOriginFieldSchemaMap(originFiledSchemaMap);
-        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
-        for (int i = 0; i < ddlSQLList.size(); i++) {
-            String srcSQL = srcSqlList.get(i);
-            String targetSQL = ddlSQLList.get(i);
-            Assert.assertEquals(srcSQL, targetSQL);
-        }
-    }
-
-    @Test
-    public void testExtractDDLListCreateTable() throws IOException {
-        String record =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n  CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n   (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t\\\"c100\\\" LONG, \\n\\t\\\"c55\\\" VARCHAR2(255), \\n\\t\\\"c77\\\" VARCHAR2(255), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n   ) ;\\n \",\"tableChanges\":[{\"type\":\"CREATE\",\"id\":\"\\\"HELOWIN\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\",\"table\":{\"defaultCharsetName\":null,\"primaryKeyColumnNames\":[\"ID\"],\"columns\":[{\"name\":\"ID\",\"jdbcType\":2,\"nativeType\":null,\"typeName\":\"NUMBER\",\"typeExpression\":\"NUMBER\",\"charsetName\":null,\"length\":10,\"scale\":0,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"NAME4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"age4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c100\",\"jdbcType\":-1,\"nativeType\":null,\"typeName\":\"LONG\",\"typeExpression\":\"LONG\",\"charsetName\":null,\"length\":0,\"scale\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c55\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c77\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null}],\"comment\":null}}]}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        serializer.setSourceConnector("oracle");
-        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
-        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
-        serializer.setSourceConnector("mysql");
-    }
-
-    @Test
-    public void testExtractDDLListTruncateTable() throws IOException {
-        String record =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944601264,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5719,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5719,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944601,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5824,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"truncate table test_sink11\\\",\\\"tableChanges\\\":[]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
-        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
-    }
-
-    @Test
-    public void testExtractDDLListDropTable() throws IOException {
-        String record =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944747956,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5901,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5901,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944747,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6037,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"DROP TABLE `test`.`test_sink11`\\\",\\\"tableChanges\\\":[]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
-        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
-    }
-
-    @Test
-    public void testExtractDDLListChangeColumn() throws IOException {
-        String record =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945030,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6661,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink change column c555 c777 bigint\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
-        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
-    }
-
-    @Test
-    public void testExtractDDLListModifyColumn() throws IOException {
-        String record =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945306941,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6738,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6738,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945306,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6884,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink modify column c777 tinyint default 7\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":5,\\\"typeName\\\":\\\"TINYINT\\\",\\\"typeExpression\\\":\\\"TINYINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"7\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
-        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
-    }
-
-    @Test
-    public void testExtractDDLListRenameColumn() throws IOException {
-        String record =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691034519,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23886,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c22 to c33\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c33\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
-        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
-    }
-
-    @Test
-    public void testFillOriginSchema() throws IOException {
-        Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
-        srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
-        srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null));
-        srcFiledSchemaMap.put(
-                "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null));
-        srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null));
-
-        serializer.setSourceConnector("mysql");
-        String columnsString =
-                "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]},{\"name\":\"cc\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]}]";
-        JsonNode columns = objectMapper.readTree(columnsString);
-        serializer.fillOriginSchema(columns);
-        Map<String, FieldSchema> originFieldSchemaMap = serializer.getOriginFieldSchemaMap();
-
-        Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
-                originFieldSchemaMap.entrySet().iterator();
-        for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
-            FieldSchema srcFiledSchema = entry.getValue();
-            Entry<String, FieldSchema> originField = originFieldSchemaIterator.next();
-
-            Assert.assertEquals(entry.getKey(), originField.getKey());
-            Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName());
-            Assert.assertEquals(
-                    srcFiledSchema.getTypeString(), originField.getValue().getTypeString());
-            Assert.assertEquals(
-                    srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue());
-            Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment());
-        }
-    }
-
-    @Test
-    public void testBuildMysql2DorisTypeName() throws IOException {
-        String columnInfo =
-                "{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10\",\"enumValues\":[]}";
-        serializer.setSourceConnector("mysql");
-        JsonNode columns = objectMapper.readTree(columnInfo);
-        String dorisTypeName = serializer.buildDorisTypeName(columns);
-        Assert.assertEquals(dorisTypeName, "INT");
-    }
-
-    @Test
-    public void testBuildOracle2DorisTypeName() throws IOException {
-        String columnInfo =
-                "{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}";
-        serializer.setSourceConnector("oracle");
-        JsonNode columns = objectMapper.readTree(columnInfo);
-        String dorisTypeName = serializer.buildDorisTypeName(columns);
-        Assert.assertEquals(dorisTypeName, "VARCHAR(384)");
-    }
-
-    @Test
-    public void testExtractDDLListRename() throws IOException {
-        String columnInfo =
-                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1698314781,\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5331,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c3 to c333\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c333\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":10,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
-        Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
-        JsonNode record = objectMapper.readTree(columnInfo);
-
-        DorisOptions dorisOptions =
-                DorisOptions.builder()
-                        .setFenodes("127.0.0.1:8030")
-                        .setTableIdentifier("test.t1")
-                        .setUsername("root")
-                        .setPassword("")
-                        .build();
-        JsonDebeziumSchemaSerializer serializer =
-                JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
-        serializer.setSourceConnector("mysql");
-
-        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
-        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
-        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
-        serializer.setOriginFieldSchemaMap(originFieldSchemaMap);
-
-        List<String> ddlList = serializer.extractDDLList(record);
-        Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", ddlList.get(0));
-    }
-
-    @Ignore
-    @Test
-    public void testSerializeAddColumn() throws IOException, DorisException {
-        // alter table t1 add  column  c_1 varchar(200)
-        String record =
-                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n column  c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        boolean flag = serializer.schemaChange(recordRoot);
-        Assert.assertEquals(true, flag);
-
-        Field targetField = getField("c_1");
-        Assert.assertNotNull(targetField);
-        Assert.assertEquals("c_1", targetField.getName());
-        Assert.assertEquals("VARCHAR", targetField.getType());
-    }
-
-    @Ignore
-    @Test
-    public void testSerializeDropColumn() throws IOException, DorisException {
-        // alter table  t1 drop  column  c_1;
-        String ddl =
-                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663925897,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13422,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table    t1 drop \\\\n column  c_1\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
-        JsonNode recordRoot = objectMapper.readTree(ddl);
-        boolean flag = serializer.schemaChange(recordRoot);
-        Assert.assertEquals(true, flag);
-
-        Field targetField = getField("c_1");
-        Assert.assertNull(targetField);
-    }
-
-    private static Field getField(String column) throws DorisException {
-        // get table schema
-        Schema schema =
-                RestService.getSchema(dorisOptions, DorisReadOptions.builder().build(), LOG);
-        List<Field> properties = schema.getProperties();
-        Field targetField = null;
-        for (Field field : properties) {
-            if (column.equals(field.getName())) {
-                targetField = field;
-                break;
-            }
-        }
-        return targetField;
-    }
-
-    @Test
-    public void testGetCdcTableIdentifier() throws Exception {
-        String insert =
-                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
-        JsonNode recordRoot = objectMapper.readTree(insert);
-        String identifier = serializer.getCdcTableIdentifier(recordRoot);
-        Assert.assertEquals("test.t1", identifier);
-
-        String insertSchema =
-                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
-        String identifierSchema =
-                serializer.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
-        Assert.assertEquals("test.dbo.t1", identifierSchema);
-
-        String ddl =
-                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
-        String ddlRes = serializer.getCdcTableIdentifier(objectMapper.readTree(ddl));
-        Assert.assertEquals("test.t1", ddlRes);
-    }
-
-    @Test
-    public void testGetDorisTableIdentifier() throws Exception {
-        Map<String, String> map = new HashMap<>();
-        map.put("test.dbo.t1", "test.t1");
-        serializer.setTableMapping(map);
-        String identifier = serializer.getDorisTableIdentifier("test.dbo.t1");
-        Assert.assertEquals("test.t1", identifier);
-
-        identifier = serializer.getDorisTableIdentifier("test.t1");
-        Assert.assertEquals("test.t1", identifier);
-
-        String tmp = dorisOptions.getTableIdentifier();
-        dorisOptions.setTableIdentifier(null);
-        identifier = serializer.getDorisTableIdentifier("test.t1");
-        Assert.assertNull(identifier);
-        dorisOptions.setTableIdentifier(tmp);
-    }
-
-    @Test
-    public void testSchemaChangeMultiTable() throws Exception {
-        Map<String, String> map = new HashMap<>();
-        map.put("mysql.t1", "doris.t1");
-        map.put("mysql.t2", "doris.t2");
-        serializer.setTableMapping(map);
-        String tmp = dorisOptions.getTableIdentifier();
-        dorisOptions.setTableIdentifier(null);
-        String ddl1 =
-                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
-        String ddl2 =
-                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
-        String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)";
-        String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)";
-
-        Assert.assertEquals(exceptDDL1, serializer.extractDDL(objectMapper.readTree(ddl1)));
-        Assert.assertEquals(exceptDDL2, serializer.extractDDL(objectMapper.readTree(ddl2)));
-
-        // Assert.assertEquals(exceptDDL1, serializer.extractDDLList(objectMapper.readTree(ddl1)));
-        // Assert.assertEquals(exceptDDL2, serializer.extractDDLList(objectMapper.readTree(ddl2)));
-
-        dorisOptions.setTableIdentifier(tmp);
-    }
-
-    @Test
-    @Ignore
-    public void testAutoCreateTable() throws Exception {
-        String record =
-                "{    \"source\":{        \"version\":\"1.9.7.Final\",        \"connector\":\"oracle\",        \"name\":\"oracle_logminer\",        \"ts_ms\":1696945825065,        \"snapshot\":\"true\",        \"db\":\"TESTDB\",        \"sequence\":null,        \"schema\":\"ADMIN\",        \"table\":\"PERSONS\",        \"txId\":null,        \"scn\":\"1199617\",        \"commit_scn\":null,        \"lcr_position\":null,        \"rs_id\":null,        \"ssn\":0,        \"redo_thread\":null    },    \"databaseName\":\"TESTDB\",    \"schemaName\":\"ADMIN\",    \"ddl\":\"\\n  CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n   (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n   ) ;\\n \",    \"tableChanges\":[        {            \"type\":\"CREATE\",            \"id\":\"\\\"TESTDB\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\",            \"table\":{                \"defaultCharsetName\":null,                \"primaryKeyColumnNames\":[                    \"ID\"                ],                \"columns\":[                    {                        \"name\":\"ID\",                        \"jdbcType\":2,                        \"nativeType\":null,                        \"typeName\":\"NUMBER\",                        \"typeExpression\":\"NUMBER\",                        \"charsetName\":null,                        \"length\":10,                        \"scale\":0,                        \"position\":1,                        \"optional\":false,                        \"autoIncremented\":false,                        \"generated\":false,                        \"comment\":null                    },                    {                        \"name\":\"NAME4\",                        \"jdbcType\":12,                        \"nativeType\":null,                        \"typeName\":\"VARCHAR2\",                        \"typeExpression\":\"VARCHAR2\",                        \"charsetName\":null,                        \"length\":128,                        \"scale\":null,                        \"position\":2,                        \"optional\":false,                        \"autoIncremented\":false,                        \"generated\":false,                        \"comment\":null                    },                    {                        \"name\":\"age4\",                        \"jdbcType\":12,                        \"nativeType\":null,                        \"typeName\":\"VARCHAR2\",                        \"typeExpression\":\"VARCHAR2\",                        \"charsetName\":null,                        \"length\":128,                        \"scale\":null,                        \"position\":3,                        \"optional\":true,                        \"autoIncremented\":false,                        \"generated\":false,                        \"comment\":null                    }                ],                \"comment\":null            }        }    ]}";
-        JsonNode recordRoot = objectMapper.readTree(record);
-        dorisOptions =
-                DorisOptions.builder()
-                        .setFenodes("127.0.0.1:8030")
-                        .setTableIdentifier("")
-                        .setUsername("root")
-                        .setPassword("")
-                        .build();
-        serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
-        serializer.setSourceConnector(SourceConnector.ORACLE.connectorName);
-        TableSchema tableSchema = serializer.extractCreateTableSchema(recordRoot);
-        Assert.assertEquals("TESTDB", tableSchema.getDatabase());
-        Assert.assertEquals("PERSONS", tableSchema.getTable());
-        Assert.assertArrayEquals(new String[] {"ID"}, tableSchema.getKeys().toArray());
-        Assert.assertEquals(3, tableSchema.getFields().size());
-        Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
-        Assert.assertEquals("NAME4", tableSchema.getFields().get("NAME4").getName());
-        Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName());
-        serializer.setSourceConnector(SourceConnector.MYSQL.connectorName);
+        JsonDebeziumSchemaChange schemaChange = serializer.getJsonDebeziumSchemaChange();
+        schemaChange.setSchemaChangeManager(mockSchemaChangeManager);
+        serializer.serialize(recordValue);
+        DorisRecord serializeRecord = serializer.serialize(recordSchema);
+        Assert.assertNull(serializeRecord);
     }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeBase.java
new file mode 100644
index 0000000..d88be9a
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeBase.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.LoadConstants;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test base for data change and schema change. */
+public class TestJsonDebeziumChangeBase {
+
+    protected static DorisOptions dorisOptions;
+    protected Map<String, String> tableMapping = new HashMap<>();
+    protected boolean ignoreUpdateBefore = true;
+    protected String lineDelimiter = LoadConstants.LINE_DELIMITER_DEFAULT;
+    protected ObjectMapper objectMapper = new ObjectMapper();
+
+    @Before
+    public void setUp() {
+        dorisOptions =
+                DorisOptions.builder()
+                        .setFenodes("127.0.0.1:8030")
+                        .setTableIdentifier("test.t1")
+                        .setUsername("root")
+                        .setPassword("")
+                        .build();
+        this.tableMapping.put("mysql.t1", "doris.t1");
+        this.tableMapping.put("mysql.t2", "doris.t2");
+        this.tableMapping.put("test.dbo.t1", "test.t1");
+        this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+        JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+        this.objectMapper.setNodeFactory(jsonNodeFactory);
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
new file mode 100644
index 0000000..d789807
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/** Test for JsonDebeziumDataChange. */
+public class TestJsonDebeziumDataChange extends TestJsonDebeziumChangeBase {
+
+    private JsonDebeziumDataChange dataChange;
+    private JsonDebeziumChangeContext changeContext;
+
+    @Before
+    public void setUp() {
+        super.setUp();
+        changeContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        null,
+                        null,
+                        null,
+                        objectMapper,
+                        null,
+                        lineDelimiter,
+                        ignoreUpdateBefore);
+        dataChange = new JsonDebeziumDataChange(changeContext);
+    }
+
+    @Test
+    public void testSerializeInsert() throws IOException {
+        // insert into t1
+        // VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-0110:01:03');
+        String record =
+                "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1663923840146,\"transaction\":null}";
+        Map<String, String> valueMap = extractValueMap(record);
+        Assert.assertEquals("1", valueMap.get("id"));
+        Assert.assertEquals("doris", valueMap.get("name"));
+        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+        Assert.assertEquals(6, valueMap.size());
+    }
+
+    @Test
+    public void testSerializeUpdate() throws IOException {
+        // update t1 set name='doris-update' WHERE id =1;
+        String record =
+                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}";
+        Map<String, String> valueMap = extractValueMap(record);
+        Assert.assertEquals("1", valueMap.get("id"));
+        Assert.assertEquals("doris-update", valueMap.get("name"));
+        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+        Assert.assertEquals(6, valueMap.size());
+    }
+
+    @Test
+    public void testSerializeDelete() throws IOException {
+        String record =
+                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+        Map<String, String> valueMap = extractValueMap(record);
+        Assert.assertEquals("1", valueMap.get("id"));
+        Assert.assertEquals("doris-update", valueMap.get("name"));
+        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+        Assert.assertEquals("1", valueMap.get("__DORIS_DELETE_SIGN__"));
+        Assert.assertEquals(6, valueMap.size());
+    }
+
+    @Test
+    public void testSerializeUpdateBefore() throws IOException {
+        changeContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        null,
+                        null,
+                        null,
+                        objectMapper,
+                        null,
+                        lineDelimiter,
+                        false);
+        dataChange = new JsonDebeziumDataChange(changeContext);
+
+        // update t1 set name='doris-update' WHERE id =1;
+        String record =
+                "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}";
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+        String op = extractJsonNode(recordRoot, "op");
+        DorisRecord dorisRecord = dataChange.serialize(record, recordRoot, op);
+        byte[] serializedValue = dorisRecord.getRow();
+        String row = new String(serializedValue, StandardCharsets.UTF_8);
+        String[] split = row.split("\n");
+        Map<String, String> valueMap =
+                objectMapper.readValue(split[1], new TypeReference<Map<String, String>>() {});
+
+        Assert.assertEquals("1", valueMap.get("id"));
+        Assert.assertEquals("doris-update", valueMap.get("name"));
+        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+        Assert.assertEquals(6, valueMap.size());
+
+        Map<String, String> beforeMap =
+                objectMapper.readValue(split[0], new TypeReference<Map<String, String>>() {});
+        Assert.assertEquals("doris", beforeMap.get("name"));
+    }
+
+    private Map<String, String> extractValueMap(String record) throws IOException {
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+        String op = extractJsonNode(recordRoot, "op");
+        DorisRecord dorisRecord = dataChange.serialize(record, recordRoot, op);
+        byte[] serializedValue = dorisRecord.getRow();
+        return objectMapper.readValue(
+                new String(serializedValue, StandardCharsets.UTF_8),
+                new TypeReference<Map<String, String>>() {});
+    }
+
+    @Test
+    public void testGetCdcTableIdentifier() throws Exception {
+        String insert =
+                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+        JsonNode recordRoot = objectMapper.readTree(insert);
+        String identifier = dataChange.getCdcTableIdentifier(recordRoot);
+        Assert.assertEquals("test.t1", identifier);
+
+        String insertSchema =
+                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+        String identifierSchema =
+                dataChange.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+        Assert.assertEquals("test.dbo.t1", identifierSchema);
+
+        String ddl =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        String ddlRes = dataChange.getCdcTableIdentifier(objectMapper.readTree(ddl));
+        Assert.assertEquals("test.t1", ddlRes);
+    }
+
+    @Test
+    public void testGetDorisTableIdentifier() throws Exception {
+        String identifier = dataChange.getDorisTableIdentifier("test.dbo.t1");
+        Assert.assertEquals("test.t1", identifier);
+
+        identifier = dataChange.getDorisTableIdentifier("test.t1");
+        Assert.assertEquals("test.t1", identifier);
+
+        String tmp = dorisOptions.getTableIdentifier();
+        dorisOptions.setTableIdentifier(null);
+        identifier = dataChange.getDorisTableIdentifier("test.t1");
+        Assert.assertNull(identifier);
+        dorisOptions.setTableIdentifier(tmp);
+    }
+
+    private String extractJsonNode(JsonNode record, String key) {
+        return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
+                ? record.get(key).asText()
+                : null;
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
new file mode 100644
index 0000000..9b003a8
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Test for JsonDebeziumSchemaChangeImpl. */
+public class TestJsonDebeziumSchemaChangeImpl extends TestJsonDebeziumChangeBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TestJsonDebeziumSchemaChangeImpl.class);
+
+    private JsonDebeziumSchemaChangeImpl schemaChange;
+    private JsonDebeziumChangeContext changeContext;
+
+    @Before
+    public void setUp() {
+        super.setUp();
+        changeContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        null,
+                        null,
+                        null,
+                        objectMapper,
+                        null,
+                        lineDelimiter,
+                        ignoreUpdateBefore);
+        schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
+    }
+
+    @Test
+    public void testExtractDDL() throws IOException {
+        String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)";
+        String record =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        String ddl = schemaChange.extractDDL(recordRoot);
+        Assert.assertEquals(srcDDL, ddl);
+
+        String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)";
+        String record1 =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(30000)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        JsonNode recordRoot1 = objectMapper.readTree(record1);
+        String ddl1 = schemaChange.extractDDL(recordRoot1);
+        Assert.assertEquals(targetDDL, ddl1);
+    }
+
+    @Test
+    public void testSchemaChangeMultiTable() throws Exception {
+        String tmp = dorisOptions.getTableIdentifier();
+        dorisOptions.setTableIdentifier(null);
+        String ddl1 =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        String ddl2 =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)";
+        String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)";
+
+        Assert.assertEquals(exceptDDL1, schemaChange.extractDDL(objectMapper.readTree(ddl1)));
+        Assert.assertEquals(exceptDDL2, schemaChange.extractDDL(objectMapper.readTree(ddl2)));
+
+        // Assert.assertEquals(exceptDDL1, serializer.extractDDLList(objectMapper.readTree(ddl1)));
+        // Assert.assertEquals(exceptDDL2, serializer.extractDDLList(objectMapper.readTree(ddl2)));
+
+        dorisOptions.setTableIdentifier(tmp);
+    }
+
+    @Ignore
+    @Test
+    public void testSerializeAddColumn() throws IOException, DorisException {
+        // alter table t1 add  column  c_1 varchar(200)
+        String record =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n column  c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        boolean flag = schemaChange.schemaChange(recordRoot);
+        Assert.assertEquals(true, flag);
+
+        Field targetField = getField("c_1");
+        Assert.assertNotNull(targetField);
+        Assert.assertEquals("c_1", targetField.getName());
+        Assert.assertEquals("VARCHAR", targetField.getType());
+    }
+
+    @Ignore
+    @Test
+    public void testSerializeDropColumn() throws IOException, DorisException {
+        // alter table  t1 drop  column  c_1;
+        String ddl =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663925897,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13422,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table    t1 drop \\\\n column  c_1\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(ddl);
+        boolean flag = schemaChange.schemaChange(recordRoot);
+        Assert.assertEquals(true, flag);
+
+        Field targetField = getField("c_1");
+        Assert.assertNull(targetField);
+    }
+
+    private static Field getField(String column) throws DorisException {
+        // get table schema
+        Schema schema =
+                RestService.getSchema(dorisOptions, DorisReadOptions.builder().build(), LOG);
+        List<Field> properties = schema.getProperties();
+        Field targetField = null;
+        for (Field field : properties) {
+            if (column.equals(field.getName())) {
+                targetField = field;
+                break;
+            }
+        }
+        return targetField;
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
new file mode 100644
index 0000000..adc87b1
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/** Test for JsonDebeziumSchemaChangeImplV2. */
+public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBase {
+
+    private JsonDebeziumSchemaChangeImplV2 schemaChange;
+    private JsonDebeziumChangeContext changeContext;
+
+    @Before
+    public void setUp() {
+        super.setUp();
+        String sourceTableName = null;
+        String targetDatabase = "TESTDB";
+        Map<String, String> tableProperties = new HashMap<>();
+        changeContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        sourceTableName,
+                        targetDatabase,
+                        tableProperties,
+                        objectMapper,
+                        null,
+                        lineDelimiter,
+                        ignoreUpdateBefore);
+        schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
+    }
+
+    @Test
+    public void testExtractDDLListMultipleColumns() throws IOException {
+        String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'";
+        String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
+        String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
+        String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13";
+        List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
+
+        Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
+        originFiledSchemaMap.put("c2", new FieldSchema());
+        originFiledSchemaMap.put("c555", new FieldSchema());
+        originFiledSchemaMap.put("c666", new FieldSchema());
+        originFiledSchemaMap.put("c4", new FieldSchema());
+        originFiledSchemaMap.put("c13", new FieldSchema());
+
+        String record =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        schemaChange.setOriginFieldSchemaMap(originFiledSchemaMap);
+        List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+        for (int i = 0; i < ddlSQLList.size(); i++) {
+            String srcSQL = srcSqlList.get(i);
+            String targetSQL = ddlSQLList.get(i);
+            Assert.assertEquals(srcSQL, targetSQL);
+        }
+    }
+
+    @Test
+    public void testExtractDDLListCreateTable() throws IOException {
+        String record =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n  CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n   (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t\\\"c100\\\" LONG, \\n\\t\\\"c55\\\" VARCHAR2(255), \\n\\t\\\"c77\\\" VARCHAR2(255), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n   ) ;\\n \",\"tableChanges\":[{\"type\":\"CREATE\",\"id\":\"\\\"HELOWIN\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\",\"table\":{\"defaultCharsetName\":null,\"primaryKeyColumnNames\":[\"ID\"],\"columns\":[{\"name\":\"ID\",\"jdbcType\":2,\"nativeType\":null,\"typeName\":\"NUMBER\",\"typeExpression\":\"NUMBER\",\"charsetName\":null,\"length\":10,\"scale\":0,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"NAME4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"age4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c100\",\"jdbcType\":-1,\"nativeType\":null,\"typeName\":\"LONG\",\"typeExpression\":\"LONG\",\"charsetName\":null,\"length\":0,\"scale\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c55\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c77\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null}],\"comment\":null}}]}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        schemaChange.setSourceConnector("oracle");
+        List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+        schemaChange.setSourceConnector("mysql");
+    }
+
+    @Test
+    public void testExtractDDLListTruncateTable() throws IOException {
+        String record =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944601264,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5719,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5719,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944601,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5824,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"truncate table test_sink11\\\",\\\"tableChanges\\\":[]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListDropTable() throws IOException {
+        String record =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944747956,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5901,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5901,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944747,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6037,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"DROP TABLE `test`.`test_sink11`\\\",\\\"tableChanges\\\":[]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListChangeColumn() throws IOException {
+        String record =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945030,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6661,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink change column c555 c777 bigint\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListModifyColumn() throws IOException {
+        String record =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945306941,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6738,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6738,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945306,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6884,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink modify column c777 tinyint default 7\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":5,\\\"typeName\\\":\\\"TINYINT\\\",\\\"typeExpression\\\":\\\"TINYINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"7\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListRenameColumn() throws IOException {
+        String record =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691034519,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23886,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c22 to c33\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c33\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testFillOriginSchema() throws IOException {
+        Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+        srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+        srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null));
+        srcFiledSchemaMap.put(
+                "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null));
+        srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null));
+
+        schemaChange.setSourceConnector("mysql");
+        String columnsString =
+                "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]},{\"name\":\"cc\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]}]";
+        JsonNode columns = objectMapper.readTree(columnsString);
+        schemaChange.fillOriginSchema(columns);
+        Map<String, FieldSchema> originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap();
+
+        Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+                originFieldSchemaMap.entrySet().iterator();
+        for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+            FieldSchema srcFiledSchema = entry.getValue();
+            Entry<String, FieldSchema> originField = originFieldSchemaIterator.next();
+
+            Assert.assertEquals(entry.getKey(), originField.getKey());
+            Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName());
+            Assert.assertEquals(
+                    srcFiledSchema.getTypeString(), originField.getValue().getTypeString());
+            Assert.assertEquals(
+                    srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue());
+            Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment());
+        }
+    }
+
+    @Test
+    public void testBuildMysql2DorisTypeName() throws IOException {
+        String columnInfo =
+                "{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10\",\"enumValues\":[]}";
+        schemaChange.setSourceConnector("mysql");
+        JsonNode columns = objectMapper.readTree(columnInfo);
+        String dorisTypeName = schemaChange.buildDorisTypeName(columns);
+        Assert.assertEquals(dorisTypeName, "INT");
+    }
+
+    @Test
+    public void testBuildOracle2DorisTypeName() throws IOException {
+        String columnInfo =
+                "{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}";
+        schemaChange.setSourceConnector("oracle");
+        JsonNode columns = objectMapper.readTree(columnInfo);
+        String dorisTypeName = schemaChange.buildDorisTypeName(columns);
+        Assert.assertEquals(dorisTypeName, "VARCHAR(384)");
+    }
+
+    @Test
+    public void testExtractDDLListRename() throws IOException {
+        String columnInfo =
+                "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1698314781,\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5331,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c3 to c333\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c333\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":10,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+        Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
+        JsonNode record = objectMapper.readTree(columnInfo);
+        schemaChange.setSourceConnector("mysql");
+
+        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
+        schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);
+
+        List<String> ddlList = schemaChange.extractDDLList(record);
+        Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", ddlList.get(0));
+    }
+
+    @Test
+    public void testGetCdcTableIdentifier() throws Exception {
+        String insert =
+                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+        JsonNode recordRoot = objectMapper.readTree(insert);
+        String identifier = schemaChange.getCdcTableIdentifier(recordRoot);
+        Assert.assertEquals("test.t1", identifier);
+
+        String insertSchema =
+                "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+        String identifierSchema =
+                schemaChange.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+        Assert.assertEquals("test.dbo.t1", identifierSchema);
+
+        String ddl =
+                "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        String ddlRes = schemaChange.getCdcTableIdentifier(objectMapper.readTree(ddl));
+        Assert.assertEquals("test.t1", ddlRes);
+    }
+
+    @Test
+    public void testGetDorisTableIdentifier() throws Exception {
+        String identifier = schemaChange.getDorisTableIdentifier("test.dbo.t1");
+        Assert.assertEquals("test.t1", identifier);
+
+        identifier = schemaChange.getDorisTableIdentifier("test.t1");
+        Assert.assertEquals("test.t1", identifier);
+
+        String tmp = dorisOptions.getTableIdentifier();
+        dorisOptions.setTableIdentifier(null);
+        identifier = schemaChange.getDorisTableIdentifier("test.t1");
+        Assert.assertNull(identifier);
+        dorisOptions.setTableIdentifier(tmp);
+    }
+
+    @Test
+    public void testAutoCreateTable() throws Exception {
+        String record =
+                "{    \"source\":{        \"version\":\"1.9.7.Final\",        \"connector\":\"oracle\",        \"name\":\"oracle_logminer\",        \"ts_ms\":1696945825065,        \"snapshot\":\"true\",        \"db\":\"TESTDB\",        \"sequence\":null,        \"schema\":\"ADMIN\",        \"table\":\"PERSONS\",        \"txId\":null,        \"scn\":\"1199617\",        \"commit_scn\":null,        \"lcr_position\":null,        \"rs_id\":null,        \"ssn\":0,        \"redo_thread\":null    },    \"databaseName\":\"TESTDB\",    \"schemaName\":\"ADMIN\",    \"ddl\":\"\\n  CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n   (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n   ) ;\\n \",    \"tableChanges\":[        {            \"type\":\"CREATE\",            \"id\":\"\\\"TESTDB\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\",            \"table\":{                \"defaultCharsetName\":null,                \"primaryKeyColumnNames\":[                    \"ID\"                ],                \"columns\":[                    {                        \"name\":\"ID\",                        \"jdbcType\":2,                        \"nativeType\":null,                        \"typeName\":\"NUMBER\",                        \"typeExpression\":\"NUMBER\",                        \"charsetName\":null,                        \"length\":10,                        \"scale\":0,                        \"position\":1,                        \"optional\":false,                        \"autoIncremented\":false,                        \"generated\":false,                        \"comment\":null                    },                    {                        \"name\":\"NAME4\",                        \"jdbcType\":12,                        \"nativeType\":null,                        \"typeName\":\"VARCHAR2\",                        \"typeExpression\":\"VARCHAR2\",                        \"charsetName\":null,                        \"length\":128,                        \"scale\":null,                        \"position\":2,                        \"optional\":false,                        \"autoIncremented\":false,                        \"generated\":false,                        \"comment\":null                    },                    {                        \"name\":\"age4\",                        \"jdbcType\":12,                        \"nativeType\":null,                        \"typeName\":\"VARCHAR2\",                        \"typeExpression\":\"VARCHAR2\",                        \"charsetName\":null,                        \"length\":128,                        \"scale\":null,                        \"position\":3,                        \"optional\":true,                        \"autoIncremented\":false,                        \"generated\":false,                        \"comment\":null                    }                ],                \"comment\":null            }        }    ]}";
+        JsonNode recordRoot = objectMapper.readTree(record);
+        schemaChange.setSourceConnector(SourceConnector.ORACLE.connectorName);
+        TableSchema tableSchema = schemaChange.extractCreateTableSchema(recordRoot);
+        Assert.assertEquals("TESTDB", tableSchema.getDatabase());
+        Assert.assertEquals("PERSONS", tableSchema.getTable());
+        Assert.assertArrayEquals(new String[] {"ID"}, tableSchema.getKeys().toArray());
+        Assert.assertEquals(3, tableSchema.getFields().size());
+        Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
+        Assert.assertEquals("NAME4", tableSchema.getFields().get("NAME4").getName());
+        Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName());
+        schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 8987030..2f5568e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -39,6 +39,7 @@
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -68,6 +69,7 @@
     private static final String TABLE_1 = "tbl1";
     private static final String TABLE_2 = "tbl2";
     private static final String TABLE_3 = "tbl3";
+    private static final String TABLE_4 = "tbl4";
 
     private static final MySQLContainer MYSQL_CONTAINER =
             new MySQLContainer("mysql")
@@ -104,7 +106,8 @@
                         .collect(Collectors.toSet());
         String sql =
                 "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
-        checkResult(expected, sql, 2);
+        String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+        checkResult(expected, query1, 2);
 
         // add incremental data
         try (Connection connection =
@@ -136,7 +139,8 @@
                         .collect(Collectors.toSet());
         sql =
                 "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
-        checkResult(expected2, sql, 2);
+        String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+        checkResult(expected2, query2, 2);
 
         // mock schema change
         try (Connection connection =
@@ -162,19 +166,127 @@
                                 Arrays.asList("doris_1_1_1", "c1_val"))
                         .collect(Collectors.toSet());
         sql = "select * from %s.%s order by 1";
-        checkResult(expected3, sql, 2);
+        String query3 = String.format(sql, DATABASE, TABLE_1);
+        checkResult(expected3, query3, 2);
         jobClient.cancel().get();
     }
 
+    @Test
+    public void testAutoAddTable() throws Exception {
+        initializeMySQLTable();
+        initializeDorisTable();
+        JobClient jobClient = submitJob();
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        Set<List<Object>> expected =
+                Stream.<List<Object>>of(
+                                Arrays.asList("doris_1", 1),
+                                Arrays.asList("doris_2", 2),
+                                Arrays.asList("doris_3", 3))
+                        .collect(Collectors.toSet());
+        String sql =
+                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
+        String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+        checkResult(expected, query1, 2);
+
+        // auto create table4
+        addTableTable_4();
+        Thread.sleep(20000);
+        Set<List<Object>> expected2 =
+                Stream.<List<Object>>of(
+                                Arrays.asList("doris_4_1", 4), Arrays.asList("doris_4_2", 4))
+                        .collect(Collectors.toSet());
+        sql = "select * from %s.%s order by 1";
+        String query2 = String.format(sql, DATABASE, TABLE_4);
+        checkResult(expected2, query2, 2);
+
+        // add incremental data
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_1_1',10)", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_2_1',11)", DATABASE, TABLE_2));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_3_1',12)", DATABASE, TABLE_3));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_4_3',43)", DATABASE, TABLE_4));
+
+            statement.execute(
+                    String.format(
+                            "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
+            statement.execute(
+                    String.format("delete from %s.%s where name='doris_4_2'", DATABASE, TABLE_4));
+            statement.execute(
+                    String.format(
+                            "update %s.%s set age=41 where name='doris_4_1'", DATABASE, TABLE_4));
+        }
+
+        Thread.sleep(20000);
+        Set<List<Object>> expected3 =
+                Stream.<List<Object>>of(
+                                Arrays.asList("doris_1", 18),
+                                Arrays.asList("doris_1_1", 10),
+                                Arrays.asList("doris_2_1", 11),
+                                Arrays.asList("doris_3", 3),
+                                Arrays.asList("doris_3_1", 12),
+                                Arrays.asList("doris_4_1", 41),
+                                Arrays.asList("doris_4_3", 43))
+                        .collect(Collectors.toSet());
+        sql =
+                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
+        String query3 =
+                String.format(
+                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
+                        TABLE_4);
+        checkResult(expected3, query3, 2);
+
+        // mock schema change
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_4));
+            statement.execute(
+                    String.format("alter table %s.%s drop column age", DATABASE, TABLE_4));
+            Thread.sleep(20000);
+            statement.execute(
+                    String.format(
+                            "insert into %s.%s  values ('doris_4_4','c1_val')", DATABASE, TABLE_4));
+        }
+        Thread.sleep(20000);
+        Set<List<Object>> expected4 =
+                Stream.<List<Object>>of(
+                                Arrays.asList("doris_4_1", null),
+                                Arrays.asList("doris_4_3", null),
+                                Arrays.asList("doris_4_4", "c1_val"))
+                        .collect(Collectors.toSet());
+        sql = "select * from %s.%s order by 1";
+        String query4 = String.format(sql, DATABASE, TABLE_4);
+        checkResult(expected4, query4, 2);
+        jobClient.cancel().get();
+    }
+
+    private void initializeDorisTable() throws Exception {
+        try (Statement statement = connection.createStatement()) {
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
+        }
+    }
+
     public void checkResult(Set<List<Object>> expected, String query, int columnSize)
             throws Exception {
         Set<List<Object>> actual = new HashSet<>();
         try (Statement sinkStatement = connection.createStatement()) {
-            ResultSet sinkResultSet =
-                    sinkStatement.executeQuery(
-                            String.format(
-                                    query, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE,
-                                    TABLE_3));
+            ResultSet sinkResultSet = sinkStatement.executeQuery(query);
             while (sinkResultSet.next()) {
                 List<Object> row = new ArrayList<>();
                 for (int i = 1; i <= columnSize; i++) {
@@ -218,7 +330,7 @@
         Map<String, String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "tbl1|tbl2|tbl3";
+        String includingTables = "tbl.*";
         String excludingTables = "";
         DatabaseSync databaseSync = new MysqlDatabaseSync();
         databaseSync
@@ -232,6 +344,7 @@
                 .setTableConfig(tableConfig)
                 .setCreateTableOnly(false)
                 .setNewSchemaChange(true)
+                .setSingleSink(true)
                 .create();
         databaseSync.build();
         JobClient jobClient = env.executeAsync();
@@ -242,6 +355,28 @@
         return jobClient;
     }
 
+    private void addTableTable_4() throws SQLException {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256) primary key,\n"
+                                    + "`age` int\n"
+                                    + ")",
+                            DATABASE, TABLE_4));
+
+            // mock stock data
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_4_1',4)", DATABASE, TABLE_4));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_4_2',4)", DATABASE, TABLE_4));
+        }
+    }
+
     public void initializeMySQLTable() throws Exception {
         try (Connection connection =
                         DriverManager.getConnection(