[connector-jdbc][bugfix] fix oracle create table comment special string bug (#7012)
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 3775f9f..510c28b 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -355,11 +355,18 @@
throw new UnsupportedOperationException();
}
+ protected List<String> getCreateTableSqls(TablePath tablePath, CatalogTable table) {
+ return Collections.singletonList(getCreateTableSql(tablePath, table));
+ }
+
protected void createTableInternal(TablePath tablePath, CatalogTable table)
throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
try {
- executeInternal(dbUrl, getCreateTableSql(tablePath, table));
+ final List<String> createTableSqlList = getCreateTableSqls(tablePath, table);
+ for (String sql : createTableSqlList) {
+ executeInternal(dbUrl, sql);
+ }
} catch (Exception e) {
throw new CatalogException(
String.format("Failed creating table %s", tablePath.getFullName()), e);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 1a3ac81..b51369e 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -117,22 +117,11 @@
@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
- return new OracleCreateTableSqlBuilder(table).build(tablePath);
+ return new OracleCreateTableSqlBuilder(table).build(tablePath).get(0);
}
- @Override
- protected void createTableInternal(TablePath tablePath, CatalogTable table)
- throws CatalogException {
- String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
- try {
- String createTableSQL = getCreateTableSql(tablePath, table);
- for (String sql : createTableSQL.split(";")) {
- executeInternal(dbUrl, sql);
- }
- } catch (Exception e) {
- // fallback to super
- super.createTableInternal(tablePath, table);
- }
+ protected List<String> getCreateTableSqls(TablePath tablePath, CatalogTable table) {
+ return new OracleCreateTableSqlBuilder(table).build(tablePath);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
index f5717c7..6afbfcf 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
@@ -27,6 +27,7 @@
import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -45,7 +46,8 @@
this.fieldIde = catalogTable.getOptions().get("fieldIde");
}
- public String build(TablePath tablePath) {
+ public List<String> build(TablePath tablePath) {
+ List<String> sqls = new ArrayList<>();
StringBuilder createTableSql = new StringBuilder();
createTableSql
.append("CREATE TABLE ")
@@ -66,7 +68,7 @@
createTableSql.append(String.join(",\n", columnSqls));
createTableSql.append("\n)");
-
+ sqls.add(createTableSql.toString());
List<String> commentSqls =
columns.stream()
.filter(column -> StringUtils.isNotBlank(column.getComment()))
@@ -75,13 +77,8 @@
buildColumnCommentSql(
column, tablePath.getSchemaAndTableName("\"")))
.collect(Collectors.toList());
-
- if (!commentSqls.isEmpty()) {
- createTableSql.append(";\n");
- createTableSql.append(String.join(";\n", commentSqls));
- }
-
- return createTableSql.toString();
+ sqls.addAll(commentSqls);
+ return sqls;
}
private String buildColumnSql(Column column) {
@@ -134,7 +131,7 @@
columnCommentSql
.append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\""))
.append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
- .append(column.getComment())
+ .append(column.getComment().replace("'", "''"))
.append("'");
return columnCommentSql.toString();
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
index f4f3010..717f72e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
@@ -221,6 +222,8 @@
@Test
public void testCatalog() {
TablePath tablePathOracle = TablePath.of("XE", "TESTUSER", "E2E_TABLE_SOURCE_LOWER");
+ TablePath tablePathOracleCreateTablePath =
+ TablePath.of("XE", "TESTUSER", "E2E_TABLE_SOURCE_LOWER_AUTO");
OracleCatalog oracleCatalog =
new OracleCatalog(
"Oracle",
@@ -230,9 +233,29 @@
jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())),
SCHEMA);
oracleCatalog.open();
+ catalog.executeSql(
+ tablePathOracle,
+ "comment on column E2E_TABLE_SOURCE_LOWER.CHAR_10_COL is '\"#¥%……&*();;'',,..``````//''@特殊注释''\\''\"'");
Assertions.assertTrue(oracleCatalog.tableExists(tablePathOracle));
+ Assertions.assertEquals(
+ oracleCatalog
+ .getTable(tablePathOracle)
+ .getTableSchema()
+ .getColumns()
+ .get(1)
+ .getComment(),
+ "\"#¥%……&*();;',,..``````//'@特殊注释'\\'\"");
oracleCatalog.truncateTable(tablePathOracle, true);
Assertions.assertFalse(oracleCatalog.isExistsData(tablePathOracle));
+ // create table with comment
+ Assertions.assertFalse(oracleCatalog.tableExists(tablePathOracleCreateTablePath));
+ oracleCatalog.createTable(
+ tablePathOracleCreateTablePath, oracleCatalog.getTable(tablePathOracle), true);
+ Assertions.assertTrue(oracleCatalog.tableExists(tablePathOracleCreateTablePath));
+ final CatalogTable table = oracleCatalog.getTable(tablePathOracleCreateTablePath);
+ Assertions.assertEquals(
+ table.getTableSchema().getColumns().get(1).getComment(),
+ "\"#¥%……&*();;',,..``````//'@特殊注释'\\'\"");
oracleCatalog.close();
}
}