[Bug] fix can not create table problem (#252)

Co-authored-by: wudi <>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 29d9cee..25d06ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -93,6 +93,9 @@
     private SchemaChangeManager schemaChangeManager;
     // <cdc db.schema.table, doris db.table>
     private Map<String, String> tableMapping;
+    // create table properties
+    private Map<String, String> tableProperties;
+    private String targetDatabase;
 
     public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
             Pattern pattern,
@@ -134,24 +137,19 @@
             String sourceTableName,
             boolean newSchemaChange,
             DorisExecutionOptions executionOptions,
-            Map<String, String> tableMapping) {
+            Map<String, String> tableMapping,
+            Map<String, String> tableProperties,
+            String targetDatabase) {
         this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
         this.tableMapping = tableMapping;
+        this.tableProperties = tableProperties;
+        this.targetDatabase = targetDatabase;
     }
 
     @Override
     public DorisRecord serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
         JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
-
-        //Filter out table records that are not in tableMapping
-        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
-        String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
-        if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
-            LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
-            return null;
-        }
-
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
             // schema change ddl
@@ -166,6 +164,15 @@
         if (newSchemaChange && firstLoad) {
             initOriginFieldSchema(recordRoot);
         }
+
+        //Filter out table records that are not in tableMapping
+        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
+        if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
+            LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
+            return null;
+        }
+
         Map<String, Object> valueMap;
         switch (op) {
             case OP_READ:
@@ -313,11 +320,16 @@
 
     @VisibleForTesting
     public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
+        if(sourceConnector == null){
+            sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
+        }
+
         String dorisTable = getCreateTableIdentifier(record);
         JsonNode tableChange =  extractTableChange(record);
         JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
         JsonNode columns = tableChange.get("table").get("columns");
-        String tblComment = tableChange.get("table").get("comment").asText();
+        JsonNode comment = tableChange.get("table").get("comment");
+        String tblComment = comment == null ? "" : comment.asText();
         Map<String, FieldSchema> field = new LinkedHashMap<>();
         for (JsonNode column : columns) {
             buildFieldSchema(field, column);
@@ -333,6 +345,7 @@
         tableSchema.setKeys(pkList);
         tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
         tableSchema.setTableComment(tblComment);
+        tableSchema.setProperties(tableProperties);
 
         String[] split = dorisTable.split("\\.");
         Preconditions.checkArgument(split.length == 2);
@@ -402,9 +415,8 @@
     }
 
     public String getCreateTableIdentifier(JsonNode record){
-        String db = extractJsonNode(record.get("source"), "db");
         String table = extractJsonNode(record.get("source"), "table");
-        return db + "." + table;
+        return targetDatabase + "." + table;
     }
 
     public String getDorisTableIdentifier(String cdcTableIdentifier){
@@ -657,6 +669,8 @@
         private boolean newSchemaChange;
         private DorisExecutionOptions executionOptions;
         private Map<String, String> tableMapping;
+        private Map<String, String> tableProperties;
+        private String targetDatabase;
 
         public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
@@ -688,9 +702,19 @@
             return this;
         }
 
+        public Builder setTableProperties(Map<String, String> tableProperties) {
+            this.tableProperties = tableProperties;
+            return this;
+        }
+
+        public Builder setTargetDatabase(String targetDatabase) {
+            this.targetDatabase = targetDatabase;
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer build() {
             return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange,
-                    executionOptions, tableMapping);
+                    executionOptions, tableMapping, tableProperties, targetDatabase);
         }
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 02ab034..fe5357e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -242,6 +242,8 @@
                         .setNewSchemaChange(newSchemaChange)
                         .setExecutionOptions(executionOptions)
                         .setTableMapping(tableMapping)
+                        .setTableProperties(tableConfig)
+                        .setTargetDatabase(database)
                         .build())
                 .setDorisOptions(dorisBuilder.build());
         return builder.build();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index a3e01d3..a86b2ab 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -122,9 +122,7 @@
                 .username(config.get(MySqlSourceOptions.USERNAME))
                 .password(config.get(MySqlSourceOptions.PASSWORD))
                 .databaseList(databaseName)
-                .tableList(tableName)
-                //default open add newly table
-                .scanNewlyAddedTableEnabled(true);
+                .tableList(tableName);
 
         config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
         config