DRILL-7377: Nested schemas for dynamic EVF columns

The Result Set Loader (part of EVF) allows adding columns up-front
before reading rows (so-called "early schema.") Such schemas allow
nested columns (maps with members, repeated lists with a type, etc.)

The Result Set Loader also allows adding columns dynamically
while loading data (so-called "late schema".) Previously, the code
assumed that columns would be added top-down: first the map, then
the map's contents, etc.

Charles found a need to allow adding a nested column (a repeated
list with a declared list type.)

This patch revises the code to use the same mechanism in both the
early- and late-schema cases, allowing adding nested columns at
any time.

Testing: Added a new unit test case for the repeated list late
schema with content case.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
index e0d9e27..bf1256d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
@@ -32,6 +32,21 @@
  * <p>
  * Recursion is much easier if we can go bottom-up. But, writers
  * require top-down construction.
+ * <p>
+ * This particular class builds a column and all its contents.
+ * For example, given a map, which contains a repeated list which
+ * contains a repeated INT, this class first builds the map,
+ * then adds the repeated list, then adds the INT array. To do
+ * so, it will create a copy of the structured metadata.
+ * <p>
+ * A drawback of this approach is that the metadata objects used
+ * in the "parent" writers will be copies of, not the same as, those
+ * in the schema from which we are building the writers. At present,
+ * this is not an issue, but it is something to be aware of as uses
+ * become more sophisticated.
+ * <p>
+ * This class contrasts with the @{link ColumnBuilder} class which
+ * builds the structure within a single vector and writer.
  */
 
 public class BuildFromSchema {
@@ -49,6 +64,11 @@
     ObjectWriter add(ColumnMetadata colSchema);
   }
 
+  /**
+   * Shim used for adding a column to a tuple directly.
+   * This method will recursively invoke this builder
+   * to expand any nested content.
+   */
   private static class TupleShim implements ParentShim {
     private final TupleWriter writer;
 
@@ -63,6 +83,24 @@
     }
   }
 
+  /**
+   * Shim used when implementing the add of a column to
+   * a tuple in the result set loader. Directly calls the
+   * internal method to add a column to the "tuple state."
+   */
+  private static class TupleStateShim implements ParentShim {
+    private final TupleState state;
+
+    public TupleStateShim(TupleState state) {
+      this.state = state;
+    }
+
+    @Override
+    public ObjectWriter add(ColumnMetadata colSchema) {
+      return state.addColumn(colSchema).writer();
+    }
+  }
+
   private static class UnionShim implements ParentShim {
     private final VariantWriter writer;
 
@@ -89,6 +127,12 @@
     }
   }
 
+  private static BuildFromSchema instance = new BuildFromSchema();
+
+  private BuildFromSchema() { }
+
+  public static BuildFromSchema instance() { return instance; }
+
   /**
    * When creating a schema up front, provide the schema of the desired tuple,
    * then build vectors and writers to match. Allows up-front schema definition
@@ -105,17 +149,36 @@
     }
   }
 
-  private void buildColumn(ParentShim parent, ColumnMetadata colSchema) {
+  /**
+   * Build a column recursively. Called internally when adding a column
+   * via the addColumn() method on the tuple writer.
+   */
+
+  public ObjectWriter buildColumn(TupleState state, ColumnMetadata colSchema) {
+    return buildColumn(new TupleStateShim(state), colSchema);
+  }
+
+  /**
+   * Build the column writer, and any nested content, returning the built
+   * column writer as a generic object writer.
+   *
+   * @param parent the shim that implements the logic to add a column
+   * to a tuple, list, repeated list, or union.
+   * @param colSchema the schema of the column to add
+   * @return the object writer for the added column
+   */
+
+  private ObjectWriter buildColumn(ParentShim parent, ColumnMetadata colSchema) {
     if (colSchema.isMultiList()) {
-      buildRepeatedList(parent, colSchema);
+      return buildRepeatedList(parent, colSchema);
     } else if (colSchema.isMap()) {
-      buildMap(parent, colSchema);
+      return buildMap(parent, colSchema);
     } else if (isSingleList(colSchema)) {
-      buildSingleList(parent, colSchema);
+      return buildSingleList(parent, colSchema);
     } else if (colSchema.isVariant()) {
-      buildVariant(parent, colSchema);
+      return buildVariant(parent, colSchema);
     } else {
-      buildPrimitive(parent, colSchema);
+      return buildPrimitive(parent, colSchema);
     }
   }
 
@@ -133,13 +196,14 @@
     return colSchema.isVariant() && colSchema.isArray() && colSchema.variantSchema().isSingleType();
   }
 
