[fix](cdc) add Oracle table name validation (#320)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
index f3da962..4cc9098 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -23,6 +23,7 @@
import java.util.Map;
public class TableSchema {
+ public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$";
private String database;
private String table;
private String tableComment;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index ef2e7ac..8ca66e4 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -32,6 +32,8 @@
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
@@ -118,6 +120,15 @@
if (!isSyncNeeded(tableName)) {
continue;
}
+ // Oracle allows table names to contain special characters such as /, #, $,
+ // etc., as in 'A/B'.
+ // However, Doris does not support tables with these characters.
+ if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) {
+ throw new CreateTableException(
+ String.format(
+ "The table name %s is invalid. Table names in Doris must match the regex pattern %s. Please consider renaming the table or use the 'excluding-tables' option to filter it out.",
+ tableName, TableSchema.DORIS_TABLE_REGEX));
+ }
SourceSchema sourceSchema =
new OracleSchema(
metaData, databaseName, schemaName, tableName, tableComment);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index da96f08..f4d6ba3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -125,7 +125,7 @@
String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
Preconditions.checkNotNull(databaseName, "database-name in sqlserver is required");
- Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is required");
+ Preconditions.checkNotNull(schemaName, "schema-name in sqlserver is required");
String tableName = config.get(JdbcSourceOptions.TABLE_NAME);
String hostname = config.get(JdbcSourceOptions.HOSTNAME);