fix decimal(65,18) loss of precision (#105)
* fix decimal(65,18) loss of precision
* Fix decimal 0 missing problem
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index bd685ef..60fca85 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -19,8 +19,10 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.HttpGetWithEntity;
@@ -74,12 +76,16 @@
this.database = tableInfo[0];
this.table = tableInfo[1];
this.sourceTableName = sourceTableName;
+ // Prevent loss of decimal data precision
+ this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+ JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+ this.objectMapper.setNodeFactory(jsonNodeFactory);
}
@Override
public byte[] serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
- JsonNode recordRoot = objectMapper.readTree(record);
+ JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
//schema change ddl