[fix](cdc) fix datetime precision and regular matching format errors after turning on single-sink (#298)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index cc5fe4b..e125a30 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -70,6 +70,8 @@
/** Max size of varchar type of Doris. */
public static final int MAX_VARCHAR_SIZE = 65533;
+ /* Max precision of datetime type of Doris. */
+ public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
public static DataType toFlinkType(
String columnName, String columnType, int precision, int scale) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e153039..e71ed51 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -80,7 +80,7 @@
protected String tablePrefix;
protected String tableSuffix;
protected boolean singleSink;
- private Map<String, String> tableMapping = new HashMap<>();
+ private final Map<String, String> tableMapping = new HashMap<>();
public abstract void registerDriver() throws SQLException;
@@ -93,7 +93,7 @@
/** Get the prefix of a specific tableList, for example, mysql is database, oracle is schema. */
public abstract String getTableListPrefix();
- public DatabaseSync() throws SQLException {
+ protected DatabaseSync() throws SQLException {
registerDriver();
}
@@ -315,6 +315,9 @@
.collect(Collectors.joining("|"));
} else {
// includingTablePattern and ^excludingPattern
+ if (includingTables == null) {
+ includingTables = ".*";
+ }
String includingPattern =
String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables);
if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
index 704b8b3..60a5eda 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -17,11 +17,20 @@
package org.apache.doris.flink.tools.cdc.mysql;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
import org.apache.doris.flink.catalog.doris.DorisType;
+import static org.apache.doris.flink.catalog.DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;
+
public class MysqlType {
+
+ // MySQL driver returns width of timestamp types instead of precision.
+ // 19 characters are used for zero-precision timestamps while others
+ // require 19 + precision + 1 characters with the additional character
+ // required for the decimal separator.
+ private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
private static final String BIT = "BIT";
private static final String BOOLEAN = "BOOLEAN";
private static final String BOOL = "BOOL";
@@ -145,8 +154,35 @@
return DorisType.DATE_V2;
case DATETIME:
case TIMESTAMP:
- int dtScale = length > 19 ? length - 20 : 0;
- return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(dtScale, 6));
+ // default precision is 0
+ // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+ if (length == null
+ || length <= 0
+ || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
+ return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
+ } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
+ // Timestamp with a fraction of seconds.
+ // For example, 2024-01-01 01:01:01.1
+ // The decimal point will occupy 1 character.
+ // Thus,the length of the timestamp is 21.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2,
+ Math.min(
+ length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
+ MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else if (length <= TimestampType.MAX_PRECISION) {
+ // For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2,
+ Math.min(length, MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported length: "
+ + length
+ + " for MySQL TIMESTAMP/DATETIME types");
+ }
case CHAR:
case VARCHAR:
Preconditions.checkNotNull(length);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 11df3e0..8aca521 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
@@ -267,4 +268,51 @@
Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName());
schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
}
+
+ @Test
+ public void testDateTimeFullOrigin() throws JsonProcessingException {
+ Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+ srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_0", new FieldSchema("test_dt_0", "DATETIMEV2(0)", null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_1", new FieldSchema("test_dt_1", "DATETIMEV2(1)", null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_3", new FieldSchema("test_dt_3", "DATETIMEV2(3)", null, null));
+ srcFiledSchemaMap.put(
+ "test_dt_6", new FieldSchema("test_dt_6", "DATETIMEV2(6)", null, null));
+ srcFiledSchemaMap.put(
+ "test_ts_0", new FieldSchema("test_ts_0", "DATETIMEV2(0)", null, null));
+ srcFiledSchemaMap.put(
+ "test_ts_1",
+ new FieldSchema("test_ts_1", "DATETIMEV2(1)", "current_timestamp", null));
+ srcFiledSchemaMap.put(
+ "test_ts_3",
+ new FieldSchema("test_ts_3", "DATETIMEV2(3)", "current_timestamp", null));
+ srcFiledSchemaMap.put(
+ "test_ts_6",
+ new FieldSchema("test_ts_6", "DATETIMEV2(6)", "current_timestamp", null));
+
+ schemaChange.setSourceConnector("mysql");
+ String columnsString =
+ "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_1\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":1,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_3\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":3,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_6\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":6,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_0\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_1\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":1,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_3\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":3,\"position\":8,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_6\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":6,\"position\":9,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}]},\"comment\":null}]}";
+ JsonNode columns = objectMapper.readTree(columnsString);
+ schemaChange.fillOriginSchema(columns);
+ Map<String, FieldSchema> originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap();
+
+ Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+ originFieldSchemaMap.entrySet().iterator();
+ for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+ FieldSchema srcFiledSchema = entry.getValue();
+ Entry<String, FieldSchema> originField = originFieldSchemaIterator.next();
+
+ Assert.assertEquals(entry.getKey(), originField.getKey());
+ Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName());
+ Assert.assertEquals(
+ srcFiledSchema.getTypeString(), originField.getValue().getTypeString());
+ Assert.assertEquals(
+ srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue());
+ Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment());
+ }
+ }
}