[Bug] fix can not create table problem (#252)
Co-authored-by: wudi <>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 29d9cee..25d06ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -93,6 +93,9 @@
private SchemaChangeManager schemaChangeManager;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
+ // create table properties
+ private Map<String, String> tableProperties;
+ private String targetDatabase;
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
@@ -134,24 +137,19 @@
String sourceTableName,
boolean newSchemaChange,
DorisExecutionOptions executionOptions,
- Map<String, String> tableMapping) {
+ Map<String, String> tableMapping,
+ Map<String, String> tableProperties,
+ String targetDatabase) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
this.tableMapping = tableMapping;
+ this.tableProperties = tableProperties;
+ this.targetDatabase = targetDatabase;
}
@Override
public DorisRecord serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
-
- //Filter out table records that are not in tableMapping
- String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
- String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
- if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
- LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
- return null;
- }
-
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
@@ -166,6 +164,15 @@
if (newSchemaChange && firstLoad) {
initOriginFieldSchema(recordRoot);
}
+
+ //Filter out table records that are not in tableMapping
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
+ if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
+ LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
+ return null;
+ }
+
Map<String, Object> valueMap;
switch (op) {
case OP_READ:
@@ -313,11 +320,16 @@
@VisibleForTesting
public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
+ if(sourceConnector == null){
+ sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
+ }
+
String dorisTable = getCreateTableIdentifier(record);
JsonNode tableChange = extractTableChange(record);
JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
JsonNode columns = tableChange.get("table").get("columns");
- String tblComment = tableChange.get("table").get("comment").asText();
+ JsonNode comment = tableChange.get("table").get("comment");
+ String tblComment = comment == null ? "" : comment.asText();
Map<String, FieldSchema> field = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(field, column);
@@ -333,6 +345,7 @@
tableSchema.setKeys(pkList);
tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
tableSchema.setTableComment(tblComment);
+ tableSchema.setProperties(tableProperties);
String[] split = dorisTable.split("\\.");
Preconditions.checkArgument(split.length == 2);
@@ -402,9 +415,8 @@
}
public String getCreateTableIdentifier(JsonNode record){
- String db = extractJsonNode(record.get("source"), "db");
String table = extractJsonNode(record.get("source"), "table");
- return db + "." + table;
+ return targetDatabase + "." + table;
}
public String getDorisTableIdentifier(String cdcTableIdentifier){
@@ -657,6 +669,8 @@
private boolean newSchemaChange;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
+ private Map<String, String> tableProperties;
+ private String targetDatabase;
public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -688,9 +702,19 @@
return this;
}
+ public Builder setTableProperties(Map<String, String> tableProperties) {
+ this.tableProperties = tableProperties;
+ return this;
+ }
+
+ public Builder setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange,
- executionOptions, tableMapping);
+ executionOptions, tableMapping, tableProperties, targetDatabase);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 02ab034..fe5357e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -242,6 +242,8 @@
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
+ .setTableProperties(tableConfig)
+ .setTargetDatabase(database)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index a3e01d3..a86b2ab 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -122,9 +122,7 @@
.username(config.get(MySqlSourceOptions.USERNAME))
.password(config.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
- .tableList(tableName)
- //default open add newly table
- .scanNewlyAddedTableEnabled(true);
+ .tableList(tableName);
config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
config