DRILL-7640: EVF-based JSON Loader

Builds on the JSON structure parser and several other PRs
to provide an enhanced, robust mechanism to read JSON data
into value vectors via the EVF. This is not the JSON reader,
rather it is the "V2" version of the JsonProcessor which
does the actual JSON parsing/loading work.

closes #2023
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java
index d2250ef..37f77a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ProjectionFilter.java
@@ -43,6 +43,8 @@
   ProjectionFilter PROJECT_ALL = new ImplicitProjectionFilter(true);
   ProjectionFilter PROJECT_NONE = new ImplicitProjectionFilter(false);
 
+  boolean isProjected(String colName);
+
   boolean isProjected(ColumnMetadata columnSchema);
 
   ProjectionFilter mapProjection(boolean isColProjected, String colName);
@@ -81,6 +83,11 @@
     }
 
     @Override
+    public boolean isProjected(String name) {
+      return projectAll;
+    }
+
+    @Override
     public boolean isProjected(ColumnMetadata columnSchema) {
       return projectAll ? !Projections.excludeFromWildcard(columnSchema) : false;
     }
@@ -111,6 +118,11 @@
     }
 
     @Override
+    public boolean isProjected(String colName) {
+      return projectionSet.isProjected(colName);
+    }
+
+    @Override
     public boolean isProjected(ColumnMetadata columnSchema) {
       return projectionSet.enforceProjection(columnSchema, errorContext);
     }
@@ -154,6 +166,12 @@
     }
 
     @Override
