[FLINK-28634][json] Add simple JsonSerDeSchema
diff --git a/flink-formats-kafka/flink-json-debezium/pom.xml b/flink-formats-kafka/flink-json-debezium/pom.xml
index 28c7b46..ab697cf 100644
--- a/flink-formats-kafka/flink-json-debezium/pom.xml
+++ b/flink-formats-kafka/flink-json-debezium/pom.xml
@@ -111,6 +111,12 @@
<!-- test utils dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
new file mode 100644
index 0000000..fd28712
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** DeserializationSchema that deserializes a JSON String. */
+@PublicEvolving
+public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<T> clazz;
+ private final SerializableSupplier<ObjectMapper> mapperFactory;
+ protected transient ObjectMapper mapper;
+
+ public JsonDeserializationSchema(Class<T> clazz) {
+ this(clazz, () -> new ObjectMapper());
+ }
+
+ public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
+ this(typeInformation, () -> new ObjectMapper());
+ }
+
+ public JsonDeserializationSchema(
+ Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory) {
+ super(clazz);
+ this.clazz = clazz;
+ this.mapperFactory = mapperFactory;
+ }
+
+ public JsonDeserializationSchema(
+ TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory) {
+ super(typeInformation);
+ this.clazz = typeInformation.getTypeClass();
+ this.mapperFactory = mapperFactory;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ mapper = mapperFactory.get();
+ }
+
+ @Override
+ public T deserialize(byte[] message) throws IOException {
+ return mapper.readValue(message, clazz);
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
index 55c61e1..36aa484 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
@@ -17,28 +17,19 @@
package org.apache.flink.formats.json;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import java.io.IOException;
-
/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
*
* <p>Fields can be accessed by calling objectNode.get(<name>).as(<type>)
*/
@PublicEvolving
-public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
+public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> {
- private static final long serialVersionUID = -1699854177598621044L;
+ private static final long serialVersionUID = 2L;
- private final ObjectMapper mapper = new ObjectMapper();
-
- @Override
- public ObjectNode deserialize(byte[] message) throws IOException {
- return mapper.readValue(message, ObjectNode.class);
+ public JsonNodeDeserializationSchema() {
+ super(ObjectNode.class);
}
}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
new file mode 100644
index 0000000..c029fa1
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** SerializationSchema that serializes an object to a JSON String. */
+@PublicEvolving
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableSupplier<ObjectMapper> mapperFactory;
+
+ protected transient ObjectMapper mapper;
+
+ public JsonSerializationSchema() {
+ this(() -> new ObjectMapper());
+ }
+
+ public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory) {
+ this.mapperFactory = mapperFactory;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ mapper = mapperFactory.get();
+ }
+
+ @Override
+ public byte[] serialize(T element) {
+ try {
+ return mapper.writeValueAsBytes(element);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(
+ String.format("Could not serialize value '%s'.", element), e);
+ }
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 741b492..2bd8dc5 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -17,6 +17,8 @@
package org.apache.flink.formats.json;
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -37,6 +39,7 @@
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
+ schema.open(new DummyInitializationContext());
ObjectNode deserializedValue = schema.deserialize(serializedValue);
assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
new file mode 100644
index 0000000..5ed992c
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonSerDeSchemaTest {
+ private static final JsonSerializationSchema<Event> SERIALIZATION_SCHEMA;
+ private static final JsonDeserializationSchema<Event> DESERIALIZATION_SCHEMA;
+ private static final String JSON = "{\"x\":34,\"y\":\"hello\"}";
+
+ static {
+ SERIALIZATION_SCHEMA = new JsonSerializationSchema<>();
+ SERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+ DESERIALIZATION_SCHEMA = new JsonDeserializationSchema<>(Event.class);
+ DESERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+ }
+
+ @Test
+ void testSrialization() throws IOException {
+ final byte[] serialized = SERIALIZATION_SCHEMA.serialize(new Event(34, "hello"));
+ assertThat(serialized).isEqualTo(JSON.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
+ void testDeserialization() throws IOException {
+ final Event deserialized =
+ DESERIALIZATION_SCHEMA.deserialize(JSON.getBytes(StandardCharsets.UTF_8));
+ assertThat(deserialized).isEqualTo(new Event(34, "hello"));
+ }
+
+ @Test
+ void testRoundTrip() throws IOException {
+ final Event original = new Event(34, "hello");
+
+ final byte[] serialized = SERIALIZATION_SCHEMA.serialize(original);
+
+ final Event deserialized = DESERIALIZATION_SCHEMA.deserialize(serialized);
+
+ assertThat(deserialized).isEqualTo(original);
+ }
+
+ private static class Event {
+
+ private int x;
+ private String y = null;
+
+ public Event() {}
+
+ public Event(int x, String y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public int getX() {
+ return x;
+ }
+
+ public void setX(int x) {
+ this.x = x;
+ }
+
+ public String getY() {
+ return y;
+ }
+
+ public void setY(String y) {
+ this.y = y;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Event event = (Event) o;
+ return x == event.x && Objects.equals(y, event.y);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(x, y);
+ }
+ }
+}