[fix](cdc)fix the issue caused by Oracle table names containing slash (#355)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index 86d6336..b421aff 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -21,8 +21,10 @@
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
/**
* JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related
@@ -38,7 +40,15 @@
String tableComment)
throws Exception {
super(databaseName, schemaName, tableName, tableComment);
- fields = new LinkedHashMap<>();
+ fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
+ primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName);
+ }
+
+ public LinkedHashMap<String, FieldSchema> getColumnInfo(
+ DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
+ throws SQLException {
+ LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>();
+ //
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
@@ -57,14 +67,21 @@
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}
+ return fields;
+ }
- primaryKeys = new ArrayList<>();
+ public List<String> getPrimaryKeys(
+ DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
+ throws SQLException {
+ List<String> primaryKeys = new ArrayList<>();
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
}
}
+
+ return primaryKeys;
}
public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index f3a8c8b..945c839 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -32,8 +32,6 @@
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
-import org.apache.doris.flink.catalog.doris.TableSchema;
-import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
@@ -120,15 +118,6 @@
if (!isSyncNeeded(tableName)) {
continue;
}
- // Oracle allows table names to contain special characters such as /, #, $,
- // etc., as in 'A/B'.
- // However, Doris does not support tables with these characters.
- if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) {
- throw new CreateTableException(
- String.format(
- "The table name %s is invalid. Table names in Doris must match the regex pattern %s. Please consider renaming the table or use the 'excluding-tables' option to filter it out.",
- tableName, TableSchema.DORIS_TABLE_REGEX));
- }
SourceSchema sourceSchema =
new OracleSchema(
metaData, databaseName, schemaName, tableName, tableComment);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
index e059181..71e4477 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -17,9 +17,12 @@
package org.apache.doris.flink.tools.cdc.oracle;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
public class OracleSchema extends JdbcSourceSchema {
@@ -42,4 +45,16 @@
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}
+
+ @Override
+ public LinkedHashMap<String, FieldSchema> getColumnInfo(
+ DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
+ throws SQLException {
+ // Oracle permits table names to include special characters such as /,
+ // etc., as in 'A/B'.
+ // When attempting to fetch column information for `A/B` via JDBC,
+ // it may throw an ORA-01424 error.
+ // Hence, we substitute `/` with '_' to address the issue.
+ return super.getColumnInfo(metaData, databaseName, schemaName, tableName.replace("/", "_"));
+ }
}