[fix](cdc) single sink add tableprefix and tablesuffix (#301)
Currently, when single-sink is enabled and CDC automatically creates a table, it cannot automatically obtain the suffix and suffix.
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 370bca7..5bea2d9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.writer.serializer;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
@@ -66,6 +67,8 @@
// create table properties
private Map<String, String> tableProperties;
private String targetDatabase;
+ private String targetTablePrefix;
+ private String targetTableSuffix;
private JsonDebeziumDataChange dataChange;
private JsonDebeziumSchemaChange schemaChange;
@@ -109,11 +112,15 @@
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping,
Map<String, String> tableProperties,
- String targetDatabase) {
+ String targetDatabase,
+ String targetTablePrefix,
+ String targetTableSuffix) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
+ this.targetTablePrefix = targetTablePrefix;
+ this.targetTableSuffix = targetTableSuffix;
init();
}
@@ -128,8 +135,9 @@
objectMapper,
pattern,
lineDelimiter,
- ignoreUpdateBefore);
-
+ ignoreUpdateBefore,
+ targetTablePrefix,
+ targetTableSuffix);
this.schemaChange =
newSchemaChange
? new JsonDebeziumSchemaChangeImplV2(changeContext)
@@ -180,6 +188,8 @@
private Map<String, String> tableMapping;
private Map<String, String> tableProperties;
private String targetDatabase;
+ private String targetTablePrefix = "";
+ private String targetTableSuffix = "";
public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -221,6 +231,20 @@
return this;
}
+ public Builder setTargetTablePrefix(String tablePrefix) {
+ if (!StringUtils.isNullOrWhitespaceOnly(tablePrefix)) {
+ this.targetTablePrefix = tablePrefix;
+ }
+ return this;
+ }
+
+ public Builder setTargetTableSuffix(String tableSuffix) {
+ if (!StringUtils.isNullOrWhitespaceOnly(tableSuffix)) {
+ this.targetTableSuffix = tableSuffix;
+ }
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(
dorisOptions,
@@ -230,7 +254,9 @@
executionOptions,
tableMapping,
tableProperties,
- targetDatabase);
+ targetDatabase,
+ targetTablePrefix,
+ targetTableSuffix);
}
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index 9c59f14..a7253d2 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -38,6 +38,8 @@
private final Pattern pattern;
private final String lineDelimiter;
private final boolean ignoreUpdateBefore;
+ private String targetTablePrefix;
+ private String targetTableSuffix;
public JsonDebeziumChangeContext(
DorisOptions dorisOptions,
@@ -48,7 +50,9 @@
ObjectMapper objectMapper,
Pattern pattern,
String lineDelimiter,
- boolean ignoreUpdateBefore) {
+ boolean ignoreUpdateBefore,
+ String targetTablePrefix,
+ String targetTableSuffix) {
this.dorisOptions = dorisOptions;
this.tableMapping = tableMapping;
this.sourceTableName = sourceTableName;
@@ -58,6 +62,8 @@
this.pattern = pattern;
this.lineDelimiter = lineDelimiter;
this.ignoreUpdateBefore = ignoreUpdateBefore;
+ this.targetTablePrefix = targetTablePrefix;
+ this.targetTableSuffix = targetTableSuffix;
}
public DorisOptions getDorisOptions() {
@@ -95,4 +101,12 @@
public boolean isIgnoreUpdateBefore() {
return ignoreUpdateBefore;
}
+
+ public String getTargetTablePrefix() {
+ return targetTablePrefix;
+ }
+
+ public String getTargetTableSuffix() {
+ return targetTableSuffix;
+ }
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 5604da7..b3a90e6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -70,6 +70,8 @@
// create table properties
private final Map<String, String> tableProperties;
private String targetDatabase;
+ private String targetTablePrefix;
+ private String targetTableSuffix;
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE);
@@ -81,6 +83,14 @@
this.tableProperties = changeContext.getTableProperties();
this.tableMapping = changeContext.getTableMapping();
this.objectMapper = changeContext.getObjectMapper();
+ this.targetTablePrefix =
+ changeContext.getTargetTablePrefix() == null
+ ? ""
+ : changeContext.getTargetTablePrefix();
+ this.targetTableSuffix =
+ changeContext.getTargetTableSuffix() == null
+ ? ""
+ : changeContext.getTargetTableSuffix();
}
@Override
@@ -253,7 +263,7 @@
private String getCreateTableIdentifier(JsonNode record) {
String table = extractJsonNode(record.get("source"), "table");
- return targetDatabase + "." + table;
+ return targetDatabase + "." + targetTablePrefix + table + targetTableSuffix;
}
private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index b6bb5e5..e153039 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -288,6 +288,8 @@
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
+ .setTargetTablePrefix(tablePrefix)
+ .setTargetTableSuffix(tableSuffix)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
index d789807..8900339 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -48,7 +48,9 @@
objectMapper,
null,
lineDelimiter,
- ignoreUpdateBefore);
+ ignoreUpdateBefore,
+ "",
+ "");
dataChange = new JsonDebeziumDataChange(changeContext);
}
@@ -109,7 +111,9 @@
objectMapper,
null,
lineDelimiter,
- false);
+ false,
+ "",
+ "");
dataChange = new JsonDebeziumDataChange(changeContext);
// update t1 set name='doris-update' WHERE id =1;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
index 9b003a8..c8997a1 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
@@ -55,7 +55,9 @@
objectMapper,
null,
lineDelimiter,
- ignoreUpdateBefore);
+ ignoreUpdateBefore,
+ "",
+ "");
schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index c63267d..0ce60d3 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -58,7 +58,9 @@
objectMapper,
null,
lineDelimiter,
- ignoreUpdateBefore);
+ ignoreUpdateBefore,
+ "",
+ "");
schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 242f93f..3390f75 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -72,7 +72,7 @@
private static final String TABLE_4 = "tbl4";
private static final MySQLContainer MYSQL_CONTAINER =
- new MySQLContainer("mysql")
+ new MySQLContainer("mysql:8.0")
.withDatabaseName(DATABASE)
.withUsername(MYSQL_USER)
.withPassword(MYSQL_PASSWD);