[FLINK-20470][json] MissingNode can't be casted to ObjectNode when deserializing JSON
This closes #14316
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 78a3247..0a2fb5e 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -89,7 +89,7 @@
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
this.runtimeConverter = new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
- .createRowConverter(checkNotNull(rowType));
+ .createConverter(checkNotNull(rowType));
this.timestampFormat = timestampFormat;
boolean hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
if (hasDecimalType) {
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
index 25df3df..65feec3 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
@@ -106,7 +106,7 @@
/**
* Creates a runtime converter which is null safe.
*/
- private JsonToRowDataConverter createConverter(LogicalType type) {
+ public JsonToRowDataConverter createConverter(LogicalType type) {
return wrapIntoNullableConverter(createNotNullConverter(type));
}
@@ -368,7 +368,7 @@
private JsonToRowDataConverter wrapIntoNullableConverter(
JsonToRowDataConverter converter) {
return jsonNode -> {
- if (jsonNode == null || jsonNode.isNull()) {
+ if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
return null;
}
try {
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index d324e85..b49326b 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -351,6 +351,17 @@
@Test
public void testDeserializationMissingNode() throws Exception {
+ DataType dataType = ROW(FIELD("name", STRING()));
+ RowType schema = (RowType) dataType.getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ schema, InternalTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601);
+ RowData rowData = deserializationSchema.deserialize("".getBytes());
+ assertEquals(null, rowData);
+ }
+
+ @Test
+ public void testDeserializationMissingField() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
// Root