fix debeziumSourcefunction ddl parse (#201)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index 38ae229..b2d88c6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -49,6 +49,7 @@
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -197,7 +198,7 @@
                 String ddlSql = ddlSqlList.get(i);
                 boolean doSchemaChange = checkSchemaChange(ddlSchema);
                 status = doSchemaChange && execSchemaChange(ddlSql);
-                LOG.info("schema change status:{}", status);
+                LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
             }
         } catch (Exception ex) {
             LOG.warn("schema change error :", ex);
@@ -220,12 +221,14 @@
 
     @VisibleForTesting
     public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
-        JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, "historyRecord"));
+        JsonNode historyRecord = extractHistoryRecord(record);
         JsonNode tableChanges = historyRecord.get("tableChanges");
-        JsonNode tableChange = tableChanges.get(0);
         String ddl = extractJsonNode(historyRecord, "ddl");
+        if(Objects.isNull(tableChanges) || Objects.isNull(ddl)){
+            return new ArrayList<>();
+        }
         LOG.debug("received debezium ddl :{}", ddl);
-
+        JsonNode tableChange = tableChanges.get(0);
         Matcher matcher = addDropDDLPattern.matcher(ddl);
         if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
             return null;
@@ -388,12 +391,17 @@
         return recordMap != null ? recordMap : new HashMap<>();
     }
 
-    public String extractDDL(JsonNode record) throws JsonProcessingException {
-        String historyRecord = extractJsonNode(record, "historyRecord");
-        if (Objects.isNull(historyRecord)) {
-            return null;
+    private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
+        if(record.has("historyRecord")){
+            return objectMapper.readTree(record.get("historyRecord").asText());
         }
-        String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
+        //The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction
+        return record;
+    }
+
+    public String extractDDL(JsonNode record) throws JsonProcessingException {
+        JsonNode historyRecord = extractHistoryRecord(record);
+        String ddl = extractJsonNode(historyRecord, "ddl");
         LOG.debug("received debezium ddl :{}", ddl);
         if (!Objects.isNull(ddl)) {
             //filter add/drop operation
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index bfd974f..0049579 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -162,6 +162,7 @@
                     .tableList(schemaName + "." + tableName)
                     .username(username)
                     .password(password)
+                    .includeSchemaChanges(true)
                     .startupOptions(startupOptions)
                     .deserializer(schema)
                     .debeziumProperties(debeziumProperties)