[fix][cdc] fix uid conflicts during multi-database synchronization. (#382)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 632edcc..a1f511a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -129,12 +129,14 @@
tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets"));
}
Set<String> bucketsTable = new HashSet<>();
+ Set<String> targetDbSet = new HashSet<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
String targetDb = database;
// Synchronize multiple databases using the src database name
if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
targetDb = schema.getDatabaseName();
+ targetDbSet.add(targetDb);
}
if (StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(targetDb)) {
@@ -177,15 +179,35 @@
int sinkParallel =
sinkConfig.getInteger(
DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
+ String uidName = getUidName(targetDbSet, dbTbl);
sideOutput
.sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
.setParallelism(sinkParallel)
- .name(dbTbl.f1)
- .uid(dbTbl.f1);
+ .name(uidName)
+ .uid(uidName);
}
}
}
+ /**
+ * @param targetDbSet The set of target databases.
+ * @param dbTbl The database-table tuple.
+ * @return The UID of the DataStream.
+ */
+ public String getUidName(Set<String> targetDbSet, Tuple2<String, String> dbTbl) {
+ String uidName;
+ // Determine whether to proceed with multi-database synchronization.
+ // if yes, the UID is composed of `dbname_tablename`, otherwise it is composed of
+ // `tablename`.
+ if (targetDbSet.size() > 1) {
+ uidName = dbTbl.f0 + "_" + dbTbl.f1;
+ } else {
+ uidName = dbTbl.f1;
+ }
+
+ return uidName;
+ }
+
private DorisConnectionOptions getDorisConnectionOptions() {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);