Fix projection of map arrays and nested maps (#2510)

Fixes the rather complex area in EVF2 that handles
projection of nested maps and map arrays to properly pass
along the offset vector for map arrays, and set the value
count for nested maps.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/OutputBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/OutputBatchBuilder.java
index f18d9c0..683722c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/OutputBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/OutputBatchBuilder.java
@@ -73,7 +73,7 @@
  * result set loader presents the full table schema to the reader,
  * but actually writes only the projected columns. Suppose we
  * have:<pre><code>
- * SELECT t3, C, B, t1,, A ...
+ * SELECT t3, C, B, t1, A ...
  * </code></pre>
  * Then the abbreviated table schema looks like this:<pre><code>
  * [ 1 | 3 ]</code></pre>
@@ -93,6 +93,7 @@
  * be cleared by the caller.</li>
  * <li>For implicit and null columns, the output vector is identical
  * to the input vector.</li>
+ * </ul>
  */
 public class OutputBatchBuilder {
 
@@ -150,10 +151,11 @@
    * a separate implementation.
    */
   private static class MapBuilder {
-    ColumnMetadata outputCol;
-    TupleMetadata mapSchema;
-    List<MapSource> sourceMaps;
-    Object memberSources[];
+    private final ColumnMetadata outputCol;
+    private final TupleMetadata mapSchema;
+    private final List<MapSource> sourceMaps;
+    private final Object memberSources[];
+    private final List<MapVector> mapVectors = new ArrayList<>();
 
     private MapBuilder(ColumnMetadata outputCol, List<MapSource> sourceMaps) {
       this.outputCol = outputCol;
@@ -207,7 +209,9 @@
         }
         mapVector.putChild(outputCol.name(), outputVector);
       }
-
+      if (mapVector instanceof MapVector) {
+        mapVectors.add((MapVector) mapVector);
+      }
       return mapVector;
     }
 
@@ -221,7 +225,10 @@
                   .metadata(source.offset).tupleSchema(),
                 (AbstractMapVector) getMember(source)));
       }
-      return new MapBuilder(outputCol, childMaps).build(allocator);
+      MapBuilder builder = new MapBuilder(outputCol, childMaps);
+      ValueVector vector = builder.build(allocator);
+      mapVectors.addAll(builder.mapVectors);
+      return vector;
     }
 
     public ValueVector getMember(VectorSource source) {
@@ -260,7 +267,12 @@
       int outputIndex = outputSchema.index(col.name());
       Preconditions.checkState(outputIndex >= 0);
       VectorSource vectorSource = new VectorSource(source, i);
-      if (col.isMap()) {
+
+      // If the column is a regular map, then projection may select
+      // some but not all columns. If the columns is a map array, then
+      // SQL projection cannot identify which columns are wanted. Include
+      // the entire map.
+      if (col.isMap() && !col.isArray()) {
         if (vectorSources[outputIndex] == null) {
           vectorSources[outputIndex] = new ArrayList<VectorSource>();
         }
@@ -283,14 +295,8 @@
     for (int i = 0; i < outputSchema.size(); i++) {
       ColumnMetadata outputCol = outputSchema.metadata(i);
       ValueVector outputVector;
-      if (outputCol.isMap()) {
+      if (outputCol.isMap() && !outputCol.isArray()) {
         outputVector = buildTopMap(outputCol, (List<VectorSource>) vectorSources[i]);
-
-        // Map vectors are a nuisance: they carry their own value could which
-        // must be set separately from the underling data vectors.
-        if (outputVector instanceof MapVector) {
-          mapVectors.add((MapVector) outputVector);
-        }
       } else {
         outputVector = getVector((VectorSource) vectorSources[i]);
       }
@@ -308,7 +314,12 @@
               sources.get(source.source).schema.metadata(source.offset).tupleSchema(),
               (AbstractMapVector) getVector(source)));
     }
-    return new MapBuilder(outputCol, sourceMaps).build(outputContainer.getAllocator());
+    MapBuilder builder = new MapBuilder(outputCol, sourceMaps);
+    ValueVector vector = builder.build(outputContainer.getAllocator());
+    // Map vectors are a nuisance: they carry their own value count which
+    // must be set separately from the underling data vectors.
+    mapVectors.addAll(builder.mapVectors);
+    return vector;
   }
 
   public ValueVector getVector(VectorSource source) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
index ff83b1d..ab1ba16 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
 
 import java.util.Arrays;
@@ -270,4 +271,57 @@
 
     RowSetUtilities.verify(expected, fixture.wrap(output));
   }
