[FLINK-33191][Connector/Kafka] Remove dependency on Flink Shaded
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java
new file mode 100644
index 0000000..c9301c7
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.util;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+/** Factory for Jackson mappers. */
+public final class JacksonMapperFactory {
+
+ public static ObjectMapper createObjectMapper() {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ registerModules(objectMapper);
+ return objectMapper;
+ }
+
+ public static ObjectMapper createObjectMapper(JsonFactory jsonFactory) {
+ final ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
+ registerModules(objectMapper);
+ return objectMapper;
+ }
+
+ private static void registerModules(ObjectMapper mapper) {
+ mapper.registerModule(new JavaTimeModule())
+ .registerModule(new Jdk8Module().configureAbsentsAsNulls(true))
+ .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS)
+ .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ private JacksonMapperFactory() {}
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index e2b428e..970bad1 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -20,13 +20,12 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
-import org.apache.flink.util.jackson.JacksonMapperFactory;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
index e764c86..d61b7f8 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
@@ -18,17 +18,16 @@
package org.apache.flink.connector.kafka.source.reader.deserializer;
+import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.jackson.JacksonMapperFactory;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.serialization.StringDeserializer;
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
index ddbcf1c..a5abb5e 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -17,13 +17,12 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
-import org.apache.flink.util.jackson.JacksonMapperFactory;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
diff --git a/pom.xml b/pom.xml
index 96e4521..607916d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<zookeeper.version>3.5.9</zookeeper.version>
<confluent.version>7.2.2</confluent.version>
- <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
+ <jackson-bom.version>2.15.2</jackson-bom.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.9.1</junit5.version>
<assertj.version>3.23.1</assertj.version>
@@ -80,13 +80,25 @@
</properties>
<dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-jackson</artifactId>
- <version>2.13.4-16.1</version>
- </dependency>
-
<!-- Root dependencies for all projects -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <!-- Java 8 Date/time -->
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ </dependency>
+ <dependency>
+ <!-- Java 8 Datatypes -->
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jdk8</artifactId>
+ </dependency>
<!-- Logging API -->
<dependency>