[feature] multiple tables to one for DatabaseSync (#208)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 6a390ea..8a8b3db 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -103,6 +103,8 @@
String tableSuffix = params.get("table-suffix");
String includingTables = params.get("including-tables");
String excludingTables = params.get("excluding-tables");
+ String multiToOneOrigin = params.get("multi-to-one-origin");
+ String multiToOneTarget = params.get("multi-to-one-target");
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean useNewSchemaChange = params.has("use-new-schema-change");
@@ -112,7 +114,7 @@
Configuration sinkConfig = Configuration.fromMap(sinkMap);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
+ databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db"));
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index fcd0f4c..99c45eb 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -33,6 +33,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +59,7 @@
protected TableNameConverter converter;
protected Pattern includingPattern;
protected Pattern excludingPattern;
+ protected Map<Pattern, String> multiToOneRulesPattern;
protected Map<String, String> tableConfig;
protected Configuration sinkConfig;
protected boolean ignoreDefaultValue;
@@ -67,6 +69,8 @@
private boolean newSchemaChange;
protected String includingTables;
protected String excludingTables;
+ protected String multiToOneOrigin;
+ protected String multiToOneTarget;
public abstract void registerDriver() throws SQLException;
@@ -82,16 +86,19 @@
public void create(StreamExecutionEnvironment env, String database, Configuration config,
String tablePrefix, String tableSuffix, String includingTables,
- String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig,
+ String excludingTables,String multiToOneOrigin,String multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig,
Map<String, String> tableConfig, boolean createTableOnly, boolean useNewSchemaChange) {
this.env = env;
this.config = config;
this.database = database;
- this.converter = new TableNameConverter(tablePrefix, tableSuffix);
this.includingTables = includingTables;
this.excludingTables = excludingTables;
+ this.multiToOneOrigin = multiToOneOrigin;
+ this.multiToOneTarget = multiToOneTarget;
this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
+ this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin,multiToOneTarget);
+ this.converter = new TableNameConverter(tablePrefix, tableSuffix,multiToOneRulesPattern);
this.ignoreDefaultValue = ignoreDefaultValue;
this.sinkConfig = sinkConfig;
this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
@@ -118,7 +125,7 @@
List<String> dorisTables = new ArrayList<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
- String dorisTable = converter.convert(schema.getTableName());
+ String dorisTable=converter.convert(schema.getTableName());
if (!dorisSystem.tableExists(database, dorisTable)) {
TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
//set doris target database
@@ -126,7 +133,9 @@
dorisSchema.setTable(dorisTable);
dorisSystem.createTable(dorisSchema);
}
- dorisTables.add(dorisTable);
+ if(!dorisTables.contains(dorisTable)){
+ dorisTables.add(dorisTable);
+ }
}
if(createTableOnly){
System.out.println("Create table finished.");
@@ -139,7 +148,6 @@
for (String table : dorisTables) {
OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
-
int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
}
@@ -245,11 +253,36 @@
LOG.debug("table {} is synchronized? {}", tableName, sync);
return sync;
}
+ /**
+ * Filter table that many tables merge to one
+ */
+ protected HashMap<Pattern,String> multiToOneRulesParser(String multiToOneOrigin,String multiToOneTarget){
+ if(StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin) || StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)){
+ return null;
+ }
+ HashMap<Pattern,String> multiToOneRulesPattern= new HashMap<>();
+ String[] origins = multiToOneOrigin.split("\\|");
+ String[] targets = multiToOneTarget.split("\\|");
+ if(origins.length!=targets.length){
+ System.out.println("param error : multi to one params length are not equal,please check your params.");
+ System.exit(1);
+ }
+ try {
+ for (int i = 0; i < origins.length; i++) {
+ multiToOneRulesPattern.put(Pattern.compile(origins[i]),targets[i]);
+ }
+ } catch (Exception e) {
+ System.out.println("param error : Your regular expression is incorrect,please check.");
+ System.exit(1);
+ }
+ return multiToOneRulesPattern;
+ }
public static class TableNameConverter implements Serializable {
private static final long serialVersionUID = 1L;
private final String prefix;
private final String suffix;
+ private Map<Pattern,String> multiToOneRulesPattern;
TableNameConverter(){
this("","");
@@ -260,8 +293,33 @@
this.suffix = suffix == null ? "" : suffix;
}
+ TableNameConverter(String prefix, String suffix,Map<Pattern, String> multiToOneRulesPattern) {
+ this.prefix = prefix == null ? "" : prefix;
+ this.suffix = suffix == null ? "" : suffix;
+ this.multiToOneRulesPattern = multiToOneRulesPattern;
+ }
+
public String convert(String tableName) {
- return prefix + tableName + suffix;
+ if(multiToOneRulesPattern==null){
+ return prefix + tableName + suffix;
+ }
+
+ String target=null;
+
+ for (Map.Entry<Pattern, String> patternStringEntry : multiToOneRulesPattern.entrySet()) {
+ if(patternStringEntry.getKey().matcher(tableName).matches()){
+ target=patternStringEntry.getValue();
+ }
+ }
+ /**
+ * If multiToOneRulesPattern is not null and target is not assigned,
+ * then the synchronization task contains both multi to one and one to one ,
+ * prefixes and suffixes are added to common one-to-one mapping tables
+ * */
+ if(target==null){
+ return prefix + tableName + suffix;
+ }
+ return target;
}
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 1a205b1..875fb4c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -48,7 +48,8 @@
mysqlConfig.put("hostname","127.0.0.1");
mysqlConfig.put("port","3306");
mysqlConfig.put("username","root");
- mysqlConfig.put("password","");
+// mysqlConfig.put("password","");
+ mysqlConfig.put("password","12345678");
Configuration config = Configuration.fromMap(mysqlConfig);
Map<String,String> sinkConfig = new HashMap<>();
@@ -63,12 +64,15 @@
Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
- String includingTables = "tbl1|tbl2|tbl3";
+// String includingTables = "tbl1|tbl2|tbl3";
+ String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
+ String multiToOneOrigin="a_.*|b_.*";
+ String multiToOneTarget="a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 3a2a39e..9b6277f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -69,12 +69,14 @@
Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
- String includingTables = "test.*";
+ String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
+ String multiToOneOrigin="a_.*|b_.*";
+ String multiToOneTarget="a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
databaseSync.build();
env.execute(String.format("Oracle-Doris Database Sync: %s", database));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index cf5e1d8..87fa871 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -72,12 +72,14 @@
Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
- String includingTables = "testcdc";
+ String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
+ String multiToOneOrigin="a_.*|b_.*";
+ String multiToOneTarget="a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new PostgresDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
databaseSync.build();
env.execute(String.format("Postgres-Doris Database Sync: %s", database));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 7251a7f..d247500 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -51,7 +51,7 @@
sourceConfig.put("hostname","127.0.0.1");
sourceConfig.put("port","1433");
sourceConfig.put("username","sa");
- sourceConfig.put("password","123456");
+ sourceConfig.put("password","Passw@rd");
// sourceConfig.put("debezium.database.tablename.case.insensitive","false");
// sourceConfig.put("scan.incremental.snapshot.enabled","true");
// sourceConfig.put("debezium.include.schema.changes","false");
@@ -70,14 +70,16 @@
Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
- String includingTables = "products_test";
+ String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
+ String multiToOneOrigin="a_.*|b_.*";
+ String multiToOneTarget="a|b";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new SqlServerDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
databaseSync.build();
- env.execute(String.format("Postgres-Doris Database Sync: %s", database));
+ env.execute(String.format("SqlServer-Doris Database Sync: %s", database));
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
new file mode 100644
index 0000000..daab90b
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.junit.Test;
+import java.util.Arrays;
+
+/**
+ * Unit tests for the {@link DatabaseSync}.
+ **/
+public class DatabaseSyncTest {
+ @Test
+ public void multiToOneRulesParserTest() throws Exception{
+ String[][] testCase = {
+ {"a_.*|b_.*","a|b"} // Normal condition
+// ,{"a_.*|b_.*","a|b|c"} // Unequal length
+// ,{"",""} // Null value
+// ,{"***....","a"} // Abnormal regular expression
+ };
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ Arrays.stream(testCase).forEach(arr->{
+ databaseSync.multiToOneRulesParser(arr[0], arr[1]);
+ });
+ }
+}