[FLINK-28621][core] Add central Jackson mapper factory methods
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
index fd28712..cc244b0 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,11 +38,11 @@
protected transient ObjectMapper mapper;
public JsonDeserializationSchema(Class<T> clazz) {
- this(clazz, () -> new ObjectMapper());
+ this(clazz, JacksonMapperFactory::createObjectMapper);
}
public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
- this(typeInformation, () -> new ObjectMapper());
+ this(typeInformation, JacksonMapperFactory::createObjectMapper);
}
public JsonDeserializationSchema(
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 805b299..9a57bac 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -26,6 +26,7 @@
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -97,7 +98,7 @@
@Override
public void open(InitializationContext context) throws Exception {
objectMapper =
- new ObjectMapper()
+ JacksonMapperFactory.createObjectMapper()
.configure(
JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
true);
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 6a8c619..c8b7f73 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -23,6 +23,7 @@
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -86,7 +87,7 @@
@Override
public void open(InitializationContext context) throws Exception {
mapper =
- new ObjectMapper()
+ JacksonMapperFactory.createObjectMapper()
.configure(
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
encodeDecimalAsPlainNumber);
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index b2b7e6d..dd4a9bb 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -30,6 +30,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -121,7 +122,7 @@
@Override
public void open(InitializationContext context) throws Exception {
- objectMapper = new ObjectMapper();
+ objectMapper = JacksonMapperFactory.createObjectMapper();
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
index fe41232..d776185 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -97,7 +98,7 @@
@SuppressWarnings("unchecked")
public static <T> TypeInformation<T> convert(String jsonSchema) {
Preconditions.checkNotNull(jsonSchema, "JSON schema");
- final ObjectMapper mapper = new ObjectMapper();
+ final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
mapper.getFactory()
.enable(JsonParser.Feature.ALLOW_COMMENTS)
.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index e789307..f185d21 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -28,6 +28,7 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@
@Override
public void open(InitializationContext context) throws Exception {
- mapper = new ObjectMapper();
+ mapper = JacksonMapperFactory.createObjectMapper();
}
/** Builder for {@link JsonRowSerializationSchema}. */
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 9075152..e6b2a3e 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.json;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -34,7 +35,7 @@
@Test
void testDeserialize() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
ObjectNode initialValue = mapper.createObjectNode();
initialValue.put("key", 4).put("value", "world");
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 888be3b..883c3f0 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -31,6 +31,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -83,6 +84,8 @@
*/
class JsonRowDataSerDeSchemaTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
@Test
void testSerDe() throws Exception {
byte tinyint = 'c';
@@ -115,11 +118,10 @@
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);
- ObjectMapper objectMapper = new ObjectMapper();
- ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+ ArrayNode doubleNode = OBJECT_MAPPER.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("bool", true);
root.put("tinyint", tinyint);
root.put("smallint", smallint);
@@ -139,7 +141,7 @@
root.putObject("multiSet").put("element", 2);
root.putObject("map2map").putObject("inner_map").put("key", 234);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
DataType dataType =
ROW(
@@ -220,8 +222,7 @@
double doubleValue = random.nextDouble();
float floatValue = random.nextFloat();
- ObjectMapper objectMapper = new ObjectMapper();
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("bool", String.valueOf(bool));
root.put("int", String.valueOf(integer));
root.put("bigint", String.valueOf(bigint));
@@ -230,7 +231,7 @@
root.put("float1", String.valueOf(floatValue));
root.put("float2", new BigDecimal(floatValue));
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
DataType dataType =
ROW(
@@ -296,11 +297,9 @@
true);
open(serializationSchema);
- ObjectMapper objectMapper = new ObjectMapper();
-
// the first row
{
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("f1", 1);
root.put("f2", true);
root.put("f3", "str");
@@ -312,7 +311,7 @@
ObjectNode row = root.putObject("f6");
row.put("f1", "this is row1");
row.put("f2", 12);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
RowData rowData = deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
assertThat(serializedJson).containsExactly(actual);
@@ -320,7 +319,7 @@
// the second row
{
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("f1", 10);
root.put("f2", false);
root.put("f3", "newStr");
@@ -332,7 +331,7 @@
ObjectNode row = root.putObject("f6");
row.put("f1", "this is row2");
row.putNull("f2");
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
RowData rowData = deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
assertThat(serializedJson).containsExactly(actual);
@@ -419,12 +418,10 @@
@Test
void testDeserializationMissingField() throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", 123123123);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
DataType dataType = ROW(FIELD("name", STRING()));
RowType schema = (RowType) dataType.getLogicalType();
@@ -504,14 +501,12 @@
true);
open(serializationSchema);
- ObjectMapper objectMapper = new ObjectMapper();
-
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("timestamp3", "1990-10-14 12:12:43.123");
root.put("timestamp9", "1990-10-14 12:12:43.123456789");
root.put("timestamp_with_local_timezone3", "1990-10-14 12:12:43.123Z");
root.put("timestamp_with_local_timezone9", "1990-10-14 12:12:43.123456789Z");
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
RowData rowData = deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
assertThat(serializedJson).containsExactly(actual);
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index deee53a..81e370c 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,6 +53,8 @@
/** Tests for the {@link JsonRowDeserializationSchema}. */
public class JsonRowDeserializationSchemaTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
@Rule public ExpectedException thrown = ExpectedException.none();
/** Tests simple deserialization using type information. */
@@ -73,10 +76,8 @@
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);
- ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", id);
root.put("name", name);
root.put("bytes", bytes);
@@ -89,7 +90,7 @@
root.putObject("map").put("flink", 123);
root.putObject("map2map").putObject("inner_map").put("key", 234);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema =
new JsonRowDeserializationSchema.Builder(
@@ -149,10 +150,8 @@
};
final String[] strings = new String[] {"one", "two", "three"};
- final ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", id.longValue());
root.putNull("idOrNull");
root.put("name", name);
@@ -164,7 +163,7 @@
root.putArray("strings").add("one").add("two").add("three");
root.putObject("nested").put("booleanField", true).put("decimalField", 12);
- final byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ final byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema =
new JsonRowDeserializationSchema.Builder(
@@ -212,12 +211,10 @@
/** Tests deserialization with non-existing field name. */
@Test
public void testMissingNode() throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", 123123123);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
TypeInformation<Row> rowTypeInformation =
Types.ROW_NAMED(new String[] {"name"}, Types.STRING);