[FLINK-20470][json] MissingNode can't be casted to ObjectNode when deserializing JSON

This closes #14316
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 78a3247..0a2fb5e 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -89,7 +89,7 @@
 		this.failOnMissingField = failOnMissingField;
 		this.ignoreParseErrors = ignoreParseErrors;
 		this.runtimeConverter = new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
-			.createRowConverter(checkNotNull(rowType));
+			.createConverter(checkNotNull(rowType));
 		this.timestampFormat = timestampFormat;
 		boolean hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
 		if (hasDecimalType) {
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
index 25df3df..65feec3 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
@@ -106,7 +106,7 @@
 	/**
 	 * Creates a runtime converter which is null safe.
 	 */
-	private JsonToRowDataConverter createConverter(LogicalType type) {
+	public JsonToRowDataConverter createConverter(LogicalType type) {
 		return wrapIntoNullableConverter(createNotNullConverter(type));
 	}
 
@@ -368,7 +368,7 @@
 	private JsonToRowDataConverter wrapIntoNullableConverter(
 		JsonToRowDataConverter converter) {
 		return jsonNode -> {
-			if (jsonNode == null || jsonNode.isNull()) {
+			if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
 				return null;
 			}
 			try {
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index d324e85..b49326b 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -351,6 +351,17 @@
 
 	@Test
 	public void testDeserializationMissingNode() throws Exception {
+		DataType dataType = ROW(FIELD("name", STRING()));
+		RowType schema = (RowType) dataType.getLogicalType();
+
+		JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+			schema, InternalTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601);
+		RowData rowData = deserializationSchema.deserialize("".getBytes());
+		assertEquals(null, rowData);
+	}
+
+	@Test
+	public void testDeserializationMissingField() throws Exception {
 		ObjectMapper objectMapper = new ObjectMapper();
 
 		// Root