blob: 76f296e6b88834e3dab5caf244e24be68b888a86 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.avro;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.parquet.DirectWriterTest;
import static org.apache.parquet.avro.AvroTestUtil.array;
import static org.apache.parquet.avro.AvroTestUtil.field;
import static org.apache.parquet.avro.AvroTestUtil.instance;
import static org.apache.parquet.avro.AvroTestUtil.optional;
import static org.apache.parquet.avro.AvroTestUtil.optionalField;
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;
public class TestArrayCompatibility extends DirectWriterTest {
public static final Configuration OLD_BEHAVIOR_CONF = new Configuration();
public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();
@BeforeClass
public static void setupNewBehaviorConfiguration() {
OLD_BEHAVIOR_CONF.setBoolean(
AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, true);
NEW_BEHAVIOR_CONF.setBoolean(
AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
}
@Test
@Ignore(value="Not yet supported")
public void testUnannotatedListOfPrimitives() throws Exception {
Path test = writeDirect(
"message UnannotatedListOfPrimitives {" +
" repeated int32 list_of_ints;" +
"}",
rc -> {
rc.startMessage();
rc.startField("list_of_ints", 0);
rc.addInteger(34);
rc.addInteger(35);
rc.addInteger(36);
rc.endField("list_of_ints", 0);
rc.endMessage();
});
Schema expectedSchema = record("OldPrimitiveInList",
field("list_of_ints", array(primitive(Schema.Type.INT))));
GenericRecord expectedRecord = instance(expectedSchema,
"list_of_ints", Arrays.asList(34, 35, 36));
// both should behave the same way
assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
}
@Test
@Ignore(value="Not yet supported")
public void testUnannotatedListOfGroups() throws Exception {
Path test = writeDirect(
"message UnannotatedListOfGroups {" +
" repeated group list_of_points {" +
" required float x;" +
" required float y;" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("list_of_points", 0);
rc.startGroup();
rc.startField("x", 0);
rc.addFloat(1.0f);
rc.endField("x", 0);
rc.startField("y", 1);
rc.addFloat(1.0f);
rc.endField("y", 1);
rc.endGroup();
rc.startGroup();
rc.startField("x", 0);
rc.addFloat(2.0f);
rc.endField("x", 0);
rc.startField("y", 1);
rc.addFloat(2.0f);
rc.endField("y", 1);
rc.endGroup();
rc.endField("list_of_points", 0);
rc.endMessage();
});
Schema point = record("?",
field("x", primitive(Schema.Type.FLOAT)),
field("y", primitive(Schema.Type.FLOAT)));
Schema expectedSchema = record("OldPrimitiveInList",
field("list_of_points", array(point)));
GenericRecord expectedRecord = instance(expectedSchema,
"list_of_points", Arrays.asList(
instance(point, "x", 1.0f, "y", 1.0f),
instance(point, "x", 2.0f, "y", 2.0f)));
// both should behave the same way
assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
}
@Test
public void testRepeatedPrimitiveInList() throws Exception {
Path test = writeDirect(
"message RepeatedPrimitiveInList {" +
" required group list_of_ints (LIST) {" +
" repeated int32 array;" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("list_of_ints", 0);
rc.startGroup();
rc.startField("array", 0);
rc.addInteger(34);
rc.addInteger(35);
rc.addInteger(36);
rc.endField("array", 0);
rc.endGroup();
rc.endField("list_of_ints", 0);
rc.endMessage();
});
Schema expectedSchema = record("RepeatedPrimitiveInList",
field("list_of_ints", array(Schema.create(Schema.Type.INT))));
GenericRecord expectedRecord = instance(expectedSchema,
"list_of_ints", Arrays.asList(34, 35, 36));
// both should behave the same way
assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
}
@Test
public void testMultiFieldGroupInList() throws Exception {
// tests the missing element layer, detected by a multi-field group
Path test = writeDirect(
"message MultiFieldGroupInList {" +
" optional group locations (LIST) {" +
" repeated group element {" +
" required double latitude;" +
" required double longitude;" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(0.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(180.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
Schema expectedSchema = record("MultiFieldGroupInList",
optionalField("locations", array(location)));
GenericRecord expectedRecord = instance(expectedSchema,
"locations", Arrays.asList(
instance(location, "latitude", 0.0, "longitude", 0.0),
instance(location, "latitude", 0.0, "longitude", 180.0)));
// both should behave the same way
assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
}
@Test
public void testSingleFieldGroupInList() throws Exception {
// this tests the case where non-avro older data has an ambiguous list
Path test = writeDirect(
"message SingleFieldGroupInList {" +
" optional group single_element_groups (LIST) {" +
" repeated group single_element_group {" +
" required int64 count;" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("single_element_groups", 0);
rc.startGroup();
rc.startField("single_element_group", 0); // start writing array contents
rc.startGroup();
rc.startField("count", 0);
rc.addLong(1234L);
rc.endField("count", 0);
rc.endGroup();
rc.startGroup();
rc.startField("count", 0);
rc.addLong(2345L);
rc.endField("count", 0);
rc.endGroup();
rc.endField("single_element_group", 0); // finished writing array contents
rc.endGroup();
rc.endField("single_element_groups", 0);
rc.endMessage();
});
// can't tell from storage whether this should be a list of single-field
// records or if the single_field_group layer is synthetic.
// old behavior - assume that the repeated type is the element type
Schema singleElementGroupSchema = record("single_element_group",
field("count", primitive(Schema.Type.LONG)));
Schema oldSchema = record("SingleFieldGroupInList",
optionalField("single_element_groups", array(singleElementGroupSchema)));
GenericRecord oldRecord = instance(oldSchema,
"single_element_groups", Arrays.asList(
instance(singleElementGroupSchema, "count", 1234L),
instance(singleElementGroupSchema, "count", 2345L)));
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
// new behavior - assume that single_element_group is synthetic (in spec)
Schema newSchema = record("SingleFieldGroupInList",
optionalField("single_element_groups", array(primitive(Schema.Type.LONG))));
GenericRecord newRecord = instance(newSchema,
"single_element_groups", Arrays.asList(1234L, 2345L));
assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
}
@Test
public void testSingleFieldGroupInListWithSchema() throws Exception {
// this tests the case where older data has an ambiguous structure, but the
// correct interpretation can be determined from the avro schema
Schema singleElementRecord = record("single_element_group",
field("count", primitive(Schema.Type.LONG)));
Schema expectedSchema = record("SingleFieldGroupInList",
optionalField("single_element_groups",
array(singleElementRecord)));
Map<String, String> metadata = new HashMap<String, String>();
metadata.put(AvroWriteSupport.AVRO_SCHEMA, expectedSchema.toString());
Path test = writeDirect(
"message SingleFieldGroupInList {" +
" optional group single_element_groups (LIST) {" +
" repeated group single_element_group {" +
" required int64 count;" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("single_element_groups", 0);
rc.startGroup();
rc.startField("single_element_group", 0); // start writing array contents
rc.startGroup();
rc.startField("count", 0);
rc.addLong(1234L);
rc.endField("count", 0);
rc.endGroup();
rc.startGroup();
rc.startField("count", 0);
rc.addLong(2345L);
rc.endField("count", 0);
rc.endGroup();
rc.endField("single_element_group", 0); // finished writing array contents
rc.endGroup();
rc.endField("single_element_groups", 0);
rc.endMessage();
},
metadata);
GenericRecord expectedRecord = instance(expectedSchema,
"single_element_groups", Arrays.asList(
instance(singleElementRecord, "count", 1234L),
instance(singleElementRecord, "count", 2345L)));
// both should behave the same way because the schema is present
assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
}
@Test
public void testNewOptionalGroupInList() throws Exception {
Path test = writeDirect(
"message NewOptionalGroupInList {" +
" optional group locations (LIST) {" +
" repeated group list {" +
" optional group element {" +
" required double latitude;" +
" required double longitude;" +
" }" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("list", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(0.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
// write a null element (element field is omitted)
rc.startGroup(); // array level
rc.endGroup(); // array level
// write a second non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(180.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
rc.endField("list", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
// old behavior - assume that the repeated type is the element type
Schema elementRecord = record("list", optionalField("element", location));
Schema oldSchema = record("NewOptionalGroupInList",
optionalField("locations", array(elementRecord)));
GenericRecord oldRecord = instance(oldSchema,
"locations", Arrays.asList(
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 0.0)),
instance(elementRecord),
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 180.0))));
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
// new behavior - assume that single_element_group is synthetic (in spec)
Schema newSchema = record("NewOptionalGroupInList",
optionalField("locations", array(optional(location))));
GenericRecord newRecord = instance(newSchema,
"locations", Arrays.asList(
instance(location, "latitude", 0.0, "longitude", 0.0),
null,
instance(location, "latitude", 0.0, "longitude", 180.0)));
assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
}
@Test
public void testNewRequiredGroupInList() throws Exception {
Path test = writeDirect(
"message NewRequiredGroupInList {" +
" optional group locations (LIST) {" +
" repeated group list {" +
" required group element {" +
" required double latitude;" +
" required double longitude;" +
" }" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("list", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(180.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
// write a second non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(0.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
rc.endField("list", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
// old behavior - assume that the repeated type is the element type
Schema elementRecord = record("list", field("element", location));
Schema oldSchema = record("NewRequiredGroupInList",
optionalField("locations", array(elementRecord)));
GenericRecord oldRecord = instance(oldSchema,
"locations", Arrays.asList(
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 180.0)),
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 0.0))));
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
// new behavior - assume that single_element_group is synthetic (in spec)
Schema newSchema = record("NewRequiredGroupInList",
optionalField("locations", array(location)));
GenericRecord newRecord = instance(newSchema,
"locations", Arrays.asList(
instance(location, "latitude", 0.0, "longitude", 180.0),
instance(location, "latitude", 0.0, "longitude", 0.0)));
assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
}
@Test
public void testAvroCompatOptionalGroupInList() throws Exception {
Path test = writeDirect(
"message AvroCompatOptionalGroupInList {" +
" optional group locations (LIST) {" +
" repeated group array {" +
" optional group element {" +
" required double latitude;" +
" required double longitude;" +
" }" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("array", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(180.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
// write a second non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(0.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
rc.endField("array", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
// old behavior - assume that the repeated type is the element type
Schema elementRecord = record("array", optionalField("element", location));
Schema oldSchema = record("AvroCompatOptionalGroupInList",
optionalField("locations", array(elementRecord)));
GenericRecord oldRecord = instance(oldSchema,
"locations", Arrays.asList(
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 180.0)),
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 0.0))));
// both should detect the "array" name
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
}
@Test
public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
Path test = writeDirect(
"message AvroCompatOptionalGroupInListWithSchema {" +
" optional group locations (LIST) {" +
" repeated group array {" +
" optional group element {" +
" required double latitude;" +
" required double longitude;" +
" }" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("array", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(180.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
// write a second non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(0.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
rc.endField("array", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
Schema newSchema = record("AvroCompatOptionalGroupInListWithSchema",
optionalField("locations", array(optional(location))));
GenericRecord newRecord = instance(newSchema,
"locations", Arrays.asList(
instance(location, "latitude", 0.0, "longitude", 180.0),
instance(location, "latitude", 0.0, "longitude", 0.0)));
Configuration oldConfWithSchema = new Configuration();
AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
// both should use the schema structure that is provided
assertReaderContains(
new AvroParquetReader<GenericRecord>(oldConfWithSchema, test),
newSchema, newRecord);
Configuration newConfWithSchema = new Configuration(NEW_BEHAVIOR_CONF);
AvroReadSupport.setAvroReadSchema(newConfWithSchema, newSchema);
assertReaderContains(
new AvroParquetReader<GenericRecord>(newConfWithSchema, test),
newSchema, newRecord);
}
@Test
public void testAvroCompatListInList() throws Exception {
Path test = writeDirect(
"message AvroCompatListInList {" +
" optional group listOfLists (LIST) {" +
" repeated group array (LIST) {" +
" repeated int32 array;" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("array", 0); // start writing array contents
rc.startGroup();
rc.startField("array", 0); // start writing inner array contents
// write [34, 35, 36]
rc.addInteger(34);
rc.addInteger(35);
rc.addInteger(36);
rc.endField("array", 0); // finished writing inner array contents
rc.endGroup();
// write an empty list
rc.startGroup();
rc.endGroup();
rc.startGroup();
rc.startField("array", 0); // start writing inner array contents
// write [32, 33, 34]
rc.addInteger(32);
rc.addInteger(33);
rc.addInteger(34);
rc.endField("array", 0); // finished writing inner array contents
rc.endGroup();
rc.endField("array", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema listOfLists = array(array(primitive(Schema.Type.INT)));
Schema oldSchema = record("AvroCompatListInList",
optionalField("listOfLists", listOfLists));
GenericRecord oldRecord = instance(oldSchema,
"listOfLists", Arrays.asList(
Arrays.asList(34, 35, 36),
Arrays.asList(),
Arrays.asList(32, 33, 34)));
// both should detect the "array" name
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
}
@Test
public void testThriftCompatListInList() throws Exception {
Path test = writeDirect(
"message ThriftCompatListInList {" +
" optional group listOfLists (LIST) {" +
" repeated group listOfLists_tuple (LIST) {" +
" repeated int32 listOfLists_tuple_tuple;" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("listOfLists_tuple", 0); // start writing array contents
rc.startGroup();
rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
// write [34, 35, 36]
rc.addInteger(34);
rc.addInteger(35);
rc.addInteger(36);
rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
rc.endGroup();
// write an empty list
rc.startGroup();
rc.endGroup();
rc.startGroup();
rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
// write [32, 33, 34]
rc.addInteger(32);
rc.addInteger(33);
rc.addInteger(34);
rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
rc.endGroup();
rc.endField("listOfLists_tuple", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema listOfLists = array(array(primitive(Schema.Type.INT)));
Schema oldSchema = record("ThriftCompatListInList",
optionalField("listOfLists", listOfLists));
GenericRecord oldRecord = instance(oldSchema,
"listOfLists", Arrays.asList(
Arrays.asList(34, 35, 36),
Arrays.asList(),
Arrays.asList(32, 33, 34)));
// both should detect the "_tuple" names
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
}
@Test
public void testThriftCompatRequiredGroupInList() throws Exception {
Path test = writeDirect(
"message ThriftCompatRequiredGroupInList {" +
" optional group locations (LIST) {" +
" repeated group locations_tuple {" +
" optional group element {" +
" required double latitude;" +
" required double longitude;" +
" }" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("locations_tuple", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(180.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
// write a second non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(0.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
rc.endField("locations_tuple", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
// old behavior - assume that the repeated type is the element type
Schema elementRecord = record("locations_tuple", optionalField("element", location));
Schema oldSchema = record("ThriftCompatRequiredGroupInList",
optionalField("locations", array(elementRecord)));
GenericRecord oldRecord = instance(oldSchema,
"locations", Arrays.asList(
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 180.0)),
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 0.0))));
// both should detect the "array" name
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
}
@Test
public void testHiveCompatOptionalGroupInList() throws Exception {
Path test = writeDirect(
"message HiveCompatOptionalGroupInList {" +
" optional group locations (LIST) {" +
" repeated group bag {" +
" optional group element {" +
" required double latitude;" +
" required double longitude;" +
" }" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("locations", 0);
rc.startGroup();
rc.startField("bag", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(180.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
// write a second non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
rc.startGroup();
rc.startField("latitude", 0);
rc.addDouble(0.0);
rc.endField("latitude", 0);
rc.startField("longitude", 1);
rc.addDouble(0.0);
rc.endField("longitude", 1);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
rc.endField("bag", 0); // finished writing array contents
rc.endGroup();
rc.endField("locations", 0);
rc.endMessage();
});
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
// old behavior - assume that the repeated type is the element type
Schema elementRecord = record("bag", optionalField("element", location));
Schema oldSchema = record("HiveCompatOptionalGroupInList",
optionalField("locations", array(elementRecord)));
GenericRecord oldRecord = instance(oldSchema,
"locations", Arrays.asList(
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 180.0)),
instance(elementRecord, "element",
instance(location, "latitude", 0.0, "longitude", 0.0))));
// both should detect the "array" name
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
Schema newSchema = record("HiveCompatOptionalGroupInList",
optionalField("locations", array(optional(location))));
GenericRecord newRecord = instance(newSchema,
"locations", Arrays.asList(
instance(location, "latitude", 0.0, "longitude", 180.0),
instance(location, "latitude", 0.0, "longitude", 0.0)));
assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
}
@Test
public void testListOfSingleElementStructsWithElementField()
throws Exception {
Path test = writeDirect(
"message ListOfSingleElementStructsWithElementField {" +
" optional group list_of_structs (LIST) {" +
" repeated group list {" +
" required group element {" +
" required float element;" +
" }" +
" }" +
" }" +
"}",
rc -> {
rc.startMessage();
rc.startField("list_of_structs", 0);
rc.startGroup();
rc.startField("list", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
// the inner element field
rc.startGroup();
rc.startField("element", 0);
rc.addFloat(33.0F);
rc.endField("element", 0);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
// write a second non-null element
rc.startGroup(); // array level
rc.startField("element", 0);
// the inner element field
rc.startGroup();
rc.startField("element", 0);
rc.addFloat(34.0F);
rc.endField("element", 0);
rc.endGroup();
rc.endField("element", 0);
rc.endGroup(); // array level
rc.endField("list", 0); // finished writing array contents
rc.endGroup();
rc.endField("list_of_structs", 0);
rc.endMessage();
});
Schema structWithElementField = record("element",
field("element", primitive(Schema.Type.FLOAT)));
// old behavior - assume that the repeated type is the element type
Schema elementRecord = record("list",
field("element", structWithElementField));
Schema oldSchema = record("ListOfSingleElementStructsWithElementField",
optionalField("list_of_structs", array(elementRecord)));
GenericRecord oldRecord = instance(oldSchema,
"list_of_structs", Arrays.asList(
instance(elementRecord, "element",
instance(structWithElementField, "element", 33.0F)),
instance(elementRecord, "element",
instance(structWithElementField, "element", 34.0F))));
// check the schema
final MessageType fileSchema;
try (ParquetFileReader reader = ParquetFileReader.open(new Configuration(), test)) {
fileSchema = reader.getFileMetaData().getSchema();
Assert.assertEquals("Converted schema should assume 2-layer structure",
oldSchema,
new AvroSchemaConverter(OLD_BEHAVIOR_CONF).convert(fileSchema));
}
// both should default to the 2-layer structure
assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
Schema newSchema = record("ListOfSingleElementStructsWithElementField",
optionalField("list_of_structs", array(structWithElementField)));
GenericRecord newRecord = instance(newSchema,
"list_of_structs", Arrays.asList(
instance(structWithElementField, "element", 33.0F),
instance(structWithElementField, "element", 34.0F)));
// check the schema
Assert.assertEquals("Converted schema should assume 3-layer structure",
newSchema,
new AvroSchemaConverter(NEW_BEHAVIOR_CONF).convert(fileSchema));
assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
// check that this works with compatible nested schemas
Schema structWithDoubleElementField = record("element",
field("element", primitive(Schema.Type.DOUBLE)));
Schema doubleElementRecord = record("list",
field("element", structWithDoubleElementField));
Schema oldDoubleSchema = record(
"ListOfSingleElementStructsWithElementField",
optionalField("list_of_structs", array(doubleElementRecord)));
GenericRecord oldDoubleRecord = instance(oldDoubleSchema,
"list_of_structs", Arrays.asList(
instance(doubleElementRecord, "element",
instance(structWithDoubleElementField, "element", 33.0)),
instance(doubleElementRecord, "element",
instance(structWithDoubleElementField, "element", 34.0))));
assertReaderContains(oldBehaviorReader(test, oldDoubleSchema),
oldDoubleSchema, oldDoubleRecord);
Schema newDoubleSchema = record(
"ListOfSingleElementStructsWithElementField",
optionalField("list_of_structs", array(structWithDoubleElementField)));
GenericRecord newDoubleRecord = instance(newDoubleSchema,
"list_of_structs", Arrays.asList(
instance(structWithDoubleElementField, "element", 33.0),
instance(structWithDoubleElementField, "element", 34.0)));
assertReaderContains(newBehaviorReader(test, newDoubleSchema),
newDoubleSchema, newDoubleRecord);
}
public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
Path path) throws IOException {
return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
}
public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
Path path, Schema expectedSchema) throws IOException {
Configuration conf = new Configuration(OLD_BEHAVIOR_CONF);
AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
return new AvroParquetReader<T>(conf, path);
}
public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
Path path) throws IOException {
return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
}
public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
Path path, Schema expectedSchema) throws IOException {
Configuration conf = new Configuration(NEW_BEHAVIOR_CONF);
AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
return new AvroParquetReader<T>(conf, path);
}
public <T extends IndexedRecord> void assertReaderContains(
AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords)
throws IOException {
for (T expectedRecord : expectedRecords) {
T actualRecord = reader.read();
Assert.assertEquals("Should match expected schema",
expectedSchema, actualRecord.getSchema());
Assert.assertEquals("Should match the expected record",
expectedRecord, actualRecord);
}
Assert.assertNull("Should only contain " + expectedRecords.length +
" record" + (expectedRecords.length == 1 ? "" : "s"),
reader.read());
}
}