[FLINK-28621][core] Add central Jackson mapper factory methods
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
index fd28712..cc244b0 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -37,11 +38,11 @@
     protected transient ObjectMapper mapper;
 
     public JsonDeserializationSchema(Class<T> clazz) {
-        this(clazz, () -> new ObjectMapper());
+        this(clazz, JacksonMapperFactory::createObjectMapper);
     }
 
     public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
-        this(typeInformation, () -> new ObjectMapper());
+        this(typeInformation, JacksonMapperFactory::createObjectMapper);
     }
 
     public JsonDeserializationSchema(
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 805b299..9a57bac 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -26,6 +26,7 @@
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -97,7 +98,7 @@
     @Override
     public void open(InitializationContext context) throws Exception {
         objectMapper =
-                new ObjectMapper()
+                JacksonMapperFactory.createObjectMapper()
                         .configure(
                                 JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
                                 true);
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 6a8c619..c8b7f73 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -23,6 +23,7 @@
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -86,7 +87,7 @@
     @Override
     public void open(InitializationContext context) throws Exception {
         mapper =
-                new ObjectMapper()
+                JacksonMapperFactory.createObjectMapper()
                         .configure(
                                 JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
                                 encodeDecimalAsPlainNumber);
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index b2b7e6d..dd4a9bb 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -30,6 +30,7 @@
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -121,7 +122,7 @@
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         if (hasDecimalType) {
             objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         }
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
index fe41232..d776185 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -97,7 +98,7 @@
     @SuppressWarnings("unchecked")
     public static <T> TypeInformation<T> convert(String jsonSchema) {
         Preconditions.checkNotNull(jsonSchema, "JSON schema");
-        final ObjectMapper mapper = new ObjectMapper();
+        final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         mapper.getFactory()
                 .enable(JsonParser.Feature.ALLOW_COMMENTS)
                 .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index e789307..f185d21 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -28,6 +28,7 @@
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.WrappingRuntimeException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        mapper = new ObjectMapper();
+        mapper = JacksonMapperFactory.createObjectMapper();
     }
 
     /** Builder for {@link JsonRowSerializationSchema}. */
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 9075152..e6b2a3e 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.formats.json;
 
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -34,7 +35,7 @@
 
     @Test
     void testDeserialize() throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         ObjectNode initialValue = mapper.createObjectNode();
         initialValue.put("key", 4).put("value", "world");
         byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 888be3b..883c3f0 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -31,6 +31,7 @@
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -83,6 +84,8 @@
  */
 class JsonRowDataSerDeSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Test
     void testSerDe() throws Exception {
         byte tinyint = 'c';
@@ -115,11 +118,10 @@
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-        ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+        ArrayNode doubleNode = OBJECT_MAPPER.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
 
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("bool", true);
         root.put("tinyint", tinyint);
         root.put("smallint", smallint);
@@ -139,7 +141,7 @@
         root.putObject("multiSet").put("element", 2);
         root.putObject("map2map").putObject("inner_map").put("key", 234);
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType =
                 ROW(
@@ -220,8 +222,7 @@
         double doubleValue = random.nextDouble();
         float floatValue = random.nextFloat();
 
-        ObjectMapper objectMapper = new ObjectMapper();
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("bool", String.valueOf(bool));
         root.put("int", String.valueOf(integer));
         root.put("bigint", String.valueOf(bigint));
@@ -230,7 +231,7 @@
         root.put("float1", String.valueOf(floatValue));
         root.put("float2", new BigDecimal(floatValue));
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType =
                 ROW(
@@ -296,11 +297,9 @@
                         true);
         open(serializationSchema);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // the first row
         {
-            ObjectNode root = objectMapper.createObjectNode();
+            ObjectNode root = OBJECT_MAPPER.createObjectNode();
             root.put("f1", 1);
             root.put("f2", true);
             root.put("f3", "str");
@@ -312,7 +311,7 @@
             ObjectNode row = root.putObject("f6");
             row.put("f1", "this is row1");
             row.put("f2", 12);
-            byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+            byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
             RowData rowData = deserializationSchema.deserialize(serializedJson);
             byte[] actual = serializationSchema.serialize(rowData);
             assertThat(serializedJson).containsExactly(actual);
@@ -320,7 +319,7 @@
 
         // the second row
         {
-            ObjectNode root = objectMapper.createObjectNode();
+            ObjectNode root = OBJECT_MAPPER.createObjectNode();
             root.put("f1", 10);
             root.put("f2", false);
             root.put("f3", "newStr");
@@ -332,7 +331,7 @@
             ObjectNode row = root.putObject("f6");
             row.put("f1", "this is row2");
             row.putNull("f2");
-            byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+            byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
             RowData rowData = deserializationSchema.deserialize(serializedJson);
             byte[] actual = serializationSchema.serialize(rowData);
             assertThat(serializedJson).containsExactly(actual);
@@ -419,12 +418,10 @@
 
     @Test
     void testDeserializationMissingField() throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", 123123123);
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType = ROW(FIELD("name", STRING()));
         RowType schema = (RowType) dataType.getLogicalType();
@@ -504,14 +501,12 @@
                         true);
         open(serializationSchema);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("timestamp3", "1990-10-14 12:12:43.123");
         root.put("timestamp9", "1990-10-14 12:12:43.123456789");
         root.put("timestamp_with_local_timezone3", "1990-10-14 12:12:43.123Z");
         root.put("timestamp_with_local_timezone9", "1990-10-14 12:12:43.123456789Z");
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
         RowData rowData = deserializationSchema.deserialize(serializedJson);
         byte[] actual = serializationSchema.serialize(rowData);
         assertThat(serializedJson).containsExactly(actual);
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index deee53a..81e370c 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,6 +53,8 @@
 /** Tests for the {@link JsonRowDeserializationSchema}. */
 public class JsonRowDeserializationSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Rule public ExpectedException thrown = ExpectedException.none();
 
     /** Tests simple deserialization using type information. */
@@ -73,10 +76,8 @@
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", id);
         root.put("name", name);
         root.put("bytes", bytes);
@@ -89,7 +90,7 @@
         root.putObject("map").put("flink", 123);
         root.putObject("map2map").putObject("inner_map").put("key", 234);
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         JsonRowDeserializationSchema deserializationSchema =
                 new JsonRowDeserializationSchema.Builder(
@@ -149,10 +150,8 @@
                 };
         final String[] strings = new String[] {"one", "two", "three"};
 
-        final ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", id.longValue());
         root.putNull("idOrNull");
         root.put("name", name);
@@ -164,7 +163,7 @@
         root.putArray("strings").add("one").add("two").add("three");
         root.putObject("nested").put("booleanField", true).put("decimalField", 12);
 
-        final byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        final byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         JsonRowDeserializationSchema deserializationSchema =
                 new JsonRowDeserializationSchema.Builder(
@@ -212,12 +211,10 @@
     /** Tests deserialization with non-existing field name. */
     @Test
     public void testMissingNode() throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", 123123123);
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         TypeInformation<Row> rowTypeInformation =
                 Types.ROW_NAMED(new String[] {"name"}, Types.STRING);