| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.json; |
| |
| import org.apache.avro.Schema; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.nifi.avro.AvroTypeUtil; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; |
| import org.apache.nifi.schema.inference.TimeValueInference; |
| import org.apache.nifi.serialization.MalformedRecordException; |
| import org.apache.nifi.serialization.SimpleRecordSchema; |
| import org.apache.nifi.serialization.record.DataType; |
| import org.apache.nifi.serialization.record.MapRecord; |
| import org.apache.nifi.serialization.record.Record; |
| import org.apache.nifi.serialization.record.RecordField; |
| import org.apache.nifi.serialization.record.RecordFieldType; |
| import org.apache.nifi.serialization.record.RecordSchema; |
| import org.apache.nifi.serialization.record.type.ChoiceDataType; |
| import org.apache.nifi.util.EqualsWrapper; |
| import org.apache.nifi.util.MockComponentLog; |
| import org.junit.Assert; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.math.BigDecimal; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.mock; |
| |
| public class TestJsonTreeRowRecordReader { |
| private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); |
| private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); |
| private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); |
| |
| private List<RecordField> getDefaultFields() { |
| return getFields(RecordFieldType.DOUBLE.getDataType()); |
| } |
| |
| private List<RecordField> getFields(final DataType balanceDataType) { |
| final List<RecordField> fields = new ArrayList<>(); |
| fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); |
| fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("balance", balanceDataType)); |
| fields.add(new RecordField("address", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("city", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("state", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("country", RecordFieldType.STRING.getDataType())); |
| return fields; |
| } |
| |
| private RecordSchema getAccountSchema() { |
| final List<RecordField> accountFields = new ArrayList<>(); |
| accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType())); |
| accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); |
| |
| final RecordSchema accountSchema = new SimpleRecordSchema(accountFields); |
| return accountSchema; |
| } |
| |
| |
| @Test |
| public void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException { |
| final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc"); |
| final File jsonFile = new File("src/test/resources/json/choice-of-string-or-array-record.json"); |
| |
| final Schema avroSchema = new Schema.Parser().parse(schemaFile); |
| final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); |
| |
| try (final InputStream fis = new FileInputStream(jsonFile); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final Record record = reader.nextRecord(); |
| final Object[] fieldsArray = record.getAsArray("fields"); |
| assertEquals(2, fieldsArray.length); |
| |
| final Object firstElement = fieldsArray[0]; |
| assertTrue(firstElement instanceof Record); |
| assertEquals("string", ((Record) firstElement).getAsString("type")); |
| |
| final Object secondElement = fieldsArray[1]; |
| assertTrue(secondElement instanceof Record); |
| final Object[] typeArray = ((Record) secondElement).getAsArray("type"); |
| assertEquals(1, typeArray.length); |
| |
| final Object firstType = typeArray[0]; |
| assertTrue(firstType instanceof Record); |
| final Record firstTypeRecord = (Record) firstType; |
| assertEquals("string", firstTypeRecord.getAsString("type")); |
| } |
| |
| } |
| |
| @Test |
| @Ignore("Intended only for manual testing to determine performance before/after modifications") |
| public void testPerformanceOnLocalFile() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList()); |
| |
| final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289"); |
| final byte[] data = Files.readAllBytes(file.toPath()); |
| |
| final ComponentLog logger = mock(ComponentLog.class); |
| |
| int recordCount = 0; |
| final int iterations = 1000; |
| |
| for (int j = 0; j < 5; j++) { |
| final long start = System.nanoTime(); |
| for (int i = 0; i < iterations; i++) { |
| try (final InputStream in = new ByteArrayInputStream(data); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) { |
| while (reader.nextRecord() != null) { |
| recordCount++; |
| } |
| } |
| } |
| final long nanos = System.nanoTime() - start; |
| final long millis = TimeUnit.NANOSECONDS.toMillis(nanos); |
| System.out.println("Took " + millis + " millis to read " + recordCount + " records"); |
| } |
| } |
| |
| @Test |
| @Ignore("Intended only for manual testing to determine performance before/after modifications") |
| public void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList()); |
| |
| final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json"); |
| final byte[] data = Files.readAllBytes(file.toPath()); |
| |
| final ComponentLog logger = mock(ComponentLog.class); |
| |
| int recordCount = 0; |
| final int iterations = 1_000_000; |
| |
| for (int j = 0; j < 5; j++) { |
| final long start = System.nanoTime(); |
| for (int i = 0; i < iterations; i++) { |
| try (final InputStream in = new ByteArrayInputStream(data); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) { |
| while (reader.nextRecord() != null) { |
| recordCount++; |
| } |
| } |
| } |
| final long nanos = System.nanoTime() - start; |
| final long millis = TimeUnit.NANOSECONDS.toMillis(nanos); |
| System.out.println("Took " + millis + " millis to read " + recordCount + " records"); |
| } |
| } |
| |
| @Test |
| public void testChoiceOfRecordTypes() throws IOException, MalformedRecordException { |
| final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc")); |
| final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/elements-for-record-choice.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) { |
| |
| // evaluate first record |
| final Record firstRecord = reader.nextRecord(); |
| assertNotNull(firstRecord); |
| final RecordSchema firstOuterSchema = firstRecord.getSchema(); |
| assertEquals(Arrays.asList("id", "child"), firstOuterSchema.getFieldNames()); |
| assertEquals("1234", firstRecord.getValue("id")); |
| |
| // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types |
| assertTrue(firstOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE); |
| final List<DataType> firstSubTypes = ((ChoiceDataType) firstOuterSchema.getDataType("child").get()).getPossibleSubTypes(); |
| assertEquals(2, firstSubTypes.size()); |
| assertEquals(2L, firstSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count()); |
| |
| // child record should have a schema with "id" as the only field |
| final Object childObject = firstRecord.getValue("child"); |
| assertTrue(childObject instanceof Record); |
| final Record firstChildRecord = (Record) childObject; |
| final RecordSchema firstChildSchema = firstChildRecord.getSchema(); |
| |
| assertEquals(Arrays.asList("id"), firstChildSchema.getFieldNames()); |
| |
| // evaluate second record |
| final Record secondRecord = reader.nextRecord(); |
| assertNotNull(secondRecord); |
| |
| final RecordSchema secondOuterSchema = secondRecord.getSchema(); |
| assertEquals(Arrays.asList("id", "child"), secondOuterSchema.getFieldNames()); |
| assertEquals("1234", secondRecord.getValue("id")); |
| |
| // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types |
| assertTrue(secondOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE); |
| final List<DataType> secondSubTypes = ((ChoiceDataType) secondOuterSchema.getDataType("child").get()).getPossibleSubTypes(); |
| assertEquals(2, secondSubTypes.size()); |
| assertEquals(2L, secondSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count()); |
| |
| // child record should have a schema with "name" as the only field |
| final Object secondChildObject = secondRecord.getValue("child"); |
| assertTrue(secondChildObject instanceof Record); |
| final Record secondChildRecord = (Record) secondChildObject; |
| final RecordSchema secondChildSchema = secondChildRecord.getSchema(); |
| |
| assertEquals(Arrays.asList("name"), secondChildSchema.getFieldNames()); |
| |
| assertNull(reader.nextRecord()); |
| } |
| |
| } |
| |
| @Test |
| public void testReadArray() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadOneLinePerJSON() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadMultilineJSON() throws IOException, MalformedRecordException { |
| final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10)); |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadMultilineArrays() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiarray.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); |
| |
| final Object[] thirdRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues); |
| |
| final Object[] fourthRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadMixedJSON() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-mixed.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); |
| |
| final Object[] thirdRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues); |
| |
| final Object[] fourthRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues); |
| |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException { |
| final List<RecordField> fields = new ArrayList<>(); |
| fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); |
| fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final Record schemaValidatedRecord = reader.nextRecord(true, true); |
| assertEquals(1, schemaValidatedRecord.getValue("id")); |
| assertEquals("John Doe", schemaValidatedRecord.getValue("name")); |
| assertNull(schemaValidatedRecord.getValue("balance")); |
| } |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final Record rawRecord = reader.nextRecord(false, false); |
| assertEquals(1, rawRecord.getValue("id")); |
| assertEquals("John Doe", rawRecord.getValue("name")); |
| assertEquals(4750.89, rawRecord.getValue("balance")); |
| assertEquals("123 My Street", rawRecord.getValue("address")); |
| assertEquals("My City", rawRecord.getValue("city")); |
| assertEquals("MS", rawRecord.getValue("state")); |
| assertEquals("11111", rawRecord.getValue("zipCode")); |
| assertEquals("USA", rawRecord.getValue("country")); |
| } |
| } |
| |
| |
| @Test |
| public void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException { |
| final List<RecordField> fields = new ArrayList<>(); |
| fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final Record schemaValidatedRecord = reader.nextRecord(true, true); |
| assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema |
| assertEquals("John Doe", schemaValidatedRecord.getValue("name")); |
| assertNull(schemaValidatedRecord.getValue("balance")); |
| |
| assertEquals(2, schemaValidatedRecord.getRawFieldNames().size()); |
| } |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final Record rawRecord = reader.nextRecord(false, false); |
| assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1 |
| assertEquals("John Doe", rawRecord.getValue("name")); |
| assertEquals(4750.89, rawRecord.getValue("balance")); |
| assertEquals("123 My Street", rawRecord.getValue("address")); |
| assertEquals("My City", rawRecord.getValue("city")); |
| assertEquals("MS", rawRecord.getValue("state")); |
| assertEquals("11111", rawRecord.getValue("zipCode")); |
| assertEquals("USA", rawRecord.getValue("country")); |
| |
| assertEquals(8, rawRecord.getRawFieldNames().size()); |
| } |
| } |
| |
| @Test |
| public void testDateCoercedFromString() throws IOException, MalformedRecordException { |
| final String dateField = "date"; |
| final List<RecordField> recordFields = Collections.singletonList(new RecordField(dateField, RecordFieldType.DATE.getDataType())); |
| final RecordSchema schema = new SimpleRecordSchema(recordFields); |
| |
| final String date = "2000-01-01"; |
| final String datePattern = "yyyy-MM-dd"; |
| final String json = String.format("{ \"%s\": \"%s\" }", dateField, date); |
| for (final boolean coerceTypes : new boolean[] {true, false}) { |
| try (final InputStream in = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, timeFormat, timestampFormat)) { |
| |
| final Record record = reader.nextRecord(coerceTypes, false); |
| final Object value = record.getValue(dateField); |
| assertTrue("With coerceTypes set to " + coerceTypes + ", value is not a Date", value instanceof java.sql.Date); |
| assertEquals(date, value.toString()); |
| } |
| } |
| } |
| |
| @Test |
| public void testTimestampCoercedFromString() throws IOException, MalformedRecordException { |
| final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType())); |
| final RecordSchema schema = new SimpleRecordSchema(recordFields); |
| |
| for (final boolean coerceTypes : new boolean[] {true, false}) { |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/timestamp.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) { |
| |
| final Record record = reader.nextRecord(coerceTypes, false); |
| final Object value = record.getValue("timestamp"); |
| assertTrue("With coerceTypes set to " + coerceTypes + ", value is not a Timestamp", value instanceof java.sql.Timestamp); |
| } |
| } |
| } |
| |
| @Test |
| public void testSingleJsonElement() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException { |
| // Wraps default fields by Choice data type to test mapping to a Choice type. |
| final List<RecordField> choiceFields = getDefaultFields().stream() |
| .map(f -> new RecordField(f.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList()); |
| final RecordSchema schema = new SimpleRecordSchema(choiceFields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| final List<RecordField> fields = schema.getFields(); |
| for (int i = 0; i < schema.getFields().size(); i++) { |
| assertTrue(fields.get(i).getDataType() instanceof ChoiceDataType); |
| final ChoiceDataType choiceDataType = (ChoiceDataType) fields.get(i).getDataType(); |
| assertEquals(expectedTypes.get(i), choiceDataType.getPossibleSubTypes().get(0).getFieldType()); |
| } |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testElementWithNestedData() throws IOException, MalformedRecordException { |
| final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); |
| final List<RecordField> fields = getDefaultFields(); |
| fields.add(new RecordField("account", accountType)); |
| fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| final Object[] allButLast = Arrays.copyOfRange(firstRecordValues, 0, firstRecordValues.length - 1); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", "123 My Street", "My City", "MS", "11111", "USA"}, allButLast); |
| |
| final Object last = firstRecordValues[firstRecordValues.length - 1]; |
| assertTrue(Record.class.isAssignableFrom(last.getClass())); |
| final Record record = (Record) last; |
| assertEquals(42, record.getValue("id")); |
| assertEquals(4750.89, record.getValue("balance")); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testElementWithNestedArray() throws IOException, MalformedRecordException { |
| final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); |
| final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); |
| |
| final List<RecordField> fields = getDefaultFields(); |
| fields.add(new RecordField("accounts", accountsType)); |
| fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] { |
| "id", "name", "address", "city", "state", "zipCode", "country", "accounts"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| final Object[] nonArrayValues = Arrays.copyOfRange(firstRecordValues, 0, firstRecordValues.length - 1); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", "123 My Street", "My City", "MS", "11111", "USA"}, nonArrayValues); |
| |
| final Object lastRecord = firstRecordValues[firstRecordValues.length - 1]; |
| assertTrue(Object[].class.isAssignableFrom(lastRecord.getClass())); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, |
| RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", null}, secondRecordValues); |
| |
| final Object[] thirdRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {3, "Jake Doe", 4751.89, "124 My Street", "My City", "MS", "11111", "USA"}, thirdRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException { |
| final Map<String, DataType> overrides = new HashMap<>(); |
| overrides.put("address2", RecordFieldType.STRING.getDataType()); |
| |
| final List<RecordField> fields = getDefaultFields(); |
| fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType())); |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, |
| RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA", null}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", null, null}, secondRecordValues); |
| |
| final Object[] thirdRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {3, "Jake Doe", 4751.89, "124 My Street", "My City", "MS", "11111", "USA", "Apt. #12"}, thirdRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException { |
| final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-optional-balance.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final List<String> fieldNames = schema.getFieldNames(); |
| final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); |
| assertEquals(expectedFieldNames, fieldNames); |
| |
| final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); |
| final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, |
| RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING}); |
| assertEquals(expectedTypes, dataTypes); |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); |
| |
| final Object[] secondRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {2, "Jane Doe", null, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); |
| |
| final Object[] thirdRecordValues = reader.nextRecord().getValues(); |
| Assert.assertArrayEquals(new Object[] {3, "Jimmy Doe", null, "321 Your Street", "Your City", "NY", "33333", "USA"}, thirdRecordValues); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| |
| @Test |
| public void testReadUnicodeCharacters() throws IOException, MalformedRecordException { |
| |
| final List<RecordField> fromFields = new ArrayList<>(); |
| fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType())); |
| fromFields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); |
| final RecordSchema fromSchema = new SimpleRecordSchema(fromFields); |
| final DataType fromType = RecordFieldType.RECORD.getRecordDataType(fromSchema); |
| |
| final List<RecordField> fields = new ArrayList<>(); |
| fields.add(new RecordField("created_at", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("id", RecordFieldType.LONG.getDataType())); |
| fields.add(new RecordField("unicode", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("from", fromType)); |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/json-with-unicode.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| final Object[] firstRecordValues = reader.nextRecord().getValues(); |
| |
| final Object secondValue = firstRecordValues[1]; |
| assertTrue(secondValue instanceof Long); |
| assertEquals(832036744985577473L, secondValue); |
| |
| final Object unicodeValue = firstRecordValues[2]; |
| assertEquals("\u3061\u3083\u6ce3\u304d\u305d\u3046", unicodeValue); |
| |
| assertNull(reader.nextRecord()); |
| } |
| } |
| |
| @Test |
| public void testIncorrectSchema() throws IOException, MalformedRecordException { |
| final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); |
| final List<RecordField> fields = getDefaultFields(); |
| fields.add(new RecordField("account", accountType)); |
| fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); |
| |
| final RecordSchema schema = new SimpleRecordSchema(fields); |
| |
| try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account-wrong-field-type.json")); |
| final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { |
| |
| reader.nextRecord().getValues(); |
| Assert.fail("Was able to read record with invalid schema."); |
| |
| } catch (final MalformedRecordException mre) { |
| final String msg = mre.getCause().getMessage(); |
| assertTrue(msg.contains("account.balance")); |
| assertTrue(msg.contains("true")); |
| assertTrue(msg.contains("Double")); |
| assertTrue(msg.contains("Boolean")); |
| } |
| } |
| |
| @Test |
| public void testMergeOfSimilarRecords() throws Exception { |
| // GIVEN |
| String jsonPath = "src/test/resources/json/similar-records.json"; |
| |
| RecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()), |
| new RecordField("booleanOrString", RecordFieldType.CHOICE.getChoiceDataType( |
| RecordFieldType.BOOLEAN.getDataType(), |
| RecordFieldType.STRING.getDataType() |
| )), |
| new RecordField("string", RecordFieldType.STRING.getDataType()) |
| )); |
| |
| List<Object> expected = Arrays.asList( |
| new MapRecord(expectedSchema, new HashMap<String, Object>(){{ |
| put("integer", 1); |
| put("boolean", true); |
| put("booleanOrString", true); |
| }}), |
| new MapRecord(expectedSchema, new HashMap<String, Object>(){{ |
| put("integer", 2); |
| put("string", "stringValue2"); |
| put("booleanOrString", "booleanOrStringValue2"); |
| }}) |
| ); |
| |
| // WHEN |
| // THEN |
| testReadRecords(jsonPath, expected); |
| } |
| |
| @Test |
| public void testChoiceOfEmbeddedSimilarRecords() throws Exception { |
| // GIVEN |
| String jsonPath = "src/test/resources/json/choice-of-embedded-similar-records.json"; |
| |
| SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) |
| )); |
| SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("string", RecordFieldType.STRING.getDataType()) |
| )); |
| RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( |
| RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1), |
| RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2) |
| )) |
| )); |
| |
| List<Object> expected = Arrays.asList( |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{ |
| put("integer", 1); |
| put("boolean", true); |
| }})); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new MapRecord(expectedRecordSchema2, new HashMap<String, Object>(){{ |
| put("integer", 2); |
| put("string", "stringValue2"); |
| }})); |
| }}) |
| ); |
| |
| // WHEN |
| // THEN |
| testReadRecords(jsonPath, expected); |
| } |
| |
| @Test |
| public void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception { |
| // GIVEN |
| String jsonPath = "src/test/resources/json/choice-of-embedded-arrays-and-single-records.json"; |
| |
| SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()) |
| )); |
| SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) |
| )); |
| SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("string", RecordFieldType.STRING.getDataType()) |
| )); |
| SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("string", RecordFieldType.STRING.getDataType()) |
| )); |
| RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( |
| RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1), |
| RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3), |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)), |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4)) |
| )) |
| )); |
| |
| List<Object> expected = Arrays.asList( |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{ |
| put("integer", 1); |
| }})); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new Object[]{ |
| new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ |
| put("integer", 21); |
| put("boolean", true); |
| }}), |
| new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ |
| put("integer", 22); |
| put("boolean", false); |
| }}) |
| }); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{ |
| put("integer", 3); |
| put("string", "stringValue3"); |
| }})); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new Object[]{ |
| new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ |
| put("integer", 41); |
| put("string", "stringValue41"); |
| }}), |
| new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ |
| put("integer", 42); |
| put("string", "stringValue42"); |
| }}) |
| }); |
| }}) |
| ); |
| |
| // WHEN |
| // THEN |
| testReadRecords(jsonPath, expected); |
| } |
| |
| @Test |
| public void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception { |
| // GIVEN |
| String jsonPath = "src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json"; |
| |
| SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) |
| )); |
| SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) |
| )); |
| SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("string", RecordFieldType.STRING.getDataType()) |
| )); |
| SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("string", RecordFieldType.STRING.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) |
| )); |
| RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( |
| RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1), |
| RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3), |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)), |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4)) |
| )) |
| )); |
| |
| List<Object> expected = Arrays.asList( |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{ |
| put("integer", 1); |
| put("boolean", false); |
| }})); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new Object[]{ |
| new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ |
| put("integer", 21); |
| put("boolean", true); |
| }}), |
| new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ |
| put("integer", 22); |
| put("boolean", false); |
| }}) |
| }); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{ |
| put("integer", 3); |
| put("string", "stringValue3"); |
| }})); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new Object[]{ |
| new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ |
| put("integer", 41); |
| put("string", "stringValue41"); |
| }}), |
| new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ |
| put("integer", 42); |
| put("string", "stringValue42"); |
| }}), |
| new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ |
| put("integer", 43); |
| put("boolean", false); |
| }}) |
| }); |
| }}) |
| ); |
| |
| // WHEN |
| // THEN |
| testReadRecords(jsonPath, expected); |
| } |
| |
| @Test |
| public void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception { |
| // GIVEN |
| String jsonPath = "src/test/resources/json/choice-of-different-arrays-with-extra-fields.json"; |
| |
| SimpleRecordSchema recordSchema1 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) |
| )); |
| SimpleRecordSchema recordSchema2 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("string", RecordFieldType.STRING.getDataType()) |
| )); |
| |
| RecordSchema recordChoiceSchema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema1)), |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema2)) |
| )) |
| )); |
| |
| RecordSchema schema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType( |
| RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema) |
| ) |
| ))); |
| |
| SimpleRecordSchema expectedChildSchema1 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) |
| )); |
| SimpleRecordSchema expectedChildSchema2 = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("integer", RecordFieldType.INT.getDataType()), |
| new RecordField("string", RecordFieldType.STRING.getDataType()) |
| )); |
| RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema1)), |
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema2)) |
| )) |
| )); |
| |
| // Since the actual arrays have records with either (INT, BOOLEAN, STRING) or (INT, STRING, STRING) |
| // while the explicit schema defines only (INT, BOOLEAN) and (INT, STRING) we can't tell which record schema to chose |
| // so we take the first one (INT, BOOLEAN) - as best effort - for both cases |
| SimpleRecordSchema expectedSelectedRecordSchemaForRecordsInBothArrays = expectedChildSchema1; |
| |
| List<Object> expected = Arrays.asList( |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new Object[]{ |
| new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ |
| put("integer", 11); |
| put("boolean", true); |
| put("extraString", "extraStringValue11"); |
| }}), |
| new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ |
| put("integer", 12); |
| put("boolean", false); |
| put("extraString", "extraStringValue12"); |
| }}) |
| }); |
| }}), |
| new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ |
| put("record", new Object[]{ |
| new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ |
| put("integer", 21); |
| put("extraString", "extraStringValue21"); |
| put("string", "stringValue21"); |
| }}), |
| new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ |
| put("integer", 22); |
| put("extraString", "extraStringValue22"); |
| put("string", "stringValue22"); |
| }}) |
| }); |
| }}) |
| ); |
| |
| // WHEN |
| // THEN |
| testReadRecords(jsonPath, schema, expected); |
| } |
| |
| private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException { |
| // GIVEN |
| final File jsonFile = new File(jsonPath); |
| |
| try ( |
| InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile)); |
| ) { |
| RecordSchema schema = inferSchema(jsonStream); |
| |
| // WHEN |
| // THEN |
| testReadRecords(jsonStream, schema, expected); |
| } |
| } |
| |
| private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException { |
| // GIVEN |
| final File jsonFile = new File(jsonPath); |
| |
| try ( |
| InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile)); |
| ) { |
| // WHEN |
| // THEN |
| testReadRecords(jsonStream, schema, expected); |
| } |
| } |
| |
| private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException { |
| // GIVEN |
| try ( |
| JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat); |
| ) { |
| // WHEN |
| List<Object> actual = new ArrayList<>(); |
| Record record; |
| while ((record = reader.nextRecord()) != null) { |
| List<Object> dataCollection = Arrays.asList((Object[]) record.getValue("dataCollection")); |
| actual.addAll(dataCollection); |
| } |
| |
| // THEN |
| List<Function<Object, Object>> propertyProviders = Arrays.asList( |
| _object -> ((Record)_object).getSchema(), |
| _object -> Arrays.stream(((Record)_object).getValues()).map(value -> { |
| if (value != null && value.getClass().isArray()) { |
| return Arrays.asList((Object[]) value); |
| } else { |
| return value; |
| } |
| }).collect(Collectors.toList()) |
| ); |
| |
| List<EqualsWrapper<Object>> wrappedExpected = EqualsWrapper.wrapList(expected, propertyProviders); |
| List<EqualsWrapper<Object>> wrappedActual = EqualsWrapper.wrapList(actual, propertyProviders); |
| |
| assertEquals(wrappedExpected, wrappedActual); |
| } |
| } |
| |
| private RecordSchema inferSchema(InputStream jsonStream) throws IOException { |
| RecordSchema schema = new InferSchemaAccessStrategy<>( |
| (__, inputStream) -> new JsonRecordSource(inputStream), |
| new JsonSchemaInference(new TimeValueInference(null, null, null)), |
| mock(ComponentLog.class) |
| ).getSchema(Collections.emptyMap(), jsonStream, null); |
| |
| jsonStream.reset(); |
| |
| return schema; |
| } |
| } |