[Improve](cdc) Fix default value in cdc (#305)
1. Currently, creating a table does not support default value.
2. Currently, when the schema changes, quotation marks will not be added to the default value.
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 45266e5..86745fa 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -246,12 +246,20 @@
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
- sql.append(identifier(field.getName()))
- .append(" ")
- .append(fieldType)
- .append(" COMMENT '")
- .append(quoteComment(field.getComment()))
- .append("',");
+ sql.append(identifier(field.getName())).append(" ").append(fieldType);
+
+ if (field.getDefaultValue() != null) {
+ sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
+ }
+ sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
+ }
+
+ public static String quoteDefaultValue(String defaultValue) {
+ // DEFAULT current_timestamp not need quote
+ if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+ return defaultValue;
+ }
+ return "'" + defaultValue + "'";
}
public static String quoteComment(String comment) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 580b990..4c29c34 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -110,8 +110,8 @@
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(name),
type);
- if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
- addDDL = addDDL + " DEFAULT " + defaultValue;
+ if (defaultValue != null) {
+ addDDL = addDDL + " DEFAULT " + DorisSystem.quoteDefaultValue(defaultValue);
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'";
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index b3a90e6..08e8f05 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -369,17 +369,11 @@
if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
return null;
}
- // Due to historical reasons, doris needs to add quotes to
- // the default value of the new column
- // For example in mysql: alter table add column c1 int default 100
- // In Doris: alter table add column c1 int default '100'
- if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
- return defaultValue;
- } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+ if (defaultValue.equals("1970-01-01 00:00:00")) {
// TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
return "current_timestamp";
}
- return "'" + defaultValue + "'";
+ return defaultValue;
}
@VisibleForTesting
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 1ad09d7..c525202 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -59,17 +59,19 @@
String fieldName = rs.getString("COLUMN_NAME");
String comment = rs.getString("REMARKS");
String fieldType = rs.getString("TYPE_NAME");
+ String defaultValue = rs.getString("COLUMN_DEF");
Integer precision = rs.getInt("COLUMN_SIZE");
-
if (rs.wasNull()) {
precision = null;
}
+
Integer scale = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
- fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
+ fields.put(
+ fieldName, new FieldSchema(fieldName, dorisTypeStr, defaultValue, comment));
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
index b577832..17a71f8 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
@@ -33,15 +33,15 @@
@Before
public void setUp() {
- originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
- originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
- originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
+ originFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+ originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+ originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", null, ""));
- updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
- updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
- updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
- updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", "", ""));
- updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", "", ""));
+ updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+ updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+ updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", null, ""));
+ updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", null, ""));
+ updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", null, ""));
}
@Test
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 977f8da..5b553fc 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -113,6 +113,18 @@
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 'comment \"\\'sdf\\''",
addColumnDDL);
+
+ field = new FieldSchema("col", "int", "10", "comment \"'sdf'");
+ addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT '10' COMMENT 'comment \"\\'sdf\\''",
+ addColumnDDL);
+
+ field = new FieldSchema("col", "int", "current_timestamp", "comment \"'sdf'");
+ addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT current_timestamp COMMENT 'comment \"\\'sdf\\''",
+ addColumnDDL);
}
@Test
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 0ce60d3..11df3e0 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -154,7 +154,7 @@
srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null));
srcFiledSchemaMap.put(
"test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null));
- srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null));
+ srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
schemaChange.setSourceConnector("mysql");
String columnsString =