[FLINK-17461][formats][json] Support JSON serialization and deseriazation schema for RowData type

This closes #11944
diff --git a/flink-formats-kafka/flink-json-debezium/pom.xml b/flink-formats-kafka/flink-json-debezium/pom.xml
index 3bc69aa..bff7fc1 100644
--- a/flink-formats-kafka/flink-json-debezium/pom.xml
+++ b/flink-formats-kafka/flink-json-debezium/pom.xml
@@ -69,15 +69,15 @@
 			<type>test-jar</type>
 		</dependency>
 
-		<!-- TODO This could be dropped if we change JsonRowFormatFactoryTest -->
+		<!-- JSON RowData schema test dependency -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
-		<!-- TODO This could be dropped if we change JsonRowFormatFactoryTest -->
+		<!-- JSON RowData schema test dependency -->
 		<dependency>
 			<groupId>org.scala-lang</groupId>
 			<artifactId>scala-compiler</artifactId>
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
new file mode 100644
index 0000000..974b8dd
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from JSON to Flink Table/SQL internal data structure {@link RowData}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ */
+@Internal
+public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> {
+	private static final long serialVersionUID = 1L;
+
+	/** Flag indicating whether to fail if a field is missing. */
+	private final boolean failOnMissingField;
+
+	/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+	private final boolean ignoreParseErrors;
+
+	/** TypeInformation of the produced {@link RowData}. **/
+	private final TypeInformation<RowData> resultTypeInfo;
+
+	/**
+	 * Runtime converter that converts {@link JsonNode}s into
+	 * objects of Flink SQL internal data structures. **/
+	private final DeserializationRuntimeConverter runtimeConverter;
+
+	/** Object mapper for parsing the JSON. */
+	private final ObjectMapper objectMapper = new ObjectMapper();
+
+	public JsonRowDataDeserializationSchema(
+			RowType rowType,
+			TypeInformation<RowData> resultTypeInfo,
+			boolean failOnMissingField,
+			boolean ignoreParseErrors) {
+		if (ignoreParseErrors && failOnMissingField) {
+			throw new IllegalArgumentException(
+				"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
+		}
+		this.resultTypeInfo = checkNotNull(resultTypeInfo);
+		this.failOnMissingField = failOnMissingField;
+		this.ignoreParseErrors = ignoreParseErrors;
+		this.runtimeConverter = createRowConverter(checkNotNull(rowType));
+	}
+
+	@Override
+	public RowData deserialize(byte[] message) throws IOException {
+		try {
+			final JsonNode root = objectMapper.readTree(message);
+			return (RowData) runtimeConverter.convert(root);
+		} catch (Throwable t) {
+			if (ignoreParseErrors) {
+				return null;
+			}
+			throw new IOException(format("Failed to deserialize JSON '%s'.", new String(message)), t);
+		}
+	}
+
+	@Override
+	public boolean isEndOfStream(RowData nextElement) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<RowData> getProducedType() {
+		return resultTypeInfo;
+	}
+
+	// -------------------------------------------------------------------------------------
+	// Runtime Converters
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL
+	 * internal data structures.
+	 */
+	@FunctionalInterface
+	private interface DeserializationRuntimeConverter extends Serializable {
+		Object convert(JsonNode jsonNode);
+	}
+
+	/**
+	 * Creates a runtime converter which is null safe.
+	 */
+	private DeserializationRuntimeConverter createConverter(LogicalType type) {
+		return wrapIntoNullableConverter(createNotNullConverter(type));
+	}
+
+	/**
+	 * Creates a runtime converter which assuming input object is not null.
+	 */
+	private DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
+		switch (type.getTypeRoot()) {
+			case NULL:
+				return jsonNode -> null;
+			case BOOLEAN:
+				return this::convertToBoolean;
+			case TINYINT:
+				return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+			case SMALLINT:
+				return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+			case INTEGER:
+			case INTERVAL_YEAR_MONTH:
+				return this::convertToInt;
+			case BIGINT:
+			case INTERVAL_DAY_TIME:
+				return this::convertToLong;
+			case DATE:
+				return this::convertToDate;
+			case TIME_WITHOUT_TIME_ZONE:
+				return this::convertToTime;
+			case TIMESTAMP_WITH_TIME_ZONE:
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				return this::convertToTimestamp;
+			case FLOAT:
+				return this::convertToFloat;
+			case DOUBLE:
+				return this::convertToDouble;
+			case CHAR:
+			case VARCHAR:
+				return this::convertToString;
+			case BINARY:
+			case VARBINARY:
+				return this::convertToBytes;
+			case DECIMAL:
+				return createDecimalConverter((DecimalType) type);
+			case ARRAY:
+				return createArrayConverter((ArrayType) type);
+			case MAP:
+			case MULTISET:
+				return createMapConverter((MapType) type);
+			case ROW:
+				return createRowConverter((RowType) type);
+			case RAW:
+			default:
+				throw new UnsupportedOperationException("Unsupported type: " + type);
+		}
+	}
+
+	private boolean convertToBoolean(JsonNode jsonNode) {
+		if (jsonNode.isBoolean()) {
+			// avoid redundant toString and parseBoolean, for better performance
+			return jsonNode.asBoolean();
+		} else {
+			return Boolean.parseBoolean(jsonNode.asText().trim());
+		}
+	}
+
+	private int convertToInt(JsonNode jsonNode) {
+		if (jsonNode.canConvertToInt()) {
+			// avoid redundant toString and parseInt, for better performance
+			return jsonNode.asInt();
+		} else {
+			return Integer.parseInt(jsonNode.asText().trim());
+		}
+	}
+
+	private long convertToLong(JsonNode jsonNode) {
+		if (jsonNode.canConvertToLong()) {
+			// avoid redundant toString and parseLong, for better performance
+			return jsonNode.asLong();
+		} else {
+			return Long.parseLong(jsonNode.asText().trim());
+		}
+	}
+
+	private double convertToDouble(JsonNode jsonNode) {
+		if (jsonNode.isDouble()) {
+			// avoid redundant toString and parseDouble, for better performance
+			return jsonNode.asDouble();
+		} else {
+			return Double.parseDouble(jsonNode.asText().trim());
+		}
+	}
+
+	private float convertToFloat(JsonNode jsonNode) {
+		if (jsonNode.isDouble()) {
+			// avoid redundant toString and parseDouble, for better performance
+			return (float) jsonNode.asDouble();
+		} else {
+			return Float.parseFloat(jsonNode.asText().trim());
+		}
+	}
+
+	private int convertToDate(JsonNode jsonNode) {
+		LocalDate date = ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+		return (int) date.toEpochDay();
+	}
+
+	private int convertToTime(JsonNode jsonNode) {
+		// according to RFC 3339 every full-time must have a timezone;
+		// until we have full timezone support, we only support UTC;
+		// users can parse their time as string as a workaround
+		TemporalAccessor parsedTime = RFC3339_TIME_FORMAT.parse(jsonNode.asText());
+
+		ZoneOffset zoneOffset = parsedTime.query(TemporalQueries.offset());
+		LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
+
+		if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || localTime.getNano() != 0) {
+			throw new JsonParseException(
+				"Invalid time format. Only a time in UTC timezone without milliseconds is supported yet.");
+		}
+
+		// get number of milliseconds of the day
+		return localTime.toSecondOfDay() * 1000;
+	}
+
+	private TimestampData convertToTimestamp(JsonNode jsonNode) {
+		// according to RFC 3339 every date-time must have a timezone;
+		// until we have full timezone support, we only support UTC;
+		// users can parse their time as string as a workaround
+		TemporalAccessor parsedTimestamp = RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+
+		ZoneOffset zoneOffset = parsedTimestamp.query(TemporalQueries.offset());
+
+		if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
+			throw new JsonParseException(
+				"Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " +
+					"Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+		}
+
+		LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+		LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+
+		return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
+	}
+
+	private StringData convertToString(JsonNode jsonNode) {
+		return StringData.fromString(jsonNode.asText());
+	}
+
+	private byte[] convertToBytes(JsonNode jsonNode) {
+		try {
+			return jsonNode.binaryValue();
+		} catch (IOException e) {
+			throw new JsonParseException("Unable to deserialize byte array.", e);
+		}
+	}
+
+	private DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
+		final int precision = decimalType.getPrecision();
+		final int scale = decimalType.getScale();
+		return jsonNode -> {
+			BigDecimal bigDecimal;
+			if (jsonNode.isBigDecimal()) {
+				bigDecimal = jsonNode.decimalValue();
+			} else {
+				bigDecimal = new BigDecimal(jsonNode.asText());
+			}
+			return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+		};
+	}
+
+	private DeserializationRuntimeConverter createArrayConverter(ArrayType arrayType) {
+		DeserializationRuntimeConverter elementConverter = createConverter(arrayType.getElementType());
+		final Class<?> elementClass = LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+		return jsonNode -> {
+			final ArrayNode node = (ArrayNode) jsonNode;
+			final Object[] array = (Object[]) Array.newInstance(elementClass, node.size());
+			for (int i = 0; i < node.size(); i++) {
+				final JsonNode innerNode = node.get(i);
+				array[i] = elementConverter.convert(innerNode);
+			}
+			return new GenericArrayData(array);
+		};
+	}
+
+	private DeserializationRuntimeConverter createMapConverter(MapType mapType) {
+		LogicalType keyType = mapType.getKeyType();
+		if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+			throw new UnsupportedOperationException(
+				"JSON format doesn't support non-string as key type of map. " +
+				"The map type is: " + mapType.asSummaryString());
+		}
+		final DeserializationRuntimeConverter keyConverter = createConverter(keyType);
+		final DeserializationRuntimeConverter valueConverter = createConverter(mapType.getValueType());
+
+		return jsonNode -> {
+			Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
+			Map<Object, Object> result = new HashMap<>();
+			while (fields.hasNext()) {
+				Map.Entry<String, JsonNode> entry = fields.next();
+				Object key = keyConverter.convert(TextNode.valueOf(entry.getKey()));
+				Object value = valueConverter.convert(entry.getValue());
+				result.put(key, value);
+			}
+			return new GenericMapData(result);
+		};
+	}
+
+	private DeserializationRuntimeConverter createRowConverter(RowType rowType) {
+		final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream()
+			.map(RowType.RowField::getType)
+			.map(this::createConverter)
+			.toArray(DeserializationRuntimeConverter[]::new);
+		final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+		return jsonNode -> {
+			ObjectNode node = (ObjectNode) jsonNode;
+			int arity = fieldNames.length;
+			GenericRowData row = new GenericRowData(arity);
+			for (int i = 0; i < arity; i++) {
+				String fieldName = fieldNames[i];
+				JsonNode field = node.get(fieldName);
+				Object convertedField = convertField(fieldConverters[i], fieldName, field);
+				row.setField(i, convertedField);
+			}
+			return row;
+		};
+	}
+
+	private Object convertField(
+			DeserializationRuntimeConverter fieldConverter,
+			String fieldName,
+			JsonNode field) {
+		if (field == null) {
+			if (failOnMissingField) {
+				throw new JsonParseException(
+					"Could not find field with name '" + fieldName + "'.");
+			} else {
+				return null;
+			}
+		} else {
+			return fieldConverter.convert(field);
+		}
+	}
+
+	private DeserializationRuntimeConverter wrapIntoNullableConverter(
+			DeserializationRuntimeConverter converter) {
+		return jsonNode -> {
+			if (jsonNode == null || jsonNode.isNull()) {
+				return null;
+			}
+			try {
+				return converter.convert(jsonNode);
+			} catch (Throwable t) {
+				if (!ignoreParseErrors) {
+					throw t;
+				}
+				return null;
+			}
+		};
+	}
+
+	/**
+	 * Exception which refers to parse errors in converters.
+	 * */
+	private static final class JsonParseException extends RuntimeException {
+		private static final long serialVersionUID = 1L;
+
+		public JsonParseException(String message) {
+			super(message);
+		}
+
+		public JsonParseException(String message, Throwable cause) {
+			super(message, cause);
+		}
+	}
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
new file mode 100644
index 0000000..7c17738
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data structure into a JSON bytes.
+ *
+ * <p>Serializes the input Flink object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link JsonRowDataDeserializationSchema}.
+ */
+@Internal
+public class JsonRowDataSerializationSchema implements SerializationSchema<RowData> {
+	private static final long serialVersionUID = 1L;
+
+	/** The converter that converts internal data formats to JsonNode. */
+	private final SerializationRuntimeConverter runtimeConverter;
+
+	/** Object mapper that is used to create output JSON objects. */
+	private final ObjectMapper mapper = new ObjectMapper();
+
+	/** Reusable object node. */
+	private transient ObjectNode node;
+
+	public JsonRowDataSerializationSchema(RowType rowType) {
+		this.runtimeConverter = createConverter(rowType);
+	}
+
+	@Override
+	public byte[] serialize(RowData row) {
+		if (node == null) {
+			node = mapper.createObjectNode();
+		}
+
+		try {
+			runtimeConverter.convert(mapper, node, row);
+			return mapper.writeValueAsBytes(node);
+		} catch (Throwable t) {
+			throw new RuntimeException("Could not serialize row '" + row + "'. " +
+				"Make sure that the schema matches the input.", t);
+		}
+	}
+
+	// --------------------------------------------------------------------------------
+	// Runtime Converters
+	// --------------------------------------------------------------------------------
+
+	/**
+	 * Runtime converter that converts objects of Flink Table & SQL internal data structures
+	 * to corresponding {@link JsonNode}s.
+	 */
+	private interface SerializationRuntimeConverter extends Serializable {
+		JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value);
+	}
+
+	/**
+	 * Creates a runtime converter which is null safe.
+	 */
+	private SerializationRuntimeConverter createConverter(LogicalType type) {
+		return wrapIntoNullableConverter(createNotNullConverter(type));
+	}
+
+	/**
+	 * Creates a runtime converter which assuming input object is not null.
+	 */
+	private SerializationRuntimeConverter createNotNullConverter(LogicalType type) {
+		switch (type.getTypeRoot()) {
+			case NULL:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().nullNode();
+			case BOOLEAN:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().booleanNode((boolean) value);
+			case TINYINT:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((byte) value);
+			case SMALLINT:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((short) value);
+			case INTEGER:
+			case INTERVAL_YEAR_MONTH:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((int) value);
+			case BIGINT:
+			case INTERVAL_DAY_TIME:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((long) value);
+			case FLOAT:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((float) value);
+			case DOUBLE:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((double) value);
+			case CHAR:
+			case VARCHAR:
+				// value is BinaryString
+				return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(value.toString());
+			case BINARY:
+			case VARBINARY:
+				return (mapper, reuse, value) -> mapper.getNodeFactory().binaryNode((byte[]) value);
+			case DATE:
+				return createDateConverter();
+			case TIME_WITHOUT_TIME_ZONE:
+				return createTimeConverter();
+			case TIMESTAMP_WITH_TIME_ZONE:
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				return createTimestampConverter();
+			case DECIMAL:
+				return createDecimalConverter();
+			case ARRAY:
+				return createArrayConverter((ArrayType) type);
+			case MAP:
+			case MULTISET:
+				return createMapConverter((MapType) type);
+			case ROW:
+				return createRowConverter((RowType) type);
+			case RAW:
+			default:
+				throw new UnsupportedOperationException("Not support to parse type: " + type);
+		}
+	}
+
+	private SerializationRuntimeConverter createDecimalConverter() {
+		return (mapper, reuse, value) -> {
+			BigDecimal bd = ((DecimalData) value).toBigDecimal();
+			return mapper.getNodeFactory().numberNode(bd);
+		};
+	}
+
+	private SerializationRuntimeConverter createDateConverter() {
+		return (mapper, reuse, value) -> {
+			int days = (int) value;
+			LocalDate date = LocalDate.ofEpochDay(days);
+			return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format(date));
+		};
+	}
+
+	private SerializationRuntimeConverter createTimeConverter() {
+		return (mapper, reuse, value) -> {
+			int millisecond = (int) value;
+			LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L);
+			return mapper.getNodeFactory().textNode(RFC3339_TIME_FORMAT.format(time));
+		};
+	}
+
+	private SerializationRuntimeConverter createTimestampConverter() {
+		return (mapper, reuse, value) -> {
+			TimestampData timestamp = (TimestampData) value;
+			return mapper.getNodeFactory()
+				.textNode(RFC3339_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+		};
+	}
+
+	private SerializationRuntimeConverter createArrayConverter(ArrayType type) {
+		final LogicalType elementType = type.getElementType();
+		final SerializationRuntimeConverter elementConverter = createConverter(elementType);
+		return (mapper, reuse, value) -> {
+			ArrayNode node;
+
+			// reuse could be a NullNode if last record is null.
+			if (reuse == null || reuse.isNull()) {
+				node = mapper.createArrayNode();
+			} else {
+				node = (ArrayNode) reuse;
+				node.removeAll();
+			}
+
+			ArrayData array = (ArrayData) value;
+			int numElements = array.size();
+			for (int i = 0; i < numElements; i++) {
+				Object element = ArrayData.get(array, i, elementType);
+				node.add(elementConverter.convert(mapper, null, element));
+			}
+
+			return node;
+		};
+	}
+
+	private SerializationRuntimeConverter createMapConverter(MapType type) {
+		LogicalType keyType = type.getKeyType();
+		if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+			throw new UnsupportedOperationException(
+				"JSON format doesn't support non-string as key type of map. " +
+					"The map type is: " + type.asSummaryString());
+		}
+		final LogicalType valueType = type.getValueType();
+		final SerializationRuntimeConverter valueConverter = createConverter(valueType);
+		return (mapper, reuse, object) -> {
+			ObjectNode node;
+			// reuse could be a NullNode if last record is null.
+			if (reuse == null || reuse.isNull()) {
+				node = mapper.createObjectNode();
+			} else {
+				node = (ObjectNode) reuse;
+			}
+
+			MapData map = (MapData) object;
+			ArrayData keyArray = map.keyArray();
+			ArrayData valueArray = map.valueArray();
+			int numElements = map.size();
+			for (int i = 0; i < numElements; i++) {
+				String fieldName = keyArray.getString(i).toString(); // key must be string
+				Object value = ArrayData.get(valueArray, i, valueType);
+				node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), value));
+			}
+
+			return node;
+		};
+	}
+
+	private SerializationRuntimeConverter createRowConverter(RowType type) {
+		final String[] fieldNames = type.getFieldNames().toArray(new String[0]);
+		final LogicalType[] fieldTypes = type.getFields().stream()
+			.map(RowType.RowField::getType)
+			.toArray(LogicalType[]::new);
+		final SerializationRuntimeConverter[] fieldConverters = Arrays.stream(fieldTypes)
+			.map(this::createConverter)
+			.toArray(SerializationRuntimeConverter[]::new);
+		final int fieldCount = type.getFieldCount();
+
+		return (mapper, reuse, value) -> {
+			ObjectNode node;
+			// reuse could be a NullNode if last record is null.
+			if (reuse == null || reuse.isNull()) {
+				node = mapper.createObjectNode();
+			} else {
+				node = (ObjectNode) reuse;
+			}
+			RowData row = (RowData) value;
+			for (int i = 0; i < fieldCount; i++) {
+				String fieldName = fieldNames[i];
+				Object field = RowData.get(row, i, fieldTypes[i]);
+				node.set(fieldName, fieldConverters[i].convert(mapper, node.get(fieldName), field));
+			}
+			return node;
+		};
+	}
+
+	private SerializationRuntimeConverter wrapIntoNullableConverter(
+			SerializationRuntimeConverter converter) {
+		return (mapper, reuse, object) -> {
+			if (object == null) {
+				return mapper.getNodeFactory().nullNode();
+			}
+
+			return converter.convert(mapper, reuse, object);
+		};
+	}
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
new file mode 100644
index 0000000..dcfae9c
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JsonRowDataDeserializationSchema} and {@link JsonRowDataSerializationSchema}.
+ */
+public class JsonRowDataSerDeSchemaTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testSerDe() throws Exception {
+		byte tinyint = 'c';
+		short smallint = 128;
+		int intValue = 45536;
+		float floatValue = 33.333F;
+		long bigint = 1238123899121L;
+		String name = "asdlkjasjkdla998y1122";
+		byte[] bytes = new byte[1024];
+		ThreadLocalRandom.current().nextBytes(bytes);
+		BigDecimal decimal = new BigDecimal("123.456789");
+		Double[] doubles = new Double[]{1.1, 2.2, 3.3};
+		LocalDate date = LocalDate.parse("1990-10-14");
+		LocalTime time = LocalTime.parse("12:12:43");
+		Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
+		Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789");
+
+		Map<String, Long> map = new HashMap<>();
+		map.put("flink", 123L);
+
+		Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+		Map<String, Integer> innerMap = new HashMap<>();
+		innerMap.put("key", 234);
+		nestedMap.put("inner_map", innerMap);
+
+		ObjectMapper objectMapper = new ObjectMapper();
+		ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+
+		// Root
+		ObjectNode root = objectMapper.createObjectNode();
+		root.put("bool", true);
+		root.put("tinyint", tinyint);
+		root.put("smallint", smallint);
+		root.put("int", intValue);
+		root.put("bigint", bigint);
+		root.put("float", floatValue);
+		root.put("name", name);
+		root.put("bytes", bytes);
+		root.put("decimal", decimal);
+		root.set("doubles", doubleNode);
+		root.put("date", "1990-10-14");
+		root.put("time", "12:12:43Z");
+		root.put("timestamp3", "1990-10-14T12:12:43.123Z");
+		root.put("timestamp9", "1990-10-14T12:12:43.123456789Z");
+		root.putObject("map").put("flink", 123);
+		root.putObject("map2map").putObject("inner_map").put("key", 234);
+
+		byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+		DataType dataType = ROW(
+			FIELD("bool", BOOLEAN()),
+			FIELD("tinyint", TINYINT()),
+			FIELD("smallint", SMALLINT()),
+			FIELD("int", INT()),
+			FIELD("bigint", BIGINT()),
+			FIELD("float", FLOAT()),
+			FIELD("name", STRING()),
+			FIELD("bytes", BYTES()),
+			FIELD("decimal", DECIMAL(9, 6)),
+			FIELD("doubles", ARRAY(DOUBLE())),
+			FIELD("date", DATE()),
+			FIELD("time", TIME(0)),
+			FIELD("timestamp3", TIMESTAMP(3)),
+			FIELD("timestamp9", TIMESTAMP(9)),
+			FIELD("map", MAP(STRING(), BIGINT())),
+			FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
+		RowType schema = (RowType) dataType.getLogicalType();
+		RowDataTypeInfo resultTypeInfo = new RowDataTypeInfo(schema);
+
+		JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+			schema, resultTypeInfo, false, false);
+
+		Row expected = new Row(16);
+		expected.setField(0, true);
+		expected.setField(1, tinyint);
+		expected.setField(2, smallint);
+		expected.setField(3, intValue);
+		expected.setField(4, bigint);
+		expected.setField(5, floatValue);
+		expected.setField(6, name);
+		expected.setField(7, bytes);
+		expected.setField(8, decimal);
+		expected.setField(9, doubles);
+		expected.setField(10, date);
+		expected.setField(11, time);
+		expected.setField(12, timestamp3.toLocalDateTime());
+		expected.setField(13, timestamp9.toLocalDateTime());
+		expected.setField(14, map);
+		expected.setField(15, nestedMap);
+
+		RowData rowData = deserializationSchema.deserialize(serializedJson);
+		Row actual = convertToExternal(rowData, dataType);
+		assertEquals(expected, actual);
+
+		// test serialization
+		JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(schema);
+
+		byte[] actualBytes = serializationSchema.serialize(rowData);
+		assertEquals(new String(serializedJson), new String(actualBytes));
+	}
+
+	/**
+	 * Tests the deserialization slow path,
+	 * e.g. convert into string and use {@link Double#parseDouble(String)}.
+	 */
+	@Test
+	public void testSlowDeserialization() throws Exception {
+		Random random = new Random();
+		boolean bool = random.nextBoolean();
+		int integer = random.nextInt();
+		long bigint = random.nextLong();
+		double doubleValue = random.nextDouble();
+		float floatValue = random.nextFloat();
+
+		ObjectMapper objectMapper = new ObjectMapper();
+		ObjectNode root = objectMapper.createObjectNode();
+		root.put("bool", String.valueOf(bool));
+		root.put("int", String.valueOf(integer));
+		root.put("bigint", String.valueOf(bigint));
+		root.put("double1", String.valueOf(doubleValue));
+		root.put("double2", new BigDecimal(doubleValue));
+		root.put("float1", String.valueOf(floatValue));
+		root.put("float2", new BigDecimal(floatValue));
+
+		byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+		DataType dataType = ROW(
+			FIELD("bool", BOOLEAN()),
+			FIELD("int", INT()),
+			FIELD("bigint", BIGINT()),
+			FIELD("double1", DOUBLE()),
+			FIELD("double2", DOUBLE()),
+			FIELD("float1", FLOAT()),
+			FIELD("float2", FLOAT())
+		);
+		RowType rowType = (RowType) dataType.getLogicalType();
+
+		JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+			rowType, new RowDataTypeInfo(rowType), false, false);
+
+		Row expected = new Row(7);
+		expected.setField(0, bool);
+		expected.setField(1, integer);
+		expected.setField(2, bigint);
+		expected.setField(3, doubleValue);
+		expected.setField(4, doubleValue);
+		expected.setField(5, floatValue);
+		expected.setField(6, floatValue);
+
+		RowData rowData = deserializationSchema.deserialize(serializedJson);
+		Row actual = convertToExternal(rowData, dataType);
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testSerDeMultiRows() throws Exception {
+		RowType rowType = (RowType) ROW(
+			FIELD("f1", INT()),
+			FIELD("f2", BOOLEAN()),
+			FIELD("f3", STRING())
+		).getLogicalType();
+
+		JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+			rowType, new RowDataTypeInfo(rowType), false, false);
+		JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType);
+
+		ObjectMapper objectMapper = new ObjectMapper();
+
+		// the first row
+		{
+			ObjectNode root = objectMapper.createObjectNode();
+			root.put("f1", 1);
+			root.put("f2", true);
+			root.put("f3", "str");
+			byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+			RowData rowData = deserializationSchema.deserialize(serializedJson);
+			byte[] actual = serializationSchema.serialize(rowData);
+			assertEquals(new String(serializedJson), new String(actual));
+		}
+
+		// the second row
+		{
+			ObjectNode root = objectMapper.createObjectNode();
+			root.put("f1", 10);
+			root.put("f2", false);
+			root.put("f3", "newStr");
+			byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+			RowData rowData = deserializationSchema.deserialize(serializedJson);
+			byte[] actual = serializationSchema.serialize(rowData);
+			assertEquals(new String(serializedJson), new String(actual));
+		}
+	}
+
+	@Test
+	public void testSerDeMultiRowsWithNullValues() throws Exception {
+		String[] jsons = new String[] {
+			"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"metrics\":{\"k1\":10.01,\"k2\":\"invalid\"}}",
+			"{\"svt\":\"2020-02-24T12:58:09.209+0800\", \"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"}, " +
+				"\"ids\":[1, 2, 3]}",
+			"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"metrics\":{}}",
+		};
+
+		String[] expected = new String[] {
+			"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+			"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"}," +
+				"\"ids\":[1,2,3],\"metrics\":null}",
+			"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null,\"metrics\":{}}",
+		};
+
+		RowType rowType = (RowType) ROW(
+			FIELD("svt", STRING()),
+			FIELD("ops", ROW(FIELD("id", STRING()))),
+			FIELD("ids", ARRAY(INT())),
+			FIELD("metrics", MAP(STRING(), DOUBLE()))
+		).getLogicalType();
+
+		JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+			rowType, new RowDataTypeInfo(rowType), false, true);
+		JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType);
+
+		for (int i = 0; i < jsons.length; i++) {
+			String json = jsons[i];
+			RowData row = deserializationSchema.deserialize(json.getBytes());
+			String result = new String(serializationSchema.serialize(row));
+			assertEquals(expected[i], result);
+		}
+	}
+
+	@Test
+	public void testDeserializationMissingNode() throws Exception {
+		ObjectMapper objectMapper = new ObjectMapper();
+
+		// Root
+		ObjectNode root = objectMapper.createObjectNode();
+		root.put("id", 123123123);
+		byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+		DataType dataType = ROW(FIELD("name", STRING()));
+		RowType schema = (RowType) dataType.getLogicalType();
+
+		// pass on missing field
+		JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+			schema, new RowDataTypeInfo(schema), false, false);
+
+		Row expected = new Row(1);
+		Row actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
+		assertEquals(expected, actual);
+
+		// fail on missing field
+		deserializationSchema = deserializationSchema = new JsonRowDataDeserializationSchema(
+			schema, new RowDataTypeInfo(schema), true, false);
+
+		thrown.expect(IOException.class);
+		thrown.expectMessage("Failed to deserialize JSON '{\"id\":123123123}'");
+		deserializationSchema.deserialize(serializedJson);
+
+		// ignore on parse error
+		deserializationSchema = new JsonRowDataDeserializationSchema(
+			schema, new RowDataTypeInfo(schema), false, true);
+		actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
+		assertEquals(expected, actual);
+
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled");
+		// failOnMissingField and ignoreParseErrors both enabled
+		//noinspection ConstantConditions
+		new JsonRowDataDeserializationSchema(
+			schema, new RowDataTypeInfo(schema), true, true);
+	}
+
+	@Test
+	public void testJsonParse() throws Exception {
+		for (TestSpec spec : testData) {
+			testIgnoreParseErrors(spec);
+			if (spec.errorMessage != null) {
+				testParseErrors(spec);
+			}
+		}
+	}
+
+	private void testIgnoreParseErrors(TestSpec spec) throws Exception {
+		// the parsing field should be null and no exception is thrown
+		JsonRowDataDeserializationSchema ignoreErrorsSchema = new JsonRowDataDeserializationSchema(
+			spec.rowType,  new RowDataTypeInfo(spec.rowType), false, true);
+		Row expected;
+		if (spec.expected != null) {
+			expected = spec.expected;
+		} else {
+			expected = new Row(1);
+		}
+		RowData rowData = ignoreErrorsSchema.deserialize(spec.json.getBytes());
+		Row actual = convertToExternal(rowData, fromLogicalToDataType(spec.rowType));
+		assertEquals("Test Ignore Parse Error: " + spec.json,
+			expected,
+			actual);
+	}
+
+	private void testParseErrors(TestSpec spec) throws Exception {
+		// expect exception if parse error is not ignored
+		JsonRowDataDeserializationSchema failingSchema = new JsonRowDataDeserializationSchema(
+			spec.rowType,  new RowDataTypeInfo(spec.rowType), false, false);
+
+		thrown.expectMessage(spec.errorMessage);
+		failingSchema.deserialize(spec.json.getBytes());
+	}
+
+	private static List<TestSpec> testData = Arrays.asList(
+		TestSpec
+			.json("{\"id\": \"trueA\"}")
+			.rowType(ROW(FIELD("id", BOOLEAN())))
+			.expect(Row.of(false)),
+
+		TestSpec
+			.json("{\"id\": true}")
+			.rowType(ROW(FIELD("id", BOOLEAN())))
+			.expect(Row.of(true)),
+
+		TestSpec
+			.json("{\"id\":\"abc\"}")
+			.rowType(ROW(FIELD("id", INT())))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
+
+		TestSpec
+			.json("{\"id\":112.013}")
+			.rowType(ROW(FIELD("id", BIGINT())))
+			.expect(Row.of(112L)),
+
+		TestSpec
+			.json("{\"id\":\"long\"}")
+			.rowType(ROW(FIELD("id", BIGINT())))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'"),
+
+		TestSpec
+			.json("{\"id\":\"112.013.123\"}")
+			.rowType(ROW(FIELD("id", FLOAT())))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
+
+		TestSpec
+			.json("{\"id\":\"112.013.123\"}")
+			.rowType(ROW(FIELD("id", DOUBLE())))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
+
+		TestSpec
+			.json("{\"id\":\"18:00:243\"}")
+			.rowType(ROW(FIELD("id", TIME())))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"),
+
+		TestSpec
+			.json("{\"id\":\"20191112\"}")
+			.rowType(ROW(FIELD("id", DATE())))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"),
+
+		TestSpec
+			.json("{\"id\":\"2019-11-12 18:00:12\"}")
+			.rowType(ROW(FIELD("id", TIMESTAMP(0))))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'"),
+
+		TestSpec
+			.json("{\"id\":\"abc\"}")
+			.rowType(ROW(FIELD("id", DECIMAL(10, 3))))
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
+
+		TestSpec
+			.json("{\"row\":{\"id\":\"abc\"}}")
+			.rowType(ROW(FIELD("row", ROW(FIELD("id", BOOLEAN())))))
+			.expect(Row.of(new Row(1)))
+			.expectErrorMessage("Failed to deserialize JSON '{\"row\":{\"id\":\"abc\"}}'"),
+
+		TestSpec
+			.json("{\"array\":[123, \"abc\"]}")
+			.rowType(ROW(FIELD("array", ARRAY(INT()))))
+			.expect(Row.of((Object) new Integer[]{123, null}))
+			.expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'"),
+
+		TestSpec
+			.json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
+			.rowType(ROW(FIELD("map", MAP(STRING(), INT()))))
+			.expect(Row.of(createHashMap("key1", 123, "key2", null)))
+			.expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'")
+
+
+	);
+
+	private static Map<String, Integer> createHashMap(String k1, Integer v1, String k2, Integer v2) {
+		Map<String, Integer> map = new HashMap<>();
+		map.put(k1, v1);
+		map.put(k2, v2);
+		return map;
+	}
+
+	@SuppressWarnings("unchecked")
+	private static Row convertToExternal(RowData rowData, DataType dataType) {
+		return (Row) DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData);
+	}
+
+	private static class TestSpec {
+		private final String json;
+		private RowType rowType;
+		private Row expected;
+		private String errorMessage;
+
+		private TestSpec(String json) {
+			this.json = json;
+		}
+
+		public static TestSpec json(String json) {
+			return new TestSpec(json);
+		}
+
+		TestSpec expect(Row row) {
+			this.expected = row;
+			return this;
+		}
+
+		TestSpec rowType(DataType rowType) {
+			this.rowType = (RowType) rowType.getLogicalType();
+			return this;
+		}
+
+		TestSpec expectErrorMessage(String errorMessage) {
+			this.errorMessage = errorMessage;
+			return this;
+		}
+	}
+}