+    public boolean isProjected(String name) {
+      ColumnMetadata providedCol = providedSchema.metadata(name);
+      return providedCol != null || !isStrict;
+    }
+
+    @Override
     public boolean isProjected(ColumnMetadata columnSchema) {
       ColumnMetadata providedCol = providedSchema.metadata(columnSchema.name());
       if (providedCol == null) {
@@ -210,6 +228,11 @@
     }
 
     @Override
+    public boolean isProjected(String name) {
+      return filter1.isProjected(name) && filter2.isProjected(name);
+    }
+
+    @Override
     public boolean isProjected(ColumnMetadata columnSchema) {
       return filter1.isProjected(columnSchema) && filter2.isProjected(columnSchema);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
index 1f05ceb..33c8de4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
@@ -93,6 +93,153 @@
   implements AbstractTupleWriter.TupleWriterListener {
 
   /**
+   * The set of columns added via the writers: includes both projected
+   * and unprojected columns. (The writer is free to add columns that the
+   * query does not project; the result set loader creates a dummy column
+   * and dummy writer, then does not project the column to the output.)
+   */
+  protected final List<ColumnState> columns = new ArrayList<>();
+
+  /**
+   * Internal writer schema that matches the column list.
+   */
+  protected final TupleMetadata schema = new TupleSchema();
+
+  /**
+   * Metadata description of the output container (for the row) or map
+   * (for map or repeated map.)
+   * <p>
+   * Rows and maps have an output schema which may differ from the internal schema.
+   * The output schema excludes unprojected columns. It also excludes
+   * columns added in an overflow row.
+   * <p>
+   * The output schema is built slightly differently for maps inside a
+   * union vs. normal top-level (or nested) maps. Maps inside a union do
+   * not defer columns because of the muddy semantics (and infrequent use)
+   * of unions.
+   */
+  protected TupleMetadata outputSchema;
+
+  private int prevHarvestIndex = -1;
+
+  protected TupleState(LoaderInternals events,
+      ResultVectorCache vectorCache,
+      ProjectionFilter projectionSet) {
+    super(events, vectorCache, projectionSet);
+  }
+
+  protected void bindOutputSchema(TupleMetadata outputSchema) {
+    this.outputSchema = outputSchema;
+  }
+
+  /**
+   * Returns an ordered set of the columns which make up the tuple.
+   * Column order is the same as that defined by the map's schema,
+   * to allow indexed access. New columns always appear at the end
+   * of the list to preserve indexes.
+   *
+   * @return ordered list of column states for the columns within
+   * this tuple
+   */
+  public List<ColumnState> columns() { return columns; }
+
+  public TupleMetadata schema() { return writer().tupleSchema(); }
+
+  public abstract AbstractTupleWriter writer();
+
+  @Override
+  public boolean isProjected(String colName) {
+    return projectionSet.isProjected(colName);
+  }
+
+  @Override
+  public ObjectWriter addColumn(TupleWriter tupleWriter, MaterializedField column) {
+    return addColumn(tupleWriter, MetadataUtils.fromField(column));
+  }
+
+  @Override
+  public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) {
+    return BuildFromSchema.instance().buildColumn(this, columnSchema);
+  }
+
+  @Override
+  protected void addColumn(ColumnState colState) {
+    columns.add(colState);
+  }
+
+  public boolean hasProjections() {
+    for (final ColumnState colState : columns) {
+      if (colState.isProjected()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  protected Collection<ColumnState> columnStates() {
+    return columns;
+  }
+
+  protected void updateOutput(int curSchemaVersion) {
+
+    // Scan all columns
+    for (int i = 0; i < columns.size(); i++) {
+      final ColumnState colState = columns.get(i);
+
+      // Ignore unprojected columns
+      if (! colState.writer().isProjected()) {
+        continue;
+      }
+
+      // If this is a new column added since the last output, then we may have
+      // to add the column to this output. For the row itself, and for maps
+      // outside of unions, If the column was added after the output schema
+      // version cutoff, skip that column for now. But, if this tuple is
+      // within a union, then we always add all columns because union
+      // semantics are too muddy to play the deferred column game. Further,
+      // all columns in a map within a union must be nullable, so we know we
+      // can fill the column with nulls. (Something that is not true for
+      // normal maps.)
+
+      if (i > prevHarvestIndex && (! isVersioned() || colState.addVersion <= curSchemaVersion)) {
+        colState.buildOutput(this);
+        prevHarvestIndex = i;
+      }
+
+      // If the column is a map or a dict, then we have to recurse into it
+      // itself. If the map is inside a union, then the map's vectors
+      // already appear in the map vector, but we still must update the
+      // output schema.
+
+      if (colState.schema().isMap()) {
+        final MapState childMap = ((MapColumnState) colState).mapState();
+        childMap.updateOutput(curSchemaVersion);
+      } else if (colState.schema().isDict()) {
+        final DictState child = ((DictColumnState) colState).dictState();
+        child.updateOutput(curSchemaVersion);
+      }
+    }
+  }
+
+  public abstract int addOutputColumn(ValueVector vector, ColumnMetadata colSchema);
+
+  public TupleMetadata outputSchema() { return outputSchema; }
+
+  public void dump(HierarchicalFormatter format) {
+    format
+      .startObject(this)
+      .attributeArray("columns");
+    for (int i = 0; i < columns.size(); i++) {
+      format.element(i);
+      columns.get(i).dump(format);
+    }
+    format
+      .endArray()
+      .endObject();
+  }
+
+  /**
    * Represents a map column (either single or repeated). Includes maps that
    * are top-level, nested within other maps, or nested inside a union.
    * Schema management is a bit complex:
@@ -413,149 +560,6 @@
       return (AbstractTupleWriter) parentColumn.writer().array().tuple();
     }
   }
-
-  /**
-   * The set of columns added via the writers: includes both projected
-   * and unprojected columns. (The writer is free to add columns that the
-   * query does not project; the result set loader creates a dummy column
-   * and dummy writer, then does not project the column to the output.)
-   */
-  protected final List<ColumnState> columns = new ArrayList<>();
-
-  /**
-   * Internal writer schema that matches the column list.
-   */
-  protected final TupleMetadata schema = new TupleSchema();
-
-  /**
-   * Metadata description of the output container (for the row) or map
-   * (for map or repeated map.)
-   * <p>
-   * Rows and maps have an output schema which may differ from the internal schema.
-   * The output schema excludes unprojected columns. It also excludes
-   * columns added in an overflow row.
-   * <p>
-   * The output schema is built slightly differently for maps inside a
-   * union vs. normal top-level (or nested) maps. Maps inside a union do
-   * not defer columns because of the muddy semantics (and infrequent use)
-   * of unions.
-   */
-  protected TupleMetadata outputSchema;
-
-  private int prevHarvestIndex = -1;
-
-  protected TupleState(LoaderInternals events,
-      ResultVectorCache vectorCache,
-      ProjectionFilter projectionSet) {
-    super(events, vectorCache, projectionSet);
-  }
-
-  protected void bindOutputSchema(TupleMetadata outputSchema) {
-    this.outputSchema = outputSchema;
-  }
-
-  /**
-   * Returns an ordered set of the columns which make up the tuple.
-   * Column order is the same as that defined by the map's schema,
-   * to allow indexed access. New columns always appear at the end
-   * of the list to preserve indexes.
-   *
-   * @return ordered list of column states for the columns within
-   * this tuple
-   */
-  public List<ColumnState> columns() { return columns; }
-
-  public TupleMetadata schema() { return writer().tupleSchema(); }
-
-  public abstract AbstractTupleWriter writer();
-
-  @Override
-  public ObjectWriter addColumn(TupleWriter tupleWriter, MaterializedField column) {
-    return addColumn(tupleWriter, MetadataUtils.fromField(column));
-  }
-
-  @Override
-  public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) {
-    return BuildFromSchema.instance().buildColumn(this, columnSchema);
-  }
-
-  @Override
-  protected void addColumn(ColumnState colState) {
-    columns.add(colState);
-  }
-
-  public boolean hasProjections() {
-    for (final ColumnState colState : columns) {
-      if (colState.isProjected()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  protected Collection<ColumnState> columnStates() {
-    return columns;
-  }
-
-  protected void updateOutput(int curSchemaVersion) {
-
-    // Scan all columns
-    for (int i = 0; i < columns.size(); i++) {
-      final ColumnState colState = columns.get(i);
-
-      // Ignore unprojected columns
-      if (! colState.writer().isProjected()) {
-        continue;
-      }
-
-      // If this is a new column added since the lastoutput, then we may have
-      // to add the column to this output. For the row itself, and for maps
-      // outside of unions, If the column wasadded after the output schema
-      // version cutoff, skip that column for now. But, if this tuple is
-      // within a union, then we always add all columns because union
-      // semantics are too muddy to play the deferred column game. Further,
-      // all columns in a map within a union must be nullable, so we know we
-      // can fill the column with nulls. (Something that is not true for
-      // normal maps.)
-
-      if (i > prevHarvestIndex && (! isVersioned() || colState.addVersion <= curSchemaVersion)) {
-        colState.buildOutput(this);
-        prevHarvestIndex = i;
-      }
-
-      // If the column is a map or a dict, then we have to recurse into it
-      // itself. If the map is inside a union, then the map's vectors
-      // already appear in the map vector, but we still must update the
-      // output schema.
-
-      if (colState.schema().isMap()) {
-        final MapState childMap = ((MapColumnState) colState).mapState();
-        childMap.updateOutput(curSchemaVersion);
-      } else if (colState.schema().isDict()) {
-        final DictState child = ((DictColumnState) colState).dictState();
-        child.updateOutput(curSchemaVersion);
-      }
-    }
-  }
-
-  public abstract int addOutputColumn(ValueVector vector, ColumnMetadata colSchema);
-
-  public TupleMetadata outputSchema() { return outputSchema; }
-
-  public void dump(HierarchicalFormatter format) {
-    format
-      .startObject(this)
-      .attributeArray("columns");
-    for (int i = 0; i < columns.size(); i++) {
-      format.element(i);
-      columns.get(i).dump(format);
-    }
-    format
-      .endArray()
-      .endObject();
-  }
-
   public static class DictColumnState extends BaseContainerColumnState {
     protected final DictState dictState;
     protected boolean isVersioned;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/AbstractArrayListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/AbstractArrayListener.java
new file mode 100644
index 0000000..69d7bee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/AbstractArrayListener.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.easy.json.loader.StructuredValueListener.ObjectValueListener;
+import org.apache.drill.exec.store.easy.json.parser.ArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueListener;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+
+/**
+ * Base class for scalar and object arrays. Represents the array
+ * behavior of a field.
+ */
+public abstract class AbstractArrayListener implements ArrayListener {
+
+  protected final JsonLoaderImpl loader;
+  protected final ColumnMetadata colSchema;
+  protected final ValueListener elementListener;
+
+  public AbstractArrayListener(JsonLoaderImpl loader, ColumnMetadata colSchema, ValueListener elementListener) {
+    this.loader = loader;
+    this.colSchema = colSchema;
+    this.elementListener = elementListener;
+  }
+
+  public ValueListener elementListener() { return elementListener; }
+
+  @Override
+  public void onStart() { }
+
+  @Override
+  public void onElementStart() { }
+
+  @Override
+  public void onElementEnd() { }
+
+  @Override
+  public void onEnd() { }
+
+  @Override
+  public ValueListener element(ValueDef valueDef) {
+    throw loader.typeConversionError(colSchema, valueDef);
+  }
+
+  protected UserException typeConversionError(String jsonType) {
+    return loader.typeConversionError(colSchema, jsonType);
+  }
+
+  public static class ScalarArrayListener extends AbstractArrayListener {
+
+    public ScalarArrayListener(JsonLoaderImpl loader, ColumnMetadata colSchema, ScalarListener valueListener) {
+      super(loader, colSchema, valueListener);
+    }
+
+    @Override
+    public ValueListener element(ValueDef valueDef) {
+      return elementListener;
+    }
+  }
+
+  public static class ObjectArrayListener extends AbstractArrayListener {
+    private final ArrayWriter arrayWriter;
+
+    public ObjectArrayListener(JsonLoaderImpl loader, ArrayWriter arrayWriter, ObjectValueListener valueListener) {
+      super(loader, arrayWriter.schema(), valueListener);
+      this.arrayWriter = arrayWriter;
+    }
+
+    @Override
+    public ValueListener element(ValueDef valueDef) {
+      return elementListener;
+    }
+
+    @Override
+    public void onElementEnd() {
+      arrayWriter.save();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/AbstractValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/AbstractValueListener.java
new file mode 100644
index 0000000..b881ae2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/AbstractValueListener.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.easy.json.parser.ArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ObjectListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueListener;
+
+/**
+ * Abstract base class for value (field or array element) listeners.
+ */
+public abstract class AbstractValueListener implements ValueListener {
+
+  protected final JsonLoaderImpl loader;
+
+  public AbstractValueListener(JsonLoaderImpl loader) {
+    this.loader = loader;
+  }
+
+  @Override
+  public void bind(ValueHost host) { }
+
+  @Override
+  public void onBoolean(boolean value) {
+    throw typeConversionError("Boolean");
+  }
+
+  @Override
+  public void onInt(long value) {
+    throw typeConversionError("integer");
+  }
+
+  @Override
+  public void onFloat(double value) {
+    throw typeConversionError("float");
+  }
+
+  @Override
+  public void onString(String value) {
+    throw typeConversionError("string");
+  }
+
+  @Override
+  public void onEmbeddedObject(String value) {
+    throw typeConversionError("object");
+  }
+
+  @Override
+  public ObjectListener object() {
+    throw typeConversionError("object");
+  }
+
+  @Override
+  public ArrayListener array(ValueDef valueDef) {
+    throw loader.typeConversionError(schema(), valueDef);
+  }
+
+  protected UserException typeConversionError(String jsonType) {
+    return loader.typeConversionError(schema(), jsonType);
+  }
+
+  protected abstract ColumnMetadata schema();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BigIntListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BigIntListener.java
new file mode 100644
index 0000000..a5cc33e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BigIntListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Listener for JSON integer values. Allows conversion from
+ * Boolean, double and string types. (The conversion from double
+ * is lossy, but perhaps better than failing the query.)
+ * Conversion from Boolean is the usual semantics:
+ * true = 1, false = 0.  Conversion from string uses the Java
+ * integer parsing semantics.
+ */
+public class BigIntListener extends ScalarListener {
+
+  public BigIntListener(JsonLoaderImpl loader, ScalarWriter writer) {
+    super(loader, writer);
+  }
+
+  @Override
+  public void onBoolean(boolean value) {
+    writer.setLong(value ? 1 : 0);
+  }
+
+  @Override
+  public void onInt(long value) {
+    writer.setLong(value);
+  }
+
+  @Override
+  public void onFloat(double value) {
+    writer.setLong(Math.round(value));
+  }
+
+  @Override
+  public void onString(String value) {
+    value = value.trim();
+    if (value.isEmpty()) {
+      setNull();
+    } else {
+      try {
+        writer.setLong(Long.parseLong(value));
+      } catch (NumberFormatException e) {
+        throw loader.dataConversionError(schema(), "string", value);
+      }
+    }
+  }
+
+  @Override
+  protected void setArrayNull() {
+    writer.setLong(0);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BooleanListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BooleanListener.java
new file mode 100644
index 0000000..5f7549a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/BooleanListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Listener for JSON Boolean fields. Allows conversion from numeric
+ * fields (with the usual semantics: 0 = false, ~0 = true) and from
+ * strings (using Java Boolean parsing semantics.)
+ */
+public class BooleanListener extends ScalarListener {
+
+  public BooleanListener(JsonLoaderImpl loader, ScalarWriter writer) {
+    super(loader, writer);
+  }
+
+  @Override
+  public void onBoolean(boolean value) {
+    writer.setBoolean(value);
+  }
+
+  @Override
+  public void onInt(long value) {
+    writer.setBoolean(value != 0);
+  }
+
+  @Override
+  public void onFloat(double value) {
+    writer.setBoolean(value != 0);
+  }
+
+  @Override
+  public void onString(String value) {
+    value = value.trim();
+    if (value.isEmpty()) {
+      setNull();
+    } else {
+      writer.setBoolean(Boolean.parseBoolean(value.trim()));
+    }
+  }
+
+  @Override
+  protected void setArrayNull() {
+    writer.setBoolean(false);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/DoubleListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/DoubleListener.java
new file mode 100644
index 0000000..7daa42b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/DoubleListener.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Listener for the JSON double type. Allows conversion from other
+ * types. Conversion from Boolean is the usual semantics:
+ * true = 1.0, false = 0.0. Strings are parsed using Java semantics.
+ */
+public class DoubleListener extends ScalarListener {
+
+  public DoubleListener(JsonLoaderImpl loader, ScalarWriter writer) {
+    super(loader, writer);
+  }
+
+  @Override
+  public void onBoolean(boolean value) {
+    writer.setDouble(value ? 1 : 0);
+  }
+
+  @Override
+  public void onInt(long value) {
+    writer.setDouble(value);
+  }
+
+  @Override
+  public void onFloat(double value) {
+    writer.setDouble(value);
+  }
+
+  @Override
+  public void onString(String value) {
+    value = value.trim();
+    if (value.isEmpty()) {
+      setNull();
+    } else {
+      try {
+        writer.setDouble(Double.parseDouble(value));
+      } catch (NumberFormatException e) {
+        throw loader.dataConversionError(schema(), "string", value);
+      }
+    }
+  }
+
+  @Override
+  protected void setArrayNull() {
+    writer.setDouble(0);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
new file mode 100644
index 0000000..9d9afed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+
+/**
+ * Enhanced second-generation JSON loader which takes an input
+ * source and creates a series of record batches using the
+ * {@link org.apache.drill.exec.physical.resultSet.ResultSetLoader
+ * ResultSetLoader} abstraction.
+ */
+public interface JsonLoader {
+
+  /**
+   * Column property specific to the JSON loader.
+   * Mode for reading Varchar columns from JSON. One of:
+   * <li>
+   * <li>{@link #JSON_TYPED_MODE}: Read using normal typing rules
+   * (default).</li>
+   * <li>{@link #JSON_TEXT_MODE}: Like the JSON format plugin's
+   * "all-text mode", but for a single column. That JSON field is
+   * read as text regardless of the actual value. Applies only to
+   * scalars.</li>
+   * <li>{@link #JSON_LITERAL_MODE}: Causes the field, and all its
+   * children, to be read as literal JSON: the values are returned
+   * as a valid JSON string.</li>
+   * </li>
+   */
+  String JSON_MODE = ColumnMetadata.DRILL_PROP_PREFIX + "json-mode";
+  String JSON_TEXT_MODE = "text";
+  String JSON_TYPED_MODE = "typed";
+  String JSON_LITERAL_MODE = "json";
+
+  /**
+   * Read one record of data.
+   *
+   * @return {@code true} if a record was loaded, {@code false} if EOF.
+   * @throws {org.apache.drill.common.exceptions.UserException
+   * for most errors
+   * @throws RuntimeException for unexpected errors, most often due
+   * to code errors
+   */
+  boolean next();
+
+  /**
+   * Indicates that a batch is complete. Tells the loader to materialize
+   * any deferred null fields. (See {@link TupleListener} for details.)
+   */
+  void endBatch();
+
+  /**
+   * Releases resources held by this class including the input stream.
+   * Does not close the result set loader passed into this instance.
+   */
+  void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
new file mode 100644
index 0000000..9fd64bd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.json.parser.ErrorFactory;
+import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esri.core.geometry.JsonReader;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonToken;
+
+/**
+ * Revised JSON loader that is based on the
+ * {@link ResultSetLoader} abstraction. Uses the listener-based
+ * {@link JsonStructureParser} to walk the JSON tree in a "streaming"
+ * fashion, calling events which this class turns into vector write
+ * operations. Listeners handle options such as all text mode
+ * vs. type-specific parsing. Think of this implementation as a
+ * listener-based recursive-descent parser.
+ * <p>
+ * The JSON loader mechanism runs two state machines intertwined:
+ * <ol>
+ * <li>The actual parser (to parse each JSON object, array or scalar according
+ * to its inferred type represented by the {@code JsonStructureParser}.</li>
+ * <li>The type discovery machine, which is made complex because JSON may include
+ * long runs of nulls, represented by this class.</li>
+ * </ol>
+ *
+ * <h4>Schema Discovery</h4>
+ *
+ * Fields are discovered on the fly. Types are inferred from the first JSON token
+ * for a field. Type inference is less than perfect: it cannot handle type changes
+ * such as first seeing 10, then 12.5, or first seeing "100", then 200.
+ * <p>
+ * When a field first contains null or an empty list, "null deferral" logic
+ * adds a special state that "waits" for an actual data type to present itself.
+ * This allows the parser to handle a series of nulls, empty arrays, or arrays
+ * of nulls (when using lists) at the start of the file. If no type ever appears,
+ * the loader forces the field to "text mode", hoping that the field is scalar.
+ * <p>
+ * To slightly help the null case, if the projection list shows that a column
+ * must be an array or a map, then that information is used to guess the type
+ * of a null column.
+ * <p>
+ * The code includes a prototype mechanism to provide type hints for columns.
+ * At present, it is just used to handle nulls that are never "resolved" by the
+ * end of a batch. Would be much better to use the hints (or a full schema) to
+ * avoid the huge mass of code needed to handle nulls.
+ *
+ * <h4>Provided Schema</h4>
+ *
+ * The JSON loader accepts a provided schema which removes type ambiguities.
+ * If we have the examples above (runs of nulls, or shifting types), then the
+ * provided schema says the vector type to create; the individual column listeners
+ * attempt to convert the JSON token type to the target vector type. The result
+ * is that, if the schema provides the correct type, the loader can ride over
+ * ambiguities in the input.
+ *
+ * <h4>Comparison to Original JSON Reader</h4>
+ *
+ * This class replaces the {@link JsonReader} class used in Drill versions 1.17
+ * and before. Compared with the previous version, this implementation:
+ * <ul>
+ * <li>Materializes parse states as classes rather than as methods and
+ * boolean flags as in the prior version.</li>
+ * <li>Reports errors as {@link UserException} objects, complete with context
+ * information, rather than as generic Java exception as in the prior version.</li>
+ * <li>Moves parse options into a separate {@link JsonOptions} class.</li>
+ * <li>Iteration protocol is simpler: simply call {@link #next()} until it returns
+ * {@code false}. Errors are reported out-of-band via an exception.</li>
+ * <li>The result set loader abstraction is perfectly happy with an empty schema.
+ * For this reason, this version (unlike the original) does not make up a dummy
+ * column if the schema would otherwise be empty.</li>
+ * <li>Projection pushdown is handled by the {@link ResultSetLoader} rather than
+ * the JSON loader. This class always creates a vector writer, but the result set
+ * loader will return a dummy (no-op) writer for non-projected columns.</li>
+ * <li>Like the original version, this version "free wheels" over unprojected objects
+ * and arrays; watching only for matching brackets, but ignoring all else.</li>
+ * <li>Writes boolean values as SmallInt values, rather than as bits in the
+ * prior version.</li>
+ * <li>This version also "free-wheels" over all unprojected values. If the user
+ * finds that they have inconsistent data in some field f, then the user can
+ * project fields except f; Drill will ignore the inconsistent values in f.</li>
+ * <li>Because of this free-wheeling capability, this version does not need a
+ * "counting" reader; this same reader handles the case in which no fields are
+ * projected for {@code SELECT COUNT(*)} queries.</li>
+ * <li>Runs of null values result in a "deferred null state" that patiently
+ * waits for an actual value token to appear, and only then "realizes" a parse
+ * state for that type.</li>
+ * <li>Provides the same limited error recovery as the original version. See
+ * <a href="https://issues.apache.org/jira/browse/DRILL-4653">DRILL-4653</a>
+ * and
+ * <a href="https://issues.apache.org/jira/browse/DRILL-5953">DRILL-5953</a>.
+ * </li>
+ * </ul>
+ */
+public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
+  protected static final Logger logger = LoggerFactory.getLogger(JsonLoaderImpl.class);
+
+  interface NullTypeMarker {
+    void forceResolution();
+  }
+
+  private final ResultSetLoader rsLoader;
+  private final JsonLoaderOptions options;
+  private final CustomErrorContext errorContext;
+  private final TupleListener rowListener;
+  private final JsonStructureParser parser;
+  private boolean eof;
+
+  /**
+   * List of "unknown" columns (have only seen nulls or empty values)
+   * that are waiting for resolution, or forced resolution at the end
+   * of a batch. Unknown columns occur only when using dynamic type
+   * inference, and not JSON tokens have been seen which would hint
+   * at a type. Not needed when a schema is provided.
+   */
+
+  // Using a simple list. Won't perform well if we have hundreds of
+  // null fields; but then we've never seen such a pathologically bad
+  // case... Usually just one or two fields have deferred nulls.
+  private final List<NullTypeMarker> nullStates = new ArrayList<>();
+
+  public JsonLoaderImpl(ResultSetLoader rsLoader, TupleMetadata providedSchema,
+      JsonLoaderOptions options, CustomErrorContext errorContext,
+      InputStream stream) {
+    this.rsLoader = rsLoader;
+    this.options = options;
+    this.errorContext = errorContext;
+    this.rowListener = new TupleListener(this, rsLoader.writer(), providedSchema);
+    this.parser = new JsonStructureParser(stream, options, rowListener, this);
+  }
+
+  public JsonLoaderOptions options() { return options; }
+
+  @Override // JsonLoader
+  public boolean next() {
+    if (eof) {
+      return false;
+    }
+    rsLoader.startBatch();
+    RowSetLoader rowWriter = rsLoader.writer();
+    while (rowWriter.start()) {
+      if (parser.next()) {
+        rowWriter.save();
+      } else {
+        eof = true;
+        break;
+      }
+    }
+    return rsLoader.hasRows();
+  }
+
+  public void addNullMarker(JsonLoaderImpl.NullTypeMarker marker) {
+    nullStates.add(marker);
+  }
+
+  public void removeNullMarker(JsonLoaderImpl.NullTypeMarker marker) {
+    nullStates.remove(marker);
+  }
+
+  /**
+   * Finish reading a batch of data. We may have pending "null" columns:
+   * a column for which we've seen only nulls, or an array that has
+   * always been empty. The batch needs to finish, and needs a type,
+   * but we still don't know the type. Since we must decide on one,
+   * we do the following guess Varchar, and switch to text mode.
+   * <p>
+   * This choices is not perfect. Switching to text mode means
+   * results will vary
+   * from run to run depending on the order that we see empty and
+   * non-empty values for this column. Plus, since the system is
+   * distributed, the decision made here may conflict with that made in
+   * some other fragment.
+   * <p>
+   * The only real solution is for the user to provide a schema.
+   * <p>
+   * Bottom line: the user is responsible for not giving Drill
+   * ambiguous data that would require Drill to predict the future.
+   */
+  @Override // JsonLoader
+  public void endBatch() {
+
+    // Make a copy. Forcing resolution will remove the
+    // element from the original list.
+    List<NullTypeMarker> copy = new ArrayList<>();
+    copy.addAll(nullStates);
+    for (NullTypeMarker marker : copy) {
+      marker.forceResolution();
+    }
+    assert nullStates.isEmpty();
+  }
+
+  @Override // JsonLoader
+  public void close() {
+    parser.close();
+  }
+
+  @Override // ErrorFactory
+  public RuntimeException parseError(String msg, JsonParseException e) {
+    throw buildError(
+        UserException.dataReadError(e)
+          .addContext(msg));
+  }
+
+  @Override // ErrorFactory
+  public RuntimeException ioException(IOException e) {
+    throw buildError(
+        UserException.dataReadError(e)
+          .addContext(errorContext));
+  }
+
+  @Override // ErrorFactory
+  public RuntimeException structureError(String msg) {
+    throw buildError(
+        UserException.dataReadError()
+          .message(msg));
+  }
+
+  @Override // ErrorFactory
+  public RuntimeException syntaxError(JsonParseException e) {
+    throw buildError(
+        UserException.dataReadError(e)
+          .addContext("Syntax error"));
+  }
+
+  @Override // ErrorFactory
+  public RuntimeException typeError(UnsupportedConversionError e) {
+    throw buildError(
+        UserException.validationError(e)
+          .addContext("Unsupported type conversion"));
+  }
+
+  @Override // ErrorFactory
+  public RuntimeException syntaxError(JsonToken token) {
+    throw buildError(
+        UserException.dataReadError()
+          .addContext("Syntax error on token", token.toString()));
+  }
+
+  @Override // ErrorFactory
+  public RuntimeException unrecoverableError() {
+    throw buildError(
+        UserException.dataReadError()
+          .addContext("Unrecoverable syntax error on token")
+          .addContext("Recovery attempts", parser.recoverableErrorCount()));
+  }
+
+  protected UserException typeConversionError(ColumnMetadata schema, ValueDef valueDef) {
+    StringBuilder buf = new StringBuilder()
+        .append(valueDef.type().name().toLowerCase());
+    if (valueDef.isArray()) {
+      for (int i = 0; i < valueDef.dimensions(); i++) {
+        buf.append("[]");
+      }
+    }
+    return typeConversionError(schema, buf.toString());
+  }
+
+  protected UserException typeConversionError(ColumnMetadata schema, String tokenType) {
+    return buildError(schema,
+        UserException.dataReadError()
+          .message("Type of JSON token is not compatible with its column")
+          .addContext("JSON token type", tokenType));
+  }
+
+  protected UserException dataConversionError(ColumnMetadata schema, String tokenType, String value) {
+    return buildError(schema,
+        UserException.dataReadError()
+          .message("Type of JSON token is not compatible with its column")
+          .addContext("JSON token type", tokenType)
+          .addContext("JSON token", value));
+  }
+
+  public UserException unsupportedType(ColumnMetadata schema) {
+    return buildError(schema,
+        UserException.validationError()
+          .message("JSON reader does not support the provided column type"));
+  }
+
+  public UserException unsupportedJsonTypeException(String key, JsonType jsonType) {
+    return buildError(
+        UserException.dataReadError()
+          .message("JSON reader does not support the JSON data type")
+          .addContext("Field", key)
+          .addContext("JSON type", jsonType.toString()));
+  }
+
+  public UserException unsupportedArrayException(String key, int dims) {
+    return buildError(
+        UserException.validationError()
+          .message("JSON reader does not arrays deeper than two levels")
+          .addContext("Field", key)
+          .addContext("Array nesting", dims));
+  }
+
+  protected UserException buildError(ColumnMetadata schema, UserException.Builder builder) {
+    return buildError(builder
+        .addContext("Column", schema.name())
+        .addContext("Column type", schema.typeString()));
+  }
+
+  protected UserException buildError(UserException.Builder builder) {
+    builder
+      .addContext(errorContext)
+      .addContext("Line", parser.lineNumber())
+      .addContext("Position", parser.columnNumber());
+    String token = parser.token();
+    if (token != null) {
+      builder.addContext("Near token", token);
+    }
+    return builder.build(logger);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
new file mode 100644
index 0000000..59a9faa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.store.easy.json.parser.JsonStructureOptions;
+
+/**
+ * Extends the {@link JsonStructureOptions} class, which provides
+ * JSON syntactic options, with a number of semantic options enforced
+ * at the JSON loader level.
+ */
+public class JsonLoaderOptions extends JsonStructureOptions {
+
+  public boolean readNumbersAsDouble;
+
+  /**
+   * Drill prior to version 1.18 would read a null string
+   * array element as the string "null". Drill 1.18 and later
+   * reads the same token as a blank string. This flag forces
+   * the pre-1.18 behavior.
+   * <p>
+   * For <code>{a: [null]}</code>
+   * <ul>
+   * <li>If true: --> "null"</li>
+   * <li>if false: --> ""</li>
+   * </ul>
+   */
+  public boolean classicArrayNulls;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ListListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ListListener.java
new file mode 100644
index 0000000..05c65a4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ListListener.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.easy.json.parser.ArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueListener;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+
+/**
+ * Listener for the List vector writer. A List in Drill is essentially
+ * a repeated Union.
+ */
+public class ListListener extends AbstractValueListener {
+
+  private final ObjectWriter listWriter;
+  private final ListArrayListener arrayListener;
+
+  public ListListener(JsonLoaderImpl loader, ObjectWriter listWriter) {
+    super(loader);
+    this.listWriter = listWriter;
+    arrayListener = new ListArrayListener(loader, listWriter.array());
+  }
+
+  @Override
+  public void onNull() { }
+
+  @Override
+  protected ColumnMetadata schema() {
+    return listWriter.schema();
+  }
+
+  @Override
+  public ArrayListener array(ValueDef valueDef) {
+    return arrayListener;
+  }
+
+  private static class ListArrayListener extends AbstractArrayListener {
+
+    private final ArrayWriter listWriter;
+
+    public ListArrayListener(JsonLoaderImpl loader, ArrayWriter listWriter) {
+      super(loader, listWriter.schema(),
+          new VariantListener(loader, listWriter.variant()));
+      this.listWriter = listWriter;
+    }
+
+    @Override
+    public void onElementStart() {
+      // For list, must say that the entry is non-null to
+      // record an empty list. {a: null} vs. {a: []}.
+      listWriter.setNull(false);
+    }
+
+    @Override
+    public ValueListener element(ValueDef valueDef) {
+      return elementListener;
+    }
+
+    @Override
+    public void onElementEnd() {
+      listWriter.save();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/RepeatedListValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/RepeatedListValueListener.java
new file mode 100644
index 0000000..22bc68d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/RepeatedListValueListener.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.json.loader.AbstractArrayListener.ObjectArrayListener;
+import org.apache.drill.exec.store.easy.json.loader.AbstractArrayListener.ScalarArrayListener;
+import org.apache.drill.exec.store.easy.json.loader.StructuredValueListener.ObjectValueListener;
+import org.apache.drill.exec.store.easy.json.parser.ArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueListener;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+
+/**
+ * Represents a JSON value that holds a RepeatedList (2D array) value.
+ * The structure is:
+ * <ul>
+ * <li>Value - {@code RepeatedListValueListener}</li>
+ * <li>Array - {@code RepeatedArrayListener}</li>
+ * <li>Value - {@code RepeatedListElementListener} or
+ * {@code ListListener}</li>
+ * <li>Array - Depends on type</li>
+ * <li>Value - Depends on type</li>
+ * <li>Object - If a repeated list of maps</li>
+ * </ul>
+ */
+public class RepeatedListValueListener extends AbstractValueListener {
+
+  private final ObjectWriter repeatedListWriter;
+  private final RepeatedArrayListener outerArrayListener;
+
+  private RepeatedListValueListener(JsonLoaderImpl loader, ObjectWriter writer,
+      ValueListener elementListener) {
+    super(loader);
+    this.repeatedListWriter = writer;
+    this.outerArrayListener = new RepeatedArrayListener(loader, writer.schema(),
+        writer.array(), elementListener);
+  }
+
+  /**
+   * Create a repeated list listener for a scalar value.
+   */
+  public static ValueListener repeatedListFor(JsonLoaderImpl loader, ObjectWriter writer) {
+    ColumnMetadata elementSchema = writer.schema().childSchema();
+     return wrapInnerArray(loader, writer,
+        new ScalarArrayListener(loader, elementSchema,
+            ScalarListener.listenerFor(loader, writer.array().entry())));
+  }
+
+  /**
+   * Create a repeated list listener for a Map.
+   */
+  public static ValueListener repeatedObjectListFor(JsonLoaderImpl loader,
+      ObjectWriter writer, TupleMetadata providedSchema) {
+    ArrayWriter outerArrayWriter = writer.array();
+    ArrayWriter innerArrayWriter = outerArrayWriter.array();
+    return wrapInnerArray(loader, writer,
+        new ObjectArrayListener(loader, innerArrayWriter,
+            new ObjectValueListener(loader, outerArrayWriter.entry().schema(),
+                new TupleListener(loader, innerArrayWriter.tuple(), providedSchema))));
+  }
+
+  /**
+   * Given the inner array, wrap it to produce the repeated list.
+   */
+  private static ValueListener wrapInnerArray(JsonLoaderImpl loader, ObjectWriter writer,
+      ArrayListener innerArrayListener) {
+    return new RepeatedListValueListener(loader, writer,
+        new RepeatedListElementListener(loader,
+            writer.schema(), writer.array().array(),
+            innerArrayListener));
+  }
+
+  /**
+   * Create a repeated list listener for a variant. Here, the inner
+   * array is provided by a List (which is a repeated Union.)
+   */
+  public static ValueListener repeatedVariantListFor(JsonLoaderImpl loader,
+      ObjectWriter writer) {
+    return new RepeatedListValueListener(loader, writer,
+        new ListListener(loader, writer.array().entry()));
+  }
+
+  @Override
+  public ArrayListener array(ValueDef valueDef) {
+    return outerArrayListener;
+  }
+
+  @Override
+  public void onNull() { }
+
+  @Override
+  protected ColumnMetadata schema() {
+    return repeatedListWriter.schema();
+  }
+
+  /**
+   * Represents the outer array for a repeated (2D) list
+   */
+  private static class RepeatedArrayListener extends AbstractArrayListener {
+
+    private final ArrayWriter outerArrayWriter;
+
+    public RepeatedArrayListener(JsonLoaderImpl loader,
+        ColumnMetadata colMetadata, ArrayWriter outerArrayWriter,
+        ValueListener outerValue) {
+      super(loader, colMetadata, outerValue);
+      this.outerArrayWriter = outerArrayWriter;
+    }
+
+    @Override
+    public ValueListener element(ValueDef valueDef) {
+      return elementListener;
+    }
+
+    @Override
+    public void onElementEnd() {
+      outerArrayWriter.save();
+    }
+  }
+
+  /**
+   * Represents each item in the outer array of a RepeatedList. Such elements should
+   * only be arrays. However, Drill is forgiving if the value happens to be null, which
+   * is defined to be the same as an empty inner array.
+   */
+  private static class RepeatedListElementListener extends AbstractValueListener {
+
+    private final ColumnMetadata colMetadata;
+    private final ArrayListener innerArrayListener;
+    private final ArrayWriter innerArrayWriter;
+
+    public RepeatedListElementListener(JsonLoaderImpl loader, ColumnMetadata colMetadata,
+        ArrayWriter innerArrayWriter, ArrayListener innerArrayListener) {
+      super(loader);
+      this.colMetadata = colMetadata;
+      this.innerArrayListener = innerArrayListener;
+      this.innerArrayWriter = innerArrayWriter;
+    }
+
+    @Override
+    public ArrayListener array(ValueDef valueDef) {
+      return innerArrayListener;
+    }
+
+    @Override
+    public void onNull() {
+      innerArrayWriter.save();
+    }
+
+    @Override
+    protected ColumnMetadata schema() {
+      return colMetadata;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ScalarListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ScalarListener.java
new file mode 100644
index 0000000..9c7381a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ScalarListener.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.easy.json.parser.ArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
+
+/**
+ * Base class for scalar field listeners
+ */
+public abstract class ScalarListener extends AbstractValueListener {
+
+  protected final ScalarWriter writer;
+  protected final boolean isArray;
+
+  public ScalarListener(JsonLoaderImpl loader, ScalarWriter writer) {
+    super(loader);
+    this.writer = writer;
+    ColumnMetadata colSchema = writer.schema();
+    isArray = colSchema.isArray();
+  }
+
+  public static ScalarListener listenerFor(JsonLoaderImpl loader, ObjectWriter colWriter) {
+    ScalarWriter writer = colWriter.type() == ObjectType.ARRAY ?
+        colWriter.array().scalar() : colWriter.scalar();
+    switch (writer.schema().type()) {
+      case BIGINT:
+        return new BigIntListener(loader, writer);
+      case BIT:
+        return new BooleanListener(loader, writer);
+      case FLOAT8:
+        return new DoubleListener(loader, writer);
+      case VARCHAR:
+        return new VarCharListener(loader, writer);
+      case DATE:
+      case FLOAT4:
+      case INT:
+      case INTERVAL:
+      case INTERVALDAY:
+      case INTERVALYEAR:
+      case SMALLINT:
+      case TIME:
+      case TIMESTAMP:
+      case VARBINARY:
+      case VARDECIMAL:
+        // TODO: Implement conversions for above
+      default:
+        throw loader.buildError(
+            UserException.internalError(null)
+              .message("Unsupported JSON reader type: %s",
+                  writer.schema().type().name()));
+    }
+  }
+
+  @Override
+  public ColumnMetadata schema() { return writer.schema(); }
+
+  @Override
+  public void onNull() {
+    setNull();
+  }
+
+  protected void setNull() {
+    try {
+      if (isArray) {
+        setArrayNull();
+      } else {
+        writer.setNull();
+      }
+    } catch (UnsupportedConversionError e) {
+      throw loader.buildError(schema(),
+          UserException.dataReadError()
+            .message("Null value encountered in JSON input where Drill does not allow nulls."));
+    }
+  }
+
+  protected abstract void setArrayNull();
+
+  @Override
+  public ArrayListener array(ValueDef valueDef) {
+    if (isArray) {
+      valueDef = new ValueDef(valueDef.type(), valueDef.dimensions() + 1);
+    }
+    throw loader.typeConversionError(schema(), valueDef);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/StructuredValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/StructuredValueListener.java
new file mode 100644
index 0000000..a0835a8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/StructuredValueListener.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.easy.json.loader.AbstractArrayListener.ObjectArrayListener;
+import org.apache.drill.exec.store.easy.json.loader.AbstractArrayListener.ScalarArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ObjectListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueListener;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+/**
+ * Base class for structured value listeners: arrays and objects.
+ * Contains the concrete implementations as nested static classes.
+ */
+public abstract class StructuredValueListener extends AbstractValueListener {
+
+  private final ColumnMetadata colSchema;
+
+  public StructuredValueListener(JsonLoaderImpl loader, ColumnMetadata colSchema) {
+    super(loader);
+    this.colSchema = colSchema;
+  }
+
+  @Override
+  public ColumnMetadata schema() { return colSchema; }
+
+  // Ignore array nulls: {a: null} is the same as omitting
+  // array column a: an array of zero elements
+  @Override
+  public void onNull() { }
+
+  /**
+   * Abstract base class for array values which hold a nested array
+   * listener.
+   */
+  public static abstract class ArrayValueListener extends StructuredValueListener {
+
+    protected final AbstractArrayListener arrayListener;
+
+    public ArrayValueListener(JsonLoaderImpl loader, ColumnMetadata colSchema, AbstractArrayListener arrayListener) {
+      super(loader, colSchema);
+      this.arrayListener = arrayListener;
+    }
+
+    public AbstractArrayListener arrayListener() { return arrayListener; }
+
+    public ValueListener elementListener() { return arrayListener.elementListener(); }
+  }
+
+  /**
+   * Value listener for a scalar array (Drill repeated primitive).
+   * Maps null values for the entire array to an empty array.
+   * Maps a scalar to an array with a single value.
+   */
+  public static class ScalarArrayValueListener extends ArrayValueListener {
+
+    public ScalarArrayValueListener(JsonLoaderImpl loader, ColumnMetadata colSchema, ScalarArrayListener arrayListener) {
+      super(loader, colSchema, arrayListener);
+    }
+
+    @Override
+    public ArrayListener array(ValueDef valueDef) {
+      Preconditions.checkArgument(valueDef.dimensions() == 1);
+      return arrayListener;
+    }
+
+    @Override
+    public void onBoolean(boolean value) {
+      elementListener().onBoolean(value);
+    }
+
+    @Override
+    public void onInt(long value) {
+      elementListener().onInt(value);
+    }
+
+    @Override
+    public void onFloat(double value) {
+      elementListener().onFloat(value);
+    }
+
+    @Override
+    public void onString(String value) {
+      elementListener().onString(value);
+    }
+  }
+
+  /**
+   * Value listener for object (MAP) values.
+   */
+  public static class ObjectValueListener extends StructuredValueListener {
+
+    private final ObjectListener tupleListener;
+
+    public ObjectValueListener(JsonLoaderImpl loader, ColumnMetadata colSchema, ObjectListener tupleListener) {
+      super(loader, colSchema);
+      this.tupleListener = tupleListener;
+    }
+
+    @Override
+    public ObjectListener object() {
+      return tupleListener;
+    }
+  }
+
+  /**
+   * Value listener for object array (repeated MAP) values.
+   */
+  public static class ObjectArrayValueListener extends ArrayValueListener {
+
+    public ObjectArrayValueListener(JsonLoaderImpl loader,
+        ColumnMetadata colSchema, ObjectArrayListener arrayListener) {
+      super(loader, colSchema, arrayListener);
+     }
+
+    @Override
+    public ArrayListener array(ValueDef valueDef) {
+      Preconditions.checkArgument(valueDef.dimensions() == 1);
+      // Called with a provided schema where the initial array
+      // value is empty.
+      Preconditions.checkArgument(!valueDef.type().isScalar());
+      return arrayListener;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleListener.java
new file mode 100644
index 0000000..53ad5c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleListener.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.RepeatedListBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.json.loader.AbstractArrayListener.ObjectArrayListener;
+import org.apache.drill.exec.store.easy.json.loader.AbstractArrayListener.ScalarArrayListener;
+import org.apache.drill.exec.store.easy.json.loader.StructuredValueListener.ArrayValueListener;
+import org.apache.drill.exec.store.easy.json.loader.StructuredValueListener.ObjectArrayValueListener;
+import org.apache.drill.exec.store.easy.json.loader.StructuredValueListener.ObjectValueListener;
+import org.apache.drill.exec.store.easy.json.loader.StructuredValueListener.ScalarArrayValueListener;
+import org.apache.drill.exec.store.easy.json.parser.ObjectListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
+import org.apache.drill.exec.store.easy.json.parser.ValueListener;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Accepts { name : value ... }
+ * <p>
+ * The structure parser maintains a map of known fields. Each time a
+ * field is parsed, looks up the field in the map. If not found, the parser
+ * looks ahead to find a value token, if any, and calls this class to add
+ * a new column. This class creates a column writer based either on the
+ * type provided in a provided schema, or inferred from the JSON token.
+ * <p>
+ * As it turns out, most of the semantic action occurs at the tuple level:
+ * that is where fields are defined, types inferred, and projection is
+ * computed.
+ *
+ * <h4>Nulls</h4>
+ *
+ * Much code here deals with null types, especially leading nulls, leading
+ * empty arrays, and so on. The object parser creates a parser for each
+ * value; a parser which "does the right thing" based on the data type.
+ * For example, for a Boolean, the parser recognizes {@code true},
+ * {@code false} and {@code null}.
+ * <p>
+ * But what happens if the first value for a field is {@code null}? We
+ * don't know what kind of parser to create because we don't have a schema.
+ * Instead, we have to create a temporary placeholder parser that will consume
+ * nulls, waiting for a real type to show itself. Once that type appears, the
+ * null parser can replace itself with the correct form. Each vector's
+ * "fill empties" logic will back-fill the newly created vector with nulls
+ * for prior rows.
+ * <p>
+ * Two null parsers are needed: one when we see an empty list, and one for
+ * when we only see {@code null}. The one for {@code null{@code  must morph into
+ * the one for empty lists if we see:<br>
+ * {@code {a: null} {a: [ ]  }}<br>
+ * <p>
+ * If we get all the way through the batch, but have still not seen a type,
+ * then we have to guess. A prototype type system can tell us, otherwise we
+ * guess {@code VARCHAR}. ({@code VARCHAR} is the right choice for all-text
+ * mode, it is as good a guess as any for other cases.)
+ *
+ * <h4>Projection List Hints</h4>
+ *
+ * To help, we consult the projection list, if any, for a column. If the
+ * projection is of the form {@code a[0]}, we know the column had better
+ * be an array. Similarly, if the projection list has {@code b.c}, then
+ * {@code b} had better be an object.
+ *
+ * <h4>Array Handling</h4>
+ *
+ * The code here handles arrays in two ways. JSON normally uses the
+ * {@code LIST} type. But, that can be expensive if lists are
+ * well-behaved. So, the code here also implements arrays using the
+ * classic {@code REPEATED} types. The repeated type option is disabled
+ * by default. It can be enabled, for efficiency, if Drill ever supports
+ * a JSON schema. If an array is well-behaved, mark that column as able
+ * to use a repeated type.
+ *
+ * <h4>Ambiguous Types</h4>
+ *
+ * JSON nulls are untyped. A run of nulls does not tell us what type will
+ * eventually appear. The best solution is to provide a schema. Without a
+ * schema, the code is forgiving: defers selection of the column type until
+ * the first non-null value (or, forces a type at the end of the batch.)
+ * <p>
+ * For scalars the pattern is: <code>{a: null} {a: "foo"}</code>. Type
+ * selection happens on the value {@code "foo"}.
+ * <p>
+ * For arrays, the pattern is: <code>{a: []} {a: ["foo"]}</code>. Type
+ * selection happens on the first array element. Note that type selection
+ * must happen on the first element, even if tha element is null (which,
+ * as we just said, ambiguous.)
+ * <p>
+ * If we are forced to pick a type (because we hit the end of a batch, or
+ * we see {@code [null]}, then we pick {@code VARCHAR} as we allow any
+ * scalar to be converted to {@code VARCHAR}. This helps for a single-file
+ * query, but not if multiple fragments each make their own (inconsistent)
+ * decisions. Only a schema provides a consistent answer.
+ */
+public class TupleListener implements ObjectListener {
+
+  protected final JsonLoaderImpl loader;
+  protected final TupleWriter tupleWriter;
+  private final TupleMetadata providedSchema;
+
+  public TupleListener(JsonLoaderImpl loader, TupleWriter tupleWriter, TupleMetadata providedSchema) {
+    this.loader = loader;
+    this.tupleWriter = tupleWriter;
+    this.providedSchema = providedSchema;
+  }
+
+  public JsonLoaderImpl loader() { return loader; }
+
+  @Override
+  public void onStart() { }
+
+  @Override
+  public void onEnd() { }
+
+  @Override
+  public FieldType fieldType(String key) {
+    if (!tupleWriter.isProjected(key)) {
+      return FieldType.IGNORE;
+    }
+    ColumnMetadata providedCol = providedColumn(key);
+    if (providedCol == null) {
+      return FieldType.TYPED;
+    }
+    String mode = providedCol.property(JsonLoader.JSON_MODE);
+    if (mode == null) {
+      return FieldType.TYPED;
+    }
+    switch (mode) {
+      case JsonLoader.JSON_TEXT_MODE:
+        return FieldType.TEXT;
+      case JsonLoader.JSON_LITERAL_MODE:
+        return FieldType.JSON;
+      default:
+        return FieldType.TYPED;
+    }
+  }
+
+  /**
+   * Add a field not seen before. If a schema is provided, use the provided
+   * column schema to define the column. Else, build the column based on the
+   * look-ahead hints provided by the structure parser.
+   */
+  @Override
+  public ValueListener addField(String key, ValueDef valueDef) {
+    ColumnMetadata colSchema = providedColumn(key);
+    if (colSchema != null) {
+      return listenerFor(colSchema);
+    } else {
+      return listenerFor(key, valueDef);
+    }
+  }
+
+  public ColumnMetadata providedColumn(String key) {
+    return providedSchema == null ? null : providedSchema.metadata(key);
+  }
+
+  /**
+   * Build a column and its listener based on a provided schema.
+   */
+  private ValueListener listenerFor(ColumnMetadata colSchema) {
+    switch (colSchema.structureType()) {
+      case PRIMITIVE:
+        if (colSchema.isArray()) {
+          return scalarArrayListenerFor(colSchema);
+        } else {
+          return scalarListenerFor(colSchema);
+        }
+      case TUPLE:
+        if (colSchema.isArray()) {
+          return objectArrayListenerFor(colSchema);
+        } else {
+          return objectListenerFor(colSchema);
+        }
+      case VARIANT:
+        if (colSchema.isArray()) {
+          return variantArrayListenerFor(colSchema);
+        } else {
+          return variantListenerFor(colSchema);
+        }
+      case MULTI_ARRAY:
+        return repeatedListListenerFor(colSchema);
+      default:
+    }
+    throw loader.unsupportedType(colSchema);
+  }
+
+  /**
+   * Build a column and its listener based on a look-ahead hint.
+   */
+  protected ValueListener listenerFor(String key, ValueDef valueDef) {
+    if (!valueDef.isArray()) {
+      if (valueDef.type().isUnknown()) {
+        return unknownListenerFor(key);
+      } else if (valueDef.type().isObject()) {
+        return objectListenerFor(key, null);
+      } else {
+        return scalarListenerFor(key, valueDef.type());
+      }
+    } else if (valueDef.dimensions() == 1) {
+      if (valueDef.type().isUnknown()) {
+        return unknownArrayListenerFor(key, valueDef);
+      } else if (valueDef.type().isObject()) {
+        return objectArrayListenerFor(key, null);
+      } else {
+        return arrayListenerFor(key, valueDef.type());
+      }
+    } else if (valueDef.dimensions() == 2) {
+      if (valueDef.type().isUnknown()) {
+        return unknownArrayListenerFor(key, valueDef);
+      } else if (valueDef.type().isObject()) {
+        return repeatedListOfObjectsListenerFor(key, null);
+      } else {
+        return repeatedListListenerFor(key, valueDef);
+      }
+    } else {
+      throw loader.unsupportedArrayException(key, valueDef.dimensions());
+    }
+  }
+
+  public ScalarListener scalarListenerFor(String key, JsonType jsonType) {
+    ColumnMetadata colSchema = MetadataUtils.newScalar(key,
+        Types.optional(scalarTypeFor(key, jsonType)));
+    return scalarListenerFor(colSchema);
+  }
+
+  private ObjectWriter addFieldWriter(ColumnMetadata colSchema) {
+    int index = tupleWriter.addColumn(colSchema);
+    return tupleWriter.column(index);
+  }
+
+  public ScalarListener scalarListenerFor(ColumnMetadata colSchema) {
+    return ScalarListener.listenerFor(loader, addFieldWriter(colSchema));
+  }
+
+  public ObjectValueListener objectListenerFor(ColumnMetadata providedCol) {
+    return objectListenerFor(providedCol.name(), providedCol.tupleSchema());
+  }
+
+  public ObjectValueListener objectListenerFor(String key, TupleMetadata providedSchema) {
+    ColumnMetadata colSchema = MetadataUtils.newMap(key);
+    return new ObjectValueListener(loader, colSchema,
+        new TupleListener(loader, addFieldWriter(colSchema).tuple(),
+            providedSchema));
+  }
+
+  public ArrayValueListener objectArrayListenerFor(ColumnMetadata providedCol) {
+    return objectArrayListenerFor(providedCol.name(), providedCol.tupleSchema());
+  }
+
+  public ArrayValueListener objectArrayListenerFor(
+      String key, TupleMetadata providedSchema) {
+    ColumnMetadata colSchema = MetadataUtils.newMapArray(key);
+    ArrayWriter arrayWriter = addFieldWriter(colSchema).array();
+    return new ObjectArrayValueListener(loader, colSchema,
+        new ObjectArrayListener(loader, arrayWriter,
+            new ObjectValueListener(loader, colSchema,
+                new TupleListener(loader, arrayWriter.tuple(), providedSchema))));
+  }
+
+  public ArrayValueListener arrayListenerFor(String key, JsonType jsonType) {
+    ColumnMetadata colSchema = MetadataUtils.newScalar(key,
+        Types.repeated(scalarTypeFor(key, jsonType)));
+    return scalarArrayListenerFor(colSchema);
+  }
+
+  /**
+   * Convert the JSON type, obtained by looking ahead one token, to a Drill
+   * scalar type. Report an error if the JSON type does not map to a Drill
+   * type (which can occur in a context where we expect a scalar, but got
+   * an object or array.)
+   */
+  private MinorType scalarTypeFor(String key, JsonType jsonType) {
+    MinorType colType = drillTypeFor(jsonType);
+    if (colType == null) {
+      throw loader.unsupportedJsonTypeException(key, jsonType);
+    }
+    return colType;
+  }
+
+  public MinorType drillTypeFor(JsonType type) {
+    if (loader.options().allTextMode) {
+      return MinorType.VARCHAR;
+    }
+    switch (type) {
+    case BOOLEAN:
+      return MinorType.BIT;
+    case FLOAT:
+      return MinorType.FLOAT8;
+    case INTEGER:
+      if (loader.options().readNumbersAsDouble) {
+        return MinorType.FLOAT8;
+      } else {
+        return MinorType.BIGINT;
+      }
+    case STRING:
+      return MinorType.VARCHAR;
+    default:
+      return null;
+    }
+  }
+
+  public ArrayValueListener scalarArrayListenerFor(ColumnMetadata colSchema) {
+    return new ScalarArrayValueListener(loader, colSchema,
+        new ScalarArrayListener(loader, colSchema,
+            scalarListenerFor(colSchema)));
+  }
+
+  /**
+   * Create a listener when we don't have type information. For the case
+   * {@code null} appears before other values.
+   */
+  private ValueListener unknownListenerFor(String key) {
+    return new UnknownFieldListener(this, key);
+  }
+
+  /**
+   * Create a listener when we don't have type information. For the case
+   * {@code []} appears before other values.
+   */
+  private ValueListener unknownArrayListenerFor(String key, ValueDef valueDef) {
+    UnknownFieldListener fieldListener = new UnknownFieldListener(this, key);
+    fieldListener.array(valueDef);
+    return fieldListener;
+  }
+
+  private ValueListener variantListenerFor(ColumnMetadata colSchema) {
+    return new VariantListener(loader, addFieldWriter(colSchema).variant());
+  }
+
+  private ValueListener variantArrayListenerFor(ColumnMetadata colSchema) {
+    return new ListListener(loader, addFieldWriter(colSchema));
+  }
+
+  private ValueListener repeatedListListenerFor(String key, ValueDef valueDef) {
+    ColumnMetadata colSchema = new RepeatedListBuilder(key)
+        .addArray(scalarTypeFor(key, valueDef.type()))
+        .buildColumn();
+    return repeatedListListenerFor(colSchema);
+  }
+
+  /**
+   * Create a RepeatedList which contains (empty) Map objects using the provided
+   * schema. The map fields are created on the fly from the provided schema.
+   */
+  private ValueListener repeatedListOfObjectsListenerFor(String key, ColumnMetadata providedCol) {
+    ColumnMetadata colSchema = new RepeatedListBuilder(key)
+        .addMapArray()
+          .resumeList()
+        .buildColumn();
+    TupleMetadata providedSchema = providedCol == null ? null
+        : providedCol.childSchema().tupleSchema();
+    return RepeatedListValueListener.repeatedObjectListFor(loader,
+        addFieldWriter(colSchema), providedSchema);
+  }
+
+  /**
+   * Create a RepeatedList which contains Unions. (Actually, this is an
+   * array of List objects internally.) The variant is variable, it makes no
+   * sense to specify a schema for the variant. Also, omitting the schema
+   * save a large amount of complexity that will likely never be needed.
+   */
+  private ValueListener repeatedListOfVariantListenerFor(String key) {
+    ColumnMetadata colSchema = new RepeatedListBuilder(key)
+        .addList()
+          .resumeList()
+        .buildColumn();
+    return RepeatedListValueListener.repeatedVariantListFor(loader,
+        addFieldWriter(colSchema));
+  }
+
+  private ValueListener repeatedListListenerFor(ColumnMetadata colSchema) {
+    ColumnMetadata childSchema = colSchema.childSchema();
+    if (childSchema != null) {
+      if (childSchema.isMap()) {
+        return repeatedListOfObjectsListenerFor(colSchema.name(), colSchema);
+      }
+      if (childSchema.isVariant()) {
+        return repeatedListOfVariantListenerFor(colSchema.name());
+      }
+    }
+    return RepeatedListValueListener.repeatedListFor(loader, addFieldWriter(colSchema));
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/UnknownFieldListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/UnknownFieldListener.java
new file mode 100644
index 0000000..0a2ca34
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/UnknownFieldListener.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.NullTypeMarker;
+import org.apache.drill.exec.store.easy.json.parser.ArrayListener;
+import org.apache.drill.exec.store.easy.json.parser.ObjectListener;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef;
+import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
+import org.apache.drill.exec.store.easy.json.parser.ValueListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a rather odd state: we have seen a value of one or more
+ * {@code null}s or empty arrays ({@code []}), but we have not yet seen a
+ * value that would give us a type. This listener
+ * acts as a placeholder; waiting to see the type, at which point it replaces
+ * itself with the actual typed listener. If a batch completes with only nulls
+ * for this field, then the field becomes a {@code VARCHAR} field. Drill's "fill
+ * empties" logic will back-fill nulls. All values in
+ * subsequent batches will be read in "text mode" for that one field in
+ * order to avoid a schema change.
+ * <p>
+ * Note what this listener does <i>not</i> do: it does not create a nullable
+ * int field per Drill's normal (if less than ideal) semantics. First, JSON
+ * <b>never</b> produces an int field, so nullable int is less than ideal.
+ * Second, nullable int has no basis in reality and so is a poor choice
+ * on that basis.
+ * <p>
+ * Note that we <i>cannot</i> use this class for an array that
+ * contains nulls: {@code [null]}. The null is a value that must be
+ * stored, so we must guess the type as we have no good way to count
+ * array entries except via vectors.
+ */
+public class UnknownFieldListener extends AbstractValueListener implements NullTypeMarker {
+  protected static final Logger logger = LoggerFactory.getLogger(UnknownFieldListener.class);
+
+  protected final TupleListener parentTuple;
+  protected final String key;
+  protected ValueHost host;
+  private UnknownArrayListener unknownArray;
+
+  public UnknownFieldListener(TupleListener parentTuple, String key) {
+    super(parentTuple.loader());
+    this.parentTuple = parentTuple;
+    this.key = key;
+    loader.addNullMarker(this);
+  }
+
+  @Override
+  public void bind(ValueHost host) {
+    this.host = host;
+  }
+
+  @Override
+  public void onNull() {
+    if (unknownArray != null) {
+      // An array, must resolve to some type.
+      resolveScalar(JsonType.NULL).onNull();
+    }
+    // Else ignore: still don't know what this is
+  }
+
+  @Override
+  public void onBoolean(boolean value) {
+    resolveScalar(JsonType.BOOLEAN).onBoolean(value);
+  }
+
+  @Override
+  public void onInt(long value) {
+    resolveScalar(JsonType.INTEGER).onInt(value);
+  }
+
+  @Override
+  public void onFloat(double value) {
+    resolveScalar(JsonType.FLOAT).onFloat(value);
+  }
+
+  @Override
+  public void onString(String value) {
+    resolveScalar(JsonType.STRING).onString(value);
+  }
+
+  @Override
+  public void onEmbeddedObject(String value) {
+    resolveScalar(JsonType.EMBEDDED_OBJECT).onEmbeddedObject(value);
+  }
+
+  @Override
+  public ObjectListener object() {
+    return resolveScalar(JsonType.OBJECT).object();
+  }
+
+  /**
+   * The column type is now known from context. Create a new, scalar
+   * column, writer and listener to replace ourself: this is the last
+   * call that this listener will receive.
+   */
+  protected ValueListener resolveScalar(JsonType type) {
+    if (unknownArray == null) {
+      return resolveTo(parentTuple.scalarListenerFor(key, type));
+    } else {
+
+      // Saw {a: []}, {a: 10}. Since we infer that 10 is a
+      // single-element array, resolve to an array, then send
+      // the value to the element.
+      return unknownArray.element(new ValueDef(type, 0));
+    }
+  }
+
+  @Override
+  protected ColumnMetadata schema() {
+    throw new IllegalStateException("Unknown column has no schema");
+  }
+
+  @Override
+  public ArrayListener array(ValueDef valueDef) {
+    if (valueDef.dimensions() > 1) {
+
+      // if 2D+ array, then we know enough to choose a Repeated list
+      return resolveToArray(valueDef).array(valueDef);
+    }
+    if (unknownArray == null) {
+      unknownArray = new UnknownArrayListener(this);
+    }
+    return unknownArray;
+  }
+
+  protected ValueListener resolveTo(ValueListener newListener) {
+    host.bindListener(newListener);
+    loader.removeNullMarker(this);
+    return newListener;
+  }
+
+  @Override
+  public void forceResolution() {
+    if (unknownArray == null) {
+      logger.warn("Ambiguous type! JSON field {}" +
+          " contains all nulls. Assuming VARCHAR.", key);
+      resolveTo(parentTuple.scalarListenerFor(key, JsonType.STRING));
+    } else {
+      logger.warn("Ambiguous type! JSON array field {}" +
+          " contains all empty arrays. Assuming repeated VARCHAR.", key);
+      resolveTo(parentTuple.arrayListenerFor(key, JsonType.STRING));
+    }
+  }
+
+  public ValueListener resolveToArray(ValueDef valueDef) {
+    if (valueDef.type().isUnknown()) {
+      logger.warn("Ambiguous type! JSON array field {}" +
+          " starts with null element. Assuming repeated VARCHAR.", key);
+      valueDef = new ValueDef(JsonType.STRING, valueDef.dimensions());
+    }
+    return resolveTo(parentTuple.listenerFor(key, valueDef));
+  }
+
+  /**
+   * An unknown array within the unknown field. Represents an
+   * empty array: {@code []}. Resolves to a specific type upon
+   * presentation of the first element. If that element is
+   * {@code null}, must still choose a type to record nulls.
+   * <p>
+   * This array listener holds no element since none has been
+   * created yet; we use this only while we see empty arrays.
+   */
+  public static class UnknownArrayListener implements ArrayListener {
+
+    private final UnknownFieldListener parent;
+
+    public UnknownArrayListener(UnknownFieldListener parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void onStart() { }
+
+    @Override
+    public void onElementStart() { }
+
+    @Override
+    public void onElementEnd() { }
+
+    @Override
+    public void onEnd() { }
+
+    /**
+     * Saw the first actual element. Swap out the field listener
+     * for a real array, then return the new element listener.
+     */
+    @Override
+    public ValueListener element(ValueDef valueDef) {
+      ValueDef arrayDef = new ValueDef(valueDef.type(), valueDef.dimensions() + 1);
+      return parent.resolveToArray(arrayDef)
+          .array(arrayDef)
+          .element(valueDef);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/VarCharListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/VarCharListener.java
new file mode 100644
index 0000000..1e6cd31
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/VarCharListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Value listener for JSON string values. Allows conversion from
+ * other scalar types using the Java {@code toString()} semantics.
+ * Use the "text-mode" hint in a provided schema to get the literal
+ * JSON value.
+ */
+public class VarCharListener extends ScalarListener {
+
+  private final boolean classicArrayNulls;
+
+  public VarCharListener(JsonLoaderImpl loader, ScalarWriter writer) {
+    super(loader, writer);
+    classicArrayNulls = isArray ? loader.options().classicArrayNulls : false;
+  }
+
+  @Override
+  public void onBoolean(boolean value) {
+    writer.setString(Boolean.toString(value));
+  }
+
+  @Override
+  public void onInt(long value) {
+    writer.setString(Long.toString(value));
+  }
+
+  @Override
+  public void onFloat(double value) {
+    writer.setString(Double.toString(value));
+  }
+
+  @Override
+  public void onString(String value) {
+    writer.setString(value);
+  }
+
+  @Override
+  protected void setArrayNull() {
+    writer.setString(classicArrayNulls ? "null" : "");
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/VariantListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/VariantListener.java
new file mode 100644
index 0000000..dda731c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/VariantListener.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.easy.json.parser.ObjectListener;
+import org.apache.drill.exec.vector.accessor.VariantWriter;
+
+/**
+ * Listener for a UNION type column which maps each JSON type to
+ * the matching Drill type within the UNION. Used only if a column
+ * is declared as UNION in the provided schema. This implementation
+ * does not have a way to convert a non-UNION column into a UNION
+ * during the scan. The reason is simple: the scan is obligated to
+ * return a consistent schema. Converting a column between types,
+ * especially after returning the first batch, will lead to an
+ * inconsistent schema and to downstream schema change failures.
+ */
+public class VariantListener extends AbstractValueListener {
+
+  private final VariantWriter writer;
+
+  public VariantListener(JsonLoaderImpl loader, VariantWriter writer) {
+    super(loader);
+    this.writer = writer;
+  }
+
+  @Override
+  public void onNull() { }
+
+  @Override
+  public void onBoolean(boolean value) {
+    writer.scalar(MinorType.BIT).setBoolean(value);
+  }
+
+  @Override
+  public void onInt(long value) {
+    writer.scalar(MinorType.BIGINT).setLong(value);
+  }
+
+  @Override
+  public void onFloat(double value) {
+    writer.scalar(MinorType.FLOAT8).setDouble(value);
+  }
+
+  @Override
+  public void onString(String value) {
+    writer.scalar(MinorType.VARCHAR).setString(value);
+  }
+
+  @Override
+  protected ColumnMetadata schema() {
+    return writer.schema();
+  }
+
+  @Override
+  public ObjectListener object() {
+    return new VariantTupleListener(loader, writer);
+  }
+
+  private static class VariantTupleListener extends TupleListener {
+
+    private final VariantWriter writer;
+
+    public VariantTupleListener(JsonLoaderImpl loader, VariantWriter writer) {
+      super(loader, writer.tuple(), null);
+      this.writer = writer;
+    }
+
+    @Override
+    public void onStart() {
+      writer.setType(MinorType.MAP);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueListener.java
index 6037069..94587d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueListener.java
@@ -120,7 +120,7 @@
    *
    * @param value the string value of the parsed token
    */
-  void onEmbedddObject(String value);
+  void onEmbeddedObject(String value);
 
   /**
    * The parser has encountered a object value for the field for the first
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueParser.java
index 8a7fd77..1cd0986 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ValueParser.java
@@ -72,7 +72,7 @@
           listener.onString(tokenizer.stringValue());
           break;
         case VALUE_EMBEDDED_OBJECT:
-          listener.onEmbedddObject(tokenizer.stringValue());
+          listener.onEmbeddedObject(tokenizer.stringValue());
         default:
           // Won't get here: the Jackson parser catches
           // errors.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index 6183106..1e7a9a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -96,7 +96,9 @@
         .buildSchema();
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("a"));
     assertTrue(projSet.isProjected(readerSchema.metadata("a")));
+    assertFalse(projSet.isProjected("d"));
     assertFalse(projSet.isProjected(readerSchema.metadata("d")));
   }
 
@@ -159,8 +161,11 @@
     // projection type is used here for testing; should not be used by
     // an actual reader.
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("a"));
     assertTrue(projSet.isProjected(readerSchema.metadata("a")));
+    assertTrue(projSet.isProjected("c"));
     assertTrue(projSet.isProjected(readerSchema.metadata("c")));
+    assertFalse(projSet.isProjected("d"));
     assertFalse(projSet.isProjected(readerSchema.metadata("d")));
   }
 
@@ -204,7 +209,9 @@
         .buildSchema();
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("a"));
     assertTrue(projSet.isProjected(readerSchema.metadata("a")));
+    assertFalse(projSet.isProjected("c"));
     assertFalse(projSet.isProjected(readerSchema.metadata("c")));
   }
 
@@ -273,6 +280,7 @@
         .buildSchema();
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertFalse(projSet.isProjected("a"));
     assertFalse(projSet.isProjected(readerSchema.metadata("a")));
   }
 
@@ -381,7 +389,9 @@
     assertSame(providedSchema.metadata("b"), ((UnresolvedColumn) bCol).metadata());
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("a"));
     assertTrue(projSet.isProjected(providedSchema.metadata("a")));
+    assertTrue(projSet.isProjected("b"));
     assertTrue(projSet.isProjected(providedSchema.metadata("b")));
   }
 
@@ -467,6 +477,7 @@
         .buildSchema();
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("b"));
     try {
       projSet.isProjected(readerSchema.metadata("b"));
       fail();
@@ -497,6 +508,7 @@
         .buildSchema();
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("b"));
     try {
       projSet.isProjected(readerSchema.metadata("b"));
       fail();
@@ -529,8 +541,11 @@
         .buildSchema();
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("a"));
     assertTrue(projSet.isProjected(readerSchema.metadata("a")));
+    assertFalse(projSet.isProjected("b"));
     assertFalse(projSet.isProjected(readerSchema.metadata("b")));
+    assertTrue(projSet.isProjected("c"));
     assertTrue(projSet.isProjected(readerSchema.metadata("c")));
   }
 
@@ -560,8 +575,11 @@
         .buildSchema();
 
     ProjectionFilter projSet = scanProj.readerProjection();
+    assertTrue(projSet.isProjected("a"));
     assertTrue(projSet.isProjected(readerSchema.metadata("a")));
+    assertFalse(projSet.isProjected("b"));
     assertFalse(projSet.isProjected(readerSchema.metadata("b")));
+    assertFalse(projSet.isProjected("c"));
     assertFalse(projSet.isProjected(readerSchema.metadata("c")));
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
index 1996d05..880a316 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
@@ -46,7 +46,6 @@
  * lists and repeated lists. This test verifies that it assembles the various
  * pieces correctly for the various nesting combinations.
  */
-
 @Category(RowSetTests.class)
 public class TestSchemaBuilder extends DrillTest {
 
@@ -94,7 +93,6 @@
         .add(aField);
 
     // Internal method, does not return builder itself.
-
     builder.addColumn(bCol);
 
     TupleMetadata schema = builder.buildSchema();
@@ -116,7 +114,6 @@
    * Tests creating a map within a row.
    * Also the basic map add column methods.
    */
-
   @Test
   public void testMapInRow() {
     TupleMetadata schema = new SchemaBuilder()
@@ -164,7 +161,6 @@
    * Test building a union in the top-level schema.
    * Also tests the basic union add type methods.
    */
-
   @Test
   public void testUnionInRow() {
     TupleMetadata schema = new SchemaBuilder()
@@ -205,7 +201,6 @@
   /**
    * Test building a list (of unions) in the top-level schema.
    */
-
   @Test
   public void testListInRow() {
     TupleMetadata schema = new SchemaBuilder()
@@ -252,7 +247,6 @@
   /**
    * Test building a repeated list in the top-level schema.
    */
-
   @Test
   public void testRepeatedListInRow() {
     TupleMetadata schema = new SchemaBuilder()
@@ -320,7 +314,6 @@
    * VarChar in lists, unions or repeated lists because these
    * cases are obscure and seldom (never?) used.
    */
-
   @Test
   public void testVarCharPrecision() {
     TupleMetadata schema = new SchemaBuilder()
@@ -348,7 +341,6 @@
    * broken in Drill, so we don't bother about decimals in unions,
    * lists or repeated lists, though those methods could be added.
    */
-
   @Test
   public void testDecimal() {
     TupleMetadata schema = new SchemaBuilder()
@@ -485,7 +477,6 @@
   /**
    * Verify that the map-in-map plumbing works.
    */
-
   @Test
   public void testMapInMap() {
     TupleMetadata schema = new SchemaBuilder()
@@ -512,7 +503,6 @@
   /**
    * Verify that the union-in-map plumbing works.
    */
-
   @Test
   public void testUnionInMap() {
     TupleMetadata schema = new SchemaBuilder()
@@ -538,7 +528,6 @@
   /**
    * Verify that the repeated list-in-map plumbing works.
    */
-
   @Test
   public void testRepeatedListInMap() {
     TupleMetadata schema = new SchemaBuilder()
@@ -620,7 +609,6 @@
   // and repeated lists key off of the same type code: LIST, so it is
   // ambiguous which is supported. The schema builder muddles through this
   // case, but the rest of the code might not.
-
   @Test
   public void testListInUnion() {
     TupleMetadata schema = new SchemaBuilder()
@@ -645,7 +633,6 @@
   }
 
   // Note: union-in-union not supported in Drill
-
   @Test
   public void testMapInRepeatedList() {
     TupleMetadata schema = new SchemaBuilder()
@@ -675,7 +662,6 @@
    * Test that repeated lists can be nested to provide 3D or
    * higher dimensions.
    */
-
   @Test
   public void testRepeatedListInRepeatedList() {
     TupleMetadata schema = new SchemaBuilder()
@@ -800,7 +786,7 @@
 
   @Test
   public void testStandaloneUnionBuilder() {
-    ColumnMetadata columnMetadata = new UnionBuilder("u", MinorType.VARCHAR)
+    ColumnMetadata columnMetadata = new UnionBuilder("u", MinorType.UNION)
       .addType(MinorType.INT)
       .addType(MinorType.VARCHAR)
       .buildColumn();
@@ -812,5 +798,4 @@
     assertTrue(variantMetadata.hasType(MinorType.INT));
     assertTrue(variantMetadata.hasType(MinorType.VARCHAR));
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
new file mode 100644
index 0000000..d45b197
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.EmptyErrorContext;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetOptionBuilder;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+
+public class BaseJsonLoaderTest extends SubOperatorTest {
+
+  protected static class JsonLoaderFixture {
+
+    public ResultSetOptionBuilder rsLoaderOptions = new ResultSetOptionBuilder();
+    public TupleMetadata providedSchema;
+    public JsonLoaderOptions jsonOptions = new JsonLoaderOptions();
+    public CustomErrorContext errorContext = new EmptyErrorContext();
+    private ResultSetLoader rsLoader;
+    private JsonLoader loader;
+
+    public void open(InputStream is) {
+      rsLoader = new ResultSetLoaderImpl(fixture.allocator(), rsLoaderOptions.build());
+      loader = new JsonLoaderImpl(rsLoader, providedSchema, jsonOptions, errorContext, is);
+    }
+
+    public void open(String json) {
+      InputStream stream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
+      open(stream);
+    }
+
+    public RowSet next() {
+      if (!loader.next()) {
+        return null;
+      }
+      loader.endBatch();
+      return fixture.wrap(rsLoader.harvest());
+    }
+
+    public void close() {
+      loader.close();
+      rsLoader.close();
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestObjects.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestObjects.java
new file mode 100644
index 0000000..420c2fa
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestObjects.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+public class TestObjects extends BaseJsonLoaderTest {
+
+  @Test
+  public void testMap() {
+    String json =
+        "{a: 1, m: {b: 10, c: 20}}\n" +
+        "{a: 2, m: {b: 110}}\n" +
+        "{a: 3, m: {c: 220}}\n" +
+        "{a: 4, m: {}}\n" +
+        "{a: 5, m: null}\n" +
+        "{a: 6}\n" +
+        "{a: 7, m: {b: 710, c: 720}}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addMap("m")
+          .addNullable("b", MinorType.BIGINT)
+          .addNullable("c", MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1L, mapValue(10L, 20L))
+        .addRow(2L, mapValue(110L, null))
+        .addRow(3L, mapValue(null, 220L))
+        .addRow(4L, mapValue(null, null))
+        .addRow(5L, mapValue(null, null))
+        .addRow(6L, mapValue(null, null))
+        .addRow(7L, mapValue(710L, 720L))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * Without a schema, leading nulls or empty maps can be ambiguous.
+   * With a schema, the meaning is clear.
+   */
+  @Test
+  public void testMapWithSchema() {
+    String json =
+        "{a: 6}\n" +
+        "{a: 5, m: null}\n" +
+        "{a: 4, m: {}}\n" +
+        "{a: 2, m: {b: 110}}\n" +
+        "{a: 3, m: {c: 220}}\n" +
+        "{a: 1, m: {b: 10, c: 20}}\n" +
+        "{a: 7, m: {b: 710, c: 720}}";
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addMap("m")
+          .addNullable("b", MinorType.BIGINT)
+          .addNullable("c", MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(6L, mapValue(null, null))
+        .addRow(5L, mapValue(null, null))
+        .addRow(4L, mapValue(null, null))
+        .addRow(2L, mapValue(110L, null))
+        .addRow(3L, mapValue(null, 220L))
+        .addRow(1L, mapValue(10L, 20L))
+        .addRow(7L, mapValue(710L, 720L))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testMapAsJson() {
+    String json =
+        "{a: 6}\n" +
+        "{a: 5, m: null}\n" +
+        "{a: 4, m: {}}\n" +
+        "{a: 2, m: {b: 110}}\n" +
+        "{a: 3, m: {c: 220}}\n" +
+        "{a: 1, m: {b: 10, c: 20}}\n" +
+        "{a: 7, m: {b: 710, c: 720}}";
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addNullable("m", MinorType.VARCHAR)
+        .build();
+    ColumnMetadata m = schema.metadata("m");
+    m.setProperty(JsonLoader.JSON_MODE, JsonLoader.JSON_LITERAL_MODE);
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(6L, null)
+        .addRow(5L, "null")
+        .addRow(4L, "{}")
+        .addRow(2L, "{\"b\": 110}")
+        .addRow(3L, "{\"c\": 220}")
+        .addRow(1L, "{\"b\": 10, \"c\": 20}")
+        .addRow(7L, "{\"b\": 710, \"c\": 720}")
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testMapArray() {
+    String json =
+        "{a: 1, m: [{b: 10, c: 20}, {b: 11, c: 21}]}\n" +
+        "{a: 2, m: [{b: 110}]}\n" +
+        "{a: 3, m: [{c: 220}]}\n" +
+        "{a: 4, m: [{}]}\n" +
+        "{a: 5, m: [null]}\n" +
+        "{a: 6, m: []}\n" +
+        "{a: 7, m: null}\n" +
+        "{a: 8}\n" +
+        "{a: 9, m: [{b: 710, c: 720}, {b: 711, c: 721}]}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addMapArray("m")
+          .addNullable("b", MinorType.BIGINT)
+          .addNullable("c", MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1L, mapArray(mapValue(10L, 20L), mapValue(11L, 21L)))
+        .addRow(2L, mapArray(mapValue(110L, null)))
+        .addRow(3L, mapArray(mapValue(null, 220L)))
+        .addRow(4L, mapArray(mapValue(null, null)))
+        .addRow(5L, mapArray(mapValue(null, null)))
+        .addRow(6L, mapArray())
+        .addRow(7L, mapArray())
+        .addRow(8L, mapArray())
+        .addRow(9L, mapArray(mapValue(710L, 720L), mapValue(711L, 721L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * With a schema we don't have to infer the type of the map or its members.
+   * Instead, we can tolerate extreme ambiguity in both.
+   */
+  @Test
+  public void testMapArrayWithSchema() {
+    String json =
+        "{a:  8}\n" +
+        "{a:  7, m: null}\n" +
+        "{a:  6, m: []}\n" +
+        "{a:  5, m: [null]}\n" +
+        "{a:  4, m: [{}]}\n" +
+        "{a: 10, m: [{b: null}]}\n" +
+        "{a: 11, m: [{c: null}]}\n" +
+        "{a: 12, m: [{b: null}, {c: null}]}\n" +
+        "{a:  2, m: [{b: 110}]}\n" +
+        "{a:  3, m: [{c: 220}]}\n" +
+        "{a:  1, m: [{b: 10, c: 20}, {b: 11, c: 21}]}\n" +
+        "{a:  9, m: [{b: 710, c: 720}, {b: 711, c: 721}]}";
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addMapArray("m")
+          .addNullable("b", MinorType.BIGINT)
+          .addNullable("c", MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow( 8L, mapArray())
+        .addRow( 7L, mapArray())
+        .addRow( 6L, mapArray())
+        .addRow( 5L, mapArray(mapValue(null, null)))
+        .addRow( 4L, mapArray(mapValue(null, null)))
+        .addRow(10L, mapArray(mapValue(null, null)))
+        .addRow(11L, mapArray(mapValue(null, null)))
+        .addRow(12L, mapArray(mapValue(null, null), mapValue(null, null)))
+        .addRow( 2L, mapArray(mapValue(110L, null)))
+        .addRow( 3L, mapArray(mapValue(null, 220L)))
+        .addRow( 1L, mapArray(mapValue(10L, 20L), mapValue(11L, 21L)))
+        .addRow( 9L, mapArray(mapValue(710L, 720L), mapValue(711L, 721L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * The structure parser feels its way along looking ahead at tokens
+   * to guess types. Test the case where the member is an array containing
+   * null (so the parser does not know the type). Given a schema, we know
+   * it is a map.
+   */
+  @Test
+  public void testMapArrayWithSchemaInitialNullMember() {
+    String json =
+        "{a:  5, m: [null]}\n" +
+        "{a:  1, m: [{b: 10, c: 20}, {b: 11, c: 21}]}\n";
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addMapArray("m")
+          .addNullable("b", MinorType.BIGINT)
+          .addNullable("c", MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow( 5L, mapArray(mapValue(null, null)))
+        .addRow( 1L, mapArray(mapValue(10L, 20L), mapValue(11L, 21L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testObjectToScalar() {
+    String json =
+        "{a: {b: 10}} {a: 10}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    try {
+      loader.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("integer"));
+    }
+    loader.close();
+  }
+
+  @Test
+  public void testObjectToArray() {
+    String json =
+        "{a: {b: 10}} {a: [10]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    try {
+      loader.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("integer[]"));
+    }
+    loader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestRepeatedList.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestRepeatedList.java
new file mode 100644
index 0000000..8a54863
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestRepeatedList.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+/**
+ * Tests repeated lists to form a 2D array of various data types.
+ */
+public class TestRepeatedList extends BaseJsonLoaderTest {
+
+  @Test
+  public void test2DScalars() {
+    String json =
+        "{a: [[1, 2], [3, 4, 5]]}\n" +
+        "{a: [[6], [7, 8]]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addArray(MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(objArray(
+            longArray(1L, 2L), longArray(3L, 4L, 5L)))
+        .addSingleCol(objArray(
+            longArray(6L), longArray(7L, 8L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void test2DUnknown() {
+    String json =
+        "{a: []} {a: [[1, 2], [3, 4, 5]]}\n";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addArray(MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(null)
+        .addSingleCol(objArray(
+            longArray(1L, 2L), longArray(3L, 4L, 5L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void test2DScalarWithSchema() {
+    String json =
+        "{a: null} {a: []} {a: [null]} {a: [[]]} {a: [[null]]}\n" +
+        "{a: [[1, 2], [3, 4, 5]]}\n";
+    TupleMetadata schema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addArray(MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addSingleCol(objArray())
+        .addSingleCol(objArray())
+        .addSingleCol(singleObjArray(longArray()))
+        .addSingleCol(singleObjArray(longArray()))
+        .addSingleCol(singleObjArray(longArray(0L)))
+        .addSingleCol(objArray(
+            longArray(1L, 2L), longArray(3L, 4L, 5L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void test2DUnknownForcedNull() {
+    String json =
+        "{a: []} {a: [[null]]}\n" +
+        "{a: [[1, 2], [3, 4, 5]]}\n";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addArray(MinorType.VARCHAR)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(null)
+        .addSingleCol(singleObjArray(strArray("")))
+        .addSingleCol(objArray(
+            strArray("1", "2"), strArray("3", "4", "5")))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void test2DUnknownForcedEmptyArray() {
+    String json =
+        "{a: []} {a: [[]]} {a: [[1, 2], [3, 4, 5]]}\n";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addArray(MinorType.VARCHAR)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(null)
+        .addSingleCol(singleObjArray(strArray()))
+        .addSingleCol(objArray(
+            strArray("1", "2"), strArray("3", "4", "5")))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void test3DScalars() {
+    String json =
+        "{a: [[[1, 2]]]]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    try {
+      loader.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("arrays deeper than two levels"));
+    }
+    loader.close();
+  }
+
+  @Test
+  public void test2DObjects() {
+    String json =
+        "{a: [[{b: 1}, {b: 2}], [{b: 3}, {b: 4}, {b: 5}]]}\n" +
+        "{a: [[{b: 6}], [{b: 7}, {b: 8}]]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addMapArray()
+            .addNullable("b", MinorType.BIGINT)
+            .resumeList()
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(objArray(
+            objArray(mapValue(1L), mapValue(2L)),
+            objArray(mapValue(3L), mapValue(4L), mapValue(5L))))
+        .addSingleCol(objArray(
+            singleObjArray(mapValue(6L)),
+            objArray(mapValue(7L), mapValue(8L))))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void test2DObjectsWithSchema() {
+    String json =
+        "{a: []} {a: [[null]]}\n" +
+        "{a: [[{b: 1}, {b: 2}], [{b: 3}, {b: 4}, {b: 5}]]}\n" +
+        "{a: [[{b: 6}], [{b: 7}, {b: 8}]]}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addMapArray()
+            .addNullable("b", MinorType.BIGINT)
+            .resumeList()
+          .resumeSchema()
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addSingleCol(objArray())
+        .addSingleCol(singleObjArray(mapValue((Long) null)))
+        .addSingleCol(objArray(
+            objArray(mapValue(1L), mapValue(2L)),
+            objArray(mapValue(3L), mapValue(4L), mapValue(5L))))
+        .addSingleCol(objArray(
+            singleObjArray(mapValue(6L)),
+            objArray(mapValue(7L), mapValue(8L))))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void test2DVariantWithSchema() {
+    String json =
+        "{a: []} {a: [[null]]}\n" +
+        "{a: [[true, 10], [20.5, \"foo\"]]}" +
+        "{a: [[{b: 1}, 2], [{b: 3}, \"four\", {b: 5}]]}\n";
+    TupleMetadata schema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addList()
+            .resumeList()
+          .resumeSchema()
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addRepeatedList("a")
+          .addList()
+            .addType(MinorType.BIGINT)
+            .addType(MinorType.BIT)
+            .addType(MinorType.VARCHAR)
+            .addType(MinorType.FLOAT8)
+            .addMap()
+               .addNullable("b", MinorType.BIGINT)
+              .resumeUnion()
+            .resumeList()
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(objArray())
+        .addSingleCol(singleObjArray(singleObjArray(null)))
+        .addSingleCol(objArray(
+            objArray(true, 10L),
+            objArray(20.5D, "foo")))
+        .addSingleCol(objArray(
+            objArray(mapValue(1L), 2L),
+            objArray(mapValue(3L), "four", mapValue(5L))))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestScalarArrays.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestScalarArrays.java
new file mode 100644
index 0000000..8d82878
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestScalarArrays.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.boolArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.doubleArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+/**
+ * Test scalar arrays. Without a schema, the first array token
+ * sets the type. With a schema, the type is known independent of
+ * the first token (if any).
+ * <p>
+ * Verifies that a single scalar is harmlessly treated as an
+ * array of one element. Verifies default type conversions.
+ * Verifies that null array elements are converted to a default
+ * value for the type (false, 0 or empty string.)
+ */
+public class TestScalarArrays extends BaseJsonLoaderTest {
+
+  @Test
+  public void testBoolean() {
+    String json =
+        "{a: [true, false, null]} {a: []} {a: null} " +
+        "{a: true} {a: false} " +
+        "{a: [0, 1.0, \"true\", \"\"]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.BIT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(boolArray(true, false, false))
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray(true))
+        .addSingleCol(boolArray(false))
+        .addSingleCol(boolArray(false, true, true, false))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testAllTextBoolean() {
+    String json =
+        "{a: [true, false, null]} {a: []} {a: null}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allTextMode = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(strArray("true", "false", ""))
+        .addSingleCol(strArray())
+        .addSingleCol(strArray())
+         .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testBooleanWithSchema() {
+    String json =
+        "{a: []} {a: null} {a: [true, false]} " +
+        "{a: true} {a: false} " +
+        "{a: [0, 1.0, \"true\", \"\"]}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("a", MinorType.BIT)
+        .build();
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray(true, false))
+        .addSingleCol(boolArray(true))
+        .addSingleCol(boolArray(false))
+        .addSingleCol(boolArray(false, true, true, false))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testInt() {
+    String json =
+        "{a: [2, 4, null]} {a: []} {a: null} " +
+        "{a: 10} " +
+        "{a: [3, 2.3, true, false, \"5\", \"\"]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.BIGINT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(longArray(2L, 4L, 0L))
+        .addSingleCol(longArray())
+        .addSingleCol(longArray())
+        .addSingleCol(longArray(10L))
+        .addSingleCol(longArray(3L, 2L, 1L, 0L, 5L, 0L))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testIntAsDouble() {
+    String json =
+        "{a: [2, 4, null]} {a: []} {a: null} " +
+        "{a: 10} " +
+        "{a: [3, 2.25, true, false, \"5\", \"\"]}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.readNumbersAsDouble = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.FLOAT8)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(doubleArray(2D, 4D, 0D))
+        .addSingleCol(doubleArray())
+        .addSingleCol(doubleArray())
+        .addSingleCol(doubleArray(10D))
+        .addSingleCol(doubleArray(3D, 2.25D, 1D, 0D, 5D, 0D))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testAllTextInt() {
+    String json =
+        "{a: [2, 4, null]} {a: []} {a: null}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allTextMode = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(strArray("2", "4", ""))
+        .addSingleCol(strArray())
+        .addSingleCol(strArray())
+         .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testIntWithSchema() {
+    String json =
+        "{a: []} {a: null} {a: [2, 4, null]} " +
+        "{a: 10} " +
+        "{a: [3, 2.3, true, false, \"5\", \"\"]}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("a", MinorType.BIGINT)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addSingleCol(longArray())
+        .addSingleCol(longArray())
+        .addSingleCol(longArray(2L, 4L, 0L))
+        .addSingleCol(longArray(10L))
+        .addSingleCol(longArray(3L, 2L, 1L, 0L, 5L, 0L))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testDouble() {
+    String json =
+        "{a: [2.25, 4.5, null]} {a: []} {a: null} " +
+        "{a: 10.125} " +
+        "{a: [3, 2.75, true, false, \"5.25\", \"\"]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.FLOAT8)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(doubleArray(2.25D, 4.5D, 0D))
+        .addSingleCol(doubleArray())
+        .addSingleCol(doubleArray())
+        .addSingleCol(doubleArray(10.126D))
+        .addSingleCol(doubleArray(3D, 2.75D, 1.0D, 0.0D, 5.25D, 0D))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testAllTextDouble() {
+    String json =
+        "{a: [2.25, 4.5, null]} {a: []} {a: null}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allTextMode = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(strArray("2.25", "4.5", ""))
+        .addSingleCol(strArray())
+        .addSingleCol(strArray())
+         .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testDoubleWithSchema() {
+    String json =
+        "{a: []} {a: null} {a: [2.25, 4.5, null]} " +
+        "{a: 10.125} " +
+        "{a: [3, 2.75, true, false, \"5.25\", \"\"]}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("a", MinorType.FLOAT8)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addSingleCol(doubleArray())
+        .addSingleCol(doubleArray())
+        .addSingleCol(doubleArray(2.25D, 4.5D, 0D))
+        .addSingleCol(doubleArray(10.126D))
+        .addSingleCol(doubleArray(3D, 2.75D, 1.0D, 0.0D, 5.25D, 0D))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testString() {
+    String json =
+        "{a: [\"foo\", \"\", null]} {a: []} {a: null} " +
+        "{a: \"bar\"} " +
+        "{a: [3, 2.75, true, false]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(strArray("foo", "", ""))
+        .addSingleCol(strArray())
+        .addSingleCol(strArray())
+        .addSingleCol(strArray("bar"))
+        .addSingleCol(strArray("3", "2.75", "true", "false"))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testStringClassicNulls() {
+    String json =
+        "{a: [\"foo\", \"\", null]} {a: []} {a: null}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.classicArrayNulls = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(strArray("foo", "", "null"))
+        .addSingleCol(strArray())
+        .addSingleCol(strArray())
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testStringWithSchema() {
+    String json =
+        "{a: []} {a: null} {a: [\"foo\", \"\", null]} " +
+        "{a: \"bar\"} " +
+        "{a: [3, 2.75, true, false]}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addSingleCol(strArray())
+        .addSingleCol(strArray())
+        .addSingleCol(strArray("foo", "", ""))
+        .addSingleCol(strArray("bar"))
+        .addSingleCol(strArray("3", "2.75", "true", "false"))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * Cannot shift to a nested array from a repeated scalar.
+   */
+  @Test
+  public void testNestedArray() {
+    String json =
+        "{a: [2, 4]} {a: [[10, 20]]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    try {
+      loader.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("integer[][]"));
+    } finally {
+      loader.close();
+    }
+  }
+
+  /**
+   * Cannot shift to an object from a repeated scalar.
+   */
+  @Test
+  public void testArrayWithObject() {
+    String json =
+        "{a: [2, 4]} {a: {b: 10}}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    try {
+      loader.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("object"));
+    } finally {
+      loader.close();
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestScalars.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestScalars.java
new file mode 100644
index 0000000..0b19366
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestScalars.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.resultSet.project.Projections;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+/**
+ * Tests JSON scalar handling. Without a schema, the first non-null value
+ * determines the type of the column. With a schema, then the schema determines
+ * the column type independent of the first data row.
+ * <p>
+ * In either case, all scalars perform conversion from all other scalars. If data
+ * is clean, then the conversion will never be used. If the data is messy, then
+ * the result, when combined with a schema, produces a repeatable (if perhaps still
+ * messy) result. The goal is that, with a schema, the query should not fail due
+ * to a few messy rows a billion rows in, or due to the order that the scanners
+ * see the data.
+ */
+public class TestScalars extends BaseJsonLoaderTest {
+
+  /**
+   * Test Boolean type using type inference to guess the type from the
+   * first row of data. All other types can be converted to Boolean.
+   */
+  @Test
+  public void testBoolean() {
+    String json =
+        "{a: true} {a: false} {a: null} " +
+        "{a: 1} {a: 0} " +
+        "{a: 1.0} {a: 0.0} " +
+        "{a: \"true\"} {a: \"\"} {a: \"false\"} {a: \"other\"}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(true)   // true
+        .addRow(false)  // false
+        .addRow((Boolean) null)   // null
+        .addRow(true)   // 1
+        .addRow(false)  // 0
+        .addRow(true)   // 1.0
+        .addRow(false)  // 0.0
+        .addRow(true)   // "true"
+        .addRow((Boolean) null)  // ""
+        .addRow(false)  // "false"
+        .addRow(false)  // "other"
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testAllTextBoolean() {
+    String json =
+        "{a: true} {a: false} {a: null}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allTextMode = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("true")   // true
+        .addRow("false")  // false
+        .addRow((String) null)    // null
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * Test Boolean with a provided schema which states the column type
+   * independent of the first value. Test with leading values which are
+   * not Boolean.
+   */
+  @Test
+  public void testBooleanWithSchema() {
+    String json =
+        "{a: 1} {a: 0} " +
+        "{a: 1.0} {a: 0.0} " +
+        "{a: \"true\"} {a: \"\"} {a: \"false\"} {a: \"other\"}" +
+        "{a: true} {a: false} {a: null}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIT)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(true)   // 1
+        .addRow(false)  // 0
+        .addRow(true)   // 1.0
+        .addRow(false)  // 0.0
+        .addRow(true)   // "true"
+        .addRow((Boolean) null)  // ""
+        .addRow(false)  // "false"
+        .addRow(false)  // "other"
+        .addRow(true)   // true
+        .addRow(false)  // false
+        .addRow((Boolean) null)   // null
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testInt() {
+    String json =
+        "{a: 1} {a: 0} {a: -300} {a: null} " +
+        "{a: true} {a: false} " +
+        "{a: 1.0} {a: 1.4} {a: 1.5} {a: 0.0} " +
+        "{a: \"\"} {a: \"3\"}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1)      // 1
+        .addRow(0)      // 0
+        .addRow(-300)   // -300
+        .addRow((Long) null)   // null
+        .addRow(1)      // true
+        .addRow(0)      // false
+        .addRow(1)      // 1.0
+        .addRow(1)      // 1.4
+        .addRow(2)      // 1.5
+        .addRow(0)      // 0.0
+        .addRow((Long) null)   // ""
+        .addRow(3)      // "3"
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testIntAsDouble() {
+    String json =
+        "{a: 1} {a: 0} {a: -300} {a: null} " +
+        "{a: true} {a: false} " +
+        "{a: 1.0} {a: 1.5} {a: 0.0} " +
+        "{a: \"\"} {a: \"3\"}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.readNumbersAsDouble = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.FLOAT8)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1D)      // 1
+        .addRow(0D)      // 0
+        .addRow(-300D)   // -300
+        .addRow((Double) null)   // null
+        .addRow(1D)      // true
+        .addRow(0D)      // false
+        .addRow(1D)      // 1.0
+        .addRow(1.5D)    // 1.5
+        .addRow(0D)      // 0.0
+        .addRow((Double) null)   // ""
+        .addRow(3D)      // "3"
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testAllTextInt() {
+    String json =
+        "{a: 1} {a: 0} {a: -300} {a: null}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allTextMode = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("1")    // 1
+        .addRow("0")    // 0
+        .addRow("-300") // -300
+        .addRow((String) null)    // null
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testIntWithSchema() {
+    String json =
+        "{a: true} {a: false} " +
+        "{a: 1.0} {a: 1.4} {a: 1.5} {a: 0.0} " +
+        "{a: \"\"} {a: \"3\"} " +
+        "{a: 1} {a: 0} {a: -300} {a: null}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(1)      // true
+        .addRow(0)      // false
+        .addRow(1)      // 1.0
+        .addRow(1)      // 1.4
+        .addRow(2)      // 1.5
+        .addRow(0)      // 0.0
+        .addRow((Long) null)   // ""
+        .addRow(3)      // "3"
+        .addRow(1)      // 1
+        .addRow(0)      // 0
+        .addRow(-300)   // -300
+        .addRow((Long) null)   // null
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * There are limits on Drill's generosity. If no conversion exists
+   * to int, the query will fail with a descriptive error.
+   */
+  @Test
+  public void testIntWithError() {
+    String json =
+        "{a: 1}\n{a: \"abc\"}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    try {
+      loader.next();
+      fail();
+    } catch (UserException e) {
+
+      // Robust check of error contents. Only need test once, error
+      // code is generic
+      String msg = e.getMessage();
+      assertTrue(msg.contains("not compatible"));
+      assertTrue(msg.contains("Column: a"));
+      assertTrue(msg.contains("Column type: BIGINT"));
+      assertTrue(msg.contains("JSON token type: string"));
+      assertTrue(msg.contains("JSON token: abc"));
+      assertTrue(msg.contains("Line: 2"));
+    } finally {
+      loader.close();
+    }
+  }
+
+  @Test
+  public void testFloat() {
+    String json =
+        "{a: 0.0} {a: 1.0} {a: 1.25} {a: -123.125} {a: null} " +
+        "{a: -Infinity} {a: NaN} {a: Infinity} " +
+        "{a: 0} {a: 12} " +
+        "{a: true} {a: false} " +
+        "{a: \"\"} {a: \"3.75\"}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allowNanInf = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.FLOAT8)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(0.0)      // 0.0
+        .addRow(1.0)      // 1.0
+        .addRow(1.25)     // 1.25
+        .addRow(-123.125) // -123.125
+        .addRow((Double) null)   // null
+        .addRow(Double.NEGATIVE_INFINITY) // -Inf
+        .addRow(Double.NaN) // Nan
+        .addRow(Double.POSITIVE_INFINITY) // Inf
+        .addRow(0.0)      // 0
+        .addRow(12.0)     // 12
+        .addRow(1.0)      // true
+        .addRow(0.0)      // false
+        .addRow((Double) null)   // ""
+        .addRow(3.75)     // "3.75"
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testAllTextFloat() {
+    String json =
+        "{a: 0.0} {a: 1.0} {a: 1.25} {a: -123.125} {a: null} " +
+        "{a: -Infinity} {a: NaN} {a: Infinity}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allTextMode = true;
+    loader.jsonOptions.allowNanInf = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("0.0")
+        .addRow("1.0")
+        .addRow("1.25")
+        .addRow("-123.125")
+        .addRow((String) null)
+        .addRow("-Infinity")
+        .addRow("NaN")
+        .addRow("Infinity")
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testFloatWithSchema() {
+    String json =
+        "{a: 0} {a: 12} " +
+        "{a: true} {a: false} " +
+        "{a: \"\"} {a: \"3.75\"} " +
+        "{a: 0.0} {a: 1.0} {a: 1.25} {a: -123.125} {a: null} " +
+        "{a: -Infinity} {a: NaN} {a: Infinity}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.FLOAT8)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.jsonOptions.allowNanInf = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(0.0)      // 0
+        .addRow(12.0)     // 12
+        .addRow(1.0)      // true
+        .addRow(0.0)      // false
+        .addRow((Double) null)   // ""
+        .addRow(3.75)     // "3.75"
+        .addRow(0.0)      // 0.0
+        .addRow(1.0)      // 1.0
+        .addRow(1.25)     // 1.25
+        .addRow(-123.125) // -123.125
+        .addRow((Double) null)   // null
+        .addRow(Double.NEGATIVE_INFINITY) // -Inf
+        .addRow(Double.NaN) // Nan
+        .addRow(Double.POSITIVE_INFINITY) // Inf
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testFloatWithError() {
+    String json =
+        "{a: 1.25}\n{a: \"abc\"}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    try {
+      loader.next();
+      fail();
+    } catch (UserException e) {
+      String msg = e.getMessage();
+      assertTrue(msg.contains("not compatible"));
+      assertTrue(msg.contains("Column type: DOUBLE"));
+    } finally {
+      loader.close();
+    }
+  }
+
+  @Test
+  public void testString() {
+    String json =
+        "{a: \"\"} {a: \"foo\"} {a: \" bar \"} {a: null} " +
+        "{a: 0} {a: 12} " +
+        "{a: true} {a: false} " +
+        "{a: 0.0} {a: 1.25}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("")       // ""
+        .addRow("foo")    // "foo"
+        .addRow(" bar ")  // " bar "
+        .addRow((String) null) // null
+        .addRow("0")      // 0
+        .addRow("12")     // 12
+        .addRow("true")   // true
+        .addRow("false")  // false
+        .addRow("0.0")    // 0.0
+        .addRow("1.25")   // 1.25
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testAllTextString() {
+    String json =
+        "{a: \"\"} {a: \"foo\"} {a: \" bar \"} {a: null}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.jsonOptions.allTextMode = true;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("")       // ""
+        .addRow("foo")    // "foo"
+        .addRow(" bar ")  // " bar "
+        .addRow((String) null) // null
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testStringWithSchema() {
+    String json =
+        "{a: 0} {a: 12} " +
+        "{a: true} {a: false} " +
+        "{a: 0.0} {a: 1.25} " +
+        "{a: \"\"} {a: \"foo\"} {a: \" bar \"} {a: null}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow("0")      // 0
+        .addRow("12")     // 12
+        .addRow("true")   // true
+        .addRow("false")  // false
+        .addRow("0.0")    // 0.0
+        .addRow("1.25")   // 1.25
+        .addRow("")       // ""
+        .addRow("foo")    // "foo"
+        .addRow(" bar ")  // " bar "
+        .addRow((String) null) // null
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testProjection() {
+    String json =
+        "{a: 10, b: true}\n" +
+        "{a: 20, b: [\"what?\"]}\n" +
+        "{a: 30, b: {c: \"oh, my!\"}}";
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.rsLoaderOptions.projection(
+        Projections.parse(RowSetTestUtils.projectList("a")));
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10)
+        .addRow(20)
+        .addRow(30)
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestUnknowns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestUnknowns.java
new file mode 100644
index 0000000..0ac5ec6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestUnknowns.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.boolArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+/**
+ * Tests the ability of the JSON reader to "wait out" a set of leading
+ * null or empty array values to wait until an actual value appears before
+ * deciding on the column type. Hitting the end of batch, or an array
+ * that contains only null values, forces resolution to VARCHAR.
+ */
+public class TestUnknowns extends BaseJsonLoaderTest {
+
+  @Test
+  public void testNullToBoolean() {
+    String json =
+        "{a: null} {a: true} {a: false} {a: true}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow((Boolean) null)
+        .addRow(true)
+        .addRow(false)
+        .addRow(true)
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testNullToString() {
+    String json =
+        "{a: null} {a: \"foo\"} {a: \"bar\"}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow((String) null)
+        .addRow("foo")
+        .addRow("bar")
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * Input contains all nulls. The loader will force resolve to a
+   * type, and will choose VARCHAR as all scalar types which
+   * may later appear can be converted to VARCHAR.
+   */
+  @Test
+  public void testForcedNullResolve() {
+    String json =
+        "{a: null} {a: null}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow((String) null)
+        .addRow((String) null)
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testArrayToBooleanArray() {
+    String json =
+        "{a: []} {a: [true, false]} {a: true}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.BIT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray(true, false))
+        .addSingleCol(boolArray(true))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testArrayToBooleanScalar() {
+    String json =
+        "{a: []} {a: true} {a: [true, false]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.BIT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray(true))
+        .addSingleCol(boolArray(true, false))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  /**
+   * Input contains all nulls. The loader will force resolve to a
+   * type, and will choose VARCHAR as all scalar types which
+   * may later appear can be converted to VARCHAR.
+   */
+  @Test
+  public void testForcedArrayResolve() {
+    String json =
+        "{a: []} {a: []}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(strArray())
+        .addSingleCol(strArray())
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testNullToArrayToBoolean() {
+    String json =
+        "{a: null} {a: []} {a: [true, false]} {a: true}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.BIT)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray())
+        .addSingleCol(boolArray(true, false))
+        .addSingleCol(boolArray(true))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testArrayToNull() {
+    String json =
+        "{a: []} {a: [null]} {a: [\"foo\"]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(strArray())
+        .addSingleCol(strArray(""))
+        .addSingleCol(strArray("foo"))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testArrayToObjectArray() {
+    String json =
+        "{a: 1, m: []}\n" +
+        "{a: 2, m: [{b: 10, c: 20}, {b: 11, c: 21}]}\n" +
+        "{a: 3, m: [{b: 110, c: 120}, {b: 111, c: 121}]}";
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addMapArray("m")
+          .addNullable("b", MinorType.BIGINT)
+          .addNullable("c", MinorType.BIGINT)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1L, mapArray())
+        .addRow(2L, mapArray(mapValue(10L, 20L), mapValue(11L, 21L)))
+        .addRow(3L, mapArray(mapValue(110L, 120L), mapValue(111L, 121L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestVariant.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestVariant.java
new file mode 100644
index 0000000..480aab3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/TestVariant.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+public class TestVariant extends BaseJsonLoaderTest {
+
+  @Test
+  public void testScalars() {
+    String json =
+        "{a: null} {a: true} {a: 10} {a: 10.5} {a: \"foo\"}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.UNION)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addSingleCol(null)
+        .addSingleCol(true)
+        .addSingleCol(10L)
+        .addSingleCol(10.5D)
+        .addSingleCol("foo")
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testMap() {
+    String json =
+        "{a: null} {a: 10}\n" +
+        "{a: {b: 10, c: \"foo\"}}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.UNION)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    // The RowSetBuilder can add scalar types to a union,
+    // but not structured types. So, we have to declare the
+    // structured types up front.
+    TupleMetadata expectedchema = new SchemaBuilder()
+        .addUnion("a")
+          .addType(MinorType.BIGINT)
+          .addMap()
+            .addNullable("b", MinorType.BIGINT)
+            .addNullable("c", MinorType.VARCHAR)
+            .resumeUnion()
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedchema)
+        .addSingleCol(null)
+        .addSingleCol(10L)
+        .addSingleCol(mapValue(10L, "foo"))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testScalarList() {
+    String json =
+        "{a: null} {a: []}\n" +
+        // All scalar types
+        "{a: [null, true, 10, 10.5, \"foo\"]}\n" +
+        // One more to ensure that indexes are synced
+        "{a: [false, 20]}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.LIST)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedchema = new SchemaBuilder()
+        .addList("a")
+          .addType(MinorType.BIT)
+          .addType(MinorType.BIGINT)
+          .addType(MinorType.FLOAT8)
+          .addType(MinorType.VARCHAR)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedchema)
+        .addSingleCol(null)
+        .addSingleCol(objArray())
+        .addSingleCol(objArray(null, true, 10L, 10.5D, "foo"))
+        .addSingleCol(objArray(false, 20L))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+
+  @Test
+  public void testObjectList() {
+    String json =
+        "{a: null} {a: []} {a: [null, 10]}\n" +
+        "{a: [{b:  10, c:  20}, {b: 110, c: 120}]}\n" +
+        "{a: [{b: 210, c: 220}, {b: 310, c: 320}]}";
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("a", MinorType.LIST)
+        .build();
+
+    JsonLoaderFixture loader = new JsonLoaderFixture();
+    loader.providedSchema = schema;
+    loader.open(json);
+    RowSet results = loader.next();
+    assertNotNull(results);
+
+    TupleMetadata expectedchema = new SchemaBuilder()
+        .addList("a")
+          .addType(MinorType.BIGINT)
+          .addMap()
+            .addNullable("b", MinorType.BIGINT)
+            .addNullable("c", MinorType.BIGINT)
+          .  resumeUnion()
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedchema)
+        .addSingleCol(null)
+        .addSingleCol(objArray())
+        .addSingleCol(objArray(null, 10L))
+        .addSingleCol(objArray(
+            objArray(10L, 20L), objArray(110L, 120L)))
+        .addSingleCol(objArray(
+            objArray(210L, 220L), objArray(310L, 320L)))
+        .build();
+    RowSetUtilities.verify(expected, results);
+    assertNull(loader.next());
+    loader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
index a3c3e08..c77807d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
@@ -141,7 +141,7 @@
     }
 
     @Override
-    public void onEmbedddObject(String value) {
+    public void onEmbeddedObject(String value) {
       this.value = value;
       valueCount++;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
index cf0ea11..8e287e4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -76,6 +76,13 @@
    * to construct BigDecimals of the desired precision.
    */
   private MathContext scale = new MathContext(3);
+  /**
+  * Floats and doubles do not compare exactly. This delta is used
+  * by JUnit for such comparisons. This is not a general solution;
+  * it assumes that tests won't create values that require more than
+  * three digits of precision.
+  */
+  private final double delta = 0.001;
 
   /**
    * Tests can skip the first n rows.
@@ -332,6 +339,16 @@
         assertEquals(label + " - byte lengths differ", expected.length, actual.length);
         assertTrue(label, Arrays.areEqual(expected, actual));
         break;
+
+      // Double must be handled specially since BigDecimal cannot handle
+      // INF or NAN double values.
+      case DOUBLE:
+        assertEquals(label, ec.getDouble(), ac.getDouble(), delta);
+        break;
+
+      // repeated_contains is claimed to return a boolean,
+      // actually returns a count, but in a bit field. To test
+      // this function, we must treat BIT as an integer.
       default:
         assertEquals(label, getScalar(ec), getScalar(ac));
     }
@@ -352,8 +369,7 @@
     }
   }
 
-  private void verifyArray(String label, ArrayReader ea,
-      ArrayReader aa) {
+  private void verifyArray(String label, ArrayReader ea, ArrayReader aa) {
     assertEquals(label + " - array element type", ea.entryType(), aa.entryType());
     assertEquals(label + " - array length", ea.size(), aa.size());
     int i = 0;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index f1c8935..684b202 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -215,6 +215,14 @@
     return array;
   }
 
+  public static boolean[] boolArray(Boolean... elements) {
+    boolean[] array = new boolean[elements.length];
+    for (int i = 0; i < elements.length; i++) {
+      array[i] = elements[i];
+    }
+    return array;
+  }
+
   public static String[] strArray(String... elements) {
     return elements;
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index aef21d6..181fba6 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -87,6 +87,8 @@
    */
   String EXCLUDE_FROM_WILDCARD = DRILL_PROP_PREFIX + "special";
 
+  int DEFAULT_ARRAY_SIZE = 10;
+
   /**
    * Rough characterization of Drill types into metadata categories.
    * Various aspects of Drill's type system are very, very messy.
@@ -95,27 +97,23 @@
    * the messy type system while staying close to the underlying
    * implementation.
    */
-
   enum StructureType {
 
     /**
      * Primitive column (all types except List, Map and Union.)
      * Includes (one-dimensional) arrays of those types.
      */
-
     PRIMITIVE,
 
     /**
      * Map or repeated map. Also describes the row as a whole.
      */
-
     TUPLE,
 
     /**
      * Union or (non-repeated) list. (A non-repeated list is,
      * essentially, a repeated union.)
      */
-
     VARIANT,
 
     /**
@@ -131,7 +129,6 @@
      * a separate category for 1D lists. But, again, that is not how
      * the code has evolved.
      */
-
     MULTI_ARRAY,
 
     /**
@@ -140,8 +137,6 @@
     DICT
   }
 
-  int DEFAULT_ARRAY_SIZE = 10;
-
   StructureType structureType();
 
   /**
@@ -149,7 +144,6 @@
    *
    * @return the tuple schema
    */
-
   TupleMetadata tupleSchema();
 
   /**
@@ -157,7 +151,6 @@
    *
    * @return the variant schema
    */
-
   VariantMetadata variantSchema();
 
   /**
@@ -173,7 +166,6 @@
    *
    * @return the description of the (n-1) st dimension.
    */
-
   ColumnMetadata childSchema();
   MaterializedField schema();
   MaterializedField emptySchema();
@@ -196,7 +188,6 @@
    *
    * @return true if the column is of type LIST of UNIONs
    */
-
   boolean isMultiList();
 
   /**
@@ -204,7 +195,6 @@
    * if they have the same name, type and structure (ignoring internal structure
    * such as offset vectors.)
    */
-
   boolean isEquivalent(ColumnMetadata other);
 
   /**
@@ -213,7 +203,6 @@
    *
    * @param width the expected column width
    */
-
   void setExpectedWidth(int width);
 
   /**
@@ -223,7 +212,6 @@
    * @return the expected column width of the each data value. Does not include
    * "overhead" space such as for the null-value vector or offset vector
    */
-
   int expectedWidth();
 
   /**
@@ -233,7 +221,6 @@
    * @param childCount the expected average array cardinality. Defaults to
    * 1 for non-array columns, 10 for array columns
    */
-
   void setExpectedElementCount(int childCount);
 
   /**
@@ -243,7 +230,6 @@
    * @return the expected value cardinality per value (per-row for top-level
    * columns, per array element for arrays within lists)
    */
-
   int expectedElementCount();
 
   void setFormat(String value);
@@ -294,7 +280,6 @@
    *
    * @return empty clone of this column
    */
-
   ColumnMetadata cloneEmpty();
 
   int precision();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index 49fc901..4c654a9 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -136,6 +136,10 @@
     return new MapColumnMetadata(name, DataMode.REQUIRED, (TupleSchema) schema);
   }
 
+  public static MapColumnMetadata newMap(String name) {
+    return newMap(name, new TupleSchema());
+  }
+
   public static DictColumnMetadata newDict(MaterializedField field) {
     return new DictColumnMetadata(field, fromFields(field.getChildren()));
   }
@@ -162,15 +166,15 @@
   }
 
   public static VariantColumnMetadata newVariant(MaterializedField field, VariantSchema schema) {
-    return new VariantColumnMetadata(field, schema);
+    return VariantColumnMetadata.unionOf(field, schema);
   }
 
   public static VariantColumnMetadata newVariant(String name, DataMode cardinality) {
     switch (cardinality) {
     case OPTIONAL:
-      return new VariantColumnMetadata(name, MinorType.UNION, new VariantSchema());
+      return VariantColumnMetadata.union(name);
     case REPEATED:
-      return new VariantColumnMetadata(name, MinorType.LIST, new VariantSchema());
+      return VariantColumnMetadata.list(name);
     default:
       throw new IllegalArgumentException();
     }
@@ -184,6 +188,10 @@
     return new MapColumnMetadata(name, DataMode.REPEATED, (TupleSchema) schema);
   }
 
+  public static ColumnMetadata newMapArray(String name) {
+    return newMapArray(name, new TupleSchema());
+  }
+
   public static DictColumnMetadata newDictArray(String name) {
     return new DictColumnMetadata(name, DataMode.REPEATED);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
index 895e170..a7d7f37 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
@@ -27,11 +27,11 @@
  * here. It would certainly be cleaner to have a single field, with the
  * number of dimensions as a property, but that is not how Drill evolved.
  * <p/>
- * Class can be created with and without parent container.
- * In the first case, column is added to the parent container during creation
- * and all <tt>resumeXXX</tt> methods return qualified parent container.
- * In the second case column is created without parent container as standalone entity.
- * All <tt>resumeXXX</tt> methods do not produce any action and return null.
+ * An instance can be created with and without parent container.
+ * In the first case, a column is added to the parent container during creation
+ * and all {@code resumeXXX} methods return qualified parent container.
+ * In the second case column is created without parent container as a stand-alone entity.
+ * The {@code resumeXXX} methods do not produce any action and return null.
  * To access built column {@link #buildColumn()} should be used.
  */
 public class RepeatedListBuilder implements SchemaContainer {
@@ -56,7 +56,6 @@
   public MapBuilder addMapArray() {
     // Existing code uses the repeated list name as the name of
     // the vector within the list.
-
     return new MapBuilder(this, name, DataMode.REPEATED);
   }
 
@@ -69,15 +68,17 @@
   public RepeatedListBuilder addArray(MinorType type) {
     // Existing code uses the repeated list name as the name of
     // the vector within the list.
-
-    addColumn(MetadataUtils.newScalar(name, type, DataMode.REPEATED));
+    if (type == MinorType.UNION) {
+      addColumn(VariantColumnMetadata.list(name));
+    } else {
+      addColumn(MetadataUtils.newScalar(name, type, DataMode.REPEATED));
+    }
     return this;
   }
 
   public RepeatedListBuilder addArray(MinorType type, int width) {
     // Existing code uses the repeated list name as the name of
     // the vector within the list.
-
     TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
         .setMinorType(type)
         .setMode(DataMode.REPEATED)
@@ -91,7 +92,6 @@
   public RepeatedListBuilder addArray(MinorType type, int precision, int scale) {
     // Existing code uses the repeated list name as the name of
     // the vector within the list.
-
     TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
         .setMinorType(type)
         .setMode(DataMode.REPEATED)
@@ -103,6 +103,10 @@
     return this;
   }
 
+  public UnionBuilder addList() {
+    return new UnionBuilder(this, name, MinorType.LIST);
+  }
+
   public RepeatedListColumnMetadata buildColumn() {
     return MetadataUtils.newRepeatedList(name, child);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java
index b573151..9abcd7d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java
@@ -53,7 +53,7 @@
 
   public void childSchema(ColumnMetadata childMetadata) {
     Preconditions.checkState(childSchema == null);
-    Preconditions.checkArgument(childMetadata.mode() == DataMode.REPEATED);
+    Preconditions.checkArgument(childMetadata.isArray());
     childSchema = childMetadata;
   }
 
@@ -95,7 +95,6 @@
 
     // If there is no child, then we don't know the
     // dimensionality.
-
     return childSchema == null ? UNKNOWN_DIMENSIONS
         : childSchema.dimensions() + 1;
   }
@@ -104,5 +103,4 @@
   public String typeString() {
     return "ARRAY<" + childSchema.typeString() + ">";
   }
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
index d09c810..f5c9771 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
@@ -89,7 +89,7 @@
   }
 
   public VariantColumnMetadata buildColumn() {
-    return new VariantColumnMetadata(name, type, union);
+    return VariantColumnMetadata.variantOf(name, type, union);
   }
 
   public void build() {
@@ -113,8 +113,18 @@
     return (UnionBuilder) parent;
   }
 
+  public RepeatedListBuilder resumeList() {
+    build();
+    return (RepeatedListBuilder) parent;
+  }
+
   public DictBuilder resumeDict() {
     build();
     return (DictBuilder) parent;
   }
+
+  public RepeatedListBuilder resumeRepeatedList() {
+    build();
+    return (RepeatedListBuilder) parent;
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantColumnMetadata.java
index c87e25b..c1ee98e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantColumnMetadata.java
@@ -81,12 +81,50 @@
     this.variantSchema = variantSchema;
   }
 
-  public VariantColumnMetadata(String name, MinorType type, VariantSchema variantSchema) {
-    super(name, type, DataMode.OPTIONAL);
-    this.variantSchema = variantSchema == null ? new VariantSchema() : variantSchema;
+  private VariantColumnMetadata(String name, MinorType type, DataMode mode,
+      VariantSchema variantSchema) {
+    super(name, type, mode);
+    this.variantSchema = variantSchema;
     this.variantSchema.bind(this);
   }
 
+  public static VariantColumnMetadata union(String name) {
+    return unionOf(name, null);
+  }
+
+  public static VariantColumnMetadata unionOf(String name, VariantSchema variantSchema) {
+    return new VariantColumnMetadata(name, MinorType.UNION, DataMode.OPTIONAL,
+        variantSchemaFor(variantSchema));
+  }
+
+  public static VariantColumnMetadata unionOf(MaterializedField schema, VariantSchema variantSchema) {
+    return new VariantColumnMetadata(schema, variantSchemaFor(variantSchema));
+  }
+
+  public static VariantColumnMetadata list(String name) {
+    return new VariantColumnMetadata(name, MinorType.LIST, DataMode.OPTIONAL, new VariantSchema());
+  }
+
+  public static VariantColumnMetadata listOf(String name, VariantSchema variantSchema) {
+    return new VariantColumnMetadata(name, MinorType.LIST, DataMode.OPTIONAL,
+        variantSchemaFor(variantSchema));
+  }
+
+  public static VariantColumnMetadata variantOf(String name, MinorType type, VariantSchema variantSchema) {
+    switch (type) {
+      case UNION:
+        return unionOf(name, variantSchema);
+      case LIST:
+        return listOf(name, variantSchema);
+      default:
+        throw new IllegalArgumentException(type.name());
+    }
+  }
+
+  private static VariantSchema variantSchemaFor(VariantSchema variantSchema) {
+    return variantSchema == null ? new VariantSchema() : variantSchema;
+  }
+
   @Override
   public StructureType structureType() {
     return StructureType.VARIANT;
@@ -96,11 +134,13 @@
   public boolean isVariant() { return true; }
 
   @Override
-  public boolean isArray() { return type() == MinorType.LIST; }
+  public boolean isArray() {
+    return super.isArray() || type() == MinorType.LIST;
+  }
 
   @Override
   public ColumnMetadata cloneEmpty() {
-    return new VariantColumnMetadata(name, type, variantSchema.cloneEmpty());
+    return new VariantColumnMetadata(name, type, mode, new VariantSchema());
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantSchema.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantSchema.java
index 179ff12..8261419 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantSchema.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/VariantSchema.java
@@ -42,7 +42,7 @@
     String name = Types.typeKey(type);
     switch (type) {
     case LIST:
-      return new VariantColumnMetadata(name, type, null);
+      return VariantColumnMetadata.list(name);
     case MAP:
       // Although maps do not have a bits vector, when used in a
       // union the map must be marked as optional since the union as a
@@ -175,8 +175,7 @@
     //
     // Make up a synthetic union column to be used when building
     // a reader.
-
-    return new VariantColumnMetadata("$data", MinorType.UNION, this);
+    return VariantColumnMetadata.unionOf("$data", this);
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
index a5b611d..4ec9772 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
@@ -67,6 +67,12 @@
   }
 
   /**
+   * Reports whether the given column is projected. Useful for
+   * clients that can simply skip over unprojected columns.
+   */
+  boolean isProjected(String columnName);
+
+  /**
    * Add a column to the tuple (row or map) that backs this writer. Support for
    * this operation depends on whether the client code has registered a listener
    * to implement the addition. Throws an exception if no listener is
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 4401b6c..9cb674e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -101,13 +101,11 @@
  *     then that row's values are discarded. Then, the batch is ended.</li>
  * </ul>
  */
-
 public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
 
   /**
    * Generic object wrapper for the tuple writer.
    */
-
   public static class TupleObjectWriter extends AbstractObjectWriter {
 
     protected final AbstractTupleWriter tupleWriter;
@@ -141,12 +139,13 @@
    * tuple writer. If no listener is bound, then an attempt to add a column
    * throws an exception.
    */
-
   public static interface TupleWriterListener {
 
     ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column);
 
     ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
+
+    boolean isProjected(String columnName);
   }
 
   /**
@@ -154,7 +153,6 @@
    * on the call to <tt>nextElement().</tt> The increment
    * is done at the tuple level, not the column level.
    */
-
   static class MemberWriterIndex implements ColumnWriterIndex {
     private final ColumnWriterIndex baseIndex;
 
@@ -184,7 +182,7 @@
   protected final List<AbstractObjectWriter> writers;
   protected ColumnWriterIndex vectorIndex;
   protected ColumnWriterIndex childIndex;
-  protected AbstractTupleWriter.TupleWriterListener listener;
+  protected TupleWriterListener listener;
   protected State state = State.IDLE;
 
   protected AbstractTupleWriter(TupleMetadata schema, List<AbstractObjectWriter> writers) {
@@ -225,7 +223,6 @@
    *
    * @param colWriter the column writer to add
    */
-
   public int addColumnWriter(AbstractObjectWriter colWriter) {
     assert writers.size() == tupleSchema.size();
     final int colIndex = tupleSchema.addColumn(colWriter.schema());
@@ -241,6 +238,12 @@
   }
 
   @Override
+  public boolean isProjected(String columnName) {
+    return listener == null ? true
+        : listener.isProjected(columnName);
+  }
+
+  @Override
   public int addColumn(ColumnMetadata column) {
     verifyAddColumn(column.name());
     return addColumnWriter(
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
index 30ab11e..c7a4995 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
@@ -346,6 +346,8 @@
   public void setObject(Object value) {
     if (value == null) {
       setNull();
+    } else if (value instanceof Boolean) {
+      scalar(MinorType.BIT).setBoolean((Boolean) value);
     } else if (value instanceof Integer) {
       scalar(MinorType.INT).setInt((Integer) value);
     } else if (value instanceof Long) {