PARQUET-651: Improve Avro's isElementType check.

The Avro implementation needs to check whether the read schema that is
passed by the user (or automatically converted from the file schema)
expects an extra 1-field layer to be returned, which matches the
previous behavior of Avro when reading a 3-level list. Before this
commit, the check was done by testing the structure of the expected list
element type against the repeated group's schema. If they matched, then
Avro assumed that the user expected an extra layer. However, for records
that happened to match (1-field records with a field named "element")
the check could be wrong and would cause exceptions later.

This commit updates the check to convert the file's element schema to
Avro and compare the compatibility of that schema with what was passed
by the user. This checks the entire tree from the element down and gets
the answer right based on the element and its children, not just the
field names on the element.

Author: Ryan Blue <blue@apache.org>

Closes #352 from rdblue/PARQUET-651-improve-is-element-type-check and squashes the following commits:

ad9c1ee [Ryan Blue] PARQUET-651: Undo accidental default setting change.
1efa248 [Ryan Blue] PARQUET-651: Improve Avro's isElementType check.
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 10bb29b..c0d6dc2 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -42,9 +42,11 @@
 import org.apache.avro.Conversion;
 import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.AvroIgnore;
 import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.AvroSchema;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.Stringable;
 import org.apache.avro.specific.SpecificData;
@@ -52,6 +54,7 @@
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.avro.AvroConverters.FieldStringConverter;
 import org.apache.parquet.avro.AvroConverters.FieldStringableConverter;
+import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator;
 import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
@@ -59,6 +62,8 @@
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
+import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility;
 import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
@@ -827,6 +832,14 @@
     }
   }
 