-  private void buildPrimitive(ParentShim parent, ColumnMetadata colSchema) {
-    parent.add(colSchema);
+  private ObjectWriter buildPrimitive(ParentShim parent, ColumnMetadata colSchema) {
+    return parent.add(colSchema);
   }
 
-  private void buildMap(ParentShim parent, ColumnMetadata colSchema) {
+  private ObjectWriter buildMap(ParentShim parent, ColumnMetadata colSchema) {
     final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
     expandMap(colWriter, colSchema);
+    return colWriter;
   }
 
   private void expandMap(ObjectWriter colWriter, ColumnMetadata colSchema) {
@@ -162,9 +226,10 @@
    * @param colSchema the schema of the variant (LIST or UNION) column
    */
 
-  private void buildVariant(ParentShim parent, ColumnMetadata colSchema) {
+  private ObjectWriter buildVariant(ParentShim parent, ColumnMetadata colSchema) {
     final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
     expandVariant(colWriter, colSchema);
+    return colWriter;
   }
 
   private void expandVariant(ObjectWriter colWriter, ColumnMetadata colSchema) {
@@ -182,13 +247,14 @@
     }
   }
 
-  private void buildSingleList(ParentShim parent, ColumnMetadata colSchema) {
+  private ObjectWriter buildSingleList(ParentShim parent, ColumnMetadata colSchema) {
     final ColumnMetadata seed = colSchema.cloneEmpty();
     final ColumnMetadata subtype = colSchema.variantSchema().listSubtype();
     seed.variantSchema().addType(subtype.cloneEmpty());
     seed.variantSchema().becomeSimple();
     final ObjectWriter listWriter = parent.add(seed);
     expandColumn(listWriter, subtype);
+    return listWriter;
   }
 
   /**
@@ -200,14 +266,16 @@
    * @param colSchema schema definition of the array
    */
 
-  private void buildRepeatedList(ParentShim parent, ColumnMetadata colSchema) {
+  private ObjectWriter buildRepeatedList(ParentShim parent, ColumnMetadata colSchema) {
     final ColumnMetadata seed = colSchema.cloneEmpty();
-    final RepeatedListWriter listWriter = (RepeatedListWriter) parent.add(seed).array();
+    final ObjectWriter objWriter = parent.add(seed);
+    final RepeatedListWriter listWriter = (RepeatedListWriter) objWriter.array();
     final ColumnMetadata elements = colSchema.childSchema();
     if (elements != null) {
       final RepeatedListShim listShim = new RepeatedListShim(listWriter);
       buildColumn(listShim, elements);
     }
+    return objWriter;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
index 466214e..57b07fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
@@ -78,7 +78,13 @@
  * projected (not in the list), then creates a dummy writer. Issues an error if
  * the column is projected, but the implied projection type is incompatible with
  * the actual type. (Such as trying to project an INT as x[0].)
+ * <p>
+ * This class builds the internal structure of a vector. If building a "container"
+ * vector (map, list, repeated list or union), this class expects the container
+ * to be added empty, then the members to be added one by one. See
+ * {@link BuildFromSchema} for the class that builds up a compound structure.
  */
+
 public class ColumnBuilder {
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 2cfdeec..4d70065 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -310,7 +310,7 @@
       // won't be if known up front.
 
       logger.debug("Schema: " + options.schema.toString());
-      new BuildFromSchema().buildTuple(rootWriter, options.schema);
+      BuildFromSchema.instance().buildTuple(rootWriter, options.schema);
     }
 
     // If we want to project nothing, then we do, in fact, have a
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
index 0c6c6af..9d8f798 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
@@ -21,7 +21,6 @@
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
 import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
 import org.apache.drill.exec.physical.resultSet.impl.ColumnState.BaseContainerColumnState;
@@ -490,19 +489,7 @@
 
   @Override
   public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) {
-
-    // Verify name is not a (possibly case insensitive) duplicate.
-
-    final TupleMetadata tupleSchema = schema();
-    final String colName = columnSchema.name();
-    if (tupleSchema.column(colName) != null) {
-      throw UserException
-        .validationError()
-        .message("Duplicate column name: ", colName)
-        .build(ResultSetLoaderImpl.logger);
-    }
-
-    return addColumn(columnSchema).writer();
+    return BuildFromSchema.instance().buildColumn(this, columnSchema);
   }
 
   @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java
index 880b2d8..2010cc3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java
@@ -17,22 +17,17 @@
  */
 package org.apache.drill.exec.physical.resultSet.impl;
 
-import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
-import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
-import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.util.Arrays;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.ColumnMetadata.StructureType;
@@ -48,14 +43,19 @@
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests repeated list support. Repeated lists add another layer of dimensionality
@@ -173,6 +173,9 @@
   public void test2DLateSchemaIncremental() {
     final TupleMetadata schema = new SchemaBuilder()
         .add("id", MinorType.INT)
+        .addRepeatedList("list1")
+          .addArray(MinorType.VARCHAR)
+          .resumeSchema()
         .addRepeatedList("list2")
           .addArray(MinorType.VARCHAR)
           .resumeSchema()
@@ -199,7 +202,7 @@
     // Sanity check of writer structure
 
     assertEquals(2, writer.size());
-    final ObjectWriter listObj = writer.column("list2");
+    final ObjectWriter listObj = writer.column("list1");
     assertEquals(ObjectType.ARRAY, listObj.type());
     final ArrayWriter listWriter = listObj.array();
 
@@ -221,7 +224,7 @@
     // Define the inner type.
 
     final RepeatedListWriter listWriterImpl = (RepeatedListWriter) listWriter;
-    listWriterImpl.defineElement(MaterializedField.create("list2", Types.repeated(MinorType.VARCHAR)));
+    listWriterImpl.defineElement(MaterializedField.create("list1", Types.repeated(MinorType.VARCHAR)));
 
     // Sanity check of completed structure
 
@@ -234,17 +237,45 @@
     // Write values
 
     writer
-        .addRow(5, objArray(strArray("a", "b"), strArray("c", "d")));
+        .addRow(5, objArray(strArray("a1", "b1"), strArray("c1", "d1")));
+
+    // Add the second list, with a complete type
+
+    writer.addColumn(schema.metadata(2));
+
+    // Sanity check of writer structure
+
+    assertEquals(3, writer.size());
+    final ObjectWriter list2Obj = writer.column("list2");
+    assertEquals(ObjectType.ARRAY, list2Obj.type());
+    final ArrayWriter list2Writer = list2Obj.array();
+    assertEquals(ObjectType.ARRAY, list2Writer.entryType());
+    final ArrayWriter inner2Writer = list2Writer.array();
+    assertEquals(ObjectType.SCALAR, inner2Writer.entryType());
+    final ScalarWriter str2Writer = inner2Writer.scalar();
+    assertEquals(ValueType.STRING, str2Writer.valueType());
+
+    // Write values
+
+    writer
+        .addRow(6,
+            objArray(strArray("a2", "b2"), strArray("c2", "d2")),
+            objArray(strArray("w2", "x2"), strArray("y2", "z2")));
+
+    // Add the second list, with a complete type
 
     // Verify the values.
     // (Relies on the row set level repeated list tests having passed.)
 
     final RowSet expected = fixture.rowSetBuilder(schema)
-        .addRow(1, objArray())
-        .addRow(2, objArray())
-        .addRow(3, objArray())
-        .addRow(4, objArray(objArray(), null))
-        .addRow(5, objArray(strArray("a", "b"), strArray("c", "d")))
+        .addRow(1, objArray(), objArray())
+        .addRow(2, objArray(), objArray())
+        .addRow(3, objArray(), objArray())
+        .addRow(4, objArray(objArray(), null), objArray())
+        .addRow(5, objArray(strArray("a1", "b1"), strArray("c1", "d1")), objArray())
+        .addRow(6,
+            objArray(strArray("a2", "b2"), strArray("c2", "d2")),
+            objArray(strArray("w2", "x2"), strArray("y2", "z2")))
         .build();
 
     RowSetUtilities.verify(expected, fixture.wrap(rsLoader.harvest()));
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 659d9da..948b8c7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -32,6 +33,8 @@
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.exec.vector.accessor.VariantWriter;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation for a writer for a tuple (a row or a map.) Provides access to each
@@ -143,6 +146,8 @@
     ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
   }
 
+  protected static final Logger logger = LoggerFactory.getLogger(AbstractTupleWriter.class);
+
   protected final TupleMetadata tupleSchema;
   protected final List<AbstractObjectWriter> writers;
   protected ColumnWriterIndex vectorIndex;
@@ -205,20 +210,29 @@
 
   @Override
   public int addColumn(ColumnMetadata column) {
-    if (listener == null) {
-      throw new UnsupportedOperationException("addColumn");
-    }
+    verifyAddColumn(column.name());
     final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, column);
     return addColumnWriter(colWriter);
   }
 
   @Override
   public int addColumn(MaterializedField field) {
+    verifyAddColumn(field.getName());
+    final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field);
+    return addColumnWriter(colWriter);
+  }
+
+  private void verifyAddColumn(String colName) {
     if (listener == null) {
       throw new UnsupportedOperationException("addColumn");
     }
-    final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field);
-    return addColumnWriter(colWriter);
+
+    if (tupleSchema().column(colName) != null) {
+      throw UserException
+        .validationError()
+        .message("Duplicate column name: ", colName)
+        .build(logger);
+    }
   }
 
   @Override
@@ -429,6 +443,8 @@
     this.listener = listener;
   }
 
+  public TupleWriterListener listener() { return listener; }
+
   @Override
   public void bindListener(ColumnWriterListener listener) { }