DRILL-8375: Support for non-projected complex vectors

Allows the readers using the EVF layer to write to
UNION and LIST vectors whether or not those vectors
are projected by the query. EVF creates "dummy" vectors
and writers for non-projected columns. This feature
previously worked for other column types: this PR adds
support for UNION and LIST.
diff --git a/.gitignore b/.gitignore
index 99e56d4..b5b1ae8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,3 +29,4 @@
 .*.html
 venv/
 tools/venv/
+.vscode/*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
index 71a465b..089ec17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
@@ -173,7 +173,14 @@
   }
 
   public ColumnHandle insert(ColumnMetadata col) {
-    return insert(insertPoint++, col);
+    switch (projType) {
+    case SOME:
+      return insert(columns.size(), col);
+    case ALL:
+      return insert(insertPoint++, col);
+    default:
+      throw new IllegalArgumentException("No projection, should not have materialized: " + col.name());
+    }
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
index 84f2826..1df1390 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
@@ -48,7 +48,6 @@
  * This class contrasts with the @{link ColumnBuilder} class which
  * builds the structure within a single vector and writer.
  */
-
 public class BuildFromSchema {
 
   /**
@@ -59,7 +58,6 @@
    * but have slightly different semantics. This shim wraps
    * the semantics so the builder code is simpler.
    */
-
   private interface ParentShim {
     ObjectWriter add(ColumnMetadata colSchema);
   }
@@ -140,7 +138,6 @@
    *
    * @param schema desired tuple schema to be materialized
    */
-
   public void buildTuple(TupleWriter writer, TupleMetadata schema) {
     final ParentShim tupleShim = new TupleShim(writer);
     for (int i = 0; i < schema.size(); i++) {
@@ -157,7 +154,6 @@
    * @param colSchema the schema of the column to add
    * @return the object writer for the added column
    */
-
   public ObjectWriter buildColumn(TupleState state, ColumnMetadata colSchema) {
     return buildColumn(new TupleStateShim(state), colSchema);
   }
@@ -171,7 +167,6 @@
    * @param colSchema the schema of the column to add
    * @return the object writer for the added column
    */
-
   private ObjectWriter buildColumn(ParentShim parent, ColumnMetadata colSchema) {
     if (colSchema.isMultiList()) {
       return buildRepeatedList(parent, colSchema);
@@ -197,7 +192,6 @@
    * @return true if the column is of type LIST with a single
    * element type
    */
-
   private boolean isSingleList(ColumnMetadata colSchema) {
     return colSchema.isVariant() && colSchema.isArray() && colSchema.variantSchema().isSingleType();
   }
@@ -231,7 +225,6 @@
    * the common behavior
    * @param colSchema the schema of the variant (LIST or UNION) column
    */
-
   private ObjectWriter buildVariant(ParentShim parent, ColumnMetadata colSchema) {
     final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
     expandVariant(colWriter, colSchema);
@@ -271,7 +264,6 @@
    * @param parent tuple writer for the tuple that holds the array
    * @param colSchema schema definition of the array
    */
-
   private ObjectWriter buildRepeatedList(ParentShim parent, ColumnMetadata colSchema) {
     final ObjectWriter objWriter = parent.add(colSchema.cloneEmpty());
     final RepeatedListWriter listWriter = (RepeatedListWriter) objWriter.array();
@@ -292,7 +284,6 @@
    * @param colWriter the writer for the (possibly structured) column
    * @param colSchema the schema definition for the column
    */
-
   private void expandColumn(ObjectWriter colWriter, ColumnMetadata colSchema) {
 
     if (colSchema.isMultiList()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
index 45f4b1f..bb02d56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
@@ -156,8 +156,7 @@
     } else if (columnSchema.isArray()) {
       vectorState = new RepeatedVectorState(colWriter.array(), (RepeatedValueVector) vector);
     } else if (columnSchema.isNullable()) {
-      vectorState = new NullableVectorState(
-          colWriter, (NullableVector) vector);
+      vectorState = new NullableVectorState(colWriter, (NullableVector) vector);
     } else {
       vectorState = SimpleVectorState.vectorState(columnSchema,
             colWriter.events(), vector);
@@ -319,7 +318,14 @@
     // have content that varies from batch to batch. Only the leaf
     // vectors can be cached.
     assert columnSchema.variantSchema().size() == 0;
-    final UnionVector vector = new UnionVector(columnSchema.schema(), parent.loader().allocator(), null);
+    final UnionVector vector;
+    if (parent.projection().projection(columnSchema).isProjected || allowCreation(parent)) {
+      vector = new UnionVector(columnSchema.schema(), parent.loader().allocator(), null);
+    } else {
+
+      // Column is not projected. No materialized backing for the column.
+      vector = null;
+    }
 
     // Then the union writer.
     final UnionWriterImpl unionWriter = new UnionWriterImpl(columnSchema, vector, null);
@@ -330,7 +336,8 @@
 
     // Create the manager for the columns within the union.
     final UnionState unionState = new UnionState(parent.loader(),
-        parent.vectorCache().childCache(columnSchema.name()));
+        parent.vectorCache().childCache(columnSchema.name()),
+        vector == null ? ProjectionFilter.PROJECT_NONE : ProjectionFilter.PROJECT_ALL);
 
     // Bind the union state to the union writer to handle column additions.
     unionWriter.bindListener(unionState);
@@ -372,7 +379,10 @@
    */
   private ColumnState buildSimpleList(ContainerState parent, ColumnMetadata columnSchema) {
 
-    // The variant must have the one and only type.
+    final ProjectionFilter projFilter = parent.projection();
+    final ProjResult projResult = projFilter.projection(columnSchema);
+
+   // The variant must have the one and only type.
     assert columnSchema.variantSchema().size() == 1;
     assert columnSchema.variantSchema().isSimple();
 
@@ -386,9 +396,16 @@
     listState.setSubColumn(memberState);
 
     // Create the list vector. Contains a single type.
-    final ListVector listVector = new ListVector(columnSchema.schema().cloneEmpty(),
-        parent.loader().allocator(), null);
-    listVector.setChildVector(memberState.vector());
+    final ListVector listVector;
+    if (projResult.isProjected) {
+      listVector= new ListVector(columnSchema.schema().cloneEmpty(),
+          parent.loader().allocator(), null);
+      listVector.setChildVector(memberState.vector());
+    } else {
+
+      // Column is not projected. No materialized backing for the column.
+      listVector = null;
+    }
 
     // Create the list writer: an array of the one type.
     final ListWriterImpl listWriter = new ListWriterImpl(columnSchema,
@@ -396,8 +413,13 @@
     final AbstractObjectWriter listObjWriter = new ArrayObjectWriter(listWriter);
 
     // Create the list vector state that tracks the list vector lifecycle.
-    final ListVectorState vectorState = new ListVectorState(listWriter,
-        memberState.writer().events(), listVector);
+    final VectorState vectorState;
+    if (listVector == null) {
+      vectorState = new NullVectorState();
+    } else {
+      vectorState= new ListVectorState(listWriter,
+          memberState.writer().events(), listVector);
+    }
 
     // Assemble it all into a union column state.
     return new UnionColumnState(parent.loader(),
@@ -468,17 +490,24 @@
     // the element type after creating the repeated writer itself.
     assert columnSchema.childSchema() == null;
 
+    final ProjectionFilter projFilter = parent.projection();
+    final ProjResult projResult = projFilter.projection(columnSchema);
+
     // Build the repeated vector.
-    final RepeatedListVector vector = new RepeatedListVector(
+    final RepeatedListVector vector;
+    if (projResult.isProjected) {
+      vector = new RepeatedListVector(
         columnSchema.emptySchema(), parent.loader().allocator(), null);
+    } else {
+      vector = null;
+    }
 
     // No inner type yet. The result set loader builds the subtype
     // incrementally because it might be complex (a map or another
     // repeated list.) To start, use a dummy to avoid need for if-statements
     // everywhere.
     final ColumnMetadata dummyElementSchema = new PrimitiveColumnMetadata(
-        MaterializedField.create(columnSchema.name(),
-            Types.repeated(MinorType.NULL)));
+        MaterializedField.create(columnSchema.name(), Types.repeated(MinorType.NULL)));
     final AbstractObjectWriter dummyElement = ColumnWriterFactory.buildDummyColumnWriter(dummyElementSchema);
 
     // Create the list writer: an array of arrays.
@@ -486,9 +515,7 @@
         columnSchema, vector, dummyElement);
 
     // Create the list vector state that tracks the list vector lifecycle.
-
-    final RepeatedListVectorState vectorState = new RepeatedListVectorState(
-        arrayWriter, vector);
+    final VectorState vectorState = new RepeatedListVectorState(arrayWriter, vector);
 
     // Build the container that tracks the array contents
     final RepeatedListState listState = new RepeatedListState(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/NullVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/NullVectorState.java
index 041679c..9f2b7f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/NullVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/NullVectorState.java
@@ -24,7 +24,6 @@
  * Do-nothing vector state for a map column which has no actual vector
  * associated with it.
  */
-
 public class NullVectorState implements VectorState {
 
   /**
@@ -32,7 +31,6 @@
    * allocate or roll-over, but where we do want to at least track
    * the vector itself. (Used for map and union pseudo-vectors.)
    */
-
   public static class UnmanagedVectorState extends NullVectorState {
     ValueVector vector;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedListState.java
index f12cbfe..6bf3d85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedListState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedListState.java
@@ -37,7 +37,7 @@
 /**
  * Represents the internal state of a RepeatedList vector. The repeated list
  * is wrapped in a repeated list "column state" that manages the column as a
- * whole. The repeated list acts as a container which the <tt>RepeatedListState<tt>
+ * whole. The repeated list acts as a container which the {@code RepeatedListState}
  * implements. At the vector level, we track the repeated list vector, but
  * only perform operations on its associated offset vector.
  */
@@ -48,12 +48,12 @@
    */
   public static class RepeatedListColumnState extends BaseContainerColumnState {
 
-    private final RepeatedListState listState;
+    private final ContainerState listState;
 
     public RepeatedListColumnState(LoaderInternals loader,
         AbstractObjectWriter writer,
-        RepeatedListVectorState vectorState,
-        RepeatedListState listState) {
+        VectorState vectorState,
+        ContainerState listState) {
       super(loader, writer, vectorState);
       this.listState = listState;
       listState.bindColumnState(this);
@@ -80,14 +80,19 @@
 
     private final ArrayWriter arrayWriter;
     private final RepeatedListVector vector;
-    private final OffsetVectorState offsetsState;
+    private final VectorState offsetsState;
 
     public RepeatedListVectorState(AbstractObjectWriter arrayWriter, RepeatedListVector vector) {
       this.vector = vector;
       this.arrayWriter = arrayWriter.array();
-      offsetsState = new OffsetVectorState(
-          arrayWriter.events(), vector.getOffsetVector(),
-          this.arrayWriter.entryType() == null ? null : arrayWriter.events());
+      if (vector == null) {
+        offsetsState = new NullVectorState();
+      } else {
+        offsetsState = new OffsetVectorState(
+            arrayWriter.events(),
+            vector.getOffsetVector(),
+            this.arrayWriter.entryType() == null ? null : arrayWriter.events());
+      }
     }
 
     /**
@@ -99,7 +104,9 @@
      * of the repeated list
      */
     public void updateChildWriter(AbstractObjectWriter childWriter) {
-      offsetsState.setChildWriter(childWriter.events());
+      if (offsetsState instanceof OffsetVectorState) {
+        ((OffsetVectorState) offsetsState).setChildWriter(childWriter.events());
+      }
     }
 
     @SuppressWarnings("unchecked")
@@ -175,7 +182,9 @@
     // vector.
     final RepeatedListVectorState vectorState = (RepeatedListVectorState) parentColumn.vectorState();
     final RepeatedListVector listVector = vectorState.vector;
-    listVector.setChildVector(childState.vector());
+    if (listVector != null) {
+      listVector.setChildVector(childState.vector());
+    }
 
     // The repeated list's offset vector state needs to know the offset
     // of the inner vector. Bind that information now that we have
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index dd5973c..571c4d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -284,7 +284,6 @@
     columnBuilder = new ColumnBuilder();
 
     // Determine the root vector cache
-
     ResultVectorCache vectorCache;
     if (options.vectorCache == null) {
       vectorCache = new NullResultVectorCacheImpl(allocator);
@@ -293,20 +292,18 @@
     }
 
     // Build the row set model depending on whether a schema is provided.
-
     rootState = new RowState(this, vectorCache);
     rootWriter = rootState.rootWriter();
 
     // If no schema, columns will be added incrementally as they
     // are discovered. Start with an empty model.
-
     if (options.schema != null) {
 
       // Schema provided. Populate a model (and create vectors) for the
       // provided schema. The schema can be extended later, but normally
       // won't be if known up front.
 
-      logger.debug("Schema: " + options.schema);
+      logger.debug("Schema: " + options.schema.toString());
       BuildFromSchema.instance().buildTuple(rootWriter, options.schema);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java
index 02cc90c..ec431e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java
@@ -25,7 +25,6 @@
 import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
 import org.apache.drill.exec.physical.resultSet.impl.ColumnState.BaseContainerColumnState;
 import org.apache.drill.exec.physical.resultSet.impl.SingleVectorState.FixedWidthVectorState;
-import org.apache.drill.exec.physical.resultSet.impl.SingleVectorState.SimpleVectorState;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.record.metadata.VariantSchema;
@@ -95,12 +94,16 @@
   public static class UnionVectorState implements VectorState {
 
     private final UnionVector vector;
-    private final SimpleVectorState typesVectorState;
+    private final VectorState typesVectorState;
 
     public UnionVectorState(UnionVector vector, UnionWriterImpl unionWriter) {
       this.vector = vector;
-      typesVectorState = new FixedWidthVectorState(
-          ((UnionVectorShim) unionWriter.shim()).typeWriter(), vector.getTypeVector());
+      if (vector == null) {
+        typesVectorState = new NullVectorState();
+      } else {
+        typesVectorState = new FixedWidthVectorState(
+            ((UnionVectorShim) unionWriter.shim()).typeWriter(), vector.getTypeVector());
+      }
     }
 
     @Override
@@ -133,7 +136,7 @@
     public UnionVector vector() { return vector; }
 
     @Override
-    public boolean isProjected() { return true; }
+    public boolean isProjected() { return vector != null; }
 
     @Override
     public void dump(HierarchicalFormatter format) {
@@ -147,11 +150,10 @@
    * vectors in the union,
    * and matches the set of child writers in the union writer.
    */
-
   private final Map<MinorType, ColumnState> columns = new HashMap<>();
 
-  public UnionState(LoaderInternals events, ResultVectorCache vectorCache) {
-    super(events, vectorCache);
+  public UnionState(LoaderInternals events, ResultVectorCache vectorCache, ProjectionFilter projectionSet) {
+    super(events, vectorCache, projectionSet);
   }
 
   public UnionWriterImpl writer() {
@@ -183,7 +185,13 @@
   protected void addColumn(ColumnState colState) {
     assert ! columns.containsKey(colState.schema().type());
     columns.put(colState.schema().type(), colState);
-    vector().addType(colState.vector());
+    if (vector() == null) {
+      if (colState.vector() != null) {
+        throw new IllegalStateException("Attempt to add a materialized vector to an unprojected vector");
+      }
+    } else {
+      vector().addType(colState.vector());
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderUnprojected.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderUnprojected.java
new file mode 100644
index 0000000..60abdf1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderUnprojected.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.resultSet.project.Projections;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+/**
+ * Verify the correct functioning of the "dummy" columns created
+ * for unprojected columns.
+ */
+public class TestResultSetLoaderUnprojected  extends SubOperatorTest {
+
+  @Test
+  public void testScalar()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a");
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertFalse(rootWriter.column("b").isProjected());
+    rsLoader.startBatch();
+    for (int i = 1; i < 3; i++) {
+      rootWriter.start();
+      rootWriter.scalar(0).setInt(i);
+      rootWriter.scalar(1).setInt(i * 5);
+      rootWriter.save();
+    }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1)
+        .addRow(2)
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testScalarArray()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a");
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addArray("b", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertFalse(rootWriter.column("b").isProjected());
+    rsLoader.startBatch();
+    for (int i = 1; i < 3; i++) {
+      rootWriter.start();
+      rootWriter.scalar(0).setInt(i);
+      for (int j = 0; j < 3; j++) {
+        ArrayWriter aw = rootWriter.array(1);
+        ScalarWriter sw = rootWriter.array(1).scalar();
+        sw.setInt(i * 5 + j);
+        aw.save();
+      }
+      rootWriter.save();
+    }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1)
+        .addRow(2)
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testMap()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a");
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("a")
+          .add("foo", MinorType.INT)
+          .resumeSchema()
+        .addMap("b")
+          .add("foo", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertFalse(rootWriter.column("b").isProjected());
+    rsLoader.startBatch();
+    for (int i = 1; i < 3; i++) {
+      rootWriter.start();
+      rootWriter.tuple(0).scalar("foo").setInt(i);
+      rootWriter.tuple(1).scalar("foo").setInt(i * 5);
+      rootWriter.save();
+    }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("a")
+          .add("foo", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(RowSetUtilities.mapValue(1))
+        .addSingleCol(RowSetUtilities.mapValue(2))
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testMapElements()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a.foo");
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("a")
+          .add("foo", MinorType.INT)
+          .add("bar", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(1, actualSchema.size());
+    assertEquals("a", actualSchema.metadata(0).name());
+    assertEquals(2, actualSchema.metadata(0).tupleSchema().size());
+    assertEquals("foo", actualSchema.metadata(0).tupleSchema().metadata(0).name());
+    assertEquals("bar", actualSchema.metadata(0).tupleSchema().metadata(1).name());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertTrue(rootWriter.tuple("a").column(0).isProjected());
+    assertFalse(rootWriter.tuple("a").column(1).isProjected());
+    rsLoader.startBatch();
+    for (int i = 1; i < 3; i++) {
+      rootWriter.start();
+      TupleWriter aWriter = rootWriter.tuple(0);
+      aWriter.scalar("foo").setInt(i);
+      aWriter.scalar("bar").setInt(i * 5);
+      rootWriter.save();
+    }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("a")
+          .add("foo", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(RowSetUtilities.mapValue(1))
+        .addSingleCol(RowSetUtilities.mapValue(2))
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testMapArray()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a");
+    TupleMetadata schema = new SchemaBuilder()
+        .addMapArray("a")
+          .add("foo", MinorType.INT)
+          .resumeSchema()
+        .addMapArray("b")
+          .add("foo", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertFalse(rootWriter.column("b").isProjected());
+    rsLoader.startBatch();
+    for (int i = 0; i < 2; i++) {
+      rootWriter.start();
+      ArrayWriter aWriter = rootWriter.array(0);
+      ArrayWriter bWriter = rootWriter.array(1);
+      for (int j = 0; j < 2; j++) {
+        aWriter.tuple().scalar(0).setInt(i * 2 + j + 1);
+        bWriter.tuple().scalar(0).setInt((i * 2 + j) * 5);
+        aWriter.save();
+        bWriter.save();
+      }
+      rootWriter.save();
+    }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMapArray("a")
+          .add("foo", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(
+            RowSetUtilities.mapArray(
+                RowSetUtilities.mapValue(1),
+                RowSetUtilities.mapValue(2)))
+        .addSingleCol(
+            RowSetUtilities.mapArray(
+                RowSetUtilities.mapValue(3),
+                RowSetUtilities.mapValue(4)))
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testVariant()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a");
+    TupleMetadata schema = new SchemaBuilder()
+        .addUnion("a")
+          .resumeSchema()
+        .addUnion("b")
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertFalse(rootWriter.column("b").isProjected());
+    rsLoader.startBatch();
+    rootWriter.start();
+    rootWriter.variant(0).scalar(MinorType.INT).setInt(1);
+    rootWriter.variant(1).scalar(MinorType.INT).setInt(5);
+    rootWriter.save();
+    rootWriter.start();
+    rootWriter.variant(0).scalar(MinorType.VARCHAR).setString("2");
+    rootWriter.variant(1).scalar(MinorType.VARCHAR).setString("10");
+    rootWriter.save();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addUnion("a")
+          .addType(MinorType.INT)
+          .addType(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1)
+        .addRow("2")
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testList()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a");
+    TupleMetadata schema = new SchemaBuilder()
+        .addList("a")
+          .addType(MinorType.INT)
+          .resumeSchema()
+        .addList("b")
+          .addType(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertFalse(rootWriter.column("b").isProjected());
+    rsLoader.startBatch();
+    ArrayWriter aw = rootWriter.array(0);
+    ScalarWriter swa = aw.scalar();
+    ArrayWriter bw = rootWriter.array(1);
+    ScalarWriter swb = bw.scalar();
+    for (int i = 1; i < 3; i++) {
+      rootWriter.start();
+      for (int j = 0; j < 3; j++) {
+        swa.setInt(i * 10 + j);
+        swb.setInt(i * 100 + j);
+        aw.save();
+        bw.save();
+      }
+      rootWriter.save();
+    }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addList("a")
+          .addType(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(RowSetUtilities.listValue(10, 11, 12))
+        .addSingleCol(RowSetUtilities.listValue(20, 21, 22))
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void test2DList()
+  {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("a");
+    TupleMetadata schema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addArray(MinorType.INT)
+          .resumeSchema()
+        .addRepeatedList("b")
+          .addArray(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .projection(Projections.parse(selection))
+        .readerSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertTrue(rootWriter.column("a").isProjected());
+    assertFalse(rootWriter.column("b").isProjected());
+    rsLoader.startBatch();
+    ArrayWriter aw = rootWriter.array(0);
+    ArrayWriter aw2 = aw.array();
+    ScalarWriter swa = aw2.scalar();
+    ArrayWriter bw = rootWriter.array(1);
+    ArrayWriter bw2 = bw.array();
+    ScalarWriter swb = bw2.scalar();
+    for (int i = 1; i < 3; i++) {
+      rootWriter.start();
+      for (int j = 0; j < 3; j++) {
+        for (int k = 0; k < 3; k++) {
+          swa.setInt(i * 10 + j * 3 + k);
+          swb.setInt(i * 100 + j * 30 + k);
+          aw2.save();
+          bw2.save();
+        }
+        aw.save();
+        bw.save();
+      }
+      rootWriter.save();
+    }
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addArray(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(RowSetUtilities.listValue(
+            RowSetUtilities.listValue(10, 11, 12),
+            RowSetUtilities.listValue(13, 14, 15),
+            RowSetUtilities.listValue(16, 17, 18)))
+        .addSingleCol(RowSetUtilities.listValue(
+            RowSetUtilities.listValue(20, 21, 22),
+            RowSetUtilities.listValue(23, 24, 25),
+            RowSetUtilities.listValue(26, 27, 28)))
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+}
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index fa01bc8..79e799a 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -882,7 +882,7 @@
     <profile>
       <id>hadoop-2</id>
       <properties>
-        <jdbc-all-jar.maxsize>54300000</jdbc-all-jar.maxsize>
+        <jdbc-all-jar.maxsize>54400000</jdbc-all-jar.maxsize>
       </properties>
     </profile>
   </profiles>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index 0f78f55..d80b2fe 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -79,9 +79,9 @@
    * columns may be available only when explicitly requested. For example,
    * the log reader has a "_raw" column which includes the entire input
    * line before parsing. This column can be requested explicitly:<br>
-   * <tt>SELECT foo, bar, _raw FROM ...</tt><br>
+   * {@code SELECT foo, bar, _raw FROM ...}<br>
    * but the column will <i>not</i> be included when using the wildcard:<br>
-   * <tt>SELECT * FROM ...</tt>
+   * {@code SELECT * FROM ...}
    * <p>
    * Marking a column (either in the provided schema or the reader schema)
    * will prevent that column from appearing in a wildcard expansion.
@@ -192,25 +192,25 @@
   StructureType structureType();
 
   /**
-   * Schema for <tt>TUPLE</tt> columns.
+   * Schema for {@code TUPLE} columns.
    *
    * @return the tuple schema
    */
   TupleMetadata tupleSchema();
 
   /**
-   * Schema for <tt>VARIANT</tt> columns.
+   * Schema for {@code VARIANT} columns.
    *
    * @return the variant schema
    */
   VariantMetadata variantSchema();
 
   /**
-   * Schema of inner dimension for <tt>MULTI_ARRAY<tt> columns.
+   * Schema of inner dimension for <code>MULTI_ARRAY</code> columns.
    * If an array is 3D, the outer column represents all 3 dimensions.
-   * <tt>outer.childSchema()</tt> gives another <tt>MULTI_ARRAY</tt>
+   * {@code outer.childSchema()} gives another {@code MULTI_ARRAY}
    * for the inner 2D array.
-   * <tt>outer.childSchema().childSchema()</tt> gives a column
+   * {@code outer.childSchema().childSchema()} gives a column
    * of some other type (but repeated) for the 1D array.
    * <p>
    * Sorry for the mess, but it is how the code works and we are not
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index 994b6db..76541cc 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -32,6 +32,7 @@
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.exec.vector.accessor.VariantWriter;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter.DummyOffsetVectorWriter;
 
 /**
  * Writer for an array-valued column. This writer appends values: once a value
@@ -177,7 +178,8 @@
   public static abstract class BaseArrayWriter extends AbstractArrayWriter {
 
     public BaseArrayWriter(ColumnMetadata schema, UInt4Vector offsetVector, AbstractObjectWriter elementObjWriter) {
-      super(schema, elementObjWriter, new OffsetVectorWriterImpl(offsetVector));
+      super(schema, elementObjWriter,
+          offsetVector == null ? new DummyOffsetVectorWriter() : new OffsetVectorWriterImpl(offsetVector));
     }
 
     @Override
@@ -328,7 +330,7 @@
   public boolean nullable() { return false; }
 
   @Override
-  public boolean isProjected() { return true; }
+  public boolean isProjected() { return offsetsWriter.isProjected(); }
 
   @Override
   public void setNull() {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
index 2b22efc..2124ca4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
@@ -125,15 +125,14 @@
         final ScalarObjectWriter scalarWriter = new ScalarObjectWriter(
             new DummyScalarWriter(schema));
         switch (schema.mode()) {
-        case OPTIONAL:
-        case REQUIRED:
-          return scalarWriter;
-        case REPEATED:
-          return new ArrayObjectWriter(
-              new DummyArrayWriter(schema,
-                scalarWriter));
-        default:
-          throw new UnsupportedOperationException(schema.mode().toString());
+          case OPTIONAL:
+          case REQUIRED:
+            return scalarWriter;
+          case REPEATED:
+            return new ArrayObjectWriter(
+                new DummyArrayWriter(schema, scalarWriter));
+          default:
+            throw new UnsupportedOperationException(schema.mode().toString());
       }
     }
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java
index f3e2590..7f341fa 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java
@@ -22,7 +22,6 @@
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
-import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim;
 import com.google.common.base.Preconditions;
 
 /**
@@ -32,7 +31,6 @@
  * the case that a list may eventually hold a union, but at present
  * it holds nothing.
  */
-
 public class EmptyListShim implements UnionShim {
 
   private UnionWriterImpl writer;
@@ -116,7 +114,6 @@
    * @param colWriter the column writer returned from the listener
    * @return the same column writer
    */
-
   private AbstractObjectWriter doAddMember(AbstractObjectWriter colWriter) {
     // Something went terribly wrong if the check below fails.
     Preconditions.checkState(writer.shim() != this);
@@ -146,4 +143,7 @@
   public void dump(HierarchicalFormatter format) {
     format.startObject(this).endObject();
   }
+
+  @Override
+  public boolean isProjected() { return true; }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ListWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ListWriterImpl.java
index cb26daa..34aa551 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ListWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ListWriterImpl.java
@@ -20,8 +20,9 @@
 import java.lang.reflect.Array;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ColumnAccessors.UInt1ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter;
 import org.apache.drill.exec.vector.complex.ListVector;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -32,16 +33,19 @@
  * their indexes since the contents of lists can change dynamically,
  * and auto-increment is meaningful only for scalar arrays.
  */
-
 public class ListWriterImpl extends ObjectArrayWriter {
 
   private final ListVector vector;
-  private final UInt1ColumnWriter isSetWriter;
+  private final AbstractScalarWriterImpl isSetWriter;
 
   public ListWriterImpl(ColumnMetadata schema, ListVector vector, AbstractObjectWriter memberWriter) {
-    super(schema, vector.getOffsetVector(), memberWriter);
+    super(schema, vector == null ? null : vector.getOffsetVector(), memberWriter);
     this.vector = vector;
-    isSetWriter = new UInt1ColumnWriter(vector.getBitsVector());
+    if (vector == null) {
+      isSetWriter = new DummyScalarWriter(null);
+    } else {
+      isSetWriter = new UInt1ColumnWriter(vector.getBitsVector());
+    }
     elementIndex = new ArrayElementWriterIndex();
   }
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index 3245323..e183ca9 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -32,7 +32,6 @@
 /**
  * Writer for a Drill Map type. Maps are actually tuples, just like rows.
  */
-
 public abstract class MapWriter extends AbstractTupleWriter {
 
   /**
@@ -40,7 +39,6 @@
    * rather, this writer is a holder for the columns within the map, and those
    * columns are what is written.
    */
-
   protected static class SingleMapWriter extends MapWriter {
     private final MapVector mapVector;
 
@@ -83,7 +81,6 @@
    * Since the map is an array, it has an associated offset vector, which the
    * parent array writer is responsible for maintaining.
    */
-
   protected static class ArrayMapWriter extends MapWriter {
 
     protected ArrayMapWriter(ColumnMetadata schema, List<AbstractObjectWriter> writers) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java
index a845828..6a58f12 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java
@@ -60,33 +60,33 @@
     }
   }
 
-  public static ObjectDictWriter.DictObjectWriter buildDict(ColumnMetadata metadata, DictVector vector,
+  public static DictObjectWriter buildDict(ColumnMetadata metadata, DictVector vector,
                                                             List<AbstractObjectWriter> keyValueWriters) {
     DictEntryWriter.DictEntryObjectWriter entryObjectWriter =
         DictEntryWriter.buildDictEntryWriter(metadata, keyValueWriters, vector);
     DictWriter objectDictWriter;
-    if (vector != null) {
-      objectDictWriter = new ObjectDictWriter(metadata, vector.getOffsetVector(), entryObjectWriter);
-    } else {
+    if (vector == null) {
       objectDictWriter = new DummyDictWriter(metadata, entryObjectWriter);
+    } else {
+      objectDictWriter = new ObjectDictWriter(metadata, vector.getOffsetVector(), entryObjectWriter);
     }
-    return new ObjectDictWriter.DictObjectWriter(objectDictWriter);
+    return new DictObjectWriter(objectDictWriter);
   }
 
   public static ArrayObjectWriter buildDictArray(ColumnMetadata metadata, RepeatedDictVector vector,
                                                  List<AbstractObjectWriter> keyValueWriters) {
     final DictVector dataVector;
-    if (vector != null) {
-      dataVector = (DictVector) vector.getDataVector();
-    } else {
+    if (vector == null) {
       dataVector = null;
+    } else {
+      dataVector = (DictVector) vector.getDataVector();
     }
     ObjectDictWriter.DictObjectWriter dictWriter = buildDict(metadata, dataVector, keyValueWriters);
     AbstractArrayWriter arrayWriter;
-    if (vector != null) {
-      arrayWriter = new ObjectArrayWriter(metadata, vector.getOffsetVector(), dictWriter);
-    } else {
+    if (vector == null) {
       arrayWriter = new DummyArrayWriter(metadata, dictWriter);
+    } else {
+      arrayWriter = new ObjectArrayWriter(metadata, vector.getOffsetVector(), dictWriter);
     }
     return new ArrayObjectWriter(arrayWriter);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/RepeatedListWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/RepeatedListWriter.java
index 36d25fb..728785d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/RepeatedListWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/RepeatedListWriter.java
@@ -42,7 +42,6 @@
  * list is a dummy, to be replaced by the real one once it is discovered
  * by reading data (or by parsing a schema.)
  */
-
 public class RepeatedListWriter extends ObjectArrayWriter {
 
   public interface ArrayListener {
@@ -63,7 +62,7 @@
   public static AbstractObjectWriter buildRepeatedList(ColumnMetadata schema,
       RepeatedListVector vector, AbstractObjectWriter elementWriter) {
     AbstractArrayWriter arrayWriter = new RepeatedListWriter(schema,
-        vector.getOffsetVector(),
+        vector == null ? null : vector.getOffsetVector(),
         elementWriter);
     return new ArrayObjectWriter(arrayWriter);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
index cf7ef28..c0f4afc 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
@@ -182,6 +182,8 @@
         setByteObjectArray((Byte[]) array);
       } else if (memberClassName.equals(Boolean.class.getName())) {
         setBooleanObjectArray((Boolean[]) array);
+      } else if (memberClassName.equals(Object.class.getName())) {
+        setObjectArray((Object[]) array);
       } else {
         throw new IllegalArgumentException( "Unknown Java array type: " + memberClassName );
       }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java
index a6a1e6c..29db57d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java
@@ -22,7 +22,6 @@
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
-import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim;
 import com.google.common.base.Preconditions;
 
 /**
@@ -33,7 +32,6 @@
  * to present a uniform variant interface for a list that holds zero
  * one (this case) or many types.
  */
-
 public class SimpleListShim implements UnionShim {
 
   private UnionWriterImpl writer;
@@ -209,4 +207,9 @@
     colWriter.dump(format);
     format.endObject();
   }
+
+  @Override
+  public boolean isProjected() {
+    return true;
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionMemberShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionMemberShim.java
new file mode 100644
index 0000000..0cea513
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionMemberShim.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.UnionVector;
+
+/**
+ * Retrieves the member vectors for a union vector.
+ */
+interface UnionMemberShim {
+
+  ValueVector getMember(MinorType type);
+  boolean isProjected();
+
+  class UnionMemberShimImpl implements UnionMemberShim {
+    private final UnionVector vector;
+
+    protected UnionMemberShimImpl(UnionVector vector) {
+      this.vector = vector;
+    }
+
+    @Override
+    public ValueVector getMember(MinorType type) {
+      return vector.getMember(type);
+    }
+
+    @Override
+    public boolean isProjected() { return true; }
+  }
+
+  class DummyUnionMemberShim implements UnionMemberShim {
+
+    @Override
+    public ValueVector getMember(MinorType type) {
+      return null;
+    }
+
+    @Override
+    public boolean isProjected() { return false; }
+  }
+
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionShim.java
new file mode 100644
index 0000000..53ca8d5
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionShim.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+
+/**
+ * Unions are overly complex. They can evolve from no type, to a single type,
+ * to multiple types. The kind of vector used in these cases differ. This
+ * shim acts as a facade between the writer and the underlying vector, allowing
+ * the writer to remain constant while the vector (and its shim) evolves.
+ */
+public interface UnionShim extends WriterEvents {
+  void bindWriter(UnionWriterImpl writer);
+  void setNull();
+  boolean hasType(MinorType type);
+
+  /**
+   * Return an existing writer for the given type, or create a new one
+   * if needed.
+   *
+   * @param type desired variant type
+   * @return a writer for that type
+   */
+  ObjectWriter member(MinorType type);
+  void setType(MinorType type);
+  @Override
+  int lastWriteIndex();
+  @Override
+  int rowStartIndex();
+  AbstractObjectWriter addMember(ColumnMetadata colSchema);
+  AbstractObjectWriter addMember(MinorType type);
+  void addMember(AbstractObjectWriter colWriter);
+  boolean isProjected();
+
+  public abstract class AbstractUnionShim implements UnionShim {
+
+    protected final AbstractObjectWriter variants[];
+
+    public AbstractUnionShim() {
+      variants = new AbstractObjectWriter[MinorType.values().length];
+    }
+
+    public AbstractUnionShim(AbstractObjectWriter variants[]) {
+      if (variants == null) {
+        this.variants = new AbstractObjectWriter[MinorType.values().length];
+      } else {
+        this.variants = variants;
+      }
+    }
+
+    @Override
+    public boolean hasType(MinorType type) {
+      return variants[type.ordinal()] != null;
+    }
+
+    /**
+     * Performs just the work of adding a vector to the list of existing
+     * variants. Called when adding a type via the writer, but also when
+     * the result set loader promotes a list from single type to a union,
+     * and provides this shim with the writer from the single-list shim.
+     * In the latter case, the writer is already initialized and is already
+     * part of the metadata for this list; so we don't want to call the
+     * list's {@code addMember()} and repeat those operations.
+     *
+     * @param colWriter the column (type) writer to add
+     */
+    public void addMemberWriter(AbstractObjectWriter colWriter) {
+      final MinorType type = colWriter.schema().type();
+      assert variants[type.ordinal()] == null;
+      variants[type.ordinal()] = colWriter;
+    }
+  }
+}
+
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
index f9d0996..002fdde 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
@@ -17,15 +17,19 @@
  */
 package org.apache.drill.exec.vector.accessor.writer;
 
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractFixedWidthWriter.BaseFixedWidthWriter;
-import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim;
+import org.apache.drill.exec.vector.accessor.writer.UnionMemberShim.DummyUnionMemberShim;
+import org.apache.drill.exec.vector.accessor.writer.UnionMemberShim.UnionMemberShimImpl;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter;
 import org.apache.drill.exec.vector.complex.UnionVector;
 
 /**
@@ -55,7 +59,10 @@
       // which will create the member metadata. This means that the type
       // will already be in the variant schema by the time we add the
       // writer to the variant writer in a few steps from now.
-      final ValueVector memberVector = shim.vector.getMember(type);
+      //
+      // When a variant is unprojected, the member vector will be null,
+      // which will cause a dummy writer to be created.
+      final ValueVector memberVector = shim.unionMemberShim.getMember(type);
       final ColumnMetadata memberSchema = shim.writer.variantSchema().addType(type);
       return ColumnWriterFactory.buildColumnWriter(memberSchema, memberVector);
     }
@@ -66,7 +73,7 @@
     }
   }
 
-  private final UnionVector vector;
+  private final UnionMemberShim unionMemberShim;
   private final AbstractObjectWriter variants[];
   private UnionWriterImpl writer;
 
@@ -75,18 +82,23 @@
    * says which union member holds the value for each row. The type vector
    * can also indicate if the value is null.
    */
-  private final BaseScalarWriter typeWriter;
+  private final AbstractScalarWriterImpl typeWriter;
 
   public UnionVectorShim(UnionVector vector) {
-    this.vector = vector;
+    this.unionMemberShim = vector == null ? new DummyUnionMemberShim() : new UnionMemberShimImpl(vector);
     typeWriter = ColumnWriterFactory.newWriter(vector.getTypeVector());
     variants = new AbstractObjectWriter[MinorType.values().length];
   }
 
   public UnionVectorShim(UnionVector vector,
       AbstractObjectWriter variants[]) {
-    this.vector = vector;
-    typeWriter = ColumnWriterFactory.newWriter(vector.getTypeVector());
+    if (vector == null) {
+      this.unionMemberShim = new DummyUnionMemberShim();
+      this.typeWriter = new DummyScalarWriter(new PrimitiveColumnMetadata("$type", MinorType.UINT1, DataMode.REQUIRED));
+    } else {
+      this.unionMemberShim = new UnionMemberShimImpl(vector);
+      this.typeWriter = ColumnWriterFactory.newWriter(vector.getTypeVector());
+    }
     if (variants == null) {
       this.variants = new AbstractObjectWriter[MinorType.values().length];
     } else {
@@ -273,7 +285,6 @@
     }
   }
 
-
   /**
    * Return the writer for the types vector. To be used only by the row set
    * loader overflow logic; never by the application (which is why the method
@@ -308,4 +319,8 @@
     typeWriter.dump(format);
     format.endObject();
   }
-}
\ No newline at end of file
+
+  @Override
+  public boolean isProjected() { return unionMemberShim.isProjected(); }
+}
+
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
index 98f0798..6ee4738 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
@@ -39,34 +39,22 @@
 
 /**
  * Writer to a union vector.
+ * <p>
+ * A union vector has three attributes: null flag, type and value.
+ * The union vector holds the type: a bundle of other vectors hold
+ * the value. The type says which of the other vectors to consult
+ * to write the value. If a column is null, then we consult no other
+ * vectors. If all columns (thus far) are null, then there are no
+ * associated data vectors.
+ * <p>
+ * The protocol is to first set the type. Doing so creates the
+ * associated data vector, if it does not yet exist. This highlights the
+ * poor design of this vector: if we have even one value of a given type,
+ * we must have a vector that holds values for all rows, then we ignore
+ * the unwanted values.
  */
-
 public class UnionWriterImpl implements VariantWriter, WriterEvents {
 
-  public interface UnionShim extends WriterEvents {
-    void bindWriter(UnionWriterImpl writer);
-    void setNull();
-    boolean hasType(MinorType type);
-
-    /**
-     * Return an existing writer for the given type, or create a new one
-     * if needed.
-     *
-     * @param type desired variant type
-     * @return a writer for that type
-     */
-
-    ObjectWriter member(MinorType type);
-    void setType(MinorType type);
-    @Override
-    int lastWriteIndex();
-    @Override
-    int rowStartIndex();
-    AbstractObjectWriter addMember(ColumnMetadata colSchema);
-    AbstractObjectWriter addMember(MinorType type);
-    void addMember(AbstractObjectWriter colWriter);
-  }
-
   public static class VariantObjectWriter extends AbstractObjectWriter {
 
     private final UnionWriterImpl writer;
@@ -102,7 +90,6 @@
    * need to implement the same methods, so we can't just implement these
    * methods on the union writer itself.
    */
-
   private class ElementPositions implements WriterPosition {
 
     @Override
@@ -234,7 +221,6 @@
    *
    * @param writer the column writer to add
    */
-
   protected void addMember(AbstractObjectWriter writer) {
     final MinorType type = writer.schema().type();
 
@@ -270,7 +256,7 @@
   }
 
   @Override
-  public boolean isProjected() { return true; }
+  public boolean isProjected() { return shim.isProjected(); }
 
   @Override
   public void startWrite() {