[FLINK-28634][json] Add simple JsonSerDeSchema
diff --git a/flink-formats-kafka/flink-json-debezium/pom.xml b/flink-formats-kafka/flink-json-debezium/pom.xml
index 28c7b46..ab697cf 100644
--- a/flink-formats-kafka/flink-json-debezium/pom.xml
+++ b/flink-formats-kafka/flink-json-debezium/pom.xml
@@ -111,6 +111,12 @@
 		<!-- test utils dependency -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
new file mode 100644
index 0000000..fd28712
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** DeserializationSchema that deserializes a JSON String. */
+@PublicEvolving
+public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Class<T> clazz;
+    private final SerializableSupplier<ObjectMapper> mapperFactory;
+    protected transient ObjectMapper mapper;
+
+    public JsonDeserializationSchema(Class<T> clazz) {
+        this(clazz, () -> new ObjectMapper());
+    }
+
+    public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
+        this(typeInformation, () -> new ObjectMapper());
+    }
+
+    public JsonDeserializationSchema(
+            Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory) {
+        super(clazz);
+        this.clazz = clazz;
+        this.mapperFactory = mapperFactory;
+    }
+
+    public JsonDeserializationSchema(
+            TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory) {
+        super(typeInformation);
+        this.clazz = typeInformation.getTypeClass();
+        this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        mapper = mapperFactory.get();
+    }
+
+    @Override
+    public T deserialize(byte[] message) throws IOException {
+        return mapper.readValue(message, clazz);
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
index 55c61e1..36aa484 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
@@ -17,28 +17,19 @@
 
 package org.apache.flink.formats.json;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
-import java.io.IOException;
-
 /**
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
  *
  * <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
  */
 @PublicEvolving
-public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
+public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> {
 
-    private static final long serialVersionUID = -1699854177598621044L;
+    private static final long serialVersionUID = 2L;
 
-    private final ObjectMapper mapper = new ObjectMapper();
-
-    @Override
-    public ObjectNode deserialize(byte[] message) throws IOException {
-        return mapper.readValue(message, ObjectNode.class);
+    public JsonNodeDeserializationSchema() {
+        super(ObjectNode.class);
     }
 }
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
new file mode 100644
index 0000000..c029fa1
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** SerializationSchema that serializes an object to a JSON String. */
+@PublicEvolving
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SerializableSupplier<ObjectMapper> mapperFactory;
+
+    protected transient ObjectMapper mapper;
+
+    public JsonSerializationSchema() {
+        this(() -> new ObjectMapper());
+    }
+
+    public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory) {
+        this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        mapper = mapperFactory.get();
+    }
+
+    @Override
+    public byte[] serialize(T element) {
+        try {
+            return mapper.writeValueAsBytes(element);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(
+                    String.format("Could not serialize value '%s'.", element), e);
+        }
+    }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 741b492..2bd8dc5 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.formats.json;
 
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -37,6 +39,7 @@
         byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
 
         JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
+        schema.open(new DummyInitializationContext());
         ObjectNode deserializedValue = schema.deserialize(serializedValue);
 
         assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
new file mode 100644
index 0000000..5ed992c
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonSerDeSchemaTest {
+    private static final JsonSerializationSchema<Event> SERIALIZATION_SCHEMA;
+    private static final JsonDeserializationSchema<Event> DESERIALIZATION_SCHEMA;
+    private static final String JSON = "{\"x\":34,\"y\":\"hello\"}";
+
+    static {
+        SERIALIZATION_SCHEMA = new JsonSerializationSchema<>();
+        SERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+        DESERIALIZATION_SCHEMA = new JsonDeserializationSchema<>(Event.class);
+        DESERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+    }
+
+    @Test
+    void testSrialization() throws IOException {
+        final byte[] serialized = SERIALIZATION_SCHEMA.serialize(new Event(34, "hello"));
+        assertThat(serialized).isEqualTo(JSON.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testDeserialization() throws IOException {
+        final Event deserialized =
+                DESERIALIZATION_SCHEMA.deserialize(JSON.getBytes(StandardCharsets.UTF_8));
+        assertThat(deserialized).isEqualTo(new Event(34, "hello"));
+    }
+
+    @Test
+    void testRoundTrip() throws IOException {
+        final Event original = new Event(34, "hello");
+
+        final byte[] serialized = SERIALIZATION_SCHEMA.serialize(original);
+
+        final Event deserialized = DESERIALIZATION_SCHEMA.deserialize(serialized);
+
+        assertThat(deserialized).isEqualTo(original);
+    }
+
+    private static class Event {
+
+        private int x;
+        private String y = null;
+
+        public Event() {}
+
+        public Event(int x, String y) {
+            this.x = x;
+            this.y = y;
+        }
+
+        public int getX() {
+            return x;
+        }
+
+        public void setX(int x) {
+            this.x = x;
+        }
+
+        public String getY() {
+            return y;
+        }
+
+        public void setY(String y) {
+            this.y = y;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Event event = (Event) o;
+            return x == event.x && Objects.equals(y, event.y);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(x, y);
+        }
+    }
+}