+
+  @Test
+  public void testMapArray() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .addMapArray("m")
+          .add("x", MinorType.INT)
+          .add("y", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+
+    final VectorContainer input = fixture.rowSetBuilder(schema)
+        .addRow("barney", mapArray(mapValue(1, "betty"), mapValue(2, "bambam")))
+        .addRow("fred", mapArray(mapValue(1, "wilma"), mapValue(2, "pebbles")))
+        .build()
+        .container();
+
+    final OutputBatchBuilder builder = new OutputBatchBuilder(schema,
+        Collections.singletonList(new BatchSource(schema, input)),
+        fixture.allocator());
+    builder.load(input.getRecordCount());
+    VectorContainer output = builder.outputContainer();
+
+    RowSetUtilities.verify(fixture.wrap(input), fixture.wrap(output));
+  }
+
+  @Test
+  public void testNestedMap() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .addMap("m")
+          .add("x", MinorType.INT)
+          .addMap("y")
+            .add("p", MinorType.INT)
+            .add("q", MinorType.VARCHAR)
+            .resumeMap()
+          .resumeSchema()
+        .buildSchema();
+
+    final VectorContainer input = fixture.rowSetBuilder(schema)
+        .addRow("barney", mapValue(1, mapValue(10, "betty")))
+        .addRow("fred", mapValue(2, mapValue(20, "wilma")))
+        .build()
+        .container();
+
+    final OutputBatchBuilder builder = new OutputBatchBuilder(schema,
+        Collections.singletonList(new BatchSource(schema, input)),
+        fixture.allocator());
+    builder.load(input.getRecordCount());
+    VectorContainer output = builder.outputContainer();
+
+    RowSetUtilities.verify(fixture.wrap(input), fixture.wrap(output));
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 24ecc13..9676f44 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -44,7 +44,6 @@
   /**
    * Width of each fixed-width value.
    */
-
   public static final int VALUE_WIDTH = 1;
 
   /**
@@ -53,7 +52,6 @@
    * enforced when the vector is used to hold values in a repeated
    * vector.
    */
-
   public static final int MAX_CAPACITY = MAX_BUFFER_SIZE / VALUE_WIDTH;
 
   /**
@@ -62,7 +60,6 @@
    * the maximum item count. This is the limit enforced when the
    * vector is used to hold required or nullable values.
    */
-
   public static final int MAX_COUNT = Math.min(MAX_ROW_COUNT, MAX_CAPACITY);
 
   /**
@@ -70,7 +67,6 @@
    * values that either fit in the maximum overall vector size, or that
    * is no larger than the maximum vector item count.
    */
-
   public static final int NET_MAX_SIZE = VALUE_WIDTH * MAX_COUNT;
 
   private final FieldReader reader = new BitReaderImpl(BitVector.this);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index b836b25..994b6db 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -91,13 +91,11 @@
  *     are passed to both.</li>
  * </ul>
  */
-
 public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
 
   /**
    * Object representation of an array writer.
    */
-
   public static class ArrayObjectWriter extends AbstractObjectWriter {
 
     protected final AbstractArrayWriter arrayWriter;
@@ -130,7 +128,6 @@
    * Keeps track of the current offset in terms of value positions.
    * Forwards overflow events to the base index.
    */
-
   public class ArrayElementWriterIndex implements ColumnWriterIndex {
 
     private int elementIndex;