[fix](cdc) single sink add tableprefix and tablesuffix (#301)

Currently, when single-sink is enabled and CDC automatically creates a table, it cannot automatically obtain the suffix and suffix.
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 370bca7..5bea2d9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.sink.writer.serializer;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -66,6 +67,8 @@
     // create table properties
     private Map<String, String> tableProperties;
     private String targetDatabase;
+    private String targetTablePrefix;
+    private String targetTableSuffix;
     private JsonDebeziumDataChange dataChange;
     private JsonDebeziumSchemaChange schemaChange;
 
@@ -109,11 +112,15 @@
             DorisExecutionOptions executionOptions,
             Map<String, String> tableMapping,
             Map<String, String> tableProperties,
-            String targetDatabase) {
+            String targetDatabase,
+            String targetTablePrefix,
+            String targetTableSuffix) {
         this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
         this.tableMapping = tableMapping;
         this.tableProperties = tableProperties;
         this.targetDatabase = targetDatabase;
+        this.targetTablePrefix = targetTablePrefix;
+        this.targetTableSuffix = targetTableSuffix;
         init();
     }
 
@@ -128,8 +135,9 @@
                         objectMapper,
                         pattern,
                         lineDelimiter,
-                        ignoreUpdateBefore);
-
+                        ignoreUpdateBefore,
+                        targetTablePrefix,
+                        targetTableSuffix);
         this.schemaChange =
                 newSchemaChange
                         ? new JsonDebeziumSchemaChangeImplV2(changeContext)
@@ -180,6 +188,8 @@
         private Map<String, String> tableMapping;
         private Map<String, String> tableProperties;
         private String targetDatabase;
+        private String targetTablePrefix = "";
+        private String targetTableSuffix = "";
 
         public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
@@ -221,6 +231,20 @@
             return this;
         }
 
+        public Builder setTargetTablePrefix(String tablePrefix) {
+            if (!StringUtils.isNullOrWhitespaceOnly(tablePrefix)) {
+                this.targetTablePrefix = tablePrefix;
+            }
+            return this;
+        }
+
+        public Builder setTargetTableSuffix(String tableSuffix) {
+            if (!StringUtils.isNullOrWhitespaceOnly(tableSuffix)) {
+                this.targetTableSuffix = tableSuffix;
+            }
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer build() {
             return new JsonDebeziumSchemaSerializer(
                     dorisOptions,
@@ -230,7 +254,9 @@
                     executionOptions,
                     tableMapping,
                     tableProperties,
-                    targetDatabase);
+                    targetDatabase,
+                    targetTablePrefix,
+                    targetTableSuffix);
         }
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index 9c59f14..a7253d2 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -38,6 +38,8 @@
     private final Pattern pattern;
     private final String lineDelimiter;
     private final boolean ignoreUpdateBefore;
+    private String targetTablePrefix;
+    private String targetTableSuffix;
 
     public JsonDebeziumChangeContext(
             DorisOptions dorisOptions,
@@ -48,7 +50,9 @@
             ObjectMapper objectMapper,
             Pattern pattern,
             String lineDelimiter,
-            boolean ignoreUpdateBefore) {
+            boolean ignoreUpdateBefore,
+            String targetTablePrefix,
+            String targetTableSuffix) {
         this.dorisOptions = dorisOptions;
         this.tableMapping = tableMapping;
         this.sourceTableName = sourceTableName;
@@ -58,6 +62,8 @@
         this.pattern = pattern;
         this.lineDelimiter = lineDelimiter;
         this.ignoreUpdateBefore = ignoreUpdateBefore;
+        this.targetTablePrefix = targetTablePrefix;
+        this.targetTableSuffix = targetTableSuffix;
     }
 
     public DorisOptions getDorisOptions() {
@@ -95,4 +101,12 @@
     public boolean isIgnoreUpdateBefore() {
         return ignoreUpdateBefore;
     }
+
+    public String getTargetTablePrefix() {
+        return targetTablePrefix;
+    }
+
+    public String getTargetTableSuffix() {
+        return targetTableSuffix;
+    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 5604da7..b3a90e6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -70,6 +70,8 @@
     // create table properties
     private final Map<String, String> tableProperties;
     private String targetDatabase;
+    private String targetTablePrefix;
+    private String targetTableSuffix;
 
     public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
         this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE);
@@ -81,6 +83,14 @@
         this.tableProperties = changeContext.getTableProperties();
         this.tableMapping = changeContext.getTableMapping();
         this.objectMapper = changeContext.getObjectMapper();
+        this.targetTablePrefix =
+                changeContext.getTargetTablePrefix() == null
+                        ? ""
+                        : changeContext.getTargetTablePrefix();
+        this.targetTableSuffix =
+                changeContext.getTargetTableSuffix() == null
+                        ? ""
+                        : changeContext.getTargetTableSuffix();
     }
 
     @Override
@@ -253,7 +263,7 @@
 
     private String getCreateTableIdentifier(JsonNode record) {
         String table = extractJsonNode(record.get("source"), "table");
-        return targetDatabase + "." + table;
+        return targetDatabase + "." + targetTablePrefix + table + targetTableSuffix;
     }
 
     private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index b6bb5e5..e153039 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -288,6 +288,8 @@
                                 .setTableMapping(tableMapping)
                                 .setTableProperties(tableConfig)
                                 .setTargetDatabase(database)
+                                .setTargetTablePrefix(tablePrefix)
+                                .setTargetTableSuffix(tableSuffix)
                                 .build())
                 .setDorisOptions(dorisBuilder.build());
         return builder.build();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
index d789807..8900339 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -48,7 +48,9 @@
                         objectMapper,
                         null,
                         lineDelimiter,
-                        ignoreUpdateBefore);
+                        ignoreUpdateBefore,
+                        "",
+                        "");
         dataChange = new JsonDebeziumDataChange(changeContext);
     }
 
@@ -109,7 +111,9 @@
                         objectMapper,
                         null,
                         lineDelimiter,
-                        false);
+                        false,
+                        "",
+                        "");
         dataChange = new JsonDebeziumDataChange(changeContext);
 
         // update t1 set name='doris-update' WHERE id =1;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
index 9b003a8..c8997a1 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
@@ -55,7 +55,9 @@
                         objectMapper,
                         null,
                         lineDelimiter,
-                        ignoreUpdateBefore);
+                        ignoreUpdateBefore,
+                        "",
+                        "");
         schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index c63267d..0ce60d3 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -58,7 +58,9 @@
                         objectMapper,
                         null,
                         lineDelimiter,
-                        ignoreUpdateBefore);
+                        ignoreUpdateBefore,
+                        "",
+                        "");
         schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 242f93f..3390f75 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -72,7 +72,7 @@
     private static final String TABLE_4 = "tbl4";
 
     private static final MySQLContainer MYSQL_CONTAINER =
-            new MySQLContainer("mysql")
+            new MySQLContainer("mysql:8.0")
                     .withDatabaseName(DATABASE)
                     .withUsername(MYSQL_USER)
                     .withPassword(MYSQL_PASSWD);