fix debeziumSourcefunction ddl parse (#201)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index 38ae229..b2d88c6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -49,6 +49,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -197,7 +198,7 @@
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(ddlSchema);
status = doSchemaChange && execSchemaChange(ddlSql);
- LOG.info("schema change status:{}", status);
+ LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
@@ -220,12 +221,14 @@
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
- JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, "historyRecord"));
+ JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
- JsonNode tableChange = tableChanges.get(0);
String ddl = extractJsonNode(historyRecord, "ddl");
+ if(Objects.isNull(tableChanges) || Objects.isNull(ddl)){
+ return new ArrayList<>();
+ }
LOG.debug("received debezium ddl :{}", ddl);
-
+ JsonNode tableChange = tableChanges.get(0);
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
return null;
@@ -388,12 +391,17 @@
return recordMap != null ? recordMap : new HashMap<>();
}
- public String extractDDL(JsonNode record) throws JsonProcessingException {
- String historyRecord = extractJsonNode(record, "historyRecord");
- if (Objects.isNull(historyRecord)) {
- return null;
+ private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
+ if(record.has("historyRecord")){
+ return objectMapper.readTree(record.get("historyRecord").asText());
}
- String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
+ //The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction
+ return record;
+ }
+
+ public String extractDDL(JsonNode record) throws JsonProcessingException {
+ JsonNode historyRecord = extractHistoryRecord(record);
+ String ddl = extractJsonNode(historyRecord, "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index bfd974f..0049579 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -162,6 +162,7 @@
.tableList(schemaName + "." + tableName)
.username(username)
.password(password)
+ .includeSchemaChanges(true)
.startupOptions(startupOptions)
.deserializer(schema)
.debeziumProperties(debeziumProperties)