blob: 81e370c0fd36aebf4fcd0e0bae3150f1b0cf44da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.json;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.formats.utils.DeserializationSchemaMatcher.whenDeserializedWith;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.internal.matchers.ThrowableCauseMatcher.hasCause;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
/** Tests for the {@link JsonRowDeserializationSchema}. */
public class JsonRowDeserializationSchemaTest {
private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
@Rule public ExpectedException thrown = ExpectedException.none();
/** Tests simple deserialization using type information. */
@Test
public void testTypeInfoDeserialization() throws Exception {
long id = 1238123899121L;
String name = "asdlkjasjkdla998y1122";
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
Timestamp timestamp = Timestamp.valueOf("1990-10-14 12:12:43");
Date date = Date.valueOf("1990-10-14");
Time time = Time.valueOf("12:12:43");
Map<String, Long> map = new HashMap<>();
map.put("flink", 123L);
Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
Map<String, Integer> innerMap = new HashMap<>();
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);
// Root
ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", id);
root.put("name", name);
root.put("bytes", bytes);
root.put("date1", "1990-10-14");
root.put("date2", "1990-10-14");
root.put("time1", "12:12:43Z");
root.put("time2", "12:12:43Z");
root.put("timestamp1", "1990-10-14T12:12:43Z");
root.put("timestamp2", "1990-10-14T12:12:43Z");
root.putObject("map").put("flink", 123);
root.putObject("map2map").putObject("inner_map").put("key", 234);
byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema =
new JsonRowDeserializationSchema.Builder(
Types.ROW_NAMED(
new String[] {
"id",
"name",
"bytes",
"date1",
"date2",
"time1",
"time2",
"timestamp1",
"timestamp2",
"map",
"map2map"
},
Types.LONG,
Types.STRING,
Types.PRIMITIVE_ARRAY(Types.BYTE),
Types.SQL_DATE,
Types.LOCAL_DATE,
Types.SQL_TIME,
Types.LOCAL_TIME,
Types.SQL_TIMESTAMP,
Types.LOCAL_DATE_TIME,
Types.MAP(Types.STRING, Types.LONG),
Types.MAP(
Types.STRING, Types.MAP(Types.STRING, Types.INT))))
.build();
Row row = new Row(11);
row.setField(0, id);
row.setField(1, name);
row.setField(2, bytes);
row.setField(3, date);
row.setField(4, date.toLocalDate());
row.setField(5, time);
row.setField(6, time.toLocalTime());
row.setField(7, timestamp);
row.setField(8, timestamp.toLocalDateTime());
row.setField(9, map);
row.setField(10, nestedMap);
assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(row));
}
@Test
public void testSchemaDeserialization() throws Exception {
final BigDecimal id = BigDecimal.valueOf(1238123899121L);
final String name = "asdlkjasjkdla998y1122";
final byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
final BigDecimal[] numbers =
new BigDecimal[] {
BigDecimal.valueOf(1), BigDecimal.valueOf(2), BigDecimal.valueOf(3)
};
final String[] strings = new String[] {"one", "two", "three"};
// Root
ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", id.longValue());
root.putNull("idOrNull");
root.put("name", name);
root.put("date", "1990-10-14");
root.put("time", "12:12:43Z");
root.put("timestamp", "1990-10-14T12:12:43Z");
root.put("bytes", bytes);
root.putArray("numbers").add(1).add(2).add(3);
root.putArray("strings").add("one").add("two").add("three");
root.putObject("nested").put("booleanField", true).put("decimalField", 12);
final byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema =
new JsonRowDeserializationSchema.Builder(
"{"
+ " type: 'object',"
+ " properties: {"
+ " id: { type: 'integer' },"
+ " idOrNull: { type: ['integer', 'null'] },"
+ " name: { type: 'string' },"
+ " date: { type: 'string', format: 'date' },"
+ " time: { type: 'string', format: 'time' },"
+ " timestamp: { type: 'string', format: 'date-time' },"
+ " bytes: { type: 'string', contentEncoding: 'base64' },"
+ " numbers: { type: 'array', items: { type: 'integer' } },"
+ " strings: { type: 'array', items: { type: 'string' } },"
+ " nested: { "
+ " type: 'object',"
+ " properties: { "
+ " booleanField: { type: 'boolean' },"
+ " decimalField: { type: 'number' }"
+ " }"
+ " }"
+ " }"
+ "}")
.build();
final Row expected = new Row(10);
expected.setField(0, id);
expected.setField(1, null);
expected.setField(2, name);
expected.setField(3, Date.valueOf("1990-10-14"));
expected.setField(4, Time.valueOf("12:12:43"));
expected.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
expected.setField(6, bytes);
expected.setField(7, numbers);
expected.setField(8, strings);
final Row nestedRow = new Row(2);
nestedRow.setField(0, true);
nestedRow.setField(1, BigDecimal.valueOf(12));
expected.setField(9, nestedRow);
assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(expected));
}
/** Tests deserialization with non-existing field name. */
@Test
public void testMissingNode() throws Exception {
// Root
ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", 123123123);
byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
TypeInformation<Row> rowTypeInformation =
Types.ROW_NAMED(new String[] {"name"}, Types.STRING);
JsonRowDeserializationSchema deserializationSchema =
new JsonRowDeserializationSchema.Builder(rowTypeInformation).build();
Row row = new Row(1);
assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(row));
deserializationSchema =
new JsonRowDeserializationSchema.Builder(rowTypeInformation)
.failOnMissingField()
.build();
assertThat(
serializedJson,
whenDeserializedWith(deserializationSchema)
.failsWithException(hasCause(instanceOf(IllegalStateException.class))));
// ignore-parse-errors ignores missing field exception too
deserializationSchema =
new JsonRowDeserializationSchema.Builder(rowTypeInformation)
.ignoreParseErrors()
.build();
assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(row));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both true");
new JsonRowDeserializationSchema.Builder(rowTypeInformation)
.failOnMissingField()
.ignoreParseErrors()
.build();
}
/** Tests that number of field names and types has to match. */
@Test
public void testNumberOfFieldNamesAndTypesMismatch() {
try {
new JsonRowDeserializationSchema.Builder(
Types.ROW_NAMED(new String[] {"one", "two", "three"}, Types.LONG))
.build();
Assert.fail("Did not throw expected Exception");
} catch (IllegalArgumentException ignored) {
// Expected
}
}
@Test
public void testJsonParse() {
for (TestSpec spec : testData) {
testIgnoreParseErrors(spec);
if (spec.errorMessage != null) {
testParseErrors(spec);
}
}
}
private void testIgnoreParseErrors(TestSpec spec) {
// the parsing field should be null and no exception is thrown
JsonRowDeserializationSchema ignoreErrorsSchema =
new JsonRowDeserializationSchema.Builder(spec.rowTypeInformation)
.ignoreParseErrors()
.build();
Row expected;
if (spec.expected != null) {
expected = spec.expected;
} else {
expected = new Row(1);
}
assertThat(
"Test Ignore Parse Error: " + spec.json,
spec.json.getBytes(),
whenDeserializedWith(ignoreErrorsSchema).equalsTo(expected));
}
private void testParseErrors(TestSpec spec) {
// expect exception if parse error is not ignored
JsonRowDeserializationSchema failingSchema =
new JsonRowDeserializationSchema.Builder(spec.rowTypeInformation).build();
assertThat(
"Test Parse Error: " + spec.json,
spec.json.getBytes(),
whenDeserializedWith(failingSchema)
.failsWithException(hasMessage(containsString(spec.errorMessage))));
}
private static List<TestSpec> testData =
Arrays.asList(
TestSpec.json("{\"id\": \"trueA\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.BOOLEAN))
.expect(Row.of(false)),
TestSpec.json("{\"id\": true}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.BOOLEAN))
.expect(Row.of(true)),
TestSpec.json("{\"id\":\"abc\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.INT))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
TestSpec.json("{\"id\":112.013}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.LONG))
.expect(Row.of(112L)),
TestSpec.json("{\"id\":true}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.STRING))
.expect(Row.of("true")),
TestSpec.json("{\"id\":123.234}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.STRING))
.expect(Row.of("123.234")),
TestSpec.json("{\"id\":1234567}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.STRING))
.expect(Row.of("1234567")),
TestSpec.json("{\"id\":\"string field\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.STRING))
.expect(Row.of("string field")),
TestSpec.json("{\"id\":[\"array data1\",\"array data2\",123,234.345]}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.STRING))
.expect(Row.of("[\"array data1\",\"array data2\",123,234.345]")),
TestSpec.json("{\"id\":{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.STRING))
.expect(Row.of("{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}")),
TestSpec.json("{\"id\":\"long\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.LONG))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'"),
TestSpec.json("{\"id\":\"112.013.123\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.FLOAT))
.expectErrorMessage(
"Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
TestSpec.json("{\"id\":\"112.013.123\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.DOUBLE))
.expectErrorMessage(
"Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
TestSpec.json("{\"id\":\"18:00:243\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.SQL_TIME))
.expectErrorMessage(
"Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"),
TestSpec.json("{\"id\":\"20191112\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.SQL_DATE))
.expectErrorMessage(
"Failed to deserialize JSON '{\"id\":\"20191112\"}'"),
TestSpec.json("{\"id\":\"2019-11-12 18:00:12\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.SQL_TIMESTAMP))
.expectErrorMessage(
"Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'"),
TestSpec.json("{\"id\":\"abc\"}")
.typeInfo(Types.ROW_NAMED(new String[] {"id"}, Types.BIG_DEC))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
TestSpec.json("{\"row\":{\"id\":\"abc\"}}")
.typeInfo(
Types.ROW_NAMED(
new String[] {"row"},
Types.ROW_NAMED(new String[] {"id"}, Types.INT)))
.expect(Row.of(new Row(1)))
.expectErrorMessage(
"Failed to deserialize JSON '{\"row\":{\"id\":\"abc\"}}'"),
TestSpec.json("{\"array\":[123, \"abc\"]}")
.typeInfo(
Types.ROW_NAMED(
new String[] {"array"}, Types.OBJECT_ARRAY(Types.INT)))
.expect(Row.of((Object) new Integer[] {123, null}))
.expectErrorMessage(
"Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'"),
TestSpec.json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
.typeInfo(
Types.ROW_NAMED(
new String[] {"map"},
Types.MAP(Types.STRING, Types.INT)))
.expect(Row.of(createHashMap("key1", 123, "key2", null)))
.expectErrorMessage(
"Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'"),
TestSpec.json("{\"id\":1,\"factor\":799.929496989092949698}")
.typeInfo(
Types.ROW_NAMED(
new String[] {"id", "factor"},
Types.INT,
Types.BIG_DEC))
.expect(Row.of(1, new BigDecimal("799.929496989092949698"))));
private static Map<String, Integer> createHashMap(
String k1, Integer v1, String k2, Integer v2) {
Map<String, Integer> map = new HashMap<>();
map.put(k1, v1);
map.put(k2, v2);
return map;
}
private static class TestSpec {
private final String json;
private @Nullable TypeInformation<Row> rowTypeInformation;
private @Nullable Row expected;
private @Nullable String errorMessage;
private TestSpec(String json) {
this.json = json;
}
public static TestSpec json(String json) {
return new TestSpec(json);
}
TestSpec expect(Row row) {
this.expected = row;
return this;
}
TestSpec typeInfo(TypeInformation<Row> rowTypeInformation) {
this.rowTypeInformation = rowTypeInformation;
return this;
}
TestSpec expectErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
return this;
}
}
}