fix decimal(65,18) loss of precision (#105)

* fix decimal(65,18) loss of precision
* Fix decimal 0 missing problem
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index bd685ef..60fca85 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -19,8 +19,10 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
@@ -74,12 +76,16 @@
         this.database = tableInfo[0];
         this.table = tableInfo[1];
         this.sourceTableName = sourceTableName;
+        // Prevent loss of decimal data precision
+        this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+        JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+        this.objectMapper.setNodeFactory(jsonNodeFactory);
     }
 
     @Override
     public byte[] serialize(String record) throws IOException {
         LOG.debug("received debezium json data {} :", record);
-        JsonNode recordRoot = objectMapper.readTree(record);
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
             //schema change ddl