[FLINK-17461][formats][json] Support JSON serialization and deseriazation schema for RowData type
This closes #11944
diff --git a/flink-formats-kafka/flink-json-debezium/pom.xml b/flink-formats-kafka/flink-json-debezium/pom.xml
index 3bc69aa..bff7fc1 100644
--- a/flink-formats-kafka/flink-json-debezium/pom.xml
+++ b/flink-formats-kafka/flink-json-debezium/pom.xml
@@ -69,15 +69,15 @@
<type>test-jar</type>
</dependency>
- <!-- TODO This could be dropped if we change JsonRowFormatFactoryTest -->
+ <!-- JSON RowData schema test dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
- <!-- TODO This could be dropped if we change JsonRowFormatFactoryTest -->
+ <!-- JSON RowData schema test dependency -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
new file mode 100644
index 0000000..974b8dd
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from JSON to Flink Table/SQL internal data structure {@link RowData}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ */
+@Internal
+public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ /** Flag indicating whether to fail if a field is missing. */
+ private final boolean failOnMissingField;
+
+ /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+ private final boolean ignoreParseErrors;
+
+ /** TypeInformation of the produced {@link RowData}. **/
+ private final TypeInformation<RowData> resultTypeInfo;
+
+ /**
+ * Runtime converter that converts {@link JsonNode}s into
+ * objects of Flink SQL internal data structures. **/
+ private final DeserializationRuntimeConverter runtimeConverter;
+
+ /** Object mapper for parsing the JSON. */
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public JsonRowDataDeserializationSchema(
+ RowType rowType,
+ TypeInformation<RowData> resultTypeInfo,
+ boolean failOnMissingField,
+ boolean ignoreParseErrors) {
+ if (ignoreParseErrors && failOnMissingField) {
+ throw new IllegalArgumentException(
+ "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
+ }
+ this.resultTypeInfo = checkNotNull(resultTypeInfo);
+ this.failOnMissingField = failOnMissingField;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.runtimeConverter = createRowConverter(checkNotNull(rowType));
+ }
+
+ @Override
+ public RowData deserialize(byte[] message) throws IOException {
+ try {
+ final JsonNode root = objectMapper.readTree(message);
+ return (RowData) runtimeConverter.convert(root);
+ } catch (Throwable t) {
+ if (ignoreParseErrors) {
+ return null;
+ }
+ throw new IOException(format("Failed to deserialize JSON '%s'.", new String(message)), t);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return resultTypeInfo;
+ }
+
+ // -------------------------------------------------------------------------------------
+ // Runtime Converters
+ // -------------------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL
+ * internal data structures.
+ */
+ @FunctionalInterface
+ private interface DeserializationRuntimeConverter extends Serializable {
+ Object convert(JsonNode jsonNode);
+ }
+
+ /**
+ * Creates a runtime converter which is null safe.
+ */
+ private DeserializationRuntimeConverter createConverter(LogicalType type) {
+ return wrapIntoNullableConverter(createNotNullConverter(type));
+ }
+
+ /**
+ * Creates a runtime converter which assuming input object is not null.
+ */
+ private DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return jsonNode -> null;
+ case BOOLEAN:
+ return this::convertToBoolean;
+ case TINYINT:
+ return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+ case SMALLINT:
+ return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return this::convertToInt;
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return this::convertToLong;
+ case DATE:
+ return this::convertToDate;
+ case TIME_WITHOUT_TIME_ZONE:
+ return this::convertToTime;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return this::convertToTimestamp;
+ case FLOAT:
+ return this::convertToFloat;
+ case DOUBLE:
+ return this::convertToDouble;
+ case CHAR:
+ case VARCHAR:
+ return this::convertToString;
+ case BINARY:
+ case VARBINARY:
+ return this::convertToBytes;
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ARRAY:
+ return createArrayConverter((ArrayType) type);
+ case MAP:
+ case MULTISET:
+ return createMapConverter((MapType) type);
+ case ROW:
+ return createRowConverter((RowType) type);
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private boolean convertToBoolean(JsonNode jsonNode) {
+ if (jsonNode.isBoolean()) {
+ // avoid redundant toString and parseBoolean, for better performance
+ return jsonNode.asBoolean();
+ } else {
+ return Boolean.parseBoolean(jsonNode.asText().trim());
+ }
+ }
+
+ private int convertToInt(JsonNode jsonNode) {
+ if (jsonNode.canConvertToInt()) {
+ // avoid redundant toString and parseInt, for better performance
+ return jsonNode.asInt();
+ } else {
+ return Integer.parseInt(jsonNode.asText().trim());
+ }
+ }
+
+ private long convertToLong(JsonNode jsonNode) {
+ if (jsonNode.canConvertToLong()) {
+ // avoid redundant toString and parseLong, for better performance
+ return jsonNode.asLong();
+ } else {
+ return Long.parseLong(jsonNode.asText().trim());
+ }
+ }
+
+ private double convertToDouble(JsonNode jsonNode) {
+ if (jsonNode.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return jsonNode.asDouble();
+ } else {
+ return Double.parseDouble(jsonNode.asText().trim());
+ }
+ }
+
+ private float convertToFloat(JsonNode jsonNode) {
+ if (jsonNode.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return (float) jsonNode.asDouble();
+ } else {
+ return Float.parseFloat(jsonNode.asText().trim());
+ }
+ }
+
+ private int convertToDate(JsonNode jsonNode) {
+ LocalDate date = ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+ return (int) date.toEpochDay();
+ }
+
+ private int convertToTime(JsonNode jsonNode) {
+ // according to RFC 3339 every full-time must have a timezone;
+ // until we have full timezone support, we only support UTC;
+ // users can parse their time as string as a workaround
+ TemporalAccessor parsedTime = RFC3339_TIME_FORMAT.parse(jsonNode.asText());
+
+ ZoneOffset zoneOffset = parsedTime.query(TemporalQueries.offset());
+ LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
+
+ if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || localTime.getNano() != 0) {
+ throw new JsonParseException(
+ "Invalid time format. Only a time in UTC timezone without milliseconds is supported yet.");
+ }
+
+ // get number of milliseconds of the day
+ return localTime.toSecondOfDay() * 1000;
+ }
+
+ private TimestampData convertToTimestamp(JsonNode jsonNode) {
+ // according to RFC 3339 every date-time must have a timezone;
+ // until we have full timezone support, we only support UTC;
+ // users can parse their time as string as a workaround
+ TemporalAccessor parsedTimestamp = RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+
+ ZoneOffset zoneOffset = parsedTimestamp.query(TemporalQueries.offset());
+
+ if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
+ throw new JsonParseException(
+ "Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " +
+ "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ }
+
+ LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+ LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+
+ return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
+ }
+
+ private StringData convertToString(JsonNode jsonNode) {
+ return StringData.fromString(jsonNode.asText());
+ }
+
+ private byte[] convertToBytes(JsonNode jsonNode) {
+ try {
+ return jsonNode.binaryValue();
+ } catch (IOException e) {
+ throw new JsonParseException("Unable to deserialize byte array.", e);
+ }
+ }
+
+ private DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return jsonNode -> {
+ BigDecimal bigDecimal;
+ if (jsonNode.isBigDecimal()) {
+ bigDecimal = jsonNode.decimalValue();
+ } else {
+ bigDecimal = new BigDecimal(jsonNode.asText());
+ }
+ return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+ };
+ }
+
+ private DeserializationRuntimeConverter createArrayConverter(ArrayType arrayType) {
+ DeserializationRuntimeConverter elementConverter = createConverter(arrayType.getElementType());
+ final Class<?> elementClass = LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+ return jsonNode -> {
+ final ArrayNode node = (ArrayNode) jsonNode;
+ final Object[] array = (Object[]) Array.newInstance(elementClass, node.size());
+ for (int i = 0; i < node.size(); i++) {
+ final JsonNode innerNode = node.get(i);
+ array[i] = elementConverter.convert(innerNode);
+ }
+ return new GenericArrayData(array);
+ };
+ }
+
+ private DeserializationRuntimeConverter createMapConverter(MapType mapType) {
+ LogicalType keyType = mapType.getKeyType();
+ if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new UnsupportedOperationException(
+ "JSON format doesn't support non-string as key type of map. " +
+ "The map type is: " + mapType.asSummaryString());
+ }
+ final DeserializationRuntimeConverter keyConverter = createConverter(keyType);
+ final DeserializationRuntimeConverter valueConverter = createConverter(mapType.getValueType());
+
+ return jsonNode -> {
+ Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
+ Map<Object, Object> result = new HashMap<>();
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = fields.next();
+ Object key = keyConverter.convert(TextNode.valueOf(entry.getKey()));
+ Object value = valueConverter.convert(entry.getValue());
+ result.put(key, value);
+ }
+ return new GenericMapData(result);
+ };
+ }
+
+ private DeserializationRuntimeConverter createRowConverter(RowType rowType) {
+ final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(this::createConverter)
+ .toArray(DeserializationRuntimeConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+ return jsonNode -> {
+ ObjectNode node = (ObjectNode) jsonNode;
+ int arity = fieldNames.length;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ JsonNode field = node.get(fieldName);
+ Object convertedField = convertField(fieldConverters[i], fieldName, field);
+ row.setField(i, convertedField);
+ }
+ return row;
+ };
+ }
+
+ private Object convertField(
+ DeserializationRuntimeConverter fieldConverter,
+ String fieldName,
+ JsonNode field) {
+ if (field == null) {
+ if (failOnMissingField) {
+ throw new JsonParseException(
+ "Could not find field with name '" + fieldName + "'.");
+ } else {
+ return null;
+ }
+ } else {
+ return fieldConverter.convert(field);
+ }
+ }
+
+ private DeserializationRuntimeConverter wrapIntoNullableConverter(
+ DeserializationRuntimeConverter converter) {
+ return jsonNode -> {
+ if (jsonNode == null || jsonNode.isNull()) {
+ return null;
+ }
+ try {
+ return converter.convert(jsonNode);
+ } catch (Throwable t) {
+ if (!ignoreParseErrors) {
+ throw t;
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Exception which refers to parse errors in converters.
+ * */
+ private static final class JsonParseException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public JsonParseException(String message) {
+ super(message);
+ }
+
+ public JsonParseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
new file mode 100644
index 0000000..7c17738
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data structure into a JSON bytes.
+ *
+ * <p>Serializes the input Flink object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link JsonRowDataDeserializationSchema}.
+ */
+@Internal
+public class JsonRowDataSerializationSchema implements SerializationSchema<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ /** The converter that converts internal data formats to JsonNode. */
+ private final SerializationRuntimeConverter runtimeConverter;
+
+ /** Object mapper that is used to create output JSON objects. */
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ /** Reusable object node. */
+ private transient ObjectNode node;
+
+ public JsonRowDataSerializationSchema(RowType rowType) {
+ this.runtimeConverter = createConverter(rowType);
+ }
+
+ @Override
+ public byte[] serialize(RowData row) {
+ if (node == null) {
+ node = mapper.createObjectNode();
+ }
+
+ try {
+ runtimeConverter.convert(mapper, node, row);
+ return mapper.writeValueAsBytes(node);
+ } catch (Throwable t) {
+ throw new RuntimeException("Could not serialize row '" + row + "'. " +
+ "Make sure that the schema matches the input.", t);
+ }
+ }
+
+ // --------------------------------------------------------------------------------
+ // Runtime Converters
+ // --------------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that converts objects of Flink Table & SQL internal data structures
+ * to corresponding {@link JsonNode}s.
+ */
+ private interface SerializationRuntimeConverter extends Serializable {
+ JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value);
+ }
+
+ /**
+ * Creates a runtime converter which is null safe.
+ */
+ private SerializationRuntimeConverter createConverter(LogicalType type) {
+ return wrapIntoNullableConverter(createNotNullConverter(type));
+ }
+
+ /**
+ * Creates a runtime converter which assuming input object is not null.
+ */
+ private SerializationRuntimeConverter createNotNullConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().nullNode();
+ case BOOLEAN:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().booleanNode((boolean) value);
+ case TINYINT:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((byte) value);
+ case SMALLINT:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((short) value);
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((int) value);
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((long) value);
+ case FLOAT:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((float) value);
+ case DOUBLE:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((double) value);
+ case CHAR:
+ case VARCHAR:
+ // value is BinaryString
+ return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(value.toString());
+ case BINARY:
+ case VARBINARY:
+ return (mapper, reuse, value) -> mapper.getNodeFactory().binaryNode((byte[]) value);
+ case DATE:
+ return createDateConverter();
+ case TIME_WITHOUT_TIME_ZONE:
+ return createTimeConverter();
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return createTimestampConverter();
+ case DECIMAL:
+ return createDecimalConverter();
+ case ARRAY:
+ return createArrayConverter((ArrayType) type);
+ case MAP:
+ case MULTISET:
+ return createMapConverter((MapType) type);
+ case ROW:
+ return createRowConverter((RowType) type);
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Not support to parse type: " + type);
+ }
+ }
+
+ private SerializationRuntimeConverter createDecimalConverter() {
+ return (mapper, reuse, value) -> {
+ BigDecimal bd = ((DecimalData) value).toBigDecimal();
+ return mapper.getNodeFactory().numberNode(bd);
+ };
+ }
+
+ private SerializationRuntimeConverter createDateConverter() {
+ return (mapper, reuse, value) -> {
+ int days = (int) value;
+ LocalDate date = LocalDate.ofEpochDay(days);
+ return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format(date));
+ };
+ }
+
+ private SerializationRuntimeConverter createTimeConverter() {
+ return (mapper, reuse, value) -> {
+ int millisecond = (int) value;
+ LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L);
+ return mapper.getNodeFactory().textNode(RFC3339_TIME_FORMAT.format(time));
+ };
+ }
+
+ private SerializationRuntimeConverter createTimestampConverter() {
+ return (mapper, reuse, value) -> {
+ TimestampData timestamp = (TimestampData) value;
+ return mapper.getNodeFactory()
+ .textNode(RFC3339_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+ };
+ }
+
+ private SerializationRuntimeConverter createArrayConverter(ArrayType type) {
+ final LogicalType elementType = type.getElementType();
+ final SerializationRuntimeConverter elementConverter = createConverter(elementType);
+ return (mapper, reuse, value) -> {
+ ArrayNode node;
+
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createArrayNode();
+ } else {
+ node = (ArrayNode) reuse;
+ node.removeAll();
+ }
+
+ ArrayData array = (ArrayData) value;
+ int numElements = array.size();
+ for (int i = 0; i < numElements; i++) {
+ Object element = ArrayData.get(array, i, elementType);
+ node.add(elementConverter.convert(mapper, null, element));
+ }
+
+ return node;
+ };
+ }
+
+ private SerializationRuntimeConverter createMapConverter(MapType type) {
+ LogicalType keyType = type.getKeyType();
+ if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new UnsupportedOperationException(
+ "JSON format doesn't support non-string as key type of map. " +
+ "The map type is: " + type.asSummaryString());
+ }
+ final LogicalType valueType = type.getValueType();
+ final SerializationRuntimeConverter valueConverter = createConverter(valueType);
+ return (mapper, reuse, object) -> {
+ ObjectNode node;
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createObjectNode();
+ } else {
+ node = (ObjectNode) reuse;
+ }
+
+ MapData map = (MapData) object;
+ ArrayData keyArray = map.keyArray();
+ ArrayData valueArray = map.valueArray();
+ int numElements = map.size();
+ for (int i = 0; i < numElements; i++) {
+ String fieldName = keyArray.getString(i).toString(); // key must be string
+ Object value = ArrayData.get(valueArray, i, valueType);
+ node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), value));
+ }
+
+ return node;
+ };
+ }
+
+ private SerializationRuntimeConverter createRowConverter(RowType type) {
+ final String[] fieldNames = type.getFieldNames().toArray(new String[0]);
+ final LogicalType[] fieldTypes = type.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ final SerializationRuntimeConverter[] fieldConverters = Arrays.stream(fieldTypes)
+ .map(this::createConverter)
+ .toArray(SerializationRuntimeConverter[]::new);
+ final int fieldCount = type.getFieldCount();
+
+ return (mapper, reuse, value) -> {
+ ObjectNode node;
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createObjectNode();
+ } else {
+ node = (ObjectNode) reuse;
+ }
+ RowData row = (RowData) value;
+ for (int i = 0; i < fieldCount; i++) {
+ String fieldName = fieldNames[i];
+ Object field = RowData.get(row, i, fieldTypes[i]);
+ node.set(fieldName, fieldConverters[i].convert(mapper, node.get(fieldName), field));
+ }
+ return node;
+ };
+ }
+
+ private SerializationRuntimeConverter wrapIntoNullableConverter(
+ SerializationRuntimeConverter converter) {
+ return (mapper, reuse, object) -> {
+ if (object == null) {
+ return mapper.getNodeFactory().nullNode();
+ }
+
+ return converter.convert(mapper, reuse, object);
+ };
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
new file mode 100644
index 0000000..dcfae9c
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JsonRowDataDeserializationSchema} and {@link JsonRowDataSerializationSchema}.
+ */
+public class JsonRowDataSerDeSchemaTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testSerDe() throws Exception {
+ byte tinyint = 'c';
+ short smallint = 128;
+ int intValue = 45536;
+ float floatValue = 33.333F;
+ long bigint = 1238123899121L;
+ String name = "asdlkjasjkdla998y1122";
+ byte[] bytes = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ BigDecimal decimal = new BigDecimal("123.456789");
+ Double[] doubles = new Double[]{1.1, 2.2, 3.3};
+ LocalDate date = LocalDate.parse("1990-10-14");
+ LocalTime time = LocalTime.parse("12:12:43");
+ Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
+ Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789");
+
+ Map<String, Long> map = new HashMap<>();
+ map.put("flink", 123L);
+
+ Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+ Map<String, Integer> innerMap = new HashMap<>();
+ innerMap.put("key", 234);
+ nestedMap.put("inner_map", innerMap);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+
+ // Root
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("bool", true);
+ root.put("tinyint", tinyint);
+ root.put("smallint", smallint);
+ root.put("int", intValue);
+ root.put("bigint", bigint);
+ root.put("float", floatValue);
+ root.put("name", name);
+ root.put("bytes", bytes);
+ root.put("decimal", decimal);
+ root.set("doubles", doubleNode);
+ root.put("date", "1990-10-14");
+ root.put("time", "12:12:43Z");
+ root.put("timestamp3", "1990-10-14T12:12:43.123Z");
+ root.put("timestamp9", "1990-10-14T12:12:43.123456789Z");
+ root.putObject("map").put("flink", 123);
+ root.putObject("map2map").putObject("inner_map").put("key", 234);
+
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ DataType dataType = ROW(
+ FIELD("bool", BOOLEAN()),
+ FIELD("tinyint", TINYINT()),
+ FIELD("smallint", SMALLINT()),
+ FIELD("int", INT()),
+ FIELD("bigint", BIGINT()),
+ FIELD("float", FLOAT()),
+ FIELD("name", STRING()),
+ FIELD("bytes", BYTES()),
+ FIELD("decimal", DECIMAL(9, 6)),
+ FIELD("doubles", ARRAY(DOUBLE())),
+ FIELD("date", DATE()),
+ FIELD("time", TIME(0)),
+ FIELD("timestamp3", TIMESTAMP(3)),
+ FIELD("timestamp9", TIMESTAMP(9)),
+ FIELD("map", MAP(STRING(), BIGINT())),
+ FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
+ RowType schema = (RowType) dataType.getLogicalType();
+ RowDataTypeInfo resultTypeInfo = new RowDataTypeInfo(schema);
+
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ schema, resultTypeInfo, false, false);
+
+ Row expected = new Row(16);
+ expected.setField(0, true);
+ expected.setField(1, tinyint);
+ expected.setField(2, smallint);
+ expected.setField(3, intValue);
+ expected.setField(4, bigint);
+ expected.setField(5, floatValue);
+ expected.setField(6, name);
+ expected.setField(7, bytes);
+ expected.setField(8, decimal);
+ expected.setField(9, doubles);
+ expected.setField(10, date);
+ expected.setField(11, time);
+ expected.setField(12, timestamp3.toLocalDateTime());
+ expected.setField(13, timestamp9.toLocalDateTime());
+ expected.setField(14, map);
+ expected.setField(15, nestedMap);
+
+ RowData rowData = deserializationSchema.deserialize(serializedJson);
+ Row actual = convertToExternal(rowData, dataType);
+ assertEquals(expected, actual);
+
+ // test serialization
+ JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(schema);
+
+ byte[] actualBytes = serializationSchema.serialize(rowData);
+ assertEquals(new String(serializedJson), new String(actualBytes));
+ }
+
+ /**
+ * Tests the deserialization slow path,
+ * e.g. convert into string and use {@link Double#parseDouble(String)}.
+ */
+ @Test
+ public void testSlowDeserialization() throws Exception {
+ Random random = new Random();
+ boolean bool = random.nextBoolean();
+ int integer = random.nextInt();
+ long bigint = random.nextLong();
+ double doubleValue = random.nextDouble();
+ float floatValue = random.nextFloat();
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("bool", String.valueOf(bool));
+ root.put("int", String.valueOf(integer));
+ root.put("bigint", String.valueOf(bigint));
+ root.put("double1", String.valueOf(doubleValue));
+ root.put("double2", new BigDecimal(doubleValue));
+ root.put("float1", String.valueOf(floatValue));
+ root.put("float2", new BigDecimal(floatValue));
+
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ DataType dataType = ROW(
+ FIELD("bool", BOOLEAN()),
+ FIELD("int", INT()),
+ FIELD("bigint", BIGINT()),
+ FIELD("double1", DOUBLE()),
+ FIELD("double2", DOUBLE()),
+ FIELD("float1", FLOAT()),
+ FIELD("float2", FLOAT())
+ );
+ RowType rowType = (RowType) dataType.getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ rowType, new RowDataTypeInfo(rowType), false, false);
+
+ Row expected = new Row(7);
+ expected.setField(0, bool);
+ expected.setField(1, integer);
+ expected.setField(2, bigint);
+ expected.setField(3, doubleValue);
+ expected.setField(4, doubleValue);
+ expected.setField(5, floatValue);
+ expected.setField(6, floatValue);
+
+ RowData rowData = deserializationSchema.deserialize(serializedJson);
+ Row actual = convertToExternal(rowData, dataType);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testSerDeMultiRows() throws Exception {
+ RowType rowType = (RowType) ROW(
+ FIELD("f1", INT()),
+ FIELD("f2", BOOLEAN()),
+ FIELD("f3", STRING())
+ ).getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ rowType, new RowDataTypeInfo(rowType), false, false);
+ JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // the first row
+ {
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("f1", 1);
+ root.put("f2", true);
+ root.put("f3", "str");
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ RowData rowData = deserializationSchema.deserialize(serializedJson);
+ byte[] actual = serializationSchema.serialize(rowData);
+ assertEquals(new String(serializedJson), new String(actual));
+ }
+
+ // the second row
+ {
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("f1", 10);
+ root.put("f2", false);
+ root.put("f3", "newStr");
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ RowData rowData = deserializationSchema.deserialize(serializedJson);
+ byte[] actual = serializationSchema.serialize(rowData);
+ assertEquals(new String(serializedJson), new String(actual));
+ }
+ }
+
+ @Test
+ public void testSerDeMultiRowsWithNullValues() throws Exception {
+ String[] jsons = new String[] {
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"metrics\":{\"k1\":10.01,\"k2\":\"invalid\"}}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\", \"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"}, " +
+ "\"ids\":[1, 2, 3]}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"metrics\":{}}",
+ };
+
+ String[] expected = new String[] {
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"}," +
+ "\"ids\":[1,2,3],\"metrics\":null}",
+ "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null,\"metrics\":{}}",
+ };
+
+ RowType rowType = (RowType) ROW(
+ FIELD("svt", STRING()),
+ FIELD("ops", ROW(FIELD("id", STRING()))),
+ FIELD("ids", ARRAY(INT())),
+ FIELD("metrics", MAP(STRING(), DOUBLE()))
+ ).getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ rowType, new RowDataTypeInfo(rowType), false, true);
+ JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType);
+
+ for (int i = 0; i < jsons.length; i++) {
+ String json = jsons[i];
+ RowData row = deserializationSchema.deserialize(json.getBytes());
+ String result = new String(serializationSchema.serialize(row));
+ assertEquals(expected[i], result);
+ }
+ }
+
+ @Test
+ public void testDeserializationMissingNode() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // Root
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("id", 123123123);
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ DataType dataType = ROW(FIELD("name", STRING()));
+ RowType schema = (RowType) dataType.getLogicalType();
+
+ // pass on missing field
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ schema, new RowDataTypeInfo(schema), false, false);
+
+ Row expected = new Row(1);
+ Row actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
+ assertEquals(expected, actual);
+
+ // fail on missing field
+ deserializationSchema = deserializationSchema = new JsonRowDataDeserializationSchema(
+ schema, new RowDataTypeInfo(schema), true, false);
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage("Failed to deserialize JSON '{\"id\":123123123}'");
+ deserializationSchema.deserialize(serializedJson);
+
+ // ignore on parse error
+ deserializationSchema = new JsonRowDataDeserializationSchema(
+ schema, new RowDataTypeInfo(schema), false, true);
+ actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
+ assertEquals(expected, actual);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled");
+ // failOnMissingField and ignoreParseErrors both enabled
+ //noinspection ConstantConditions
+ new JsonRowDataDeserializationSchema(
+ schema, new RowDataTypeInfo(schema), true, true);
+ }
+
+ @Test
+ public void testJsonParse() throws Exception {
+ for (TestSpec spec : testData) {
+ testIgnoreParseErrors(spec);
+ if (spec.errorMessage != null) {
+ testParseErrors(spec);
+ }
+ }
+ }
+
+ private void testIgnoreParseErrors(TestSpec spec) throws Exception {
+ // the parsing field should be null and no exception is thrown
+ JsonRowDataDeserializationSchema ignoreErrorsSchema = new JsonRowDataDeserializationSchema(
+ spec.rowType, new RowDataTypeInfo(spec.rowType), false, true);
+ Row expected;
+ if (spec.expected != null) {
+ expected = spec.expected;
+ } else {
+ expected = new Row(1);
+ }
+ RowData rowData = ignoreErrorsSchema.deserialize(spec.json.getBytes());
+ Row actual = convertToExternal(rowData, fromLogicalToDataType(spec.rowType));
+ assertEquals("Test Ignore Parse Error: " + spec.json,
+ expected,
+ actual);
+ }
+
+ private void testParseErrors(TestSpec spec) throws Exception {
+ // expect exception if parse error is not ignored
+ JsonRowDataDeserializationSchema failingSchema = new JsonRowDataDeserializationSchema(
+ spec.rowType, new RowDataTypeInfo(spec.rowType), false, false);
+
+ thrown.expectMessage(spec.errorMessage);
+ failingSchema.deserialize(spec.json.getBytes());
+ }
+
+ private static List<TestSpec> testData = Arrays.asList(
+ TestSpec
+ .json("{\"id\": \"trueA\"}")
+ .rowType(ROW(FIELD("id", BOOLEAN())))
+ .expect(Row.of(false)),
+
+ TestSpec
+ .json("{\"id\": true}")
+ .rowType(ROW(FIELD("id", BOOLEAN())))
+ .expect(Row.of(true)),
+
+ TestSpec
+ .json("{\"id\":\"abc\"}")
+ .rowType(ROW(FIELD("id", INT())))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
+
+ TestSpec
+ .json("{\"id\":112.013}")
+ .rowType(ROW(FIELD("id", BIGINT())))
+ .expect(Row.of(112L)),
+
+ TestSpec
+ .json("{\"id\":\"long\"}")
+ .rowType(ROW(FIELD("id", BIGINT())))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"112.013.123\"}")
+ .rowType(ROW(FIELD("id", FLOAT())))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"112.013.123\"}")
+ .rowType(ROW(FIELD("id", DOUBLE())))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"18:00:243\"}")
+ .rowType(ROW(FIELD("id", TIME())))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"20191112\"}")
+ .rowType(ROW(FIELD("id", DATE())))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"2019-11-12 18:00:12\"}")
+ .rowType(ROW(FIELD("id", TIMESTAMP(0))))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'"),
+
+ TestSpec
+ .json("{\"id\":\"abc\"}")
+ .rowType(ROW(FIELD("id", DECIMAL(10, 3))))
+ .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
+
+ TestSpec
+ .json("{\"row\":{\"id\":\"abc\"}}")
+ .rowType(ROW(FIELD("row", ROW(FIELD("id", BOOLEAN())))))
+ .expect(Row.of(new Row(1)))
+ .expectErrorMessage("Failed to deserialize JSON '{\"row\":{\"id\":\"abc\"}}'"),
+
+ TestSpec
+ .json("{\"array\":[123, \"abc\"]}")
+ .rowType(ROW(FIELD("array", ARRAY(INT()))))
+ .expect(Row.of((Object) new Integer[]{123, null}))
+ .expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'"),
+
+ TestSpec
+ .json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
+ .rowType(ROW(FIELD("map", MAP(STRING(), INT()))))
+ .expect(Row.of(createHashMap("key1", 123, "key2", null)))
+ .expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'")
+
+
+ );
+
+ private static Map<String, Integer> createHashMap(String k1, Integer v1, String k2, Integer v2) {
+ Map<String, Integer> map = new HashMap<>();
+ map.put(k1, v1);
+ map.put(k2, v2);
+ return map;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Row convertToExternal(RowData rowData, DataType dataType) {
+ return (Row) DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData);
+ }
+
+ private static class TestSpec {
+ private final String json;
+ private RowType rowType;
+ private Row expected;
+ private String errorMessage;
+
+ private TestSpec(String json) {
+ this.json = json;
+ }
+
+ public static TestSpec json(String json) {
+ return new TestSpec(json);
+ }
+
+ TestSpec expect(Row row) {
+ this.expected = row;
+ return this;
+ }
+
+ TestSpec rowType(DataType rowType) {
+ this.rowType = (RowType) rowType.getLogicalType();
+ return this;
+ }
+
+ TestSpec expectErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ return this;
+ }
+ }
+}