[fix](cdc) fix datetime precision and regular matching format errors after turning on single-sink (#298)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index cc5fe4b..e125a30 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -70,6 +70,8 @@
 
     /** Max size of varchar type of Doris. */
     public static final int MAX_VARCHAR_SIZE = 65533;
+    /* Max precision of datetime type of Doris. */
+    public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
 
     public static DataType toFlinkType(
             String columnName, String columnType, int precision, int scale) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e153039..e71ed51 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -80,7 +80,7 @@
     protected String tablePrefix;
     protected String tableSuffix;
     protected boolean singleSink;
-    private Map<String, String> tableMapping = new HashMap<>();
+    private final Map<String, String> tableMapping = new HashMap<>();
 
     public abstract void registerDriver() throws SQLException;
 
@@ -93,7 +93,7 @@
     /** Get the prefix of a specific tableList, for example, mysql is database, oracle is schema. */
     public abstract String getTableListPrefix();
 
-    public DatabaseSync() throws SQLException {
+    protected DatabaseSync() throws SQLException {
         registerDriver();
     }
 
@@ -315,6 +315,9 @@
                     .collect(Collectors.joining("|"));
         } else {
             // includingTablePattern and ^excludingPattern
+            if (includingTables == null) {
+                includingTables = ".*";
+            }
             String includingPattern =
                     String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables);
             if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
index 704b8b3..60a5eda 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -17,11 +17,20 @@
 
 package org.apache.doris.flink.tools.cdc.mysql;
 
+import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.doris.flink.catalog.doris.DorisType;
 
+import static org.apache.doris.flink.catalog.DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;
+
 public class MysqlType {
+
+    // MySQL driver returns width of timestamp types instead of precision.
+    // 19 characters are used for zero-precision timestamps while others
+    // require 19 + precision + 1 characters with the additional character
+    // required for the decimal separator.
+    private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
     private static final String BIT = "BIT";
     private static final String BOOLEAN = "BOOLEAN";
     private static final String BOOL = "BOOL";
@@ -145,8 +154,35 @@
                 return DorisType.DATE_V2;
             case DATETIME:
             case TIMESTAMP:
-                int dtScale = length > 19 ? length - 20 : 0;
-                return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(dtScale, 6));
+                // default precision is 0
+                // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+                if (length == null
+                        || length <= 0
+                        || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
+                    return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
+                } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
+                    // Timestamp with a fraction of seconds.
+                    // For example, 2024-01-01 01:01:01.1
+                    // The decimal point will occupy 1 character.
+                    // Thus,the length of the timestamp is 21.
+                    return String.format(
+                            "%s(%s)",
+                            DorisType.DATETIME_V2,
+                            Math.min(
+                                    length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
+                                    MAX_SUPPORTED_DATE_TIME_PRECISION));
+                } else if (length <= TimestampType.MAX_PRECISION) {
+                    // For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9.
+                    return String.format(
+                            "%s(%s)",
+                            DorisType.DATETIME_V2,
+                            Math.min(length, MAX_SUPPORTED_DATE_TIME_PRECISION));
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported length: "
+                                    + length
+                                    + " for MySQL TIMESTAMP/DATETIME types");
+                }
             case CHAR:
             case VARCHAR:
                 Preconditions.checkNotNull(length);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 11df3e0..8aca521 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
@@ -267,4 +268,51 @@
         Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName());
         schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
     }
+
+    @Test
+    public void testDateTimeFullOrigin() throws JsonProcessingException {
+        Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+        srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_0", new FieldSchema("test_dt_0", "DATETIMEV2(0)", null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_1", new FieldSchema("test_dt_1", "DATETIMEV2(1)", null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_3", new FieldSchema("test_dt_3", "DATETIMEV2(3)", null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_6", new FieldSchema("test_dt_6", "DATETIMEV2(6)", null, null));
+        srcFiledSchemaMap.put(
+                "test_ts_0", new FieldSchema("test_ts_0", "DATETIMEV2(0)", null, null));
+        srcFiledSchemaMap.put(
+                "test_ts_1",
+                new FieldSchema("test_ts_1", "DATETIMEV2(1)", "current_timestamp", null));
+        srcFiledSchemaMap.put(
+                "test_ts_3",
+                new FieldSchema("test_ts_3", "DATETIMEV2(3)", "current_timestamp", null));
+        srcFiledSchemaMap.put(
+                "test_ts_6",
+                new FieldSchema("test_ts_6", "DATETIMEV2(6)", "current_timestamp", null));
+
+        schemaChange.setSourceConnector("mysql");
+        String columnsString =
+                "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_1\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":1,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_3\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":3,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_6\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":6,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_0\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_1\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":1,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_3\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":3,\"position\":8,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_6\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":6,\"position\":9,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}]},\"comment\":null}]}";
+        JsonNode columns = objectMapper.readTree(columnsString);
+        schemaChange.fillOriginSchema(columns);
+        Map<String, FieldSchema> originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap();
+
+        Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+                originFieldSchemaMap.entrySet().iterator();
+        for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+            FieldSchema srcFiledSchema = entry.getValue();
+            Entry<String, FieldSchema> originField = originFieldSchemaIterator.next();
+
+            Assert.assertEquals(entry.getKey(), originField.getKey());
+            Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName());
+            Assert.assertEquals(
+                    srcFiledSchema.getTypeString(), originField.getValue().getTypeString());
+            Assert.assertEquals(
+                    srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue());
+            Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment());
+        }
+    }
 }