[Improve](cdc) Fix default value in cdc (#305)

1. Currently, creating a table does not support default value.
2. Currently, when the schema changes, quotation marks will not be added to the default value.
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 45266e5..86745fa 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -246,12 +246,20 @@
         if (isKey && DorisType.STRING.equals(fieldType)) {
             fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
         }
-        sql.append(identifier(field.getName()))
-                .append(" ")
-                .append(fieldType)
-                .append(" COMMENT '")
-                .append(quoteComment(field.getComment()))
-                .append("',");
+        sql.append(identifier(field.getName())).append(" ").append(fieldType);
+
+        if (field.getDefaultValue() != null) {
+            sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
+        }
+        sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
+    }
+
+    public static String quoteDefaultValue(String defaultValue) {
+        // DEFAULT current_timestamp not need quote
+        if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+            return defaultValue;
+        }
+        return "'" + defaultValue + "'";
     }
 
     public static String quoteComment(String comment) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 580b990..4c29c34 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -110,8 +110,8 @@
                         DorisSystem.quoteTableIdentifier(tableIdentifier),
                         DorisSystem.identifier(name),
                         type);
-        if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
-            addDDL = addDDL + " DEFAULT " + defaultValue;
+        if (defaultValue != null) {
+            addDDL = addDDL + " DEFAULT " + DorisSystem.quoteDefaultValue(defaultValue);
         }
         if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
             addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'";
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index b3a90e6..08e8f05 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -369,17 +369,11 @@
         if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
             return null;
         }
-        // Due to historical reasons, doris needs to add quotes to
-        // the default value of the new column
-        // For example in mysql: alter table add column c1 int default 100
-        // In Doris: alter table add column c1 int default '100'
-        if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
-            return defaultValue;
-        } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+        if (defaultValue.equals("1970-01-01 00:00:00")) {
             // TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
             return "current_timestamp";
         }
-        return "'" + defaultValue + "'";
+        return defaultValue;
     }
 
     @VisibleForTesting
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 1ad09d7..c525202 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -59,17 +59,19 @@
                 String fieldName = rs.getString("COLUMN_NAME");
                 String comment = rs.getString("REMARKS");
                 String fieldType = rs.getString("TYPE_NAME");
+                String defaultValue = rs.getString("COLUMN_DEF");
                 Integer precision = rs.getInt("COLUMN_SIZE");
-
                 if (rs.wasNull()) {
                     precision = null;
                 }
+
                 Integer scale = rs.getInt("DECIMAL_DIGITS");
                 if (rs.wasNull()) {
                     scale = null;
                 }
                 String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
-                fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
+                fields.put(
+                        fieldName, new FieldSchema(fieldName, dorisTypeStr, defaultValue, comment));
             }
         }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
index b577832..17a71f8 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
@@ -33,15 +33,15 @@
 
     @Before
     public void setUp() {
-        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
-        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
-        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
+        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", null, ""));
 
-        updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
-        updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
-        updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
-        updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", "", ""));
-        updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", "", ""));
+        updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+        updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+        updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", null, ""));
+        updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", null, ""));
+        updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", null, ""));
     }
 
     @Test
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 977f8da..5b553fc 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -113,6 +113,18 @@
         Assert.assertEquals(
                 "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 'comment \"\\'sdf\\''",
                 addColumnDDL);
+
+        field = new FieldSchema("col", "int", "10", "comment \"'sdf'");
+        addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT '10' COMMENT 'comment \"\\'sdf\\''",
+                addColumnDDL);
+
+        field = new FieldSchema("col", "int", "current_timestamp", "comment \"'sdf'");
+        addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT current_timestamp COMMENT 'comment \"\\'sdf\\''",
+                addColumnDDL);
     }
 
     @Test
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 0ce60d3..11df3e0 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -154,7 +154,7 @@
         srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null));
         srcFiledSchemaMap.put(
                 "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null));
-        srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null));
+        srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
 
         schemaChange.setSourceConnector("mysql");
         String columnsString =