[improve] Schema change does not require db and tbl consistency (#87)

* Optimizing schema changes
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index c3fe987..bd685ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -25,6 +25,7 @@
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -56,14 +57,23 @@
     private static final String OP_UPDATE = "u"; // update
     private static final String OP_DELETE = "d"; // delete
 
-    private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+COLUMN\\s+([^\\s]+).*";
+    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int
+    private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
     private final Pattern addDropDDLPattern;
     private DorisOptions dorisOptions;
     private ObjectMapper objectMapper = new ObjectMapper();
+    private String database;
+    private String table;
+    //table name of the cdc upstream, format is db.tbl
+    private String sourceTableName;
 
-    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern) {
+    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) {
         this.dorisOptions = dorisOptions;
         this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
+        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
+        this.database = tableInfo[0];
+        this.table = tableInfo[1];
+        this.sourceTableName = sourceTableName;
     }
 
     @Override
@@ -97,8 +107,16 @@
     public boolean schemaChange(JsonNode recordRoot) {
         boolean status = false;
         try{
-            boolean doSchemaChange = checkSchemaChange(recordRoot);
-            status = doSchemaChange && execSchemaChange(recordRoot);
+            if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){
+                return false;
+            }
+            String ddl = extractDDL(recordRoot);
+            if(StringUtils.isNullOrWhitespaceOnly(ddl)){
+                LOG.info("ddl can not do schema change:{}", recordRoot);
+                return false;
+            }
+            boolean doSchemaChange = checkSchemaChange(ddl);
+            status = doSchemaChange && execSchemaChange(ddl);
             LOG.info("schema change status:{}", status);
         }catch (Exception ex){
             LOG.warn("schema change error :", ex);
@@ -106,6 +124,16 @@
         return status;
     }
 
+    /**
+     * When cdc synchronizes multiple tables, it will capture multiple table schema changes
+     */
+    protected boolean checkTable(JsonNode recordRoot) {
+        String db = extractDatabase(recordRoot);
+        String tbl = extractTable(recordRoot);
+        String dbTbl = db + "." + tbl;
+        return sourceTableName.equals(dbTbl);
+    }
+
     private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
         if(delete){
             valueMap.put(DORIS_DELETE_SIGN, "1");
@@ -114,11 +142,9 @@
         }
     }
 
-    private boolean checkSchemaChange(JsonNode record) throws IOException {
-        String database = extractDatabase(record);
-        String table = extractTable(record);
+    private boolean checkSchemaChange(String ddl) throws IOException {
         String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database, table);
-        Map<String,Object> param = buildRequestParam(record);
+        Map<String,Object> param = buildRequestParam(ddl);
         if(param.size() != 2){
             return false;
         }
@@ -139,27 +165,21 @@
      * "columnName" : "column"
      * }
      */
-    private Map<String, Object> buildRequestParam(JsonNode record) throws JsonProcessingException {
+    protected Map<String, Object> buildRequestParam(String ddl) {
         Map<String,Object> params = new HashMap<>();
-        String ddl = extractDDL(record);
-        if(ddl == null){
-            return params;
-        }
         Matcher matcher = addDropDDLPattern.matcher(ddl);
         if(matcher.find()){
             String op = matcher.group(1);
-            String col = matcher.group(2);
+            String col = matcher.group(3);
             params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
             params.put("columnName", col);
         }
         return params;
     }
 
-    private boolean execSchemaChange(JsonNode record) throws IOException {
-        String extractDDL = extractDDL(record);
+    private boolean execSchemaChange(String ddl) throws IOException {
         Map<String, String> param = new HashMap<>();
-        param.put("stmt", extractDDL);
-        String database = extractDatabase(record);
+        param.put("stmt", ddl);
         String requestUrl = String.format(SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database);
         HttpPost httpPost = new HttpPost(requestUrl);
         httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
@@ -169,15 +189,20 @@
         return success;
     }
 
-    private String extractDatabase(JsonNode record) {
-        return extractJsonNode(record.get("source"), "db");
+    protected String extractDatabase(JsonNode record) {
+        if(record.get("source").has("schema")){
+            //compatible with schema
+            return extractJsonNode(record.get("source"), "schema");
+        }else{
+            return extractJsonNode(record.get("source"), "db");
+        }
     }
 
-    private String extractTable(JsonNode record) {
+    protected String extractTable(JsonNode record) {
         return extractJsonNode(record.get("source"), "table");
     }
 
-    private boolean handleResponse(HttpUriRequest request) throws IOException {
+    private boolean handleResponse(HttpUriRequest request) {
         try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
             CloseableHttpResponse response = httpclient.execute(request);
             final int statusCode = response.getStatusLine().getStatusCode();
@@ -215,20 +240,26 @@
         return recordMap != null ? recordMap : new HashMap<>();
     }
 
-    @VisibleForTesting
     public String extractDDL(JsonNode record) throws JsonProcessingException {
         String historyRecord = extractJsonNode(record, "historyRecord");
         if (Objects.isNull(historyRecord)) {
             return null;
         }
         String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
+        LOG.debug("received debezium ddl :{}", ddl);
         if (!Objects.isNull(ddl)) {
             //filter add/drop operation
-            if (addDropDDLPattern.matcher(ddl).matches()) {
+            Matcher matcher = addDropDDLPattern.matcher(ddl);
+            if(matcher.find()){
+                String op = matcher.group(1);
+                String col = matcher.group(3);
+                String type = matcher.group(5);
+                type = type == null ? "" : type;
+                ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);
+                LOG.info("parse ddl:{}", ddl);
                 return ddl;
             }
         }
-        LOG.info("parse ddl:{}", ddl);
         return null;
     }
 
@@ -246,6 +277,7 @@
     public static class Builder {
         private DorisOptions dorisOptions;
         private Pattern addDropDDLPattern;
+        private String sourceTableName;
 
         public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
@@ -257,8 +289,13 @@
             return this;
         }
 
+        public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTableName) {
+            this.sourceTableName = sourceTableName;
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer build() {
-            return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern);
+            return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName);
         }
     }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 0d35ae3..ed5c37f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -100,8 +100,8 @@
 
     @Test
     public void testExtractDDL() throws IOException {
-        String srcDDL = "alter table t1 add \n column  c_1 varchar(200)";
-        String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n column  c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+        String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(200)";
+        String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n    c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
         JsonNode recordRoot = objectMapper.readTree(record);
         String ddl = serializer.extractDDL(recordRoot);
         Assert.assertEquals(srcDDL, ddl);