[feature] multiple tables to one for DatabaseSync (#208)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 6a390ea..8a8b3db 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -103,6 +103,8 @@
         String tableSuffix = params.get("table-suffix");
         String includingTables = params.get("including-tables");
         String excludingTables = params.get("excluding-tables");
+        String multiToOneOrigin = params.get("multi-to-one-origin");
+        String multiToOneTarget = params.get("multi-to-one-target");
         boolean createTableOnly = params.has("create-table-only");
         boolean ignoreDefaultValue = params.has("ignore-default-value");
         boolean useNewSchemaChange = params.has("use-new-schema-change");
@@ -112,7 +114,7 @@
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
+        databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
         databaseSync.build();
         if(StringUtils.isNullOrWhitespaceOnly(jobName)){
             jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db"));
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index fcd0f4c..99c45eb 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -33,6 +33,7 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,7 @@
     protected TableNameConverter converter;
     protected Pattern includingPattern;
     protected Pattern excludingPattern;
+    protected Map<Pattern, String> multiToOneRulesPattern;
     protected Map<String, String> tableConfig;
     protected Configuration sinkConfig;
     protected boolean ignoreDefaultValue;
@@ -67,6 +69,8 @@
     private boolean newSchemaChange;
     protected String includingTables;
     protected String excludingTables;
+    protected String multiToOneOrigin;
+    protected String multiToOneTarget;
 
     public abstract void registerDriver() throws SQLException;
 
@@ -82,16 +86,19 @@
 
     public void create(StreamExecutionEnvironment env, String database, Configuration config,
                        String tablePrefix, String tableSuffix, String includingTables,
-                       String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig,
+                       String excludingTables,String multiToOneOrigin,String multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig,
             Map<String, String> tableConfig, boolean createTableOnly, boolean useNewSchemaChange) {
         this.env = env;
         this.config = config;
         this.database = database;
-        this.converter = new TableNameConverter(tablePrefix, tableSuffix);
         this.includingTables = includingTables;
         this.excludingTables = excludingTables;
+        this.multiToOneOrigin = multiToOneOrigin;
+        this.multiToOneTarget = multiToOneTarget;
         this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
         this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
+        this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin,multiToOneTarget);
+        this.converter = new TableNameConverter(tablePrefix, tableSuffix,multiToOneRulesPattern);
         this.ignoreDefaultValue = ignoreDefaultValue;
         this.sinkConfig = sinkConfig;
         this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
@@ -118,7 +125,7 @@
         List<String> dorisTables = new ArrayList<>();
         for (SourceSchema schema : schemaList) {
             syncTables.add(schema.getTableName());
-            String dorisTable = converter.convert(schema.getTableName());
+            String dorisTable=converter.convert(schema.getTableName());
             if (!dorisSystem.tableExists(database, dorisTable)) {
                 TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
                 //set doris target database
@@ -126,7 +133,9 @@
                 dorisSchema.setTable(dorisTable);
                 dorisSystem.createTable(dorisSchema);
             }
-            dorisTables.add(dorisTable);
+            if(!dorisTables.contains(dorisTable)){
+                dorisTables.add(dorisTable);
+            }
         }
         if(createTableOnly){
             System.out.println("Create table finished.");
@@ -139,7 +148,6 @@
         for (String table : dorisTables) {
             OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
             DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
-
             int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
             sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
         }
@@ -245,11 +253,36 @@
         LOG.debug("table {} is synchronized? {}", tableName, sync);
         return sync;
     }
+    /**
+     * Filter table that many tables merge to one
+     */
+    protected HashMap<Pattern,String> multiToOneRulesParser(String multiToOneOrigin,String multiToOneTarget){
+        if(StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin) || StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)){
+            return null;
+        }
+        HashMap<Pattern,String> multiToOneRulesPattern= new HashMap<>();
+        String[] origins = multiToOneOrigin.split("\\|");
+        String[] targets = multiToOneTarget.split("\\|");
+        if(origins.length!=targets.length){
+            System.out.println("param error : multi to one params length are not equal,please check your params.");
+            System.exit(1);
+        }
+        try {
+            for (int i = 0; i < origins.length; i++) {
+                multiToOneRulesPattern.put(Pattern.compile(origins[i]),targets[i]);
+            }
+        } catch (Exception e) {
+            System.out.println("param error : Your regular expression is incorrect,please check.");
+            System.exit(1);
+        }
+        return multiToOneRulesPattern;
+    }
 
     public static class TableNameConverter implements Serializable {
         private static final long serialVersionUID = 1L;
         private final String prefix;
         private final String suffix;
+        private Map<Pattern,String> multiToOneRulesPattern;
 
         TableNameConverter(){
             this("","");
@@ -260,8 +293,33 @@
             this.suffix = suffix == null ? "" : suffix;
         }
 
+        TableNameConverter(String prefix, String suffix,Map<Pattern, String> multiToOneRulesPattern) {
+            this.prefix = prefix == null ? "" : prefix;
+            this.suffix = suffix == null ? "" : suffix;
+            this.multiToOneRulesPattern = multiToOneRulesPattern;
+        }
+
         public String convert(String tableName) {
-            return prefix + tableName + suffix;
+            if(multiToOneRulesPattern==null){
+                return prefix + tableName + suffix;
+            }
+
+            String target=null;
+
+            for (Map.Entry<Pattern, String> patternStringEntry : multiToOneRulesPattern.entrySet()) {
+                if(patternStringEntry.getKey().matcher(tableName).matches()){
+                    target=patternStringEntry.getValue();
+                }
+            }
+            /**
+             * If multiToOneRulesPattern is not null and target is not assigned,
+             * then the synchronization task contains both multi to one and one to one ,
+             * prefixes and suffixes are added to common one-to-one mapping tables
+             * */
+            if(target==null){
+                return prefix + tableName + suffix;
+            }
+            return target;
         }
     }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 1a205b1..875fb4c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -48,7 +48,8 @@
         mysqlConfig.put("hostname","127.0.0.1");
         mysqlConfig.put("port","3306");
         mysqlConfig.put("username","root");
-        mysqlConfig.put("password","");
+//        mysqlConfig.put("password","");
+        mysqlConfig.put("password","12345678");
         Configuration config = Configuration.fromMap(mysqlConfig);
 
         Map<String,String> sinkConfig = new HashMap<>();
@@ -63,12 +64,15 @@
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "tbl1|tbl2|tbl3";
+//        String includingTables = "tbl1|tbl2|tbl3";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 3a2a39e..9b6277f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -69,12 +69,14 @@
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "test.*";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new OracleDatabaseSync();
-        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("Oracle-Doris Database Sync: %s", database));
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index cf5e1d8..87fa871 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -72,12 +72,14 @@
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "testcdc";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new PostgresDatabaseSync();
-        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("Postgres-Doris Database Sync: %s", database));
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 7251a7f..d247500 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -51,7 +51,7 @@
         sourceConfig.put("hostname","127.0.0.1");
         sourceConfig.put("port","1433");
         sourceConfig.put("username","sa");
-        sourceConfig.put("password","123456");
+        sourceConfig.put("password","Passw@rd");
 //        sourceConfig.put("debezium.database.tablename.case.insensitive","false");
 //        sourceConfig.put("scan.incremental.snapshot.enabled","true");
 //        sourceConfig.put("debezium.include.schema.changes","false");
@@ -70,14 +70,16 @@
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "products_test";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new SqlServerDatabaseSync();
-        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
         databaseSync.build();
-        env.execute(String.format("Postgres-Doris Database Sync: %s", database));
+        env.execute(String.format("SqlServer-Doris Database Sync: %s", database));
 
     }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
new file mode 100644
index 0000000..daab90b
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.junit.Test;
+import java.util.Arrays;
+
+/**
+ * Unit tests for the {@link DatabaseSync}.
+ **/
+public class DatabaseSyncTest {
+    @Test
+    public void multiToOneRulesParserTest() throws Exception{
+        String[][] testCase = {
+                {"a_.*|b_.*","a|b"} //  Normal condition
+//                ,{"a_.*|b_.*","a|b|c"} // Unequal length
+//                ,{"",""} // Null value
+//                ,{"***....","a"} // Abnormal regular expression
+        };
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        Arrays.stream(testCase).forEach(arr->{
+            databaseSync.multiToOneRulesParser(arr[0], arr[1]);
+        });
+    }
+}