+  // Converter used to test whether a requested schema is a 2-level schema.
+  // This is used to convert the file's type assuming that the file uses
+  // 2-level lists and the result is checked to see if it matches the requested
+  // element type. This should always convert assuming 2-level lists because
+  // 2-level and 3-level can't be mixed.
+  private static final AvroSchemaConverter CONVERTER =
+      new AvroSchemaConverter(true);
+
   /**
    * Returns whether the given type is the element type of a list or is a
    * synthetic group with one field that is the element type. This is
@@ -849,13 +862,11 @@
       return true;
     } else if (elementSchema != null &&
         elementSchema.getType() == Schema.Type.RECORD) {
-      Set<String> fieldNames = new HashSet<String>();
-      for (Schema.Field field : elementSchema.getFields()) {
-        fieldNames.add(field.name());
+      Schema schemaFromRepeated = CONVERTER.convert(repeatedType.asGroupType());
+      if (checkReaderWriterCompatibility(elementSchema, schemaFromRepeated)
+          .getType() == COMPATIBLE) {
+        return true;
       }
-      // The repeated type must be the element type because it matches the
-      // structure of the Avro element's schema.
-      return fieldNames.contains(repeatedType.asGroupType().getFieldName(0));
     }
     return false;
   }
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 6b9b94c..70b6525 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -64,6 +64,17 @@
     this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
   }
 
+  /**
+   * Constructor used by {@link AvroRecordConverter#isElementType}, which always
+   * uses the 2-level list conversion.
+   *
+   * @param assumeRepeatedIsListElement whether to assume 2-level lists
+   */
+  AvroSchemaConverter(boolean assumeRepeatedIsListElement) {
+    this.assumeRepeatedIsListElement = assumeRepeatedIsListElement;
+    this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
+  }
+
   public AvroSchemaConverter(Configuration conf) {
     this.assumeRepeatedIsListElement = conf.getBoolean(
         ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
@@ -220,6 +231,10 @@
     return convertFields(parquetSchema.getName(), parquetSchema.getFields());
   }
 
+  Schema convert(GroupType parquetSchema) {
+    return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+  }
+
   private Schema convertFields(String name, List<Type> parquetFields) {
     List<Schema.Field> fields = new ArrayList<Schema.Field>();
     for (Type parquetType : parquetFields) {
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
index 29264f0..aa577ab 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -27,6 +27,8 @@
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -44,10 +46,13 @@
 
 public class TestArrayCompatibility extends DirectWriterTest {
 
+  public static final Configuration OLD_BEHAVIOR_CONF = new Configuration();
   public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();
 
   @BeforeClass
   public static void setupNewBehaviorConfiguration() {
+    OLD_BEHAVIOR_CONF.setBoolean(
+        AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, true);
     NEW_BEHAVIOR_CONF.setBoolean(
         AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
   }
@@ -1035,9 +1040,143 @@
     assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
   }
 
+  @Test
+  public void testListOfSingleElementStructsWithElementField()
+      throws Exception {
+    Path test = writeDirect(
+        "message ListOfSingleElementStructsWithElementField {" +
+            "  optional group list_of_structs (LIST) {" +
+            "    repeated group list {" +
+            "      required group element {" +
+            "        required float element;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_structs", 0);
+
+            rc.startGroup();
+            rc.startField("list", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            // the inner element field
+            rc.startGroup();
+            rc.startField("element", 0);
+            rc.addFloat(33.0F);
+            rc.endField("element", 0);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            // the inner element field
+            rc.startGroup();
+            rc.startField("element", 0);
+            rc.addFloat(34.0F);
+            rc.endField("element", 0);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("list", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("list_of_structs", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema structWithElementField = record("element",
+        field("element", primitive(Schema.Type.FLOAT)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("list",
+        field("element", structWithElementField));
+    Schema oldSchema = record("ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "list_of_structs", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(structWithElementField, "element", 33.0F)),
+            instance(elementRecord, "element",
+                instance(structWithElementField, "element", 34.0F))));
+
+    // check the schema
+    ParquetFileReader reader = ParquetFileReader
+        .open(new Configuration(), test);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+    Assert.assertEquals("Converted schema should assume 2-layer structure",
+        oldSchema,
+        new AvroSchemaConverter(OLD_BEHAVIOR_CONF).convert(fileSchema));
+
+    // both should default to the 2-layer structure
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    Schema newSchema = record("ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(structWithElementField)));
+    GenericRecord newRecord = instance(newSchema,
+        "list_of_structs", Arrays.asList(
+            instance(structWithElementField, "element", 33.0F),
+            instance(structWithElementField, "element", 34.0F)));
+
+    // check the schema
+    Assert.assertEquals("Converted schema should assume 3-layer structure",
+        newSchema,
+        new AvroSchemaConverter(NEW_BEHAVIOR_CONF).convert(fileSchema));
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+
+    // check that this works with compatible nested schemas
+
+    Schema structWithDoubleElementField = record("element",
+        field("element", primitive(Schema.Type.DOUBLE)));
+
+    Schema doubleElementRecord = record("list",
+        field("element", structWithDoubleElementField));
+    Schema oldDoubleSchema = record(
+        "ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(doubleElementRecord)));
+    GenericRecord oldDoubleRecord = instance(oldDoubleSchema,
+        "list_of_structs", Arrays.asList(
+            instance(doubleElementRecord, "element",
+                instance(structWithDoubleElementField, "element", 33.0)),
+            instance(doubleElementRecord, "element",
+                instance(structWithDoubleElementField, "element", 34.0))));
+    assertReaderContains(oldBehaviorReader(test, oldDoubleSchema),
+        oldDoubleSchema, oldDoubleRecord);
+
+    Schema newDoubleSchema = record(
+        "ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(structWithDoubleElementField)));
+    GenericRecord newDoubleRecord = instance(newDoubleSchema,
+        "list_of_structs", Arrays.asList(
+            instance(structWithDoubleElementField, "element", 33.0),
+            instance(structWithDoubleElementField, "element", 34.0)));
+    assertReaderContains(newBehaviorReader(test, newDoubleSchema),
+        newDoubleSchema, newDoubleRecord);
+  }
+
   public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
       Path path) throws IOException {
-    return new AvroParquetReader<T>(path);
+    return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
+  }
+
+  public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
+      Path path, Schema expectedSchema) throws IOException {
+    Configuration conf = new Configuration(OLD_BEHAVIOR_CONF);
+    AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
+    return new AvroParquetReader<T>(conf, path);
   }
 
   public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
@@ -1045,6 +1184,13 @@
     return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
   }
 
+  public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
+      Path path, Schema expectedSchema) throws IOException {
+    Configuration conf = new Configuration(NEW_BEHAVIOR_CONF);
+    AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
+    return new AvroParquetReader<T>(conf, path);
+  }
+
   public <T extends IndexedRecord> void assertReaderContains(
       AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords)
       throws IOException {