DRILL-7359: Add support for DICT type in RowSet Framework

closes #1870
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index c0638b5..f80bd80 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -813,7 +813,7 @@
       default:
         if (componentType.isStruct()) {
           // for the case when nested type is struct, it should be placed into repeated map
-          return MetadataUtils.newMapArray(name, childColumnMetadata.mapSchema());
+          return MetadataUtils.newMapArray(name, childColumnMetadata.tupleSchema());
         } else {
           // otherwise creates column metadata with repeated data mode
           return new PrimitiveColumnMetadata(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index 1bbff77..8975a0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -27,6 +27,7 @@
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.complex.DictVector;
 
 /**
  * Perform a schema projection for the case of an explicit list of
@@ -78,6 +79,18 @@
     }
   }
 
+  private void resolveDictValueColumn(ResolvedTuple outputTuple,
+      RequestedColumn inputCol, TupleMetadata readerSchema) {
+    int tableColIndex = readerSchema.index(DictVector.FIELD_VALUE_NAME);
+    if (tableColIndex == -1) {
+      resolveNullColumn(outputTuple, inputCol);
+    } else {
+      resolveTableColumn(outputTuple, inputCol,
+          readerSchema.metadata(tableColIndex),
+          tableColIndex);
+    }
+  }
+
   private void resolveTableColumn(ResolvedTuple outputTuple,
       RequestedColumn requestedCol,
       ColumnMetadata column, int sourceIndex) {
@@ -88,7 +101,11 @@
     // that x is a map.
 
     if (requestedCol.isTuple()) {
-      resolveMap(outputTuple, requestedCol, column, sourceIndex);
+      if (column.isDict()) {
+        resolveDict(outputTuple, requestedCol, column, sourceIndex);
+      } else {
+        resolveMap(outputTuple, requestedCol, column, sourceIndex);
+      }
     }
 
     // Is the requested column implied to be an array?
@@ -132,7 +149,7 @@
     ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple,
         column.schema(), sourceIndex);
     resolveTuple(mapCol.members(), requestedCol.mapProjection(),
-        column.mapSchema());
+        column.tupleSchema());
 
     // If the projection is simple, then just project the map column
     // as is. A projection is simple if all map columns from the table
@@ -158,6 +175,35 @@
     }
   }
 
+  private void resolveDict(ResolvedTuple outputTuple,
+                          RequestedColumn requestedCol, ColumnMetadata column,
+                          int sourceIndex) {
+
+    // If the actual column isn't a dict, then the request is invalid.
+
+    if (!column.isDict()) {
+      throw UserException
+          .validationError()
+          .message("Project list implies a dict column, but actual column is not a dict")
+          .addContext("Projected column:", requestedCol.fullName())
+          .addContext("Table column:", column.name())
+          .addContext("Type:", column.type().name())
+          .addContext(scanProj.context())
+          .build(logger);
+    }
+
+    ResolvedDictColumn dictColumn = new ResolvedDictColumn(outputTuple, column.schema(), sourceIndex);
+    resolveDictTuple(dictColumn.members(), requestedCol.mapProjection(), column.tupleSchema());
+
+    // The same as for Map
+    if (dictColumn.members().isSimpleProjection()) {
+      outputTuple.removeChild(dictColumn.members());
+      projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+    } else {
+      outputTuple.add(dictColumn);
+    }
+  }
+
   private void resolveTuple(ResolvedTuple mapTuple,
       RequestedTuple requestedTuple, TupleMetadata mapSchema) {
     for (RequestedColumn col : requestedTuple.projections()) {
@@ -165,6 +211,13 @@
     }
   }
 
+  private void resolveDictTuple(ResolvedTuple mapTuple,
+      RequestedTuple requestedTuple, TupleMetadata mapSchema) {
+    for (RequestedColumn col : requestedTuple.projections()) {
+      resolveDictValueColumn(mapTuple, col, mapSchema);
+    }
+  }
+
   private void resolveArray(ResolvedTuple outputTuple,
       RequestedColumn requestedCol, ColumnMetadata column,
       int sourceIndex) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
index 636e981..41c575b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
@@ -95,7 +95,7 @@
     if (outputSchema != null) {
       ColumnMetadata colSchema = outputSchema.metadata(mapName);
       if (colSchema != null) {
-        builder.setOutputSchema(colSchema.mapSchema());
+        builder.setOutputSchema(colSchema.tupleSchema());
       }
     }
     return builder.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedDictColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedDictColumn.java
new file mode 100644
index 0000000..cc75117
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedDictColumn.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedDict;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedSingleDict;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedDictArray;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class ResolvedDictColumn extends ResolvedColumn {
+
+  private final MaterializedField schema;
+  private final ResolvedTuple parent;
+  private final ResolvedDict members;
+
+  public ResolvedDictColumn(ResolvedTuple parent, String name) {
+    super(parent, -1);
+    schema = MaterializedField.create(name,
+        Types.required(MinorType.DICT));
+    this.parent = parent;
+    members = new ResolvedSingleDict(this);
+    parent.addChild(members);
+  }
+
+  public ResolvedDictColumn(ResolvedTuple parent,
+                           MaterializedField schema, int sourceIndex) {
+    super(parent, sourceIndex);
+    this.schema = schema;
+    this.parent = parent;
+
+    assert schema.getType().getMinorType() == MinorType.DICT;
+    if (schema.getType().getMode() == DataMode.REPEATED) {
+      members = new ResolvedDictArray(this);
+    } else {
+      members = new ResolvedSingleDict(this);
+    }
+    parent.addChild(members);
+  }
+
+  public ResolvedTuple parent() {
+    return parent;
+  }
+
+  @Override
+  public String name() {
+    return schema.getName();
+  }
+
+  public ResolvedTuple members() {
+    return members;
+  }
+
+  @Override
+  public void project(ResolvedTuple dest) {
+    dest.addVector(members.buildVector());
+  }
+
+  @Override
+  public MaterializedField schema() {
+    return schema;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
index 8628873..5b8294c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
@@ -28,7 +28,9 @@
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
@@ -293,6 +295,136 @@
     }
   }
 
+  public static abstract class ResolvedDict extends ResolvedTuple {
+
+    protected final ResolvedDictColumn parentColumn;
+
+    public ResolvedDict(ResolvedDictColumn parentColumn) {
+      super(parentColumn.parent().nullBuilder == null
+          ? null : parentColumn.parent().nullBuilder.newChild(parentColumn.name()));
+      this.parentColumn = parentColumn;
+    }
+
+    public abstract ValueVector buildVector();
+
+    @Override
+    public String name() {
+      return parentColumn.name();
+    }
+  }
+
+  public static class ResolvedSingleDict extends ResolvedDict {
+
+    protected DictVector inputVector;
+    protected DictVector outputVector;
+
+    public ResolvedSingleDict(ResolvedDictColumn parentColumn) {
+      super(parentColumn);
+    }
+
+    @Override
+    public void addVector(ValueVector vector) {
+      outputVector.putChild(vector.getField().getName(), vector);
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      assert inputVector != null;
+      return inputVector.getChildByOrdinal(index);
+    }
+
+    @Override
+    public ValueVector buildVector() {
+      if (parentColumn.sourceIndex() != -1) {
+        ResolvedTuple parentTuple = parentColumn.parent();
+        inputVector = (DictVector) parentTuple.vector(parentColumn.sourceIndex());
+      }
+      MaterializedField colSchema = parentColumn.schema();
+      MaterializedField dictField = MaterializedField.create(colSchema.getName(), colSchema.getType());
+      outputVector = new DictVector(dictField, parentColumn.parent().allocator(), null);
+      buildColumns();
+      return outputVector;
+    }
+
+    @Override
+    public BufferAllocator allocator() {
+      return outputVector.getAllocator();
+    }
+
+    @Override
+    public String name() {
+      return parentColumn.name();
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      outputVector.getMutator().setValueCount(rowCount);
+      cascadeRowCount(rowCount);
+    }
+
+    @Override
+    public int innerCardinality(int outerCardinality) {
+      return outerCardinality;
+    }
+  }
+
+  public static class ResolvedDictArray extends ResolvedDict {
+
+    protected RepeatedDictVector inputVector;
+    protected RepeatedDictVector outputVector;
+
+    private int valueCount;
+
+    public ResolvedDictArray(ResolvedDictColumn parentColumn) {
+      super(parentColumn);
+    }
+
+    @Override
+    public void addVector(ValueVector vector) {
+      ((DictVector) outputVector.getDataVector()).putChild(vector.getField().getName(), vector);
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      assert inputVector != null;
+      return ((DictVector) inputVector.getDataVector()).getChildByOrdinal(index);
+    }
+
+    @Override
+    public RepeatedDictVector buildVector() {
+      if (parentColumn.sourceIndex() != -1) {
+        ResolvedTuple parentTuple = parentColumn.parent();
+        inputVector = (RepeatedDictVector) parentTuple.vector(parentColumn.sourceIndex());
+      }
+      MaterializedField colSchema = parentColumn.schema();
+      MaterializedField dictField = MaterializedField.create(colSchema.getName(), colSchema.getType());
+      outputVector = new RepeatedDictVector(dictField, parentColumn.parent().allocator(), null);
+      valueCount = inputVector.getOffsetVector().getAccessor().getValueCount();
+      buildColumns();
+      return outputVector;
+    }
+
+    @Override
+    public BufferAllocator allocator() {
+      return outputVector.getAllocator();
+    }
+
+    @Override
+    public String name() {
+      return parentColumn.name();
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      cascadeRowCount(valueCount);
+    }
+
+    @Override
+    public int innerCardinality(int outerCardinality) {
+      return valueCount;
+    }
+  }
+
   protected final List<ResolvedColumn> members = new ArrayList<>();
   protected final NullColumnBuilder nullBuilder;
   protected List<ResolvedTuple> children;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
index be3f3f6..ec12e0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
@@ -73,7 +73,7 @@
   }
 
   protected TypeConverter childConverter(ColumnMetadata outputSchema) {
-    TupleMetadata childSchema = outputSchema == null ? null : outputSchema.mapSchema();
+    TupleMetadata childSchema = outputSchema == null ? null : outputSchema.tupleSchema();
     return typeConverter == null ? null :
       typeConverter.childConverter(childSchema);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
index 016cc63..7fdec52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
@@ -36,6 +36,11 @@
   }
 
   @Override
+  public ColumnReadProjection readDictProjection(ColumnMetadata col) {
+    return readProjection(col);
+  }
+
+  @Override
   public void setErrorContext(CustomErrorContext errorContext) { }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
index cfa82dd..daf2f1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
@@ -21,11 +21,13 @@
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
 import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumnImpl;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.TupleProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,16 +53,20 @@
     if (reqCol == null) {
       return new UnprojectedReadColumn(col);
     }
+
+    return getReadProjection(col, reqCol);
+  }
+
+  private ColumnReadProjection getReadProjection(ColumnMetadata col, RequestedColumn reqCol) {
     ColumnMetadata outputSchema = outputSchema(col);
     validateProjection(reqCol, outputSchema == null ? col : outputSchema);
-    if (!col.isMap()) {
+    if (!col.isMap() && !col.isDict()) {
 
       // Non-map column.
 
       ColumnConversionFactory conv = conversion(col, outputSchema);
       return new ProjectedReadColumn(col, reqCol, outputSchema, conv);
-    }
-    else {
+    } else {
 
       // Maps are tuples. Create a tuple projection and wrap it in
       // a column projection.
@@ -84,7 +90,11 @@
 
         mapProjection = new ExplicitProjectionSet(reqCol.mapProjection(), childConverter);
       }
-      return new ProjectedMapColumn(col, reqCol, outputSchema, mapProjection);
+      if (col.isMap()) {
+        return new ProjectedMapColumn(col, reqCol, outputSchema, mapProjection);
+      } else {
+        return new ProjectedDictColumn(col, reqCol, outputSchema, mapProjection);
+      }
     }
   }
 
@@ -110,5 +120,21 @@
   }
 
   @Override
+  public ColumnReadProjection readDictProjection(ColumnMetadata col) {
+    // Unlike for a MAP, requestedProj contains a key value, rather than nested field's name:
+    // create DICT's members somewhat artificially
+
+    assert DictVector.fieldNames.contains(col.name());
+    if (col.name().equals(DictVector.FIELD_KEY_NAME)) {
+      // This field is considered not projected but its
+      // vector and writer will be instantiated later.
+      return new UnprojectedReadColumn(col);
+    }
+
+    RequestedColumn reqCol = new RequestedColumnImpl(requestedProj, col.name()); // this is the 'value' column
+    return getReadProjection(col, reqCol);
+  }
+
+  @Override
   public boolean isEmpty() { return requestedProj.projections().isEmpty(); }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java
new file mode 100644
index 0000000..9f4eecf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.exec.physical.resultSet.ProjectionSet;
+import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
+import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+
+public class ProjectedDictColumn extends ProjectedReadColumn {
+
+  private final ProjectionSet tupleProjection;
+
+  public ProjectedDictColumn(ColumnMetadata readSchema,
+                            RequestedTuple.RequestedColumn requestedCol, ColumnMetadata outputSchema,
+                            ProjectionSet tupleProjection) {
+    super(readSchema, requestedCol, outputSchema, null);
+    this.tupleProjection = tupleProjection;
+  }
+
+  @Override
+  public ProjectionSet mapProjection() {
+    return tupleProjection;
+  }
+
+  @Override
+  public ProjectionType projectionType() {
+    return super.projectionType().isArray() ? ProjectionType.DICT_ARRAY : ProjectionType.ARRAY;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
index 640a371..1da5a2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
@@ -43,7 +43,7 @@
     } else if (isSpecial(outputSchema)) {
       return new UnprojectedReadColumn(col);
     }
-    if (col.isMap()) {
+    if (col.isMap() || col.isDict()) {
       return new ProjectedMapColumn(col, null, outputSchema,
           new WildcardProjectionSet(childConverter(outputSchema), isStrict));
 
@@ -53,6 +53,11 @@
     }
   }
 
+  @Override
+  public ColumnReadProjection readDictProjection(ColumnMetadata col) {
+    return readProjection(col);
+  }
+
   // Wildcard means use whatever schema is provided by the reader,
   // so the projection itself is non-empty even if the reader has no
   // columns.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
index 298c0e6..208defd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
@@ -102,5 +102,6 @@
 
   void setErrorContext(CustomErrorContext errorContext);
   ColumnReadProjection readProjection(ColumnMetadata col);
+  ColumnReadProjection readDictProjection(ColumnMetadata col);
   boolean isEmpty();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
index bf1256d..54d3c0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
@@ -177,6 +177,8 @@
       return buildSingleList(parent, colSchema);
     } else if (colSchema.isVariant()) {
       return buildVariant(parent, colSchema);
+    } else if (colSchema.isDict()) {
+      return buildDict(parent, colSchema);
     } else {
       return buildPrimitive(parent, colSchema);
     }
@@ -208,9 +210,9 @@
 
   private void expandMap(ObjectWriter colWriter, ColumnMetadata colSchema) {
     if (colSchema.isArray()) {
-      buildTuple(colWriter.array().tuple(), colSchema.mapSchema());
+      buildTuple(colWriter.array().tuple(), colSchema.tupleSchema());
     } else {
-      buildTuple(colWriter.tuple(), colSchema.mapSchema());
+      buildTuple(colWriter.tuple(), colSchema.tupleSchema());
     }
   }
 
@@ -280,7 +282,7 @@
 
   /**
    * We've just built a writer for column. If the column is structured
-   * (AKA "complex", meaning a map or list or array), then we need to
+   * (AKA "complex", meaning a map, list, array or dict), then we need to
    * build writer for the components of the column. We do that recursively
    * here.
    *
@@ -300,8 +302,24 @@
       assert false;
     } else if (colSchema.isVariant()) {
       expandVariant(colWriter, colSchema);
+    } else if (colSchema.isDict()) {
+      expandDict(colWriter, colSchema);
     // } else {
       // Nothing to expand for primitives
     }
   }
+
+  private ObjectWriter buildDict(ParentShim parent, ColumnMetadata colSchema) {
+    final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
+    expandDict(colWriter, colSchema);
+    return colWriter;
+  }
+
+  private void expandDict(ObjectWriter colWriter, ColumnMetadata colSchema) {
+    if (colSchema.isArray()) {
+      buildTuple(colWriter.array().dict().tuple(), colSchema.tupleSchema());
+    } else {
+      buildTuple(colWriter.dict().tuple(), colSchema.tupleSchema());
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
index 57b07fa..3f8abce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
@@ -50,11 +50,14 @@
 import org.apache.drill.exec.vector.accessor.writer.EmptyListShim;
 import org.apache.drill.exec.vector.accessor.writer.ListWriterImpl;
 import org.apache.drill.exec.vector.accessor.writer.MapWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
 import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter;
 import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl;
 import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.VariantObjectWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.exec.vector.complex.ListVector;
 import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -99,8 +102,15 @@
    */
   public ColumnState buildColumn(ContainerState parent, ColumnMetadata columnSchema) {
 
-    ColumnReadProjection colProj = parent.projectionSet().readProjection(columnSchema);
+    ColumnReadProjection colProj;
+    if (parent instanceof TupleState.DictState) {
+      colProj = parent.projectionSet().readDictProjection(columnSchema);
+    } else {
+      colProj = parent.projectionSet().readProjection(columnSchema);
+    }
     switch (colProj.providedSchema().structureType()) {
+    case DICT:
+      return buildDict(parent, colProj);
     case TUPLE:
       return buildMap(parent, colProj);
     case VARIANT:
@@ -134,7 +144,7 @@
     ColumnMetadata columnSchema = colProj.providedSchema();
 
     ValueVector vector;
-    if (!colProj.isProjected()) {
+    if (!colProj.isProjected() && !allowCreation(parent)) {
 
       // Column is not projected. No materialized backing for the column.
 
@@ -180,6 +190,21 @@
   }
 
   /**
+   * Check if this is a special case when vector, writer and column state should be
+   * created for a primitive field though the field itself is not projected. This is
+   * needed in case when {@code DICT}'s {@code value} is accessed by key, because
+   * {@code DICT}'s {@code keys} field is not projected but is needed to be initialized
+   * to ensure the dict vector is constructed properly ({@code DICT} should have both
+   * {@code keys} and {@code values} vectors as they are paired).
+   *
+   * @param parent container containing the primitive
+   * @return {@code true} if the parent is {@code DICT} and its {@code value} is accessed by key
+   */
+  private boolean allowCreation(ContainerState parent) {
+    return parent instanceof TupleState.DictState && !parent.projectionSet().isEmpty();
+  }
+
+  /**
    * Build a new map (single or repeated) column. Except for maps nested inside
    * of unions, no map vector is created
    * here, instead we create a tuple state to hold the columns, and defer the
@@ -198,7 +223,7 @@
     // calls.
 
     assert columnSchema.isMap();
-    assert columnSchema.mapSchema().size() == 0;
+    assert columnSchema.tupleSchema().isEmpty();
 
     // Create the vector, vector state and writer.
 
@@ -223,7 +248,7 @@
       // have content that varies from batch to batch. Only the leaf
       // vectors can be cached.
 
-      assert columnSchema.mapSchema().isEmpty();
+      assert columnSchema.tupleSchema().isEmpty();
       vector = new MapVector(columnSchema.schema(), parent.loader().allocator(), null);
       vectorState = new MapVectorState(vector, new NullVectorState());
     }
@@ -256,7 +281,7 @@
       // have content that varies from batch to batch. Only the leaf
       // vectors can be cached.
 
-      assert columnSchema.mapSchema().isEmpty();
+      assert columnSchema.tupleSchema().isEmpty();
       mapVector = new RepeatedMapVector(mapColSchema.schema(),
           parent.loader().allocator(), null);
       offsetVector = mapVector.getOffsetVector();
@@ -534,4 +559,138 @@
     return new RepeatedListColumnState(parent.loader(),
         arrayWriter, vectorState, listState);
   }
+
+  private ColumnState buildDict(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
+
+    // When dynamically adding columns, must add the (empty)
+    // dict by itself, then add columns to the dict via separate
+    // calls (the same way as is done for MAP).
+
+    assert columnSchema.isDict();
+    assert columnSchema.tupleSchema().isEmpty();
+
+    // Create the vector, vector state and writer.
+
+    if (columnSchema.isArray()) {
+      return buildDictArray(parent, colProj);
+    } else {
+      return buildSingleDict(parent, colProj);
+    }
+  }
+
+  private ColumnState buildDictArray(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
+
+    // Create the dict's offset vector.
+
+    RepeatedDictVector repeatedDictVector;
+    UInt4Vector offsetVector;
+    if (!colProj.isProjected()) {
+      repeatedDictVector = null;
+      offsetVector = null;
+    } else {
+
+      // Creating the dict vector will create its contained vectors if we
+      // give it a materialized field with children. So, instead pass a clone
+      // without children so we can add them.
+
+      final ColumnMetadata dictColMetadata = columnSchema.cloneEmpty();
+
+      // Don't get the dict vector from the vector cache. Dict vectors may
+      // have content that varies from batch to batch. Only the leaf
+      // vectors can be cached.
+
+      assert columnSchema.tupleSchema().isEmpty();
+      repeatedDictVector = new RepeatedDictVector(dictColMetadata.schema(),
+          parent.loader().allocator(), null);
+      offsetVector = repeatedDictVector.getOffsetVector();
+    }
+
+    // Create the writer using the offset vector
+
+    final AbstractObjectWriter writer = ObjectDictWriter.buildDictArray(
+        columnSchema, repeatedDictVector, new ArrayList<>());
+
+    // Wrap the offset vector in a vector state
+
+    VectorState offsetVectorState;
+    VectorState dictOffsetVectorState;
+    if (!colProj.isProjected()) {
+      offsetVectorState = new NullVectorState();
+      dictOffsetVectorState = new NullVectorState();
+    } else {
+      AbstractArrayWriter arrayWriter = (AbstractArrayWriter) writer.array();
+      offsetVectorState = new OffsetVectorState(
+          arrayWriter.offsetWriter(),
+          offsetVector,
+          writer.array().entry().events());
+      dictOffsetVectorState = new OffsetVectorState(
+          ((AbstractArrayWriter) arrayWriter.array()).offsetWriter(),
+          ((DictVector) repeatedDictVector.getDataVector()).getOffsetVector(),
+          writer.array().entry().dict().entry().events());
+    }
+    final VectorState mapVectorState =
+        new TupleState.DictArrayVectorState(repeatedDictVector, offsetVectorState, dictOffsetVectorState);
+
+    // Assemble it all into the column state.
+
+    final TupleState.DictArrayState dictArrayState = new TupleState.DictArrayState(parent.loader(),
+        parent.vectorCache().childCache(columnSchema.name()),
+        colProj.mapProjection());
+    return new TupleState.DictColumnState(
+        dictArrayState, writer, mapVectorState, parent.isVersioned());
+  }
+
+  private ColumnState buildSingleDict(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
+
+    // Create the dict's offset vector.
+
+    DictVector dictVector;
+    UInt4Vector offsetVector;
+    if (!colProj.isProjected()) {
+      dictVector = null;
+      offsetVector = null;
+    } else {
+
+      // Creating the dict vector will create its contained vectors if we
+      // give it a materialized field with children. So, instead pass a clone
+      // without children so we can add them.
+
+      final ColumnMetadata dictColMetadata = columnSchema.cloneEmpty();
+
+      // Don't get the dict vector from the vector cache. Dict vectors may
+      // have content that varies from batch to batch. Only the leaf
+      // vectors can be cached.
+
+      assert columnSchema.tupleSchema().isEmpty();
+      dictVector = new DictVector(dictColMetadata.schema(), parent.loader().allocator(), null);
+      offsetVector = dictVector.getOffsetVector();
+    }
+
+    // Create the writer using the offset vector
+
+    final AbstractObjectWriter writer = ObjectDictWriter.buildDict(columnSchema, dictVector, new ArrayList<>());
+
+    // Wrap the offset vector in a vector state
+
+    VectorState offsetVectorState;
+    if (!colProj.isProjected()) {
+      offsetVectorState = new NullVectorState();
+    } else {
+      offsetVectorState = new OffsetVectorState(
+          (((AbstractArrayWriter) writer.dict()).offsetWriter()),
+          offsetVector,
+          writer.dict().entry().events());
+    }
+    final VectorState mapVectorState = new TupleState.SingleDictVectorState(dictVector, offsetVectorState);
+
+    // Assemble it all into the column state.
+
+    final TupleState.SingleDictState dictArrayState = new TupleState.SingleDictState(parent.loader(), parent.vectorCache().childCache(columnSchema.name()),
+        colProj.mapProjection());
+    return new TupleState.DictColumnState(
+        dictArrayState, writer, mapVectorState, parent.isVersioned());
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
index 9d8f798..b2b4d4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
@@ -38,6 +38,8 @@
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 
 /**
  * Represents the loader state for a tuple: a row or a map. This is "state" in
@@ -142,7 +144,7 @@
       } else {
         outputSchema = schema();
       }
-      mapState.bindOutputSchema(outputSchema.mapSchema());
+      mapState.bindOutputSchema(outputSchema.tupleSchema());
     }
 
     public MapState mapState() { return mapState; }
@@ -230,7 +232,9 @@
 
     @Override
     public void dump(HierarchicalFormatter format) {
-      // TODO
+      format.startObject(this)
+          .attribute("field", mapVector != null ? mapVector.getField() : "null")
+          .endObject();
     }
   }
 
@@ -539,7 +543,7 @@
         prevHarvestIndex = i;
       }
 
-      // If the column is a map, then we have to recurse into the map
+      // If the column is a map or a dict, then we have to recurse into it
       // itself. If the map is inside a union, then the map's vectors
       // already appear in the map vector, but we still must update the
       // output schema.
@@ -547,6 +551,9 @@
       if (colState.schema().isMap()) {
         final MapState childMap = ((MapColumnState) colState).mapState();
         childMap.updateOutput(curSchemaVersion);
+      } else if (colState.schema().isDict()) {
+        final DictState child = ((DictColumnState) colState).dictState();
+        child.updateOutput(curSchemaVersion);
       }
     }
   }
@@ -567,4 +574,224 @@
       .endArray()
       .endObject();
   }
+
+  public static class DictColumnState extends BaseContainerColumnState {
+    protected final DictState dictState;
+    protected boolean isVersioned;
+    protected final ColumnMetadata outputSchema;
+
+    public DictColumnState(DictState dictState,
+                          AbstractObjectWriter writer,
+                          VectorState vectorState,
+                          boolean isVersioned) {
+      super(dictState.loader(), writer, vectorState);
+      this.dictState = dictState;
+      dictState.bindColumnState(this);
+      this.isVersioned = isVersioned;
+      if (isVersioned) {
+        outputSchema = schema().cloneEmpty();
+      } else {
+        outputSchema = schema();
+      }
+      dictState.bindOutputSchema(outputSchema.tupleSchema());
+    }
+
+    @Override
+    public void buildOutput(TupleState tupleState) {
+      outputIndex = tupleState.addOutputColumn(vector(), outputSchema());
+    }
+
+    public DictState dictState() {
+      return dictState;
+    }
+
+    @Override
+    public ContainerState container() {
+      return dictState;
+    }
+
+    @Override
+    public boolean isProjected() {
+      return dictState.hasProjections();
+    }
+
+    public boolean isVersioned() {
+      return isVersioned;
+    }
+
+    @Override
+    public ColumnMetadata outputSchema() { return outputSchema; }
+  }
+
+  public static abstract class DictState extends MapState {
+
+    public DictState(LoaderInternals events,
+                    ResultVectorCache vectorCache,
+                    ProjectionSet projectionSet) {
+      super(events, vectorCache, projectionSet);
+    }
+
+    public void bindColumnState(ColumnState colState) {
+      super.bindColumnState(colState);
+      writer().bindListener(this);
+    }
+
+    @Override
+    protected boolean isVersioned() {
+      return ((DictColumnState) parentColumn).isVersioned();
+    }
+
+    @Override
+    public void dump(HierarchicalFormatter format) {
+      format.startObject(this)
+          .attribute("column", parentColumn.schema().name())
+          .attribute("cardinality", innerCardinality())
+          .endObject();
+    }
+  }
+
+  public static class SingleDictState extends DictState {
+
+    public SingleDictState(LoaderInternals events,
+                          ResultVectorCache vectorCache,
+                          ProjectionSet projectionSet) {
+      super(events, vectorCache, projectionSet);
+    }
+
+    @Override
+    public AbstractTupleWriter writer() {
+      return (AbstractTupleWriter) parentColumn.writer().dict().tuple();
+    }
+  }
+
+  public static class DictArrayState extends DictState {
+
+    public DictArrayState(LoaderInternals events,
+                         ResultVectorCache vectorCache,
+                         ProjectionSet projectionSet) {
+      super(events, vectorCache, projectionSet);
+    }
+
+    @Override
+    public int addOutputColumn(ValueVector vector, ColumnMetadata colSchema) {
+      final RepeatedDictVector repeatedDictVector = parentColumn.vector();
+      DictVector dictVector = (DictVector) repeatedDictVector.getDataVector();
+      if (isVersioned()) {
+        dictVector.putChild(colSchema.name(), vector);
+      }
+      final int index = outputSchema.addColumn(colSchema);
+      assert dictVector.size() == outputSchema.size();
+      assert dictVector.getField().getChildren().size() == outputSchema.size();
+      return index;
+    }
+
+    @Override
+    public AbstractTupleWriter writer() {
+      return (AbstractTupleWriter) parentColumn.writer().array().dict().tuple();
+    }
+  }
+
+  public static abstract class DictVectorState<T extends ValueVector> implements VectorState {
+
+    protected final T vector;
+    protected final VectorState offsets;
+
+    public DictVectorState(T vector, VectorState offsets) {
+      this.vector = vector;
+      this.offsets = offsets;
+    }
+
+    @Override
+    public T vector() {
+      return vector;
+    }
+
+    @Override
+    public int allocate(int cardinality) {
+      return offsets.allocate(cardinality);
+    }
+
+    @Override
+    public void rollover(int cardinality) {
+      offsets.rollover(cardinality);
+    }
+
+    @Override
+    public void harvestWithLookAhead() {
+      offsets.harvestWithLookAhead();
+    }
+
+    @Override
+    public void startBatchWithLookAhead() {
+      offsets.harvestWithLookAhead();
+    }
+
+    @Override
+    public void close() {
+      offsets.close();
+    }
+
+    public VectorState offsetVectorState() {
+      return offsets;
+    }
+
+    @Override
+    public boolean isProjected() {
+      return offsets.isProjected();
+    }
+
+    @Override
+    public void dump(HierarchicalFormatter format) {
+      format.startObject(this)
+          .attribute("field", vector != null ? vector.getField() : "null")
+          .endObject();
+    }
+  }
+
+  public static class SingleDictVectorState extends DictVectorState<DictVector> {
+
+    public SingleDictVectorState(DictVector vector, VectorState offsets) {
+      super(vector, offsets);
+    }
+  }
+
+  public static class DictArrayVectorState extends DictVectorState<RepeatedDictVector> {
+
+    // offsets for the data vector
+    private final VectorState dictOffsets;
+
+    public DictArrayVectorState(RepeatedDictVector vector, VectorState offsets, VectorState dictOffsets) {
+      super(vector, offsets);
+      this.dictOffsets = dictOffsets;
+    }
+
+    @Override
+    public int allocate(int cardinality) {
+      return offsets.allocate(cardinality);
+    }
+
+    @Override
+    public void rollover(int cardinality) {
+      super.rollover(cardinality);
+      dictOffsets.rollover(cardinality);
+    }
+
+    @Override
+    public void harvestWithLookAhead() {
+      super.harvestWithLookAhead();
+      dictOffsets.harvestWithLookAhead();
+    }
+
+    @Override
+    public void startBatchWithLookAhead() {
+      super.startBatchWithLookAhead();
+      dictOffsets.harvestWithLookAhead();
+    }
+
+    @Override
+    public void close() {
+      super.close();
+      dictOffsets.close();
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java
index 49c50be..915b800 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java
@@ -85,8 +85,9 @@
         return new ArraySchemaCreator((RepeatedListColumnMetadata) colMetadata);
       case VARIANT:
         return new VariantSchemaCreator((VariantSchema) colMetadata.variantSchema());
+      case DICT:
       case TUPLE:
-        return new MetadataCreator((TupleSchema) colMetadata.mapSchema());
+        return new MetadataCreator((TupleSchema) colMetadata.tupleSchema());
       default:
         throw new UnsupportedOperationException();
       }
@@ -187,8 +188,9 @@
         return new ArraySchemaRetrieval(colMetadata);
       case VARIANT:
         return new VariantSchemaRetrieval((VariantSchema) colMetadata.variantSchema());
+      case DICT:
       case TUPLE:
-        return new MetadataRetrieval(colMetadata.mapSchema());
+        return new MetadataRetrieval(colMetadata.tupleSchema());
       default:
         throw new UnsupportedOperationException();
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
index 533d6c1..b698aac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
@@ -37,11 +37,13 @@
 import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
 import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
 import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl;
+import org.apache.drill.exec.vector.accessor.reader.DictReaderImpl;
 import org.apache.drill.exec.vector.accessor.reader.MapReader;
 import org.apache.drill.exec.vector.accessor.reader.UnionReaderImpl;
 import org.apache.drill.exec.vector.accessor.reader.VectorAccessor;
 import org.apache.drill.exec.vector.accessor.reader.VectorAccessors;
 import org.apache.drill.exec.vector.accessor.reader.VectorAccessors.BaseHyperVectorAccessor;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 /**
  * Base reader builder for a hyper-batch. The semantics of hyper-batches are
@@ -151,6 +153,8 @@
 
   protected AbstractObjectReader buildVectorReader(VectorAccessor va, ColumnMetadata metadata) {
     switch(metadata.type()) {
+    case DICT:
+      return buildDict(va, metadata);
     case MAP:
       return buildMap(va, metadata.mode(), metadata);
     case UNION:
@@ -162,6 +166,28 @@
     }
   }
 
+  private AbstractObjectReader buildDict(VectorAccessor va, ColumnMetadata metadata) {
+    boolean isArray = metadata.isArray();
+
+    ValueVector vector = va.vector();
+    VectorAccessor dictAccessor;
+    if (isArray) {
+      ValueVector dictVector = ((RepeatedValueVector) vector).getDataVector();
+      dictAccessor = new VectorAccessors.SingleVectorAccessor(dictVector);
+    } else {
+      dictAccessor = va;
+    }
+
+    List<AbstractObjectReader> readers = buildMapMembers(dictAccessor, metadata.tupleSchema());
+    AbstractObjectReader reader = DictReaderImpl.build(metadata, dictAccessor, readers);
+
+    if (!isArray) {
+      return reader;
+    }
+
+    return ArrayReaderImpl.buildTuple(metadata, va, reader);
+  }
+
   private AbstractObjectReader buildMap(VectorAccessor va, DataMode mode, ColumnMetadata metadata) {
 
     boolean isArray = mode == DataMode.REPEATED;
@@ -171,7 +197,7 @@
     AbstractObjectReader mapReader = MapReader.build(
         metadata,
         isArray ? null : va,
-        buildMapMembers(va, metadata.mapSchema()));
+        buildMapMembers(va, metadata.tupleSchema()));
 
     // Single map
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java
index b503ecb..c8a0f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java
@@ -32,12 +32,15 @@
 import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
 import org.apache.drill.exec.vector.accessor.writer.ListWriterImpl;
 import org.apache.drill.exec.vector.accessor.writer.MapWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
 import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter;
 import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.VariantObjectWriter;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.UnionVector;
 
@@ -75,19 +78,32 @@
   private AbstractObjectWriter buildVectorWriter(ValueVector vector, VectorDescrip descrip) {
     final MajorType type = vector.getField().getType();
     switch (type.getMinorType()) {
-    case MAP:
-      return MapWriter.buildMapWriter(descrip.metadata,
-          (AbstractMapVector) vector,
-          buildMap((AbstractMapVector) vector, descrip));
+      case DICT:
+        return buildDict(vector, descrip);
+      case MAP:
+        return MapWriter.buildMapWriter(descrip.metadata,
+            (AbstractMapVector) vector,
+            buildMap((AbstractMapVector) vector, descrip));
 
-    case UNION:
-      return buildUnion((UnionVector) vector, descrip);
+      case UNION:
+        return buildUnion((UnionVector) vector, descrip);
 
-    case LIST:
-      return buildList(vector, descrip);
+      case LIST:
+        return buildList(vector, descrip);
 
-    default:
-      return ColumnWriterFactory.buildColumnWriter(descrip.metadata, conversionFactory, vector);
+      default:
+        return ColumnWriterFactory.buildColumnWriter(descrip.metadata, conversionFactory, vector);
+    }
+  }
+
+  private AbstractObjectWriter buildDict(ValueVector vector, VectorDescrip descrip) {
+    if (vector.getField().getType().getMode() == DataMode.REPEATED) {
+      ValueVector dataVector = ((RepeatedDictVector) vector).getDataVector();
+      List<AbstractObjectWriter> writers = buildMap((AbstractMapVector) dataVector, descrip);
+      return ObjectDictWriter.buildDictArray(descrip.metadata, (RepeatedDictVector) vector, writers);
+    } else {
+      List<AbstractObjectWriter> writers = buildMap((AbstractMapVector) vector, descrip);
+      return ObjectDictWriter.buildDict(descrip.metadata, (DictVector) vector, writers);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java
index 6bae853..ca2802e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java
@@ -28,7 +28,9 @@
 import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.UnionVector;
 
@@ -68,6 +70,8 @@
 
   private ValueVector buildVector(ColumnMetadata metadata) {
     switch (metadata.structureType()) {
+    case DICT:
+      return buildDict(metadata);
     case TUPLE:
       return buildMap(metadata);
     case VARIANT:
@@ -107,7 +111,7 @@
     // without children so we can add the children as we add vectors.
 
     final AbstractMapVector mapVector = (AbstractMapVector) TypeHelper.getNewVector(schema.emptySchema(), allocator, null);
-    populateMap(mapVector, schema.mapSchema(), false);
+    populateMap(mapVector, schema.tupleSchema(), false);
     return mapVector;
   }
 
@@ -155,7 +159,7 @@
         // for now.
 
         populateMap((AbstractMapVector) childVector,
-            variantSchema.member(MinorType.MAP).mapSchema(),
+            variantSchema.member(MinorType.MAP).tupleSchema(),
             true);
         break;
       default:
@@ -181,4 +185,31 @@
       populateUnion(vector.fullPromoteToUnion(), variantSchema);
     }
   }
+
+  private ValueVector buildDict(ColumnMetadata metadata) {
+    if (metadata.isArray()) {
+      return buildRepeatedDict(metadata);
+    } else {
+      return buildMap(metadata);
+    }
+  }
+
+  private RepeatedDictVector buildRepeatedDict(ColumnMetadata schema) {
+
+    // Creating the repeated dict vector will create its contained vectors if we
+    // give it a materialized field with children. So, instead pass a clone
+    // without children so we can add the children as we add vectors.
+
+    final RepeatedDictVector repeatedDictVector = (RepeatedDictVector) TypeHelper.getNewVector(schema.emptySchema(), allocator, null);
+    populateDict(repeatedDictVector, schema.tupleSchema());
+    return repeatedDictVector;
+  }
+
+  private void populateDict(RepeatedDictVector vector, TupleMetadata dictMetadata) {
+    for (int i = 0; i < dictMetadata.size(); i++) {
+      final ColumnMetadata childSchema = dictMetadata.metadata(i);
+      DictVector dataVector = (DictVector) vector.getDataVector();
+      dataVector.putChild(childSchema.name(), buildVector(childSchema));
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
index 0d330d6..5f0680b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
@@ -39,13 +39,16 @@
 import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
 import org.apache.drill.exec.vector.accessor.reader.AbstractScalarReader;
 import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl;
+import org.apache.drill.exec.vector.accessor.reader.DictReaderImpl;
 import org.apache.drill.exec.vector.accessor.reader.MapReader;
 import org.apache.drill.exec.vector.accessor.reader.UnionReaderImpl;
 import org.apache.drill.exec.vector.accessor.reader.VectorAccessor;
 import org.apache.drill.exec.vector.accessor.reader.VectorAccessors.SingleVectorAccessor;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.exec.vector.complex.ListVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.drill.exec.vector.complex.UnionVector;
 
 /**
@@ -119,22 +122,48 @@
     final MajorType type = va.type();
 
     switch(type.getMinorType()) {
-    case MAP:
-      return buildMap((AbstractMapVector) vector, va, type.getMode(), descrip);
-    case UNION:
-      return buildUnion((UnionVector) vector, va, descrip);
-    case LIST:
-      return buildList(vector, va, descrip);
-    case LATE:
+      case DICT:
+        return buildDict(vector, va, descrip);
+      case MAP:
+        return buildMap((AbstractMapVector) vector, va, type.getMode(), descrip);
+      case UNION:
+        return buildUnion((UnionVector) vector, va, descrip);
+      case LIST:
+        return buildList(vector, va, descrip);
+      case LATE:
 
-      // Occurs for a list with no type: a list of nulls.
+        // Occurs for a list with no type: a list of nulls.
 
-      return AbstractScalarReader.nullReader(descrip.metadata);
-    default:
-      return buildScalarReader(va, descrip.metadata);
+        return AbstractScalarReader.nullReader(descrip.metadata);
+      default:
+        return buildScalarReader(va, descrip.metadata);
     }
   }
 
+  private AbstractObjectReader buildDict(ValueVector vector, VectorAccessor va, VectorDescrip descrip) {
+
+    boolean isArray = descrip.metadata.isArray();
+
+    DictVector dictVector;
+    VectorAccessor dictAccessor;
+    if (isArray) {
+      dictVector = (DictVector) ((RepeatedValueVector) vector).getDataVector();
+      dictAccessor = new SingleVectorAccessor(dictVector);
+    } else {
+      dictVector = (DictVector) vector;
+      dictAccessor = va;
+    }
+
+    List<AbstractObjectReader> readers = buildMapMembers(dictVector, descrip.childProvider());
+    AbstractObjectReader reader = DictReaderImpl.build(descrip.metadata, dictAccessor, readers);
+
+    if (!isArray) {
+      return reader;
+    }
+
+    return ArrayReaderImpl.buildTuple(descrip.metadata, va, reader);
+  }
+
   private AbstractObjectReader buildMap(AbstractMapVector vector, VectorAccessor va, DataMode mode, VectorDescrip descrip) {
 
     final boolean isArray = mode == DataMode.REPEATED;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java
index 0ce4f49..8de2d68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java
@@ -31,7 +31,9 @@
 import org.apache.drill.exec.record.metadata.VariantSchema;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.UnionVector;
 
@@ -67,6 +69,8 @@
   private ColumnMetadata inferVector(ValueVector vector) {
     final MaterializedField field = vector.getField();
     switch (field.getType().getMinorType()) {
+    case DICT:
+      return MetadataUtils.newDict(field, inferDictSchema(vector));
     case MAP:
       return MetadataUtils.newMap(field, inferMapSchema((AbstractMapVector) vector));
     case LIST:
@@ -83,6 +87,20 @@
     }
   }
 
+  private TupleSchema inferDictSchema(ValueVector vector) {
+    final List<ColumnMetadata> columns = new ArrayList<>();
+    DictVector dictVector;
+    if (vector.getField().getType().getMode() == DataMode.REPEATED) {
+      dictVector = (DictVector) ((RepeatedDictVector) vector).getDataVector();
+    } else {
+      dictVector = (DictVector) vector;
+    }
+    for (int i = 0; i < dictVector.size(); i++) {
+      columns.add(inferVector(dictVector.getChildByOrdinal(i)));
+    }
+    return MetadataUtils.fromColumns(columns);
+  }
+
   private TupleSchema inferMapSchema(AbstractMapVector vector) {
     final List<ColumnMetadata> columns = new ArrayList<>();
     for (int i = 0; i < vector.size(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java
index 17b2fd5..ae403fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java
@@ -29,6 +29,7 @@
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 
 /**
@@ -78,6 +79,12 @@
       } else {
         allocateMap((AbstractMapVector) vector, metadata, valueCount, mdProvider);
       }
+    } else if (type.getMinorType() == MinorType.DICT) {
+      if (type.getMode() == DataMode.REPEATED) {
+        allocateDictArray((RepeatedDictVector) vector, metadata, valueCount, mdProvider);
+      } else {
+        allocateMap((AbstractMapVector) vector, metadata, valueCount, mdProvider);
+      }
     } else {
       allocatePrimitive(vector, metadata, valueCount);
     }
@@ -98,9 +105,16 @@
     allocateMap(vector, metadata, expectedValueCount, mdProvider);
   }
 
+  private void allocateDictArray(RepeatedDictVector vector, ColumnMetadata metadata,
+                                 int valueCount, MetadataProvider mdProvider) {
+    vector.getOffsetVector().allocateNew(valueCount);
+    final int expectedValueCount = valueCount * metadata.expectedElementCount();
+    allocateMap((AbstractMapVector) vector.getDataVector(), metadata, expectedValueCount, mdProvider);
+  }
+
   private void allocateMap(AbstractMapVector vector, ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) {
     final MetadataProvider mapProvider = mdProvider.childProvider(metadata);
-    final TupleMetadata mapSchema = metadata.mapSchema();
+    final TupleMetadata mapSchema = metadata.tupleSchema();
     assert mapSchema != null;
     int i = 0;
     for (final ValueVector child : vector) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
index 7db2887..363f51d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
@@ -17,9 +17,9 @@
  */
 package org.apache.drill.exec.physical.resultSet.project;
 
-import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 
 /**
  * Specifies the type of projection obtained by parsing the
@@ -79,14 +79,22 @@
    * Combination of array and map hints.
    */
 
-  TUPLE_ARRAY;  // x[0].y
+  TUPLE_ARRAY,  // x[0].y
+
+  DICT, // x[0] or x['key'] (depends on key type)
+
+  DICT_ARRAY; // x[0][42] or x[0]['key'] (depends on key type)
 
   public boolean isTuple() {
     return this == ProjectionType.TUPLE || this == ProjectionType.TUPLE_ARRAY;
   }
 
   public boolean isArray() {
-    return this == ProjectionType.ARRAY || this == ProjectionType.TUPLE_ARRAY;
+    return this == ProjectionType.ARRAY || this == ProjectionType.TUPLE_ARRAY || this == DICT_ARRAY;
+  }
+
+  public boolean isDict() {
+    return this == DICT || this == DICT_ARRAY;
   }
 
   /**
@@ -99,17 +107,12 @@
   }
 
   public static ProjectionType typeFor(MajorType majorType) {
+    boolean repeated = Types.isRepeated(majorType);
     if (majorType.getMinorType() == MinorType.MAP) {
-      if (majorType.getMode() == DataMode.REPEATED) {
-        return TUPLE_ARRAY;
-      } else {
-        return TUPLE;
-      }
-    }
-    if (majorType.getMode() == DataMode.REPEATED) {
-      return ARRAY;
-    }
-    if (majorType.getMinorType() == MinorType.LIST) {
+      return repeated ? TUPLE_ARRAY : TUPLE;
+    } else if (majorType.getMinorType() == MinorType.DICT) {
+      return repeated ? DICT_ARRAY : DICT;
+    } else if (repeated || majorType.getMinorType() == MinorType.LIST) {
       return ARRAY;
     }
     return SCALAR;
@@ -143,13 +146,17 @@
 
     switch (this) {
     case ARRAY:
-      return readType == ARRAY || readType == TUPLE_ARRAY;
+      return readType == ARRAY || readType == TUPLE_ARRAY
+          || readType == DICT // the actual key type should be validated later
+          || readType == DICT_ARRAY;
     case TUPLE_ARRAY:
-      return readType == TUPLE_ARRAY;
+      return readType == TUPLE_ARRAY || readType == DICT_ARRAY;
     case SCALAR:
       return readType == SCALAR;
     case TUPLE:
-      return readType == TUPLE || readType == TUPLE_ARRAY;
+      return readType == TUPLE || readType == TUPLE_ARRAY || readType == DICT || readType == DICT_ARRAY;
+    case DICT:
+      return readType == DICT || readType == DICT_ARRAY;
     case UNPROJECTED:
     case GENERAL:
     case WILDCARD:
@@ -169,6 +176,8 @@
       return "tuple (a.x)";
     case TUPLE_ARRAY:
       return "tuple array (a[n].x)";
+    case DICT:
+      return "dict (a['key'])";
     case WILDCARD:
       return "wildcard (*)";
     default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
index e0bc18f..55a252d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
@@ -73,6 +73,11 @@
   @Override
   public boolean isTuple() { return type.isTuple(); }
 
+  @Override
+  public boolean isDict() {
+    return type.isDict();
+  }
+
   public RequestedTuple asTuple() {
     if (members == null) {
       members = new RequestedTupleImpl(this);
@@ -177,6 +182,10 @@
       return "map column";
     case TUPLE_ARRAY:
       return "repeated map";
+    case DICT:
+      return "dict column";
+    case DICT_ARRAY:
+      return "repeated dict column";
     case WILDCARD:
       return "wildcard";
     default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
index d0455d0..d9b3e1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
@@ -75,6 +75,7 @@
     boolean isSimple();
     boolean isArray();
     boolean isTuple();
+    boolean isDict();
     String fullName();
     RequestedTuple mapProjection();
     boolean nameEquals(String target);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 7eceb56..0a8620f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -212,7 +212,7 @@
             ? new ArrayList<>()
             : new ArrayList<>(parentNames);
         currentNames.add(columnMetadata.name());
-        result.addAll(getColumnPaths(columnMetadata.mapSchema(), currentNames));
+        result.addAll(getColumnPaths(columnMetadata.tupleSchema(), currentNames));
       } else {
         result.add(Collections.singletonList(columnMetadata.name()));
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
index 1d63738..a672d19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -339,7 +339,7 @@
           // if column is a map / struct, recursively scan nested columns
           if (column.isMap()) {
             List<Records.Column> mapRecords =
-              columns(schemaPath, table, column.mapSchema(), columnName, currentIndex, true);
+              columns(schemaPath, table, column.tupleSchema(), columnName, currentIndex, true);
             records.addAll(mapRecords);
           }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
index 0b6e0ee..0c02123 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
@@ -136,6 +136,7 @@
     }
 
     ValueVector v;
+    MajorType finalType = null;
     if (vector instanceof DictVector) {
       v = ((DictVector) vector).getValues();
       if (addToBreadCrumb) {
@@ -147,6 +148,7 @@
         depth = 0;
         builder.setDict(depth);
       }
+      finalType = ((DictVector) vector).getLastPathType();
     } else if (vector instanceof AbstractContainerVector) {
       String fieldName = null;
       if (seg.isNamed()) {
@@ -176,7 +178,7 @@
         if(addToBreadCrumb) {
           builder.intermediateType(v.getField().getType());
         }
-        builder.finalType(v.getField().getType());
+        builder.finalType(finalType != null ? finalType : v.getField().getType());
       } else {
         builder.finalType(v.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build());
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
index 3255df6..a3f428d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
@@ -132,7 +132,7 @@
     PathSegment child = column.getChild();
     if (child != null && child.isNamed()) {
       String name = column.getNameSegment().getPath();
-      ColumnMetadata childMetadata = columnMetadata.mapSchema().metadata(name);
+      ColumnMetadata childMetadata = columnMetadata.tupleSchema().metadata(name);
       writeColumnToMapWriter(writer.map(name), child, childMetadata, allTextMode);
     } else {
       writeSingleOrArrayColumn(columnMetadata, writer, allTextMode);
@@ -181,9 +181,13 @@
   private static void writeColumn(ColumnMetadata columnMetadata,
       BaseWriter.MapWriter fieldWriter, boolean allTextMode) {
     switch (columnMetadata.structureType()) {
+      case DICT:
+        writeSchemaColumns(
+            columnMetadata.tupleSchema(), fieldWriter.dict(columnMetadata.name()), allTextMode);
+        break;
       case TUPLE:
         writeSchemaColumns(
-            columnMetadata.mapSchema(), fieldWriter.map(columnMetadata.name()), allTextMode);
+            columnMetadata.tupleSchema(), fieldWriter.map(columnMetadata.name()), allTextMode);
         break;
       case MULTI_ARRAY:
         writeArrayColumn(columnMetadata.childSchema(), fieldWriter.list(columnMetadata.name()), allTextMode);
@@ -213,8 +217,11 @@
   private static void writeArrayColumn(ColumnMetadata columnMetadata,
       BaseWriter.ListWriter fieldWriter, boolean allTextMode) {
     switch (columnMetadata.structureType()) {
+      case DICT:
+        writeSchemaColumns(columnMetadata.tupleSchema(), fieldWriter.dict(), allTextMode);
+        break;
       case TUPLE:
-        writeSchemaColumns(columnMetadata.mapSchema(), fieldWriter.map(), allTextMode);
+        writeSchemaColumns(columnMetadata.tupleSchema(), fieldWriter.map(), allTextMode);
         break;
       case MULTI_ARRAY:
         writeArrayColumn(columnMetadata.childSchema(), fieldWriter.list(), allTextMode);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
index ed7ba0e..a59065d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
@@ -91,7 +91,7 @@
     assertFalse(mCol.isProjected());
 
     ColumnReadProjection bCol = mCol.mapProjection().readProjection(
-        readSchema.metadata("m").mapSchema().metadata("b"));
+        readSchema.metadata("m").tupleSchema().metadata("b"));
     assertFalse(bCol.isProjected());
   }
 
@@ -135,7 +135,7 @@
     assertTrue(mCol.isProjected());
 
     ColumnReadProjection bCol = mCol.mapProjection().readProjection(
-        readSchema.metadata("m").mapSchema().metadata("b"));
+        readSchema.metadata("m").tupleSchema().metadata("b"));
     assertTrue(bCol.isProjected());
   }
 
@@ -209,7 +209,7 @@
           .add("h", MinorType.VARCHAR)
           .resumeSchema()
         .buildSchema();
-    TupleMetadata mReadSchema = readSchema.metadata("m").mapSchema();
+    TupleMetadata mReadSchema = readSchema.metadata("m").tupleSchema();
     mReadSchema.metadata("f").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
 
     TupleMetadata outputSchema = new SchemaBuilder()
@@ -219,7 +219,7 @@
           .add("g", MinorType.VARCHAR)
           .resumeSchema()
         .buildSchema();
-    TupleMetadata mOutputSchema = outputSchema.metadata("m").mapSchema();
+    TupleMetadata mOutputSchema = outputSchema.metadata("m").tupleSchema();
     mOutputSchema.metadata("g").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
 
     TypeConverter converter = TypeConverter.builder()
@@ -315,8 +315,8 @@
           .add("e", MinorType.INT)
           .resumeSchema()
         .buildSchema();
-    TupleMetadata mReadSchema = readSchema.metadata("m").mapSchema();
-    TupleMetadata m2ReadSchema = readSchema.metadata("m2").mapSchema();
+    TupleMetadata mReadSchema = readSchema.metadata("m").tupleSchema();
+    TupleMetadata m2ReadSchema = readSchema.metadata("m2").tupleSchema();
 
     TupleMetadata outputSchema = new SchemaBuilder()
         .addMap("m")
@@ -324,7 +324,7 @@
           .resumeSchema()
         .buildSchema();
     outputSchema.setBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, true);
-    TupleMetadata mOutputSchema = outputSchema.metadata("m").mapSchema();
+    TupleMetadata mOutputSchema = outputSchema.metadata("m").tupleSchema();
 
     TypeConverter converter = TypeConverter.builder()
         .providedSchema(outputSchema)
@@ -415,9 +415,9 @@
     ColumnMetadata m1Schema = readSchema.metadata("m1");
     ColumnMetadata m2Schema = readSchema.metadata("m2");
     ColumnMetadata m3Schema = readSchema.metadata("m3");
-    TupleMetadata m1ReadSchema = m1Schema.mapSchema();
-    TupleMetadata m2ReadSchema = m2Schema.mapSchema();
-    TupleMetadata m3ReadSchema = m3Schema.mapSchema();
+    TupleMetadata m1ReadSchema = m1Schema.tupleSchema();
+    TupleMetadata m2ReadSchema = m2Schema.tupleSchema();
+    TupleMetadata m3ReadSchema = m3Schema.tupleSchema();
 
     // Project one member of map m1, all of m2, none of m3
 
@@ -476,7 +476,7 @@
         .buildSchema();
 
     ColumnMetadata m1Schema = readSchema.metadata("m1");
-    TupleMetadata m1ReadSchema = m1Schema.mapSchema();
+    TupleMetadata m1ReadSchema = m1Schema.tupleSchema();
 
     // Project one member of map1, all of map2, none of map3
 
@@ -515,7 +515,7 @@
         .buildSchema();
 
     ColumnMetadata mSchema = readSchema.metadata("m");
-    TupleMetadata mReadSchema = mSchema.mapSchema();
+    TupleMetadata mReadSchema = mSchema.tupleSchema();
 
     TupleMetadata outputSchema = new SchemaBuilder()
         .addMap("m")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDictArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDictArray.java
new file mode 100644
index 0000000..dc62928
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDictArray.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.validate.BatchValidator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test dict array support in the result set loader.
+ */
+@Category(RowSetTests.class)
+public class TestResultSetLoaderDictArray extends SubOperatorTest {
+
+  @Test
+  public void testBasics() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDictArray("d", MinorType.INT)
+          .value(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify structure and schema
+
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertTrue(actualSchema.metadata(1).isArray());
+    assertTrue(actualSchema.metadata(1).isDict());
+    assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
+    assertEquals(2, actualSchema.column("d").getChildren().size());
+    DictWriter dictWriter = rootWriter.array("d").dict();
+    assertSame(actualSchema.metadata("d").tupleSchema(), dictWriter.schema().tupleSchema());
+
+    // Write a couple of rows with arrays.
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(10, objArray(
+            map(110, "d1.1", 111, "d1.2", 112, "d1.3"),
+            map(120, "d2.2"))
+        )
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map(310, "d3.1", 311, "d3.2", 313, "d3.4", 317, "d3.9"),
+            map(320, "d4.2"),
+            map(332, "d5.1", 339, "d5.5", 337, "d5.6"))
+        );
+
+    // Verify the batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    RepeatedDictVector repeatedDictVector = (RepeatedDictVector) actual.container().getValueVector(1).getValueVector();
+    MaterializedField dictArrayField = repeatedDictVector.getField(); // RepeatedDictVector contains one child - DictVector
+    assertEquals(1, dictArrayField.getChildren().size());
+    DictVector dictVector = (DictVector) repeatedDictVector.getDataVector();
+    Iterator<MaterializedField> iter = dictVector.getField().getChildren().iterator();
+    assertTrue(dictWriter.keyWriter().schema().schema().isEquivalent(iter.next()));
+    assertTrue(dictWriter.valueWriter().scalar().schema().schema().isEquivalent(iter.next()));
+
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, objArray(
+            map(110, "d1.1", 111, "d1.2", 112, "d1.3"),
+            map(120, "d2.2"))
+        )
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map(310, "d3.1", 311, "d3.2", 313, "d3.4", 317, "d3.9"),
+            map(320, "d4.2"),
+            map(332, "d5.1", 339, "d5.5", 337, "d5.6"))
+        )
+        .build();
+    RowSetUtilities.verify(expected, actual);
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testArrayValue() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDictArray("d", MinorType.INT)
+          .repeatedValue(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write a couple of rows
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(10, objArray(
+            map(110, strArray("d1.1.1", "d1.1.2"), 111, strArray("d1.1.3", "d1.1.4"), 112, strArray("d1.1.5", "d1.1.6")),
+            map(120, strArray("d1.2.1", "d1.2.2"))))
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map(310, strArray("d3.1.1", "d3.2.2"), 311, strArray("d3.1.3", "d3.2.4", "d3.1.5", "d3.1.6")),
+            map(320, strArray(), 321, strArray("d3.2.2")),
+            map(330, strArray("d3.3.1", "d1.2.2"))));
+
+    // Verify the batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, objArray(
+            map(110, strArray("d1.1.1", "d1.1.2"), 111, strArray("d1.1.3", "d1.1.4"), 112, strArray("d1.1.5", "d1.1.6")),
+            map(120, strArray("d1.2.1", "d1.2.2"))))
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map(310, strArray("d3.1.1", "d3.2.2"), 311, strArray("d3.1.3", "d3.2.4", "d3.1.5", "d3.1.6")),
+            map(320, strArray(), 321, strArray("d3.2.2")),
+            map(330, strArray("d3.3.1", "d1.2.2"))))
+        .build();
+    RowSetUtilities.verify(expected, actual);
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testScalarValue() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDictArray("d", MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write a couple of rows
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(10, objArray(
+            map("a", 1, "b", 2, "d", 4),
+            map("a", 2, "c", 3, "d", 1, "e", 4)
+        ))
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map("a", 2, "c", 4, "d", 5, "e", 6, "f", 11),
+            map("a", 1, "d", 6, "c", 3),
+            map("b", 2, "a", 3))
+        );
+
+    // Verify the batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, objArray(
+            map("a", 1, "b", 2, "d", 4),
+            map("a", 2, "c", 3, "d", 1, "e", 4)
+        ))
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map("a", 2, "c", 4, "d", 5, "e", 6, "f", 11),
+            map("a", 1, "d", 6, "c", 3),
+            map("b", 2, "a", 3))
+        )
+        .build();
+    RowSetUtilities.verify(expected, actual);
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testDictValue() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDictArray("d", MinorType.VARCHAR)
+          .dictValue()
+            .key(MinorType.VARCHAR)
+            .value(MinorType.VARCHAR)
+            .resumeDict()
+        .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write a couple of rows
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(10, objArray(
+            map("a", map("a", "a1", "b", "a2", "c", "a3"), "b", map("d", "a4"), "c", map()),
+            map("b", map("b", "a2"))
+        ))
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map("a", map("a", "b1", "b", "b1")),
+            map("b", map("e", "b2"), "a", map("h", "b1", "g", "b3"), "c", map("a", "b4")),
+            map("b", map("a", "b3", "c", "c3"), "a", map())));
+
+    // Verify the batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, objArray(
+            map("a", map("a", "a1", "b", "a2", "c", "a3"), "b", map("d", "a4"), "c", map()),
+            map("b", map("b", "a2"))
+        ))
+        .addRow(20, objArray())
+        .addRow(30, objArray(
+            map("a", map("a", "b1", "b", "b1")),
+            map("b", map("e", "b2"), "a", map("h", "b1", "g", "b3"), "c", map("a", "b4")),
+            map("b", map("a", "b3", "c", "c3"), "a", map())))
+        .build();
+    RowSetUtilities.verify(expected, actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test that memory is released if the loader is closed with an active
+   * batch (that is, before the batch is harvested.)
+   */
+  @Test
+  public void testCloseWithoutHarvest() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addDictArray("d", MinorType.INT)
+          .value(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    ArrayWriter arrayWriter = rootWriter.array("d");
+    DictWriter dictWriter = arrayWriter.dict();
+    rsLoader.startBatch();
+    for (int i = 0; i < 40; i++) {
+      rootWriter.start();
+      for (int j = 0; j < 3; j++) {
+        dictWriter.keyWriter().setInt(i);
+        dictWriter.valueWriter().scalar().setString("b-" + i);
+        arrayWriter.save();
+      }
+      rootWriter.save();
+    }
+
+    // Don't harvest the batch. Allocator will complain if the
+    // loader does not release memory.
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testKeyOverflow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addDictArray("d", MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte[] key = new byte[523];
+    Arrays.fill(key, (byte) 'X');
+
+    int arraySize = 3; // number of dicts in each row
+    int dictSize = 1; // number of entries in each dict
+
+    // Number of rows should be driven by vector size.
+    // Our row count should include the overflow row
+
+    ArrayWriter arrayDictWriter = rootWriter.array(0);
+    DictWriter dictWriter = arrayDictWriter.dict();
+    ScalarWriter keyWriter = dictWriter.keyWriter();
+    ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / (key.length * dictSize * arraySize);
+    System.out.println("expectedCoutn: " + expectedCount);
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        for (int i = 0; i < arraySize; i++) {
+          for (int j = 0; j < dictSize; j++) {
+            keyWriter.setBytes(key, key.length);
+            valueWriter.setInt(0); // acts as a placeholder, the actual value is not important
+            dictWriter.save(); // not necessary for scalars, just for completeness
+          }
+          arrayDictWriter.save();
+        }
+        rootWriter.save();
+        count++;
+      }
+
+      assertEquals(expectedCount + 1, count);
+      System.out.println("count: " + count);
+
+      // Loader's row count should include only "visible" rows
+
+      assertEquals(expectedCount, rootWriter.rowCount());
+
+      // Total count should include invisible and look-ahead rows.
+
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
+
+    // Next batch should start with the overflow row
+
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testValueOverflow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addDictArray("d", MinorType.INT)
+          .value(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte[] value = new byte[523];
+    Arrays.fill(value, (byte) 'X');
+
+    int arraySize = 2; // number of dicts in each row; array size is the same for every row to find expected row count easier
+    int dictSize = 4; // number of entries in each dict
+
+    // Number of rows should be driven by vector size.
+    // Our row count should include the overflow row
+
+    ArrayWriter arrayDictWriter = rootWriter.array(0);
+    DictWriter dictWriter = arrayDictWriter.dict();
+    ScalarWriter keyWriter = dictWriter.keyWriter();
+    ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / (value.length * dictSize * arraySize);
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        for (int i = 0; i < arraySize; i++) {
+          for (int j = 0; j < dictSize; j++) {
+            keyWriter.setInt(0); // acts as a placeholder, the actual value is not important
+            valueWriter.setBytes(value, value.length);
+            dictWriter.save(); // not necessary for scalars, just for completeness
+          }
+          arrayDictWriter.save();
+        }
+        rootWriter.save();
+        count++;
+      }
+
+      assertEquals(expectedCount + 1, count);
+
+      // Loader's row count should include only "visible" rows
+
+      assertEquals(expectedCount, rootWriter.rowCount());
+
+      // Total count should include invisible and look-ahead rows.
+
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
+
+    // Next batch should start with the overflow row
+
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
+
+    rsLoader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDicts.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDicts.java
new file mode 100644
index 0000000..3d1af87
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDicts.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.validate.BatchValidator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.DictColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+
+/**
+ * Test (non-array) dict support in the result set loader and related classes.
+ */
+@Category(RowSetTests.class)
+public class TestResultSetLoaderDicts extends SubOperatorTest {
+
+  @Test
+  public void testBasics() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDict("d", MinorType.INT)
+          .value(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertFalse(rsLoader.isProjectionEmpty());
+    final RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify structure and schema
+
+    assertEquals(4, rsLoader.schemaVersion());
+    final TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertEquals(2, actualSchema.size());
+    assertTrue(actualSchema.metadata(1).isDict());
+    assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
+    assertEquals(2, actualSchema.column("d").getChildren().size());
+
+    rsLoader.startBatch();
+
+    // Write a row the way that clients will do.
+
+    final ScalarWriter aWriter = rootWriter.scalar("a");
+    final DictWriter dictWriter = rootWriter.dict("d");
+    final ScalarWriter keyWriter = dictWriter.keyWriter();
+    final ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+    rootWriter.start();
+    aWriter.setInt(10);
+
+    keyWriter.setInt(110);
+    valueWriter.setString("fred");
+    dictWriter.save();
+    keyWriter.setInt(111);
+    valueWriter.setString("george");
+    dictWriter.save();
+
+    rootWriter.save();
+
+    // Write another using the test-time conveniences
+
+    rootWriter.addRow(20, map(210, "barney", 211, "bart", 212, "jerry"));
+
+    // Harvest the batch
+
+    final RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(4, rsLoader.schemaVersion());
+    assertEquals(2, actual.rowCount());
+    final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+    assertEquals(2, dictVector.getAccessor().getValueCount());
+
+    // Validate data
+
+    final SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, map(110, "fred", 111, "george"))
+        .addRow(20, map(210, "barney", 211, "bart", 212, "jerry"))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  /**
+   * Test adding a dict to a loader after writing the first row.
+   */
+  @Test
+  public void testDictAddition() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(1, rsLoader.schemaVersion());
+    final RowSetLoader rootWriter = rsLoader.writer();
+
+    // Start without the dict. Add then add a dict after the first row.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(10);
+
+    MaterializedField dictField = SchemaBuilder.columnSchema("d", MinorType.DICT, DataMode.REQUIRED);
+    dictField.addChild(SchemaBuilder.columnSchema(DictVector.FIELD_KEY_NAME, MinorType.VARCHAR, DataMode.REQUIRED));
+    dictField.addChild(SchemaBuilder.columnSchema(DictVector.FIELD_VALUE_NAME, MinorType.VARCHAR, DataMode.REQUIRED));
+    DictColumnMetadata dictMetadata = MetadataUtils.newDict(dictField);
+
+    final int dictIndex = rootWriter.addColumn(dictMetadata);
+    final DictWriter dictWriter = rootWriter.dict(dictIndex);
+
+    // Ensure metadata was added
+
+    final TupleMetadata actualSchema = rootWriter.tupleSchema();
+    assertTrue(actualSchema.metadata(1).isDict());
+    assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
+    assertEquals(2, actualSchema.column("d").getChildren().size());
+    assertEquals(2, actualSchema.size());
+    assertEquals(2, dictWriter.schema().tupleSchema().size());
+
+    rootWriter.addRow(20, map("name", "fred", "lastname", "smith"))
+        .addRow(30, map("name", "barney", "lastname", "johnson"));
+
+    final RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(4, rsLoader.schemaVersion());
+    assertEquals(3, actual.rowCount());
+
+    final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+    assertEquals(2, dictVector.getField().getChildren().size());
+
+    // Validate first batch
+
+    final TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDict("d", MinorType.VARCHAR)
+          .value(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10, map())
+        .addRow(20, map("name", "fred", "lastname", "smith"))
+        .addRow(30, map("name", "barney", "lastname", "johnson"))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  /**
+   * Create dict with map value. Then, add columns to the map
+   * on the fly. Use required, variable-width columns since
+   * those require the most processing and are most likely to
+   * fail if anything is out of place.
+   */
+  @Test
+  public void testMapValueRequiredFields() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDict("d", MinorType.VARCHAR)
+          .mapValue()
+            .add("b", MinorType.VARCHAR)
+            .resumeDict()
+          .resumeSchema()
+        .buildSchema();
+    final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(5, rsLoader.schemaVersion());
+    final RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    rootWriter.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")));
+
+    // Validate first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(5, rsLoader.schemaVersion());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+
+    // Now add columns in the second batch.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(20, map("a2", mapValue("c11"), "b2", mapValue("c12"), "c2", mapValue("c13")));
+
+    final DictWriter dictWriter = rootWriter.dict("d");
+    final TupleWriter nestedMapWriter = dictWriter.valueWriter().tuple();
+    nestedMapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rootWriter.addRow(30, map("a3", mapValue("c21", "d21")));
+
+    // And another set while the write proceeds.
+
+    nestedMapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rootWriter.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")));
+
+    // Validate second batch
+
+    actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(7, rsLoader.schemaVersion());
+
+    final TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDict("d", MinorType.VARCHAR)
+          .mapValue()
+            .add("b", MinorType.VARCHAR)
+            .add("c", MinorType.VARCHAR)
+            .add("d", MinorType.VARCHAR)
+            .resumeDict()
+          .resumeSchema()
+        .buildSchema();
+    expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(20, map("a2", mapValue("c11", "", ""), "b2", mapValue("c12", "", ""), "c2", mapValue("c13", "", "")))
+        .addRow(30, map("a3", mapValue("c21", "d21", "")))
+        .addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  /**
+   * Create dict with map value. Then, add columns to the map
+   * on the fly. Use required, variable-width columns since
+   * those require the most processing and are most likely to
+   * fail if anything is out of place.
+   */
+  @Test
+  public void testMapValueNullableFields() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDict("d", MinorType.VARCHAR)
+          .mapValue()
+          .addNullable("b", MinorType.VARCHAR)
+          .resumeDict()
+          .resumeSchema()
+        .buildSchema();
+    final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(5, rsLoader.schemaVersion());
+    final RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    rootWriter.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")));
+
+    // Validate first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(5, rsLoader.schemaVersion());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+
+    // Now add columns in the second batch.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(20, map("a2", mapValue("c11"), "b2", mapValue("c12"), "c2", mapValue("c13")));
+
+    final DictWriter dictWriter = rootWriter.dict("d");
+    final TupleWriter nestedMapWriter = dictWriter.valueWriter().tuple();
+    nestedMapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+    rootWriter.addRow(30, map("a3", mapValue("c21", "d21")));
+
+    // And another set while the write proceeds.
+
+    nestedMapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+    rootWriter.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")));
+
+    // Validate second batch
+
+    actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(7, rsLoader.schemaVersion());
+
+    final TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+          .addDict("d", MinorType.VARCHAR)
+          .mapValue()
+            .addNullable("b", MinorType.VARCHAR)
+            .addNullable("c", MinorType.VARCHAR)
+            .addNullable("d", MinorType.VARCHAR)
+            .resumeDict()
+          .resumeSchema()
+        .buildSchema();
+    expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(20, map("a2", mapValue("c11", null, null), "b2", mapValue("c12", null, null), "c2", mapValue("c13", null, null)))
+        .addRow(30, map("a3", mapValue("c21", "d21", null)))
+        .addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testArrayValue() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDict("d", MinorType.INT)
+          .repeatedValue(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    final RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write some rows
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(10, map(
+            1, intArray(110, 120, 130),
+            2, intArray(111, 121)
+        ))
+        .addRow(20, map())
+        .addRow(30, map(
+            1, intArray(),
+            2, intArray(310, 320),
+            3, intArray(311, 321, 331, 341),
+            4, intArray(312, 322, 332)
+        ));
+
+    // Validate first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, map(
+            1, intArray(110, 120, 130),
+            2, intArray(111, 121)
+        ))
+        .addRow(20, map())
+        .addRow(30, map(
+            1, intArray(),
+            2, intArray(310, 320),
+            3, intArray(311, 321, 331, 341),
+            4, intArray(312, 322, 332)
+        ))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+
+    // Add another rows in the second batch.
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(40, map(1, intArray(410, 420)))
+        .addRow(50, map(1, intArray(510), 2, intArray(511, 531)));
+
+    // Validate first batch. The new array should have been back-filled with
+    // empty offsets for the missing rows.
+
+    actual = fixture.wrap(rsLoader.harvest());
+    expected = fixture.rowSetBuilder(actual.schema())
+        .addRow(40, map(1, intArray(410, 420)))
+        .addRow(50, map(1, intArray(510), 2, intArray(511, 531)))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testDictValue() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addDict("d", MinorType.INT)
+          .dictValue()
+            .key(MinorType.INT)
+            .nullableValue(MinorType.VARCHAR)
+            .resumeDict()
+          .resumeSchema()
+        .buildSchema();
+    final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    final RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write some rows
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(10, map(
+            1, map(1, "a", 2, "b", 4, "c"),
+            2, map(2, "a2", 1, "c2")
+        ))
+        .addRow(20, map())
+        .addRow(30, map(
+            1, map(),
+            2, map(1, "a3"),
+            3, map(2, "b4", 4, "n4", 1, null),
+            4, map(3, "m5", 1, "a5", 2, "c5", 8, "m5", 21, "h5")
+        ));
+
+    // Validate first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, map(
+            1, map(1, "a", 2, "b", 4, "c"),
+            2, map(2, "a2", 1, "c2")
+        ))
+        .addRow(20, map())
+        .addRow(30, map(
+            1, map(),
+            2, map(1, "a3"),
+            3, map(2, "b4", 4, "n4", 1, null),
+            4, map(3, "m5", 1, "a5", 2, "c5", 8, "m5", 21, "h5")
+        ))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+
+    // Add another rows in the second batch.
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(40, map(
+            1, map(1, "j6", 0, "k6"))
+        )
+        .addRow(50, map(
+            1, map(2, "l7"),
+            2, map(1, "o8", 5, "p8", 7, "u8")
+        ));
+
+    // Validate first batch. The new dict should have been back-filled with
+    // empty offsets for the missing rows.
+
+    actual = fixture.wrap(rsLoader.harvest());
+    expected = fixture.rowSetBuilder(actual.schema())
+        .addRow(40, map(
+            1, map(1, "j6", 0, "k6"))
+        )
+        .addRow(50, map(
+            1, map(2, "l7"),
+            2, map(1, "o8", 5, "p8", 7, "u8")
+        ))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testKeyOverflow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addDict("d", MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte[] key = new byte[523];
+    Arrays.fill(key, (byte) 'X');
+
+    int dictSize = 4; // number of entries in each dict
+
+    // Number of rows should be driven by vector size.
+    // Our row count should include the overflow row
+
+    DictWriter dictWriter = rootWriter.dict(0);
+    ScalarWriter keyWriter = dictWriter.keyWriter();
+    ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / (key.length * dictSize);
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        for (int i = 0; i < dictSize; i++) {
+          keyWriter.setBytes(key, key.length);
+          valueWriter.setInt(0); // acts as a placeholder, the actual value is not important
+          dictWriter.save(); // not necessary for scalars, just for completeness
+        }
+        rootWriter.save();
+        count++;
+      }
+
+      assertEquals(expectedCount + 1, count);
+
+      // Loader's row count should include only "visible" rows
+
+      assertEquals(expectedCount, rootWriter.rowCount());
+
+      // Total count should include invisible and look-ahead rows.
+
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
+
+    // Next batch should start with the overflow row
+
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testValueOverflow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addDict("d", MinorType.INT)
+          .value(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte[] value = new byte[523];
+    Arrays.fill(value, (byte) 'X');
+
+    int dictSize = 4; // number of entries in each dict
+
+    // Number of rows should be driven by vector size.
+    // Our row count should include the overflow row
+
+    DictWriter dictWriter = rootWriter.dict(0);
+    ScalarWriter keyWriter = dictWriter.keyWriter();
+    ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / (value.length * dictSize);
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        for (int i = 0; i < dictSize; i++) {
+          keyWriter.setInt(0); // acts as a placeholder, the actual value is not important
+          valueWriter.setBytes(value, value.length);
+          dictWriter.save(); // not necessary for scalars, just for completeness
+        }
+        rootWriter.save();
+        count++;
+      }
+
+      assertEquals(expectedCount + 1, count);
+
+      // Loader's row count should include only "visible" rows
+
+      assertEquals(expectedCount, rootWriter.rowCount());
+
+      // Total count should include invisible and look-ahead rows.
+
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
+
+    // Next batch should start with the overflow row
+
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
+
+    rsLoader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java
index 08529cd..bfb94f5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java
@@ -85,11 +85,11 @@
     assertEquals(2, actualSchema.size());
     assertTrue(actualSchema.metadata(1).isArray());
     assertTrue(actualSchema.metadata(1).isMap());
-    assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+    assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
     assertEquals(2, actualSchema.column("m").getChildren().size());
     TupleWriter mapWriter = rootWriter.array("m").tuple();
-    assertSame(actualSchema.metadata("m").mapSchema(), mapWriter.schema().mapSchema());
-    assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+    assertSame(actualSchema.metadata("m").tupleSchema(), mapWriter.schema().tupleSchema());
+    assertSame(mapWriter.tupleSchema(), mapWriter.schema().tupleSchema());
     assertSame(mapWriter.tupleSchema().metadata(0), mapWriter.scalar(0).schema());
     assertSame(mapWriter.tupleSchema().metadata(1), mapWriter.scalar(1).schema());
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java
index 630796d..4030f1e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java
@@ -83,7 +83,7 @@
     final TupleMetadata actualSchema = rootWriter.tupleSchema();
     assertEquals(3, actualSchema.size());
     assertTrue(actualSchema.metadata(1).isMap());
-    assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+    assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
     assertEquals(2, actualSchema.column("m").getChildren().size());
 
     rsLoader.startBatch();
@@ -246,7 +246,7 @@
     // Ensure metadata was added
 
     assertTrue(mapWriter.tupleSchema().size() == 1);
-    assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+    assertSame(mapWriter.tupleSchema(), mapWriter.schema().tupleSchema());
     assertSame(mapWriter.tupleSchema().metadata(colIndex), mapWriter.scalar(colIndex).schema());
 
     rootWriter
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
index 84392cf..75f4880 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.resultSet.impl;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
 import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
 import static org.junit.Assert.assertEquals;
@@ -42,6 +43,7 @@
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.DictWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -228,7 +230,7 @@
     TupleWriter m1Writer = rootWriter.tuple("m1");
     assertTrue(m1Md.isMap());
     assertTrue(m1Writer.isProjected());
-    assertEquals(2, m1Md.mapSchema().size());
+    assertEquals(2, m1Md.tupleSchema().size());
     assertTrue(m1Writer.column("a").isProjected());
     assertTrue(m1Writer.column("b").isProjected());
 
@@ -236,7 +238,7 @@
     TupleWriter m2Writer = rootWriter.tuple("m2");
     assertTrue(m2Md.isMap());
     assertTrue(m2Writer.isProjected());
-    assertEquals(2, m2Md.mapSchema().size());
+    assertEquals(2, m2Md.tupleSchema().size());
     assertFalse(m2Writer.column("c").isProjected());
     assertTrue(m2Writer.column("d").isProjected());
 
@@ -244,7 +246,7 @@
     TupleWriter m3Writer = rootWriter.tuple("m3");
     assertTrue(m3Md.isMap());
     assertFalse(m3Writer.isProjected());
-    assertEquals(2, m3Md.mapSchema().size());
+    assertEquals(2, m3Md.tupleSchema().size());
     assertFalse(m3Writer.column("e").isProjected());
     assertFalse(m3Writer.column("f").isProjected());
 
@@ -305,7 +307,7 @@
     TupleWriter m1Writer = rootWriter.tuple("m1");
     assertTrue(m1Md.isMap());
     assertTrue(m1Writer.isProjected());
-    assertEquals(2, m1Md.mapSchema().size());
+    assertEquals(2, m1Md.tupleSchema().size());
     assertTrue(m1Writer.column("a").isProjected());
     assertTrue(m1Writer.column("b").isProjected());
 
@@ -595,4 +597,102 @@
       assertTrue(e.getErrorType() == ErrorType.VALIDATION);
     }
   }
+
+  @Test
+  public void testDictProjection() {
+
+    final String dictName1 = "d1";
+    final String dictName2 = "d2";
+
+    // There is no test for case when obtaining a value by key as this is not as simple projection
+    // as it is in case of map - there is a need to find a value corresponding to a key
+    // (the functionality is currently present in DictReader) and final column schema should be
+    // changed from dict structure with `key` and `value` children to a simple `value`.
+    List<SchemaPath> selection = RowSetTestUtils.projectList(dictName1);
+    TupleMetadata schema = new SchemaBuilder()
+        .addDict(dictName1, MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .addDict(dictName2, MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(ProjectionSetFactory.build(selection))
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify the projected columns
+
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    ColumnMetadata dictMetadata1 = actualSchema.metadata(dictName1);
+    DictWriter dictWriter1 = rootWriter.dict(dictName1);
+    assertTrue(dictMetadata1.isDict());
+    assertTrue(dictWriter1.isProjected());
+    assertEquals(2, dictMetadata1.tupleSchema().size());
+    assertTrue(dictWriter1.keyWriter().isProjected());
+    assertTrue(dictWriter1.valueWriter().isProjected());
+
+    ColumnMetadata dictMetadata2 = actualSchema.metadata(dictName2);
+    DictWriter dictWriter2 = rootWriter.dict(dictName2);
+    assertTrue(dictMetadata2.isDict());
+    assertFalse(dictWriter2.isProjected());
+    assertEquals(2, dictMetadata2.tupleSchema().size());
+    assertFalse(dictWriter2.keyWriter().isProjected());
+    assertFalse(dictWriter2.valueWriter().isProjected());
+
+    // Write a couple of rows.
+
+    rsLoader.startBatch();
+    rootWriter.start();
+    rootWriter
+        .addRow(map( "a", 1, "b", 2), map( "c", 3, "d", 4))
+        .addRow(map("a", 11, "b", 12), map("c", 13, "d", 14));
+
+    // Verify. Only the projected columns appear in the result set.
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addDict(dictName1, MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(map( "a", 1, "b", 2))
+        .addRow(map("a", 11, "b", 12))
+        .build();
+    RowSetUtilities.verify(expected, fixture.wrap(rsLoader.harvest()));
+    rsLoader.close();
+  }
+
+  @Test
+  public void testDictStringKeyAccess() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col.a"); // the same as col['a'], but number is expected in brackets ([])
+    TupleMetadata schema = new SchemaBuilder()
+        .addDict("col", MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(ProjectionSetFactory.build(selection))
+        .setSchema(schema)
+        .build();
+    new ResultSetLoaderImpl(fixture.allocator(), options); // no validation error
+  }
+
+  @Test
+  public void testDictNumericKeyAccess() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("col[0]");
+    TupleMetadata schema = new SchemaBuilder()
+        .addDict("col", MinorType.INT)
+        .value(MinorType.VARCHAR)
+        .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(ProjectionSetFactory.build(selection))
+        .setSchema(schema)
+        .build();
+    new ResultSetLoaderImpl(fixture.allocator(), options); // no validation error
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
index 59be129..c8c2aa0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
@@ -28,11 +28,14 @@
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.DictWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
 import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
 import org.apache.drill.exec.vector.accessor.writer.MapWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -137,7 +140,7 @@
     // Create the writers
 
     {
-      TupleMetadata mapSchema = schema.metadata("m1").mapSchema();
+      TupleMetadata mapSchema = schema.metadata("m1").tupleSchema();
       List<AbstractObjectWriter> members = new ArrayList<>();
       members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("a"), null, null));
       members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("b"), null, null));
@@ -145,7 +148,7 @@
     }
 
     {
-      TupleMetadata mapSchema = schema.metadata("m2").mapSchema();
+      TupleMetadata mapSchema = schema.metadata("m2").tupleSchema();
       List<AbstractObjectWriter> members = new ArrayList<>();
       members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("c"), null, null));
       writers.add(MapWriter.buildMapWriter(schema.metadata("m2"), null, members));
@@ -185,4 +188,83 @@
     rootWriter.saveRow();
     rootWriter.endWrite();
   }
+
+  @Test
+  public void testDummyDict() {
+
+    final String dictName = "d";
+    final String dictArrayName = "da";
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addDict(dictName, MinorType.INT)
+          .repeatedValue(MinorType.VARCHAR)
+          .resumeSchema()
+        .addDictArray(dictArrayName, MinorType.VARCHAR)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+
+    List<AbstractObjectWriter> writers = new ArrayList<>();
+
+
+    final String keyFieldName = DictVector.FIELD_KEY_NAME;
+    final String valueFieldName = DictVector.FIELD_VALUE_NAME;
+
+    // Create key and value writers for dict
+
+    ColumnMetadata dictMetadata = schema.metadata(dictName);
+    TupleMetadata dictSchema = dictMetadata.tupleSchema();
+    List<AbstractObjectWriter> dictFields = new ArrayList<>();
+    dictFields.add(ColumnWriterFactory.buildColumnWriter(dictSchema.metadata(keyFieldName), null, null));
+    dictFields.add(ColumnWriterFactory.buildColumnWriter(dictSchema.metadata(valueFieldName), null, null));
+    writers.add(ObjectDictWriter.buildDict(dictMetadata, null, dictFields));
+
+    // Create key and value writers for dict array
+
+    ColumnMetadata dictArrayMetadata = schema.metadata(dictArrayName);
+    TupleMetadata dictArraySchema = dictArrayMetadata.tupleSchema();
+    List<AbstractObjectWriter> dictArrayFields = new ArrayList<>();
+    dictArrayFields.add(ColumnWriterFactory.buildColumnWriter(dictArraySchema.metadata(keyFieldName), null, null));
+    dictArrayFields.add(ColumnWriterFactory.buildColumnWriter(dictArraySchema.metadata(valueFieldName), null, null));
+    writers.add(ObjectDictWriter.buildDictArray(dictArrayMetadata, null, dictArrayFields));
+
+    AbstractTupleWriter rootWriter = new RootWriterFixture(schema, writers);
+
+    // Events are ignored.
+
+    rootWriter.startWrite();
+    rootWriter.startRow();
+
+    // Nothing is projected
+
+    DictWriter dictWriter = rootWriter.dict(dictName);
+    assertFalse(dictWriter.isProjected());
+    assertFalse(dictWriter.keyWriter().isProjected());
+    assertFalse(dictWriter.valueWriter().array().scalar().isProjected());
+
+    DictWriter dictWriter1 = rootWriter.array(dictArrayName).dict();
+    assertFalse(dictWriter1.isProjected());
+    assertFalse(dictWriter1.keyWriter().isProjected());
+    assertFalse(dictWriter1.valueWriter().scalar().isProjected());
+
+    // Dummy columns seem real.
+
+    rootWriter.dict(dictName).keyWriter().setInt(20);
+    rootWriter.dict(0).valueWriter().array().scalar().setString("foo");
+
+    // Dummy array dict seems real.
+
+    rootWriter.array(dictArrayName).dict().keyWriter().setString("foo");
+    rootWriter.array(dictArrayName).dict().valueWriter().scalar().setInt(30);
+    rootWriter.array(dictArrayName).save();
+    rootWriter.array(1).dict().keyWriter().setString("bar");
+    rootWriter.array(1).dict().valueWriter().scalar().setInt(40);
+    rootWriter.array(1).save();
+
+    // More ignored events.
+
+    rootWriter.restartRow();
+    rootWriter.saveRow();
+    rootWriter.endWrite();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
index d128fcd..4057f33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
@@ -80,7 +80,7 @@
     final TupleMetadata actualSchema = rootWriter.tupleSchema();
     assertEquals(3, actualSchema.size());
     assertTrue(actualSchema.metadata(1).isMap());
-    assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+    assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
     assertEquals(2, actualSchema.column("m").getChildren().size());
 
     // Write a row the way that clients will do.
@@ -230,11 +230,11 @@
     assertEquals(2, actualSchema.size());
     assertTrue(actualSchema.metadata(1).isArray());
     assertTrue(actualSchema.metadata(1).isMap());
-    assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+    assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
     assertEquals(2, actualSchema.column("m").getChildren().size());
     TupleWriter mapWriter = rootWriter.array("m").tuple();
-    assertSame(actualSchema.metadata("m").mapSchema(), mapWriter.schema().mapSchema());
-    assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+    assertSame(actualSchema.metadata("m").tupleSchema(), mapWriter.schema().tupleSchema());
+    assertSame(mapWriter.tupleSchema(), mapWriter.schema().tupleSchema());
     assertSame(mapWriter.tupleSchema().metadata(0), mapWriter.scalar(0).schema());
     assertSame(mapWriter.tupleSchema().metadata(1), mapWriter.scalar(1).schema());
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
index 4f4d0fc..975811e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
@@ -18,7 +18,9 @@
 package org.apache.drill.exec.physical.rowSet;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
 import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
@@ -26,7 +28,9 @@
 import static org.junit.Assert.fail;
 
 import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Map;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -38,13 +42,18 @@
 import org.apache.drill.exec.vector.VectorOverflowException;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.KeyAccessor;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.DictReader;
+import org.apache.drill.exec.vector.complex.DictVector;
 import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSetUtilities;
@@ -584,6 +593,502 @@
     RowSetUtilities.verify(expected, actual);
   }
 
+  @Test
+  public void testDictStructure() {
+    final String dictName = "d";
+
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("id", MinorType.INT)
+        .addDict(dictName, MinorType.INT)
+          .value(MinorType.VARCHAR) // required int
+          .resumeSchema()
+        .buildSchema();
+    final ExtendableRowSet rowSet = fixture.rowSet(schema);
+    final RowSetWriter writer = rowSet.writer();
+
+    // Dict
+    // Pick out components and lightly test. (Assumes structure
+    // tested earlier is still valid, so no need to exhaustively
+    // test again.)
+
+    assertEquals(ObjectType.ARRAY, writer.column(dictName).type());
+    assertTrue(writer.column(dictName).schema().isDict());
+
+    final ScalarWriter idWriter = writer.column(0).scalar();
+    final DictWriter dictWriter = writer.column(1).dict();
+
+    assertEquals(ValueType.INTEGER, dictWriter.keyType());
+    assertEquals(ObjectType.SCALAR, dictWriter.valueType());
+
+    final ScalarWriter keyWriter = dictWriter.keyWriter();
+    final ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+    assertEquals(ValueType.INTEGER, keyWriter.valueType());
+    assertEquals(ValueType.STRING, valueWriter.valueType());
+
+    // Write data
+    idWriter.setInt(1);
+
+    keyWriter.setInt(11);
+    valueWriter.setString("a");
+    dictWriter.save(); // Advance to next entry position
+    keyWriter.setInt(12);
+    valueWriter.setString("b");
+    dictWriter.save();
+    writer.save();
+
+    idWriter.setInt(2);
+
+    keyWriter.setInt(21);
+    valueWriter.setString("c");
+    dictWriter.save();
+    writer.save();
+
+    idWriter.setInt(3);
+
+    keyWriter.setInt(31);
+    valueWriter.setString("d");
+    dictWriter.save();
+    keyWriter.setInt(32);
+    valueWriter.setString("e");
+    dictWriter.save();
+    writer.save();
+
+    // Finish the row set and get a reader.
+
+    final SingleRowSet actual = writer.done();
+    final RowSetReader reader = actual.reader();
+
+    // Verify reader structure
+
+    assertEquals(ObjectType.ARRAY, reader.column(dictName).type());
+
+    final DictReader dictReader = reader.dict(1);
+    assertEquals(ObjectType.ARRAY, dictReader.type());
+
+    assertEquals(ValueType.INTEGER, dictReader.keyColumnType());
+    assertEquals(ObjectType.SCALAR, dictReader.valueColumnType());
+
+    // Row 1: get value reader with its position set to entry corresponding to a key
+
+    assertTrue(reader.next());
+    assertFalse(dictReader.isNull()); // dict itself is not null
+
+    dictReader.getAsString();
+
+    final KeyAccessor keyAccessor = dictReader.keyAccessor();
+    final ScalarReader valueReader = dictReader.valueReader().scalar();
+
+    assertTrue(keyAccessor.find(12));
+    assertEquals("b", valueReader.getString());
+    assertTrue(keyAccessor.find(11));
+    assertEquals("a", valueReader.getString());
+
+    // compare entire dict
+    Map<Object, Object> map = map(11, "a", 12, "b");
+    assertEquals(map, dictReader.getObject());
+
+    // Row 2
+
+    assertTrue(reader.next());
+    assertFalse(keyAccessor.find(22)); // the dict does not contain an entry with the key
+    assertTrue(keyAccessor.find(21));
+    assertEquals("c", valueReader.getString());
+
+    map = map(21, "c");
+    assertEquals(map, dictReader.getObject());
+
+    // Row 3
+
+    assertTrue(reader.next());
+
+    assertTrue(keyAccessor.find(31));
+    assertEquals("d", valueReader.getString());
+    assertFalse(keyAccessor.find(33));
+    assertTrue(keyAccessor.find(32));
+    assertEquals("e", valueReader.getString());
+
+    map = map(31, "d", 32, "e");
+    assertEquals(map, dictReader.getObject());
+
+    assertFalse(reader.next());
+
+    // Verify that the dict accessor's value count was set.
+
+    final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+    assertEquals(3, dictVector.getAccessor().getValueCount());
+
+    final SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(1, map(11, "a", 12, "b"))
+        .addRow(2, map(21, "c"))
+        .addRow(3, map(31, "d", 32, "e"))
+        .build();
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  @Test
+  public void testDictStructureMapValue() {
+    final String dictName = "d";
+    final int bScale = 1;
+
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("id", MinorType.INT)
+        .addDict(dictName, MinorType.INT)
+          .mapValue()
+            .add("a", MinorType.INT)
+            .add("b", MinorType.VARDECIMAL, 8, bScale)
+            .resumeDict()
+          .resumeSchema()
+        .buildSchema();
+    final ExtendableRowSet rowSet = fixture.rowSet(schema);
+    final RowSetWriter writer = rowSet.writer();
+
+    // Dict with Map value
+
+    assertEquals(ObjectType.ARRAY, writer.column(dictName).type());
+
+    final ScalarWriter idWriter = writer.scalar(0);
+    final DictWriter dictWriter = writer.column(1).dict();
+
+    assertEquals(ValueType.INTEGER, dictWriter.keyType());
+    assertEquals(ObjectType.TUPLE, dictWriter.valueType());
+
+    final ScalarWriter keyWriter = dictWriter.keyWriter();
+    final TupleWriter valueWriter = dictWriter.valueWriter().tuple();
+
+    assertEquals(ValueType.INTEGER, keyWriter.valueType());
+
+    ScalarWriter aWriter = valueWriter.scalar("a");
+    ScalarWriter bWriter = valueWriter.scalar("b");
+    assertEquals(ValueType.INTEGER, aWriter.valueType());
+    assertEquals(ValueType.DECIMAL, bWriter.valueType());
+
+    // Write data
+
+    idWriter.setInt(1);
+
+    keyWriter.setInt(11);
+    aWriter.setInt(10);
+    bWriter.setDecimal(BigDecimal.valueOf(1));
+    dictWriter.save(); // advance to next entry position
+
+    keyWriter.setInt(12);
+    aWriter.setInt(11);
+    bWriter.setDecimal(BigDecimal.valueOf(2));
+    dictWriter.save();
+
+    writer.save();
+
+    idWriter.setInt(2);
+
+    keyWriter.setInt(21);
+    aWriter.setInt(20);
+    bWriter.setDecimal(BigDecimal.valueOf(3));
+    dictWriter.save();
+
+    writer.save();
+
+    idWriter.setInt(3);
+
+    keyWriter.setInt(31);
+    aWriter.setInt(30);
+    bWriter.setDecimal(BigDecimal.valueOf(4));
+    dictWriter.save();
+
+    keyWriter.setInt(32);
+    aWriter.setInt(31);
+    bWriter.setDecimal(BigDecimal.valueOf(5));
+    dictWriter.save();
+
+    keyWriter.setInt(33);
+    aWriter.setInt(32);
+    bWriter.setDecimal(BigDecimal.valueOf(6));
+    dictWriter.save();
+
+    writer.save();
+
+    // Finish the row set and get a reader.
+
+    final SingleRowSet actual = writer.done();
+    final RowSetReader reader = actual.reader();
+
+    // Verify reader structure
+
+    assertEquals(ObjectType.ARRAY, reader.column(dictName).type());
+
+    final DictReader dictReader = reader.dict(1);
+    assertEquals(ObjectType.ARRAY, dictReader.type());
+
+    assertEquals(ValueType.INTEGER, dictReader.keyColumnType());
+    assertEquals(ObjectType.TUPLE, dictReader.valueColumnType());
+
+    final KeyAccessor keyAccessor = dictReader.keyAccessor();
+    final TupleReader valueReader = dictReader.valueReader().tuple();
+
+    // Row 1: get value reader with its position set to entry corresponding to a key
+
+    assertTrue(reader.next());
+    assertFalse(dictReader.isNull()); // dict itself is not null
+
+    assertTrue(keyAccessor.find(12));
+    assertEquals(11, valueReader.scalar("a").getInt());
+    assertEquals(BigDecimal.valueOf(2.0), valueReader.scalar("b").getDecimal());
+
+    // MapReader#getObject() returns a List containing values for each column
+    // rather than mapping of column name to it's value, hence List is expected for Dict's value.
+    Map<Object, Object> map = map(
+        11, Arrays.asList(10, BigDecimal.valueOf(1.0)),
+        12, Arrays.asList(11, BigDecimal.valueOf(2.0))
+    );
+    assertEquals(map, dictReader.getObject());
+
+    // Row 2
+
+    assertTrue(reader.next());
+    assertFalse(keyAccessor.find(222));
+    assertTrue(keyAccessor.find(21));
+    assertEquals(Arrays.asList(20, BigDecimal.valueOf(3.0)), valueReader.getObject());
+
+    map = map(21, Arrays.asList(20, BigDecimal.valueOf(3.0)));
+    assertEquals(map, dictReader.getObject());
+
+    // Row 3
+
+    assertTrue(reader.next());
+
+    assertTrue(keyAccessor.find(32));
+    assertFalse(valueReader.isNull());
+    assertEquals(31, valueReader.scalar("a").getInt());
+    assertEquals(BigDecimal.valueOf(5.0), valueReader.scalar("b").getDecimal());
+
+    assertTrue(keyAccessor.find(31));
+    assertEquals(30, valueReader.scalar("a").getInt());
+    assertEquals(BigDecimal.valueOf(4.0), valueReader.scalar("b").getDecimal());
+
+    assertFalse(keyAccessor.find(404));
+
+    map = map(
+        31, Arrays.asList(30, BigDecimal.valueOf(4.0)),
+        32, Arrays.asList(31, BigDecimal.valueOf(5.0)),
+        33, Arrays.asList(32, BigDecimal.valueOf(6.0))
+    );
+    assertEquals(map, dictReader.getObject());
+
+    assertFalse(reader.next());
+
+    // Verify that the dict accessor's value count was set.
+
+    final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+    assertEquals(3, dictVector.getAccessor().getValueCount());
+
+    final SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(1, map(
+            11, objArray(10, BigDecimal.valueOf(1.0)),
+            12, objArray(11, BigDecimal.valueOf(2.0))
+        ))
+        .addRow(2, map(21, objArray(20, BigDecimal.valueOf(3.0))))
+        .addRow(3, map(
+            31, objArray(30, BigDecimal.valueOf(4.0)),
+            32, objArray(31, BigDecimal.valueOf(5.0)),
+            33, objArray(32, BigDecimal.valueOf(6.0))
+        ))
+        .build();
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  @Test
+  public void testRepeatedDictStructure() {
+    final String dictName = "d";
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("id", MinorType.INT)
+        .addDictArray(dictName, MinorType.FLOAT4)
+          .value(MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    final ExtendableRowSet rowSet = fixture.rowSet(schema);
+    final RowSetWriter writer = rowSet.writer();
+
+    // Repeated dict
+
+    assertEquals(ObjectType.ARRAY, writer.column(dictName).type());
+
+    final ScalarWriter idWriter = writer.scalar(0);
+
+    final ArrayWriter dictArrayWriter = writer.column(1).array();
+    assertEquals(ObjectType.ARRAY, dictArrayWriter.entryType());
+
+    DictWriter dictWriter = dictArrayWriter.dict();
+
+    assertEquals(ValueType.DOUBLE, dictWriter.keyType());
+    assertEquals(ObjectType.SCALAR, dictWriter.valueType());
+
+    final ScalarWriter keyWriter = dictWriter.keyWriter();
+    final ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+    assertEquals(ValueType.DOUBLE, keyWriter.valueType());
+    assertEquals(ValueType.STRING, valueWriter.valueType());
+
+    // Write data
+
+    idWriter.setInt(1);
+
+    keyWriter.setDouble(1);
+    valueWriter.setString("a");
+    dictWriter.save(); // advance to next entry position
+    keyWriter.setDouble(2);
+    valueWriter.setString("b");
+    dictWriter.save();
+    dictArrayWriter.save(); // advance to next array position
+
+    keyWriter.setDouble(3);
+    valueWriter.setString("c");
+    dictWriter.save();
+    dictArrayWriter.save();
+
+    writer.save(); // advance to next row
+
+    idWriter.setInt(2);
+
+    keyWriter.setDouble(11);
+    valueWriter.setString("d");
+    dictWriter.save();
+    keyWriter.setDouble(12);
+    valueWriter.setString("e");
+    dictWriter.save();
+    dictArrayWriter.save();
+
+    writer.save();
+
+    idWriter.setInt(3);
+
+    keyWriter.setDouble(21);
+    valueWriter.setString("f");
+    dictWriter.save();
+    keyWriter.setDouble(22);
+    valueWriter.setString("g");
+    dictWriter.save();
+    keyWriter.setDouble(23);
+    valueWriter.setString("h");
+    dictWriter.save();
+    dictArrayWriter.save();
+
+    keyWriter.setDouble(24);
+    valueWriter.setString("i");
+    dictWriter.save();
+    keyWriter.setDouble(25);
+    valueWriter.setString("j");
+    dictWriter.save();
+    keyWriter.setDouble(26.5);
+    valueWriter.setString("k");
+    dictWriter.save();
+    keyWriter.setDouble(27);
+    valueWriter.setString("l");
+    dictWriter.save();
+    keyWriter.setDouble(28);
+    valueWriter.setString("m");
+    dictWriter.save();
+    dictArrayWriter.save();
+
+    writer.save();
+
+    // Finish the row set and get a reader.
+
+    final SingleRowSet actual = writer.done();
+    final RowSetReader reader = actual.reader();
+
+    // Verify reader structure
+
+    assertEquals(ObjectType.ARRAY, reader.column(dictName).type());
+
+    final ArrayReader dictArrayReader = reader.array(1);
+    assertEquals(ObjectType.ARRAY, dictArrayReader.entryType());
+
+    final DictReader dictReader = dictArrayReader.entry().dict();
+    assertEquals(ValueType.DOUBLE, dictReader.keyColumnType());
+    assertEquals(ObjectType.SCALAR, dictReader.valueColumnType());
+
+    final KeyAccessor keyAccessor = dictReader.keyAccessor();
+    final ScalarReader valueReader = dictReader.valueReader().scalar();
+
+    // Row 1
+
+    assertTrue(reader.next());
+    assertFalse(dictArrayReader.isNull()); // array is not null
+
+    assertTrue(dictArrayReader.next());
+    assertFalse(dictArrayReader.isNull()); // first dict is not null
+
+    assertTrue(keyAccessor.find(2.0f));
+    assertEquals("b", valueReader.getObject());
+    assertTrue(keyAccessor.find(1.0f));
+    assertEquals("a", valueReader.getObject());
+    assertFalse(keyAccessor.find(1.1f)); // no entry for given key
+
+    assertTrue(dictArrayReader.next());
+
+    assertTrue(keyAccessor.find(3.0f));
+    assertEquals("c", valueReader.getObject());
+    assertFalse(keyAccessor.find(1.0f));
+
+    assertEquals(Arrays.asList(map(1.0, "a", 2.0, "b"), map(3.0, "c")), dictArrayReader.getObject());
+
+    // Row 2
+
+    assertTrue(reader.next());
+
+    assertTrue(dictArrayReader.next());
+
+    assertTrue(keyAccessor.find(11.0f));
+    assertEquals("d", valueReader.getString());
+    assertFalse(keyAccessor.find(1.0f));
+    assertTrue(keyAccessor.find(12.0f));
+    assertEquals("e", valueReader.getString());
+
+    // Row 3: use explicit positioning
+
+    assertTrue(reader.next());
+    dictArrayReader.setPosn(1);
+    assertTrue(keyAccessor.find(24.0f));
+    assertEquals("i", valueReader.getString());
+    assertTrue(keyAccessor.find(26.5f));
+    assertEquals("k", valueReader.getString());
+    assertTrue(keyAccessor.find(28.0f));
+    assertEquals("m", valueReader.getString());
+    assertFalse(keyAccessor.find(35.0f));
+    assertTrue(keyAccessor.find(27.0f));
+    assertEquals("l", valueReader.getString());
+
+    Map<Object, Object> element1 = map(24.0, "i", 25.0, "j", 26.5, "k", 27.0, "l", 28.0, "m");
+    assertEquals(element1, dictReader.getObject());
+
+    dictArrayReader.setPosn(0);
+    assertTrue(keyAccessor.find(23.0f));
+    assertEquals("h", valueReader.getObject());
+    assertTrue(keyAccessor.find(21.0f));
+    assertEquals("f", valueReader.getObject());
+    assertFalse(keyAccessor.find(23.05f));
+
+    Map<Object, Object> element0 = map(21.0, "f", 22.0, "g", 23.0, "h");
+    assertEquals(element0, dictReader.getObject());
+
+    assertEquals(Arrays.asList(element0, element1), dictArrayReader.getObject());
+
+    assertFalse(reader.next());
+
+    // Verify that the dict accessor's value count was set.
+
+    final RepeatedDictVector vector = (RepeatedDictVector) actual.container().getValueVector(1).getValueVector();
+    assertEquals(3, vector.getAccessor().getValueCount());
+
+    final SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(1, objArray(map(1.0f, "a", 2.0f, "b"), map(3.0f, "c")))
+        .addRow(2, objArray(singleObjArray(map(11.0f, "d", 12.0f, "e"))))
+        .addRow(3, objArray(
+            map(21.0f, "f", 22.0f, "g", 23.0f, "h"),
+            map(24.0f, "i", 25.0f, "j", 26.5f, "k", 27.0f, "l", 28.0f, "m")))
+        .build();
+    RowSetUtilities.verify(expected, actual);
+  }
+
   /**
    * Test an array of ints (as an example fixed-width type)
    * at the top level of a schema.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
index 673050a..1996d05 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
@@ -135,7 +135,7 @@
     assertTrue(m.isMap());
     assertEquals(DataMode.REQUIRED, m.mode());
 
-    TupleMetadata mapSchema = m.mapSchema();
+    TupleMetadata mapSchema = m.tupleSchema();
     assertNotNull(mapSchema);
     assertEquals(4, mapSchema.size());
 
@@ -281,6 +281,40 @@
   }
 
   /**
+   * Tests creating a dict within a row.
+   * Also the basic dict add key and value columns methods.
+   */
+  @Test
+  public void testDictInRow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addDict("d", MinorType.VARCHAR)
+          .nullableValue(MinorType.FLOAT8)
+          .resumeSchema()
+        .buildSchema();
+
+    assertEquals(1, schema.size());
+
+    ColumnMetadata d = schema.metadata(0);
+    assertEquals("d", d.name());
+    assertTrue(d.isDict());
+    assertEquals(DataMode.REQUIRED, d.mode());
+
+    TupleMetadata dictSchema = d.tupleSchema();
+    assertNotNull(dictSchema);
+    assertEquals(2, dictSchema.size());
+
+    ColumnMetadata keyMetadata = dictSchema.metadata(0);
+    assertEquals("key", keyMetadata.name());
+    assertEquals(MinorType.VARCHAR, keyMetadata.type());
+    assertEquals(DataMode.REQUIRED, keyMetadata.mode());
+
+    ColumnMetadata valueMetadata = dictSchema.metadata(1);
+    assertEquals("value", valueMetadata.name());
+    assertEquals(MinorType.FLOAT8, valueMetadata.type());
+    assertEquals(DataMode.OPTIONAL, valueMetadata.mode());
+  }
+
+  /**
    * Test methods to provide a width (precision) for VarChar
    * columns. The schema builder does not provide shortcuts for
    * VarChar in lists, unions or repeated lists because these
@@ -304,7 +338,7 @@
 
     assertEquals(21, schema.metadata("a").precision());
     assertEquals(22, schema.metadata("b").precision());
-    TupleMetadata mapSchema = schema.metadata("m").mapSchema();
+    TupleMetadata mapSchema = schema.metadata("m").tupleSchema();
     assertEquals(23, mapSchema.metadata("c").precision());
     assertEquals(24, mapSchema.metadata("d").precision());
   }
@@ -343,7 +377,7 @@
     assertEquals(7, c.precision());
     assertEquals(4, c.scale());
 
-    ColumnMetadata d = schema.metadata("m").mapSchema().metadata("d");
+    ColumnMetadata d = schema.metadata("m").tupleSchema().metadata("d");
     assertEquals(DataMode.OPTIONAL, d.mode());
     assertEquals(8, d.precision());
     assertEquals(1, d.scale());
@@ -395,13 +429,13 @@
     assertEquals(38, g.precision());
     assertEquals(4, g.scale());
 
-    ColumnMetadata d = schema.metadata("m").mapSchema().metadata("d");
+    ColumnMetadata d = schema.metadata("m").tupleSchema().metadata("d");
     assertEquals(MinorType.VARDECIMAL, d.type());
     assertEquals(DataMode.OPTIONAL, d.mode());
     assertEquals(8, d.precision());
     assertEquals(1, d.scale());
 
-    ColumnMetadata f = schema.metadata("m").mapSchema().metadata("f");
+    ColumnMetadata f = schema.metadata("m").tupleSchema().metadata("f");
     assertEquals(MinorType.VARDECIMAL, f.type());
     assertEquals(DataMode.REQUIRED, f.mode());
     assertEquals(38, f.precision());
@@ -463,8 +497,8 @@
           .resumeSchema()
         .buildSchema();
 
-    TupleMetadata m1Schema = schema.metadata("m1").mapSchema();
-    TupleMetadata m2Schema = m1Schema.metadata("m2").mapSchema();
+    TupleMetadata m1Schema = schema.metadata("m1").tupleSchema();
+    TupleMetadata m2Schema = m1Schema.metadata("m2").tupleSchema();
 
     ColumnMetadata a = m2Schema.metadata(0);
     assertEquals("a", a.name());
@@ -490,7 +524,7 @@
           .resumeSchema()
         .buildSchema();
 
-    TupleMetadata m1Schema = schema.metadata("m1").mapSchema();
+    TupleMetadata m1Schema = schema.metadata("m1").tupleSchema();
     VariantMetadata uSchema = m1Schema.metadata("u").variantSchema();
 
     assertTrue(uSchema.hasType(MinorType.INT));
@@ -516,7 +550,7 @@
           .resumeSchema()
         .buildSchema();
 
-    TupleMetadata m1Schema = schema.metadata("m1").mapSchema();
+    TupleMetadata m1Schema = schema.metadata("m1").tupleSchema();
 
     ColumnMetadata r = m1Schema.metadata(0);
     assertEquals("r", r.name());
@@ -550,7 +584,7 @@
     ColumnMetadata mapType = variant.member(MinorType.MAP);
     assertNotNull(mapType);
 
-    TupleMetadata mapSchema = mapType.mapSchema();
+    TupleMetadata mapSchema = mapType.tupleSchema();
     assertEquals(2, mapSchema.size());
 
     assertTrue(variant.hasType(MinorType.FLOAT8));
@@ -626,7 +660,7 @@
     ColumnMetadata list = schema.metadata("x");
     ColumnMetadata mapCol = list.childSchema();
     assertTrue(mapCol.isMap());
-    TupleMetadata mapSchema = mapCol.mapSchema();
+    TupleMetadata mapSchema = mapCol.tupleSchema();
 
     ColumnMetadata a = mapSchema.metadata("a");
     assertEquals(MinorType.INT, a.type());
@@ -715,7 +749,7 @@
     assertTrue(columnMetadata.isNullable());
     assertEquals("m1", columnMetadata.name());
 
-    TupleMetadata schema = columnMetadata.mapSchema();
+    TupleMetadata schema = columnMetadata.tupleSchema();
 
     ColumnMetadata col0 = schema.metadata(0);
     assertEquals("b", col0.name());
@@ -727,7 +761,7 @@
     assertTrue(col1.isMap());
     assertFalse(col1.isNullable());
 
-    ColumnMetadata child = col1.mapSchema().metadata(0);
+    ColumnMetadata child = col1.tupleSchema().metadata(0);
     assertEquals("v", child.name());
     assertEquals(MinorType.VARCHAR, child.type());
     assertTrue(child.isNullable());
@@ -751,7 +785,7 @@
     assertTrue(child.isArray());
     assertTrue(child.isMap());
 
-    TupleMetadata mapSchema = child.mapSchema();
+    TupleMetadata mapSchema = child.tupleSchema();
 
     ColumnMetadata col0 = mapSchema.metadata(0);
     assertEquals("v", col0.name());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
index 6502d83..d6237e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
@@ -67,7 +67,7 @@
     // Generic checks
 
     assertEquals(ColumnMetadata.StructureType.PRIMITIVE, col.structureType());
-    assertNull(col.mapSchema());
+    assertNull(col.tupleSchema());
     assertTrue(field.isEquivalent(col.schema()));
     assertEquals(field.getName(), col.name());
     assertEquals(field.getType().getMinorType(), col.type());
@@ -160,7 +160,7 @@
     ColumnMetadata col = MetadataUtils.fromField(field);
 
     assertEquals(ColumnMetadata.StructureType.PRIMITIVE, col.structureType());
-    assertNull(col.mapSchema());
+    assertNull(col.tupleSchema());
     assertFalse(col.isNullable());
     assertFalse(col.isArray());
     assertTrue(col.isVariableWidth());
@@ -271,9 +271,9 @@
     ColumnMetadata col = MetadataUtils.fromField(field);
 
     assertTrue(col instanceof MapColumnMetadata);
-    assertNotNull(col.mapSchema());
-    assertEquals(0, col.mapSchema().size());
-    assertSame(col, col.mapSchema().parent());
+    assertNotNull(col.tupleSchema());
+    assertEquals(0, col.tupleSchema().size());
+    assertSame(col, col.tupleSchema().parent());
 
     MapColumnMetadata mapCol = (MapColumnMetadata) col;
     assertNull(mapCol.parentTuple());
@@ -301,8 +301,8 @@
     ColumnMetadata col = MetadataUtils.fromField(field);
 
     assertTrue(col instanceof MapColumnMetadata);
-    assertNotNull(col.mapSchema());
-    assertEquals(0, col.mapSchema().size());
+    assertNotNull(col.tupleSchema());
+    assertEquals(0, col.tupleSchema().size());
 
     assertFalse(col.isNullable());
     assertTrue(col.isArray());
@@ -548,15 +548,15 @@
 
     MaterializedField fieldA = SchemaBuilder.columnSchema("a", MinorType.MAP, DataMode.REQUIRED);
     ColumnMetadata colA = root.add(fieldA);
-    TupleMetadata mapA = colA.mapSchema();
+    TupleMetadata mapA = colA.tupleSchema();
 
     MaterializedField fieldB = SchemaBuilder.columnSchema("b.x", MinorType.MAP, DataMode.REQUIRED);
     ColumnMetadata colB = mapA.add(fieldB);
-    TupleMetadata mapB = colB.mapSchema();
+    TupleMetadata mapB = colB.tupleSchema();
 
     MaterializedField fieldC = SchemaBuilder.columnSchema("c.y", MinorType.MAP, DataMode.REQUIRED);
     ColumnMetadata colC = mapB.add(fieldC);
-    TupleMetadata mapC = colC.mapSchema();
+    TupleMetadata mapC = colC.tupleSchema();
 
     MaterializedField fieldD = SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED);
     ColumnMetadata colD = mapC.add(fieldD);
@@ -602,7 +602,7 @@
     // Copying should be deep.
 
     TupleMetadata root2 = root.copy();
-    assertEquals(2, root2.metadata(0).mapSchema().metadata(0).mapSchema().metadata(0).mapSchema().size());
+    assertEquals(2, root2.metadata(0).tupleSchema().metadata(0).tupleSchema().metadata(0).tupleSchema().size());
     assert(root.isEquivalent(root2));
 
     // Generate a materialized field and compare.
@@ -640,11 +640,11 @@
 
     // Get the parts.
 
-    TupleMetadata mapA = colA.mapSchema();
+    TupleMetadata mapA = colA.tupleSchema();
     ColumnMetadata colB = mapA.metadata("b.x");
-    TupleMetadata mapB = colB.mapSchema();
+    TupleMetadata mapB = colB.tupleSchema();
     ColumnMetadata colC = mapB.metadata("c.y");
-    TupleMetadata mapC = colC.mapSchema();
+    TupleMetadata mapC = colC.tupleSchema();
     ColumnMetadata colD = mapC.metadata("d");
     ColumnMetadata colE = mapC.metadata("e");
 
@@ -800,7 +800,7 @@
     assertTrue(union.hasType(MinorType.LIST));
 
     ColumnMetadata mapCol = union.member(MinorType.MAP);
-    TupleMetadata mapSchema = mapCol.mapSchema();
+    TupleMetadata mapSchema = mapCol.tupleSchema();
     assertEquals(2, mapSchema.size());
 
     ColumnMetadata listCol = union.member(MinorType.LIST);
@@ -933,4 +933,58 @@
     schema.removeProperty("missing_prop");
     assertEquals(1, schema.properties().size());
   }
+
+  @Test
+  public void testDictColumn() {
+    MaterializedField field = SchemaBuilder.columnSchema("d", MinorType.DICT, DataMode.REQUIRED);
+    ColumnMetadata col = MetadataUtils.fromField(field);
+
+    assertTrue(col instanceof DictColumnMetadata);
+    assertNotNull(col.tupleSchema());
+    assertEquals(0, col.tupleSchema().size());
+    assertSame(col, col.tupleSchema().parent());
+
+    DictColumnMetadata dictCol = (DictColumnMetadata) col;
+    assertNull(dictCol.parentTuple());
+
+    assertEquals(ColumnMetadata.StructureType.DICT, col.structureType());
+    assertFalse(col.isNullable());
+    assertFalse(col.isArray());
+    assertFalse(col.isVariableWidth());
+    assertTrue(col.isDict());
+    assertFalse(col.isVariant());
+
+    assertEquals(0, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(0, col.expectedWidth());
+
+    assertEquals(1, col.expectedElementCount());
+    col.setExpectedElementCount(2);
+    assertEquals(1, col.expectedElementCount());
+  }
+
+  @Test
+  public void testRepeatedDictColumn() {
+    MaterializedField field = SchemaBuilder.columnSchema("da", MinorType.DICT, DataMode.REPEATED);
+    ColumnMetadata col = MetadataUtils.fromField(field);
+
+    assertTrue(col instanceof DictColumnMetadata);
+    assertNotNull(col.tupleSchema());
+    assertEquals(0, col.tupleSchema().size());
+
+    assertFalse(col.isNullable());
+    assertTrue(col.isArray());
+    assertFalse(col.isVariableWidth());
+    assertTrue(col.isDict());
+    assertFalse(col.isVariant());
+
+    assertEquals(0, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(0, col.expectedWidth());
+
+    assertEquals(ColumnMetadata.DEFAULT_ARRAY_SIZE, col.expectedElementCount());
+
+    col.setExpectedElementCount(2);
+    assertEquals(2, col.expectedElementCount());
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
index c17e20c..5a634fd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -766,6 +766,23 @@
   }
 
   @Test
+  public void testMapSchemaArrayValueGetByKeyElementByIndex() throws Exception {
+    String sql = "select map_array_value['key1'][3] element from dfs.`%s`";
+
+    TestBuilder testBuilder = testBuilder()
+        .sqlQuery(sql, generateMapSchema().getFileName())
+        .unOrdered()
+        .baselineColumns("element");
+
+    for (int i = 0; i < RECORD_COUNT; i++) {
+      double val = (double) (i + 1) * 3;
+      testBuilder.baselineValues(val);
+    }
+
+    testBuilder.go();
+  }
+
+  @Test
   public void testMapSchemaValueInFilter() throws Exception {
     String sql = "select map_field['key1'] val from dfs.`%s` where map_field['key1'] < %d";
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index f5b9cd7..54e8252 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -642,6 +642,17 @@
   }
 
   @Test
+  public void testDictArrayTypeOf() throws Exception {
+    String query = "select typeof(map_array) as type from cp.`store/parquet/complex/map/parquet/000000_0.parquet` limit 1";
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("type")
+        .baselineValuesForSingleColumn("ARRAY<DICT<BIGINT,INT>>")
+        .go();
+  }
+
+  @Test
   public void testDictTypeOf() throws Exception {
     String query = "select typeof(map_array[0]) as type from cp.`store/parquet/complex/map/parquet/000000_0.parquet` limit 1";
     testBuilder()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index b1c196b..4b6d35a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -21,6 +21,8 @@
 import static org.junit.Assert.assertTrue;
 
 import java.math.BigDecimal;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -262,4 +264,21 @@
   public static BigDecimal dec(String value) {
     return new BigDecimal(value);
   }
+
+  /**
+   * Convenience method to bootstrap a map object given key-value sequence.
+   *
+   * @param entry key-value sequence
+   * @return map containing key-value pairs from passed sequence
+   */
+  public static Map<Object, Object> map(Object... entry) {
+    assert entry.length % 2 == 0 : "Array length should be even.";
+
+    // LinkedHashMap is chosen to preserve entry order
+    Map<Object, Object> map = new LinkedHashMap<>();
+    for (int i = 0; i < entry.length; i += 2) {
+      map.put(entry[i], entry[i + 1]);
+    }
+    return map;
+  }
 }
diff --git a/exec/vector/src/main/codegen/templates/KeyAccessors.java b/exec/vector/src/main/codegen/templates/KeyAccessors.java
new file mode 100644
index 0000000..39b3b33
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/KeyAccessors.java
@@ -0,0 +1,106 @@
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/accessor/KeyAccessors.java" />
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * This class is generated using Freemarker and the ${.template_name} template.
+ */
+public class KeyAccessors {
+
+  <#list [
+      {"name": "Boolean", "javaType": "boolean"},
+      {"name": "Integer", "javaType": "int"},
+      {"name": "Long", "javaType": "long"},
+      {"name": "Double", "javaType": "double"},
+      {"name": "String", "javaType": "String"},
+      {"name": "Bytes", "javaType": "byte[]"},
+      {"name": "Decimal", "javaType": "BigDecimal"},
+      {"name": "Period", "javaType": "Period"},
+      {"name": "Date", "javaType": "LocalDate"},
+      {"name": "Time", "javaType": "LocalTime"},
+      {"name": "Timestamp", "javaType": "Instant"}
+    ] as valueType>
+
+  public static class ${valueType.name}KeyAccessor extends AbstractKeyAccessor {
+
+    public ${valueType.name}KeyAccessor(DictReader dictReader, ScalarReader keyReader) {
+      super(dictReader, keyReader);
+    }
+
+    @Override
+    public boolean find(${valueType.javaType} key) {
+      dictReader.rewind();
+      while (dictReader.next()) {
+        <#if valueType.name == "Bytes">
+        if (Arrays.equals(key, keyReader.getBytes())) {
+        <#elseif valueType.name == "Integer">
+        if (key == keyReader.getInt()) {
+        <#elseif valueType.name == "Boolean" || valueType.name == "Long" || valueType.name == "Double">
+        if (key == keyReader.get${valueType.name}()) {
+        <#else>
+        if (key.equals(keyReader.get${valueType.name}())) { // key should not be null so it is safe
+        </#if>
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+  </#list>
+
+  public static KeyAccessor getAccessor(DictReader dictReader, ScalarReader keyReader) {
+    switch (keyReader.valueType()) {
+      case BOOLEAN:
+        return new BooleanKeyAccessor(dictReader, keyReader);
+      case BYTES:
+        return new BytesKeyAccessor(dictReader, keyReader);
+      case DECIMAL:
+        return new DecimalKeyAccessor(dictReader, keyReader);
+      case DOUBLE:
+        return new DoubleKeyAccessor(dictReader, keyReader);
+      case INTEGER:
+        return new IntegerKeyAccessor(dictReader, keyReader);
+      case LONG:
+        return new LongKeyAccessor(dictReader, keyReader);
+      case PERIOD:
+        return new PeriodKeyAccessor(dictReader, keyReader);
+      case STRING:
+        return new StringKeyAccessor(dictReader, keyReader);
+      case DATE:
+        return new DateKeyAccessor(dictReader, keyReader);
+      case TIME:
+        return new TimeKeyAccessor(dictReader, keyReader);
+      case TIMESTAMP:
+        return new TimestampKeyAccessor(dictReader, keyReader);
+      default:
+        throw new IllegalStateException("Unexpected key type: " + keyReader.valueType());
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index 8d7f9d0..99bcdb3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -153,7 +153,7 @@
   }
 
   @Override
-  public TupleMetadata mapSchema() { return null; }
+  public TupleMetadata tupleSchema() { return null; }
 
   @Override
   public VariantMetadata variantSchema() { return null; }
@@ -279,9 +279,9 @@
       buf.append(", variant: ")
          .append(variantSchema().toString());
     }
-    if (mapSchema() != null) {
+    if (tupleSchema() != null) {
       buf.append(", schema: ")
-         .append(mapSchema().toString());
+         .append(tupleSchema().toString());
     }
     if (hasProperties()) {
       buf.append(", properties: ")
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
index 1ae0007..6f79336 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
@@ -84,7 +84,7 @@
   }
 
   @Override
-  public TupleMetadata mapSchema() {
+  public TupleMetadata tupleSchema() {
     return schema;
   }
 
@@ -123,7 +123,7 @@
     }
     builder.append(getStringType())
         .append("<").append(
-            mapSchema().toMetadataList().stream()
+            tupleSchema().toMetadataList().stream()
               .map(ColumnMetadata::columnString)
               .collect(Collectors.joining(", "))
         )
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index c8bc0fd..a56193e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -118,7 +118,12 @@
      * the code has evolved.
      */
 
-    MULTI_ARRAY
+    MULTI_ARRAY,
+
+    /**
+     * Dict or repeated dict.
+     */
+    DICT
   }
 
   int DEFAULT_ARRAY_SIZE = 10;
@@ -131,7 +136,7 @@
    * @return the tuple schema
    */
 
-  TupleMetadata mapSchema();
+  TupleMetadata tupleSchema();
 
   /**
    * Schema for <tt>VARIANT</tt> columns.
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictBuilder.java
new file mode 100644
index 0000000..f5af894
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictBuilder.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.complex.DictVector;
+
+/**
+ * Internal structure for building a dict. Dict is an array of key-value pairs
+ * with defined types for key and value (key and value fields are defined within {@link TupleSchema}).
+ * Key can be {@link org.apache.drill.common.types.TypeProtos.DataMode#REQUIRED} primitive,
+ * while value can be primitive or complex.
+ * <p>Column is added to the parent container during creation
+ * and all <tt>resumeXXX</tt> methods return qualified parent container.</p>
+ *
+ * @see DictVector
+ */
+public class DictBuilder implements SchemaContainer {
+
+  /**
+   * Schema containing key and value fields' definition.
+   */
+  private final TupleSchema schema = new TupleSchema();
+  private final SchemaContainer parent;
+  private final String name;
+  private final TypeProtos.DataMode mode;
+
+  public DictBuilder(SchemaContainer parent, String name, TypeProtos.DataMode mode) {
+    this.parent = parent;
+    this.name = name;
+    this.mode = mode;
+  }
+
+  @Override
+  public void addColumn(ColumnMetadata column) {
+    // As dict does not support complex key, this method is used to
+    // nest a complex value only
+    if (!DictVector.FIELD_VALUE_NAME.equals(column.name())) {
+      String message = String.format(
+          "Expected column with name '%s'. Found: '%s'.", DictVector.FIELD_VALUE_NAME, column.name());
+      throw new IllegalArgumentException(message);
+    }
+
+    if (isFieldSet(column.name())) {
+      String message = String.format("Field '%s' is already defined in dict.", column.name());
+      throw new IllegalArgumentException(message);
+    }
+
+    schema.addColumn(column);
+  }
+
+  public DictBuilder key(TypeProtos.MinorType type) {
+    TypeProtos.MajorType keyType = Types.withMode(type, TypeProtos.DataMode.REQUIRED);
+    return key(keyType);
+  }
+
+  /**
+   * Use this method to set types with width or scale and precision,
+   * e.g. {@link org.apache.drill.common.types.TypeProtos.MinorType#VARDECIMAL} with scale and precision or
+   * {@link org.apache.drill.common.types.TypeProtos.MinorType#VARCHAR} etc.
+   *
+   * @param type desired type for key
+   * @return {@code this} builder
+   * @throws IllegalStateException if key field is already set
+   * @throws IllegalArgumentException if {@code type} is not supported (either complex or nullable)
+   */
+  public DictBuilder key(TypeProtos.MajorType type) {
+    final String fieldName = DictVector.FIELD_KEY_NAME;
+    if (isFieldSet(fieldName)) {
+      throw new IllegalStateException(String.format("Filed '%s' is already defined.", fieldName));
+    }
+
+    if (!isSupportedKeyType(type)) {
+      throw new IllegalArgumentException(
+          String.format("'%s' in dict should be non-nullable primitive. Found: %s", fieldName, type));
+    }
+
+    addField(fieldName, type);
+    return this;
+  }
+
+  /**
+   * Checks if the field identified by name was already set.
+   *
+   * @param name name of the field
+   * @return {@code true} if the schema contains field with the {@code name}; {@code false} otherwise.
+   */
+  private boolean isFieldSet(String name) {
+    return schema.index(name) != -1;
+  }
+
+  public DictBuilder value(TypeProtos.MinorType type) {
+    return value(type, TypeProtos.DataMode.REQUIRED);
+  }
+
+  public DictBuilder nullableValue(TypeProtos.MinorType type) {
+    return value(type, TypeProtos.DataMode.OPTIONAL);
+  }
+
+  public DictBuilder repeatedValue(TypeProtos.MinorType type) {
+    return value(type, TypeProtos.DataMode.REPEATED);
+  }
+
+  private DictBuilder value(TypeProtos.MinorType type, TypeProtos.DataMode mode) {
+    TypeProtos.MajorType valueType = Types.withMode(type, mode);
+    return value(valueType);
+  }
+
+  /**
+   * Define non-complex value type. For complex types use {@link #mapValue()}, {@link #mapArrayValue()} etc.
+   *
+   * @param type desired non-complex type for value.
+   * @return {@code this} builder
+   * @throws IllegalStateException if value is already set
+   * @throws IllegalArgumentException if {@code type} is either {@code MAP},
+   *                                  {@code LIST}, {@code DICT} or {@code UNION}.
+   * @see #mapValue() method to define value as {@code MAP}
+   * @see #mapArrayValue() method to define value as {@code REPEATED MAP}
+   * @see #listValue() method to define value as {@code LIST}
+   * @see #unionValue() method to define value as {@code UNION}
+   * @see #dictValue() method to define value as {@code DICT}
+   * @see #dictArrayValue() method to define value as {@code REPEATED DICT}
+   */
+  public DictBuilder value(TypeProtos.MajorType type) {
+    final String fieldName = DictVector.FIELD_VALUE_NAME;
+    if (isFieldSet(fieldName)) {
+      throw new IllegalStateException(String.format("Field '%s' is already defined.", fieldName));
+    }
+
+    if (Types.isComplex(type) || Types.isUnion(type)) {
+      String msg = String.format("Complex type found %s when defining '%s'. " +
+          "Use mapValue(), listValue() etc. in case of complex value type.", fieldName, type);
+      throw new IllegalArgumentException(msg);
+    }
+
+    addField(fieldName, type);
+    return this;
+  }
+
+  /**
+   * Adds field (either key or value) after validation to the schema.
+   *
+   * @param name name of the field
+   * @param type type of the field
+   */
+  private void addField(String name, TypeProtos.MajorType type) {
+    ColumnBuilder builder = new ColumnBuilder(name, type.getMinorType())
+        .setMode(type.getMode());
+
+    if (type.hasScale()) {
+      builder.setPrecisionAndScale(type.getPrecision(), type.getScale());
+    } else if (type.hasPrecision()) {
+      builder.setPrecision(type.getPrecision());
+    }
+    if (type.hasWidth()) {
+      builder.setWidth(type.getWidth());
+    }
+
+    if (Types.isRepeated(type)) {
+      schema.add(SchemaBuilder.columnSchema(name, type.getMinorType(), type.getMode()));
+      return;
+    }
+
+    schema.add(builder.build());
+  }
+
+  public MapBuilder mapValue() {
+    return new MapBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REQUIRED);
+  }
+
+  public MapBuilder mapArrayValue() {
+    return new MapBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REPEATED);
+  }
+
+  public DictBuilder dictValue() {
+    return new DictBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REQUIRED);
+  }
+
+  public DictBuilder dictArrayValue() {
+    return new DictBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REPEATED);
+  }
+
+  public UnionBuilder unionValue() {
+    return new UnionBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.MinorType.UNION);
+  }
+
+  public UnionBuilder listValue() {
+    return new UnionBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.MinorType.LIST);
+  }
+
+  public RepeatedListBuilder repeatedListValue() {
+    return new RepeatedListBuilder(this, DictVector.FIELD_VALUE_NAME);
+  }
+
+  public DictColumnMetadata buildColumn() {
+    validateKeyValuePresent();
+    return new DictColumnMetadata(name, mode, schema);
+  }
+
+  private void validateKeyValuePresent() {
+    for (String fieldName : DictVector.fieldNames) {
+      ColumnMetadata columnMetadata = schema.metadata(fieldName);
+      if (columnMetadata == null) {
+        throw new IllegalStateException(String.format("Field %s is absent in DICT.", fieldName));
+      }
+    }
+  }
+
+  public void build() {
+    if (parent != null) {
+      parent.addColumn(buildColumn());
+    }
+  }
+
+  public SchemaBuilder resumeSchema() {
+    build();
+    return (SchemaBuilder) parent;
+  }
+
+  public MapBuilder resumeMap() {
+    build();
+    return (MapBuilder) parent;
+  }
+
+  public RepeatedListBuilder resumeList() {
+    build();
+    return (RepeatedListBuilder) parent;
+  }
+
+  public UnionBuilder resumeUnion() {
+    build();
+    return (UnionBuilder) parent;
+  }
+
+  public DictBuilder resumeDict() {
+    build();
+    return (DictBuilder) parent;
+  }
+
+  private boolean isSupportedKeyType(TypeProtos.MajorType type) {
+    return !Types.isComplex(type)
+        && !Types.isUnion(type)
+        && type.getMode() == TypeProtos.DataMode.REQUIRED;
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
index 967792e..50957e8 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
@@ -36,10 +36,10 @@
    * the children) of the materialized field provided.
    *
    * @param schema the schema to use
-   * @param mapSchema parent schema
+   * @param tupleSchema parent schema
    */
-  DictColumnMetadata(MaterializedField schema, TupleSchema mapSchema) {
-    super(schema, mapSchema);
+  DictColumnMetadata(MaterializedField schema, TupleSchema tupleSchema) {
+    super(schema, tupleSchema);
   }
 
   public DictColumnMetadata(DictColumnMetadata from) {
@@ -69,4 +69,9 @@
   protected String getStringType() {
     return "MAP";
   }
+
+  @Override
+  public StructureType structureType() {
+    return StructureType.DICT;
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
index 6c8a2fe..6f10bb0 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
@@ -144,6 +144,22 @@
     return tupleBuilder.addRepeatedList(this, name);
   }
 
+  public DictBuilder addDict(String name, MinorType keyType) {
+    return tupleBuilder.addDict(this, name).key(keyType);
+  }
+
+  public DictBuilder addDict(String name, MajorType keyType) {
+    return tupleBuilder.addDict(this, name).key(keyType);
+  }
+
+  public DictBuilder addDictArray(String name, MinorType keyType) {
+    return tupleBuilder.addDictArray(this, name).key(keyType);
+  }
+
+  public DictBuilder addDictArray(String name, MajorType keyType) {
+    return tupleBuilder.addDictArray(this, name).key(keyType);
+  }
+
   public MapColumnMetadata buildColumn() {
     return new MapColumnMetadata(memberName, mode, tupleBuilder.schema());
   }
@@ -176,4 +192,9 @@
     build();
     return (UnionBuilder) parent;
   }
+
+  public DictBuilder resumeDict() {
+    build();
+    return (DictBuilder) parent;
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index 43b38c6..05936a3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -17,13 +17,16 @@
  */
 package org.apache.drill.exec.record.metadata;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.complex.DictVector;
 
 public class MetadataUtils {
 
@@ -38,7 +41,7 @@
   /**
    * Create a column metadata object that holds the given
    * {@link MaterializedField}. The type of the object will be either a
-   * primitive or map column, depending on the field's type. The logic
+   * primitive, map or dict column, depending on the field's type. The logic
    * here mimics the code as written, which is very messy in some places.
    *
    * @param field the materialized field to wrap
@@ -90,6 +93,8 @@
   public static ColumnMetadata fromView(MaterializedField field) {
     if (field.getType().getMinorType() == MinorType.MAP) {
       return new MapColumnMetadata(field, null);
+    } else if (field.getType().getMinorType() == MinorType.DICT) {
+      return newDict(field);
     } else {
       return new PrimitiveColumnMetadata(field);
     }
@@ -138,8 +143,25 @@
     return new DictColumnMetadata(field, fromFields(field.getChildren()));
   }
 
+  public static DictColumnMetadata newDict(MaterializedField field, TupleSchema schema) {
+    validateDictChildren(schema.toFieldList());
+    return new DictColumnMetadata(field.getName(), field.getDataMode(), schema);
+  }
+
+  private static void validateDictChildren(List<MaterializedField> entryFields) {
+    Collection<String> children = entryFields.stream()
+        .map(MaterializedField::getName)
+        .collect(Collectors.toList());
+    String message = "DICT does not contain %s.";
+    if (!children.contains(DictVector.FIELD_KEY_NAME)) {
+      throw new IllegalStateException(String.format(message, DictVector.FIELD_KEY_NAME));
+    } else if (!children.contains(DictVector.FIELD_VALUE_NAME)) {
+      throw new IllegalStateException(String.format(message, DictVector.FIELD_VALUE_NAME));
+    }
+  }
+
   public static DictColumnMetadata newDict(String name, TupleMetadata schema) {
-    return new DictColumnMetadata(name, DataMode.OPTIONAL, (TupleSchema) schema);
+    return new DictColumnMetadata(name, DataMode.REQUIRED, (TupleSchema) schema);
   }
 
   public static VariantColumnMetadata newVariant(MaterializedField field, VariantSchema schema) {
@@ -167,13 +189,13 @@
 
   public static PrimitiveColumnMetadata newScalar(String name, MinorType type,
       DataMode mode) {
-    assert type != MinorType.MAP && type != MinorType.UNION && type != MinorType.LIST;
+    assert isScalar(type);
     return new PrimitiveColumnMetadata(name, type, mode);
   }
 
   public static PrimitiveColumnMetadata newScalar(String name, MajorType type) {
     MinorType minorType = type.getMinorType();
-    assert minorType != MinorType.MAP && minorType != MinorType.UNION && minorType != MinorType.LIST;
+    assert isScalar(minorType);
     return new PrimitiveColumnMetadata(name, type);
   }
 
@@ -205,4 +227,10 @@
     return new PrimitiveColumnMetadata(field);
   }
 
+  private static boolean isScalar(MinorType type) {
+    return !(type == MinorType.MAP
+        || type == MinorType.UNION
+        || type == MinorType.LIST
+        || type == MinorType.DICT);
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
index c02da5e..895e170 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
@@ -60,6 +60,12 @@
     return new MapBuilder(this, name, DataMode.REPEATED);
   }
 
+  public DictBuilder addDictArray() {
+    // Existing code uses the repeated list name as the name of
+    // the vector within the list.
+    return new DictBuilder(this, name, DataMode.REPEATED);
+  }
+
   public RepeatedListBuilder addArray(MinorType type) {
     // Existing code uses the repeated list name as the name of
     // the vector within the list.
@@ -127,6 +133,11 @@
     return (MapBuilder) parent;
   }
 
+  public DictBuilder resumeDict() {
+    build();
+    return (DictBuilder) parent;
+  }
+
   @Override
   public void addColumn(ColumnMetadata column) {
     assert child == null;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
index 96eb863..264f3ad 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
@@ -182,6 +182,22 @@
     return tupleBuilder.addMapArray(this, name);
   }
 
+  public DictBuilder addDict(String name, MinorType keyType) {
+    return tupleBuilder.addDict(this, name).key(keyType);
+  }
+
+  public DictBuilder addDict(String name, MajorType keyType) {
+    return tupleBuilder.addDict(this, name).key(keyType);
+  }
+
+  public DictBuilder addDictArray(String name, MinorType keyType) {
+    return tupleBuilder.addDictArray(this, name).key(keyType);
+  }
+
+  public DictBuilder addDictArray(String name, MajorType keyType) {
+    return tupleBuilder.addDictArray(this, name).key(keyType);
+  }
+
   public UnionBuilder addUnion(String name) {
     return tupleBuilder.addUnion(this, name);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java
index 7ce2607..33514d5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java
@@ -132,6 +132,14 @@
     return new MapBuilder(parent, name, DataMode.REPEATED);
   }
 
+  public DictBuilder addDict(SchemaContainer parent, String name) {
+    return new DictBuilder(parent, name, DataMode.REQUIRED);
+  }
+
+  public DictBuilder addDictArray(SchemaContainer parent, String name) {
+    return new DictBuilder(parent, name, DataMode.REPEATED);
+  }
+
   public UnionBuilder addUnion(SchemaContainer parent, String name) {
     return new UnionBuilder(parent, name, MinorType.UNION);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
index cb43c64..9650d5e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
@@ -66,8 +66,9 @@
     this.parentMap = parentMap;
   }
 
-  public TupleMetadata copy() {
-    TupleMetadata tuple = new TupleSchema();
+  @Override
+  public TupleSchema copy() {
+    TupleSchema tuple = new TupleSchema();
     for (ColumnMetadata md : this) {
       tuple.addColumn(md.copy());
     }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
index eee9b3a..d09c810 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
@@ -83,6 +83,11 @@
     return new RepeatedListBuilder(this, Types.typeKey(MinorType.LIST));
   }
 
+  public DictBuilder addDict() {
+    checkType(MinorType.DICT);
+    return new DictBuilder(this, Types.typeKey(MinorType.DICT), DataMode.OPTIONAL);
+  }
+
   public VariantColumnMetadata buildColumn() {
     return new VariantColumnMetadata(name, type, union);
   }
@@ -107,4 +112,9 @@
     build();
     return (UnionBuilder) parent;
   }
-}
\ No newline at end of file
+
+  public DictBuilder resumeDict() {
+    build();
+    return (DictBuilder) parent;
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/AbstractKeyAccessor.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/AbstractKeyAccessor.java
new file mode 100644
index 0000000..97731ba
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/AbstractKeyAccessor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.Period;
+
+import java.math.BigDecimal;
+
+public class AbstractKeyAccessor implements KeyAccessor {
+
+  private static final String UNSUPPORTED_MESSAGE_TEMPLATE = "%s does not support a key of type %s.";
+
+  protected final DictReader dictReader;
+  protected final ScalarReader keyReader;
+
+  protected AbstractKeyAccessor(DictReader dictReader, ScalarReader keyReader) {
+    this.dictReader = dictReader;
+    this.keyReader = keyReader;
+  }
+
+  @Override
+  public boolean find(boolean key) {
+    throw unsupported("boolean");
+  }
+
+  @Override
+  public boolean find(int key) {
+    throw unsupported("int");
+  }
+
+  @Override
+  public boolean find(BigDecimal key) {
+    throw unsupported("BigDecimal");
+  }
+
+  @Override
+  public boolean find(double key) {
+    throw unsupported("double");
+  }
+
+  @Override
+  public boolean find(long key) {
+    throw unsupported("long");
+  }
+
+  @Override
+  public boolean find(String key) {
+    throw unsupported("String");
+  }
+
+  @Override
+  public boolean find(byte[] key) {
+    throw unsupported("byte[]");
+  }
+
+  @Override
+  public boolean find(Period key) {
+    throw unsupported("Period");
+  }
+
+  @Override
+  public boolean find(LocalDate key) {
+    throw unsupported("LocalDate");
+  }
+
+  @Override
+  public boolean find(LocalTime key) {
+    throw unsupported("LocalTime");
+  }
+
+  @Override
+  public boolean find(Instant key) {
+    throw unsupported("Instant");
+  }
+
+  private UnsupportedOperationException unsupported(String type) {
+    return new UnsupportedOperationException(String.format(
+        UNSUPPORTED_MESSAGE_TEMPLATE, this.getClass().getName(), type));
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
index df0c6ad..297e615 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
@@ -54,7 +54,7 @@
  * without having to first retrieve the variant (although the indirect
  * route is, of course, available.)
  *
- * @see {@link ArrayReader}
+ * @see ArrayReader
  */
 
 public interface ArrayWriter extends ColumnWriter {
@@ -87,6 +87,7 @@
   TupleWriter tuple();
   ArrayWriter array();
   VariantWriter variant();
+  DictWriter dict();
 
   /**
    * When the array contains a tuple or an array, call <tt>save()</tt>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictReader.java
new file mode 100644
index 0000000..0c2fb0c
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+public interface DictReader extends ArrayReader {
+
+  KeyAccessor keyAccessor();
+
+  ObjectReader valueReader();
+
+  /**
+   * Number of entries in the dict.
+   * @return the number of entries
+   */
+  @Override
+  int size();
+
+  ValueType keyColumnType();
+
+  ObjectType valueColumnType();
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictWriter.java
new file mode 100644
index 0000000..0f085df
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+/**
+ * Physically the writer is an array writer with special tuple writer as its element.
+ * Its {@link #type()} returns {@code ARRAY} and {@link #entryType()} returns {@code TUPLE}.
+ * To know whether it is a Dict logically, one may use {@code schema().isDict()}.
+ *
+ * @see org.apache.drill.exec.vector.accessor.writer.DictEntryWriter
+ * @see org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter
+ * @see org.apache.drill.exec.vector.complex.DictVector
+ */
+public interface DictWriter extends ArrayWriter {
+
+  /**
+   * Returns scalar type of the key field.
+   * @return type of the key
+   */
+  ValueType keyType();
+
+  /**
+   * Returns object type of the value field.
+   * @return type of the value
+   */
+  ObjectType valueType();
+
+  /**
+   * Returns the writer associated with key field.
+   * @return key writer
+   */
+  ScalarWriter keyWriter();
+
+  /**
+   * Returns the writer associated with value field.
+   * @return value writer
+   */
+  ObjectWriter valueWriter();
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/KeyAccessor.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/KeyAccessor.java
new file mode 100644
index 0000000..d3dbed7
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/KeyAccessor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.Period;
+
+import java.math.BigDecimal;
+
+public interface KeyAccessor {
+
+  boolean find(boolean key);
+  boolean find(int key);
+  boolean find(BigDecimal key);
+  boolean find(double key);
+  boolean find(long key);
+  boolean find(String key);
+  boolean find(byte[] key);
+  boolean find(Period key);
+  boolean find(LocalDate key);
+  boolean find(LocalTime key);
+  boolean find(Instant key);
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
index c9244b5..aff4b5b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
@@ -34,6 +34,7 @@
   TupleReader tuple();
   ArrayReader array();
   VariantReader variant();
+  DictReader dict();
 
   /**
    * Gets the reader as a generic type, for dynamic
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
index 5d8077c..ed3a888 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
@@ -48,6 +48,7 @@
   TupleWriter tuple();
   ArrayWriter array();
   VariantWriter variant();
+  DictWriter dict();
 
   /**
    * Generic version of the above, for dynamic handling of
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java
index 16e90ea..e6ac2ce 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java
@@ -66,4 +66,6 @@
   ArrayReader array(String colName);
   VariantReader variant(int colIndex);
   VariantReader variant(String colName);
+  DictReader dict(int colIndex);
+  DictReader dict(String colName);
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
index 2f5c5df..de2a374 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
@@ -113,6 +113,10 @@
 
   VariantWriter variant(String colName);
 
+  DictWriter dict(int colIndex);
+
+  DictWriter dict(String colName);
+
   ObjectType type(int colIndex);
 
   ObjectType type(String colName);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
index 126fbe8..7a6e6e2 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
@@ -20,6 +20,7 @@
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.DictReader;
 import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
@@ -46,6 +47,11 @@
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public DictReader dict() {
+    throw new UnsupportedOperationException();
+  }
+
   public abstract ReaderEvents events();
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
index 77fae8f..0c699cd 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
@@ -23,6 +23,7 @@
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
+import org.apache.drill.exec.vector.accessor.DictReader;
 import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
@@ -38,11 +39,12 @@
 
   public static class TupleObjectReader extends AbstractObjectReader {
 
-    private final AbstractTupleReader tupleReader;
+    protected final AbstractTupleReader tupleReader;
 
     public TupleObjectReader(AbstractTupleReader tupleReader) {
       this.tupleReader = tupleReader;
     }
+
     @Override
     public TupleReader tuple() {
       return tupleReader;
@@ -65,10 +67,10 @@
     public ColumnReader reader() { return tupleReader; }
   }
 
-  private final AbstractObjectReader readers[];
+  protected final AbstractObjectReader[] readers;
   protected NullStateReader nullStateReader;
 
-  protected AbstractTupleReader(AbstractObjectReader readers[]) {
+  protected AbstractTupleReader(AbstractObjectReader[] readers) {
     this.readers = readers;
   }
 
@@ -112,7 +114,8 @@
   public ObjectReader column(String colName) {
     int index = tupleSchema().index(colName);
     if (index == -1) {
-      return null; }
+      return null;
+    }
     return readers[index];
   }
 
@@ -167,6 +170,16 @@
   }
 
   @Override
+  public DictReader dict(int colIndex) {
+    return column(colIndex).dict();
+  }
+
+  @Override
+  public DictReader dict(String colName) {
+    return column(colName).dict();
+  }
+
+  @Override
   public void reposition() {
     for (AbstractObjectReader reader : readers) {
       reader.events().reposition();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
index fbafa8c..474b8d7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
@@ -130,7 +130,6 @@
      * Given a 0-based index relative to the current array, return an absolute offset
      * vector location for the array value.
      *
-     * @param index 0-based index into the current array
      * @return absolute offset vector location for the array value
      */
 
@@ -169,12 +168,12 @@
     public int logicalIndex() { return position; }
   }
 
+  protected final AbstractObjectReader elementReader;
+  protected ElementReaderIndex elementIndex;
+  protected NullStateReader nullStateReader;
   private final ColumnMetadata schema;
   private final VectorAccessor arrayAccessor;
   private final OffsetVectorReader offsetReader;
-  private final AbstractObjectReader elementReader;
-  protected ElementReaderIndex elementIndex;
-  protected NullStateReader nullStateReader;
 
   public ArrayReaderImpl(ColumnMetadata schema, VectorAccessor va,
       AbstractObjectReader elementReader) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictEntryReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictEntryReader.java
new file mode 100644
index 0000000..de9f193
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictEntryReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.reader;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
+
+/**
+ * Reader for a Dict entry. The entry is a special kind of tuple.
+ */
+public class DictEntryReader extends AbstractTupleReader {
+
+  private final ColumnMetadata schema;
+  private final VectorAccessor accessor;
+
+  protected DictEntryReader(ColumnMetadata schema, VectorAccessor accessor,
+                            AbstractObjectReader[] readers) {
+    super(readers);
+    assert readers.length == 2;
+    this.schema = schema;
+    this.accessor = accessor;
+  }
+
+  public static TupleObjectReader build(ColumnMetadata schema,
+                                        VectorAccessor accessor,
+                                        AbstractObjectReader[] readers) {
+    DictEntryReader entryReader = new DictEntryReader(schema, accessor, readers);
+    entryReader.bindNullState(NullStateReaders.REQUIRED_STATE_READER);
+    return new TupleObjectReader(entryReader);
+  }
+
+  public static TupleObjectReader build(ColumnMetadata schema,
+                                           VectorAccessor accessor,
+                                           List<AbstractObjectReader> readers) {
+    AbstractObjectReader[] readerArray = new AbstractObjectReader[readers.size()];
+    return build(schema, accessor, readers.toArray(readerArray));
+  }
+
+  public AbstractObjectReader keyReader() {
+    return (AbstractObjectReader) column(0);
+  }
+
+  public AbstractObjectReader valueReader() {
+    return (AbstractObjectReader) column(1);
+  }
+
+  @Override
+  public void bindIndex(ColumnReaderIndex index) {
+    if (accessor != null) {
+      accessor.bind(index);
+    }
+    super.bindIndex(index);
+  }
+
+  @Override
+  public ColumnMetadata schema() {
+    return schema;
+  }
+
+  @Override
+  public TupleMetadata tupleSchema() {
+    return schema.tupleSchema();
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictReaderImpl.java
new file mode 100644
index 0000000..780e243
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictReaderImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.reader;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.KeyAccessor;
+import org.apache.drill.exec.vector.accessor.KeyAccessors;
+import org.apache.drill.exec.vector.accessor.ObjectReader;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.DictReader;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DictReaderImpl extends ArrayReaderImpl implements DictReader, ReaderEvents {
+
+  public static class DictObjectReader extends ArrayObjectReader {
+
+    public DictObjectReader(DictReaderImpl dictReader) {
+      super(dictReader);
+    }
+
+    @Override
+    public DictReader dict() {
+      return (DictReader) array();
+    }
+  }
+
+  private final ScalarReader keyReader;
+  private final AbstractObjectReader valueReader;
+  private final KeyAccessor keyAccessor;
+
+  public DictReaderImpl(ColumnMetadata metadata, VectorAccessor va, AbstractTupleReader.TupleObjectReader entryObjectReader) {
+    super(metadata, va, entryObjectReader);
+    DictEntryReader reader = (DictEntryReader) entryObjectReader.reader();
+    this.keyReader = reader.keyReader().scalar();
+    this.valueReader = reader.valueReader();
+    keyAccessor = KeyAccessors.getAccessor(this, keyReader);
+  }
+
+  public static DictObjectReader build(ColumnMetadata schema, VectorAccessor dictAccessor,
+                                       List<AbstractObjectReader> readers) {
+    AbstractTupleReader.TupleObjectReader entryReader = DictEntryReader.build(schema, dictAccessor, readers);
+    DictReaderImpl dictReader = new DictReaderImpl(schema, dictAccessor, entryReader);
+    dictReader.bindNullState(NullStateReaders.REQUIRED_STATE_READER);
+    return new DictObjectReader(dictReader);
+  }
+
+  @Override
+  public KeyAccessor keyAccessor() {
+    return keyAccessor;
+  }
+
+  @Override
+  public ObjectReader valueReader() {
+    return valueReader;
+  }
+
+  @Override
+  public ValueType keyColumnType() {
+    return keyReader.valueType();
+  }
+
+  @Override
+  public ObjectType valueColumnType() {
+    return valueReader.type();
+  }
+
+  @Override
+  public Map<Object, Object> getObject() {
+    rewind();
+    Map<Object, Object> map = new HashMap<>();
+    while (next()) {
+      map.put(keyReader.getObject(), valueReader.getObject());
+    }
+    return map;
+  }
+
+  @Override
+  public String getAsString() {
+    rewind();
+    StringBuilder buf = new StringBuilder();
+    buf.append("{");
+    boolean comma = false;
+    while (next()) {
+      if (comma) {
+        buf.append(", ");
+      }
+      buf.append(keyReader.getAsString())
+          .append(':')
+          .append(valueReader.getAsString());
+      comma = true;
+    }
+    buf.append("}");
+    return buf.toString();
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java
index 7329391..d1cf58f 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java
@@ -40,12 +40,12 @@
 
   private final VectorAccessor mapAccessor;
 
-  protected MapReader(ColumnMetadata schema, AbstractObjectReader readers[]) {
+  protected MapReader(ColumnMetadata schema, AbstractObjectReader[] readers) {
     this(schema, null, readers);
   }
 
   protected MapReader(ColumnMetadata schema,
-      VectorAccessor mapAccessor, AbstractObjectReader readers[]) {
+      VectorAccessor mapAccessor, AbstractObjectReader[] readers) {
     super(readers);
     this.schema = schema;
     this.mapAccessor = mapAccessor;
@@ -53,7 +53,7 @@
 
   public static TupleObjectReader build(ColumnMetadata schema,
       VectorAccessor mapAccessor,
-      AbstractObjectReader readers[]) {
+      AbstractObjectReader[] readers) {
     MapReader mapReader = new MapReader(schema, mapAccessor, readers);
     mapReader.bindNullState(NullStateReaders.REQUIRED_STATE_READER);
     return new TupleObjectReader(mapReader);
@@ -62,7 +62,7 @@
   public static AbstractObjectReader build(ColumnMetadata schema,
       VectorAccessor mapAccessor,
       List<AbstractObjectReader> readers) {
-    AbstractObjectReader readerArray[] = new AbstractObjectReader[readers.size()];
+    AbstractObjectReader[] readerArray = new AbstractObjectReader[readers.size()];
     return build(schema, mapAccessor, readers.toArray(readerArray));
   }
 
@@ -78,5 +78,5 @@
   public ColumnMetadata schema() { return schema; }
 
   @Override
-  public TupleMetadata tupleSchema() { return schema.mapSchema(); }
+  public TupleMetadata tupleSchema() { return schema.tupleSchema(); }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
index f1ea09a..0385091 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
@@ -25,6 +25,7 @@
 public class NullStateReaders {
 
   public static final RequiredStateReader REQUIRED_STATE_READER = new RequiredStateReader();
+  public static final NullStateReader NULL_STATE_READER = new AlwaysNullStateReader();
 
   private NullStateReaders() { }
 
@@ -190,4 +191,15 @@
     }
   }
 
+  protected static class AlwaysNullStateReader implements NullStateReader {
+
+    @Override
+    public void bindIndex(ColumnReaderIndex rowIndex) {
+    }
+
+    @Override
+    public boolean isNull() {
+      return true;
+    }
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
index 1dfc65a..a1720ba 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
@@ -81,10 +81,10 @@
   private final VectorAccessor unionAccessor;
   private final VectorAccessor typeAccessor;
   private final UInt1ColumnReader typeReader;
-  private final AbstractObjectReader variants[];
+  private final AbstractObjectReader[] variants;
   protected NullStateReader nullStateReader;
 
-  public UnionReaderImpl(ColumnMetadata schema, VectorAccessor va, AbstractObjectReader variants[]) {
+  public UnionReaderImpl(ColumnMetadata schema, VectorAccessor va, AbstractObjectReader[] variants) {
     this.schema = schema;
     this.unionAccessor = va;
     typeReader = new UInt1ColumnReader();
@@ -115,6 +115,7 @@
       NullStateReader nullReader;
       MinorType type = MinorType.values()[i];
       switch(type) {
+      case DICT:
       case MAP:
       case LIST:
         nullReader = new NullStateReaders.ComplexMemberStateReader(typeReader, type);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index 18702ac..b836b25 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -24,6 +24,7 @@
 import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.DictWriter;
 import org.apache.drill.exec.vector.accessor.ObjectReader;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
@@ -99,7 +100,7 @@
 
   public static class ArrayObjectWriter extends AbstractObjectWriter {
 
-    private final AbstractArrayWriter arrayWriter;
+    protected final AbstractArrayWriter arrayWriter;
 
     public ArrayObjectWriter(AbstractArrayWriter arrayWriter) {
       this.arrayWriter = arrayWriter;
@@ -319,6 +320,11 @@
   }
 
   @Override
+  public DictWriter dict() {
+    return elementObjWriter.dict();
+  }
+
+  @Override
   public int size() { return elementIndex.arraySize(); }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index d2b65ff..58ba6c2 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -21,6 +21,7 @@
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ObjectReader;
+import org.apache.drill.exec.vector.accessor.DictWriter;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
@@ -61,6 +62,11 @@
   }
 
   @Override
+  public DictWriter dict() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public abstract WriterEvents events();
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 246b4c1..7b5309c 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -28,6 +28,7 @@
 import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.DictWriter;
 import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
@@ -109,7 +110,7 @@
 
   public static class TupleObjectWriter extends AbstractObjectWriter {
 
-    private final AbstractTupleWriter tupleWriter;
+    protected final AbstractTupleWriter tupleWriter;
 
     public TupleObjectWriter(AbstractTupleWriter tupleWriter) {
       this.tupleWriter = tupleWriter;
@@ -148,6 +149,35 @@
     ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
   }
 
+  /**
+   * Wrap the outer index to avoid incrementing the array index
+   * on the call to <tt>nextElement().</tt> The increment
+   * is done at the tuple level, not the column level.
+   */
+
+  static class MemberWriterIndex implements ColumnWriterIndex {
+    private ColumnWriterIndex baseIndex;
+
+    MemberWriterIndex(ColumnWriterIndex baseIndex) {
+      this.baseIndex = baseIndex;
+    }
+
+    @Override public int rowStartIndex() { return baseIndex.rowStartIndex(); }
+    @Override public int vectorIndex() { return baseIndex.vectorIndex(); }
+    @Override public void nextElement() { }
+    @Override public void prevElement() { }
+    @Override public void rollover() { }
+
+    @Override public ColumnWriterIndex outerIndex() {
+      return baseIndex.outerIndex();
+    }
+
+    @Override
+    public String toString() {
+      return "[" + getClass().getSimpleName() + " baseIndex = " + baseIndex.toString() + "]";
+    }
+  }
+
   protected static final Logger logger = LoggerFactory.getLogger(AbstractTupleWriter.class);
 
   protected final TupleMetadata tupleSchema;
@@ -431,6 +461,16 @@
   }
 
   @Override
+  public DictWriter dict(int colIndex) {
+    return column(colIndex).dict();
+  }
+
+  @Override
+  public DictWriter dict(String colName) {
+    return column(colName).dict();
+  }
+
+  @Override
   public ObjectType type(int colIndex) {
     return column(colIndex).type();
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DictEntryWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DictEntryWriter.java
new file mode 100644
index 0000000..120b1ea
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DictEntryWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.complex.DictVector;
+
+/**
+ * Writer for a Dict entry. The entry is a special kind of tuple.
+ */
+public class DictEntryWriter extends AbstractTupleWriter {
+
+  public static class DictEntryObjectWriter extends TupleObjectWriter {
+
+    public DictEntryObjectWriter(DictEntryWriter entryWriter) {
+      super(entryWriter);
+    }
+
+    void setKeyValue(Object key, Object value) {
+      tupleWriter.set(0, key);
+      tupleWriter.set(1, value);
+    }
+
+    @Override
+    public void dump(HierarchicalFormatter format) {
+      format.startObject(this)
+          .attribute("dictEntryWriter");
+      tupleWriter.dump(format);
+      format.endObject();
+    }
+  }
+
+  private static class DummyDictEntryWriter extends DictEntryWriter {
+
+    DummyDictEntryWriter(ColumnMetadata schema, List<AbstractObjectWriter> writers) {
+      super(schema, writers);
+    }
+
+    @Override
+    public boolean isProjected() {
+      return false;
+    }
+
+    @Override
+    public void copy(ColumnReader from) {
+    }
+  }
+
+  public static DictEntryObjectWriter buildDictEntryWriter(ColumnMetadata schema,
+                                                           List<AbstractObjectWriter> keyValueWriters,
+                                                           DictVector vector) {
+    DictEntryWriter dictEntryWriter;
+    if (vector != null) {
+      dictEntryWriter = new DictEntryWriter(schema, keyValueWriters);
+    } else {
+      dictEntryWriter = new DummyDictEntryWriter(schema, keyValueWriters);
+    }
+    return new DictEntryObjectWriter(dictEntryWriter);
+  }
+
+  private final ColumnMetadata dictColumnSchema;
+
+  public DictEntryWriter(ColumnMetadata schema, List<AbstractObjectWriter> writers) {
+    super(schema.tupleSchema(), writers);
+    dictColumnSchema = schema;
+  }
+
+  @Override
+  public void bindIndex(ColumnWriterIndex index) {
+
+    // Similarly to a repeated map, the provided index is an array element
+    // index. Convert this to an index that will not increment the element
+    // index on each write so that a dict with key and value members won't
+    // increment the index for each member. Rather, the index must be
+    // incremented at the array level.
+
+    bindIndex(index, new MemberWriterIndex(index));
+  }
+
+  @Override
+  public boolean isProjected() {
+    return true;
+  }
+
+  @Override
+  public ColumnMetadata schema() {
+    return dictColumnSchema;
+  }
+
+  @Override
+  public void setObject(Object value) {
+    if (value instanceof Map.Entry) {
+      Map.Entry entry = (Map.Entry) value;
+      set(0, entry.getKey());
+      set(1, entry.getValue());
+    } else {
+      super.setObject(value);
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index 38d6a5a..3245323 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -36,41 +36,6 @@
 public abstract class MapWriter extends AbstractTupleWriter {
 
   /**
-   * Wrap the outer index to avoid incrementing the array index
-   * on the call to <tt>nextElement().</tt> For maps, the increment
-   * is done at the map level, not the column level.
-   */
-
-  private static class MemberWriterIndex implements ColumnWriterIndex {
-    private final ColumnWriterIndex baseIndex;
-
-    private MemberWriterIndex(ColumnWriterIndex baseIndex) {
-      this.baseIndex = baseIndex;
-    }
-
-    @Override public int rowStartIndex() { return baseIndex.rowStartIndex(); }
-    @Override public int vectorIndex() { return baseIndex.vectorIndex(); }
-    @Override public void nextElement() { }
-    @Override public void prevElement() { }
-    @Override public void rollover() { }
-
-    @Override public ColumnWriterIndex outerIndex() {
-      return baseIndex.outerIndex();
-    }
-
-    @Override
-    public String toString() {
-      return new StringBuilder()
-        .append("[")
-        .append(getClass().getSimpleName())
-        .append(" baseIndex = ")
-        .append(baseIndex.toString())
-        .append("]")
-        .toString();
-    }
-  }
-
-  /**
    * Writer for a single (non-array) map. Clients don't really "write" maps;
    * rather, this writer is a holder for the columns within the map, and those
    * columns are what is written.
@@ -172,7 +137,7 @@
   protected final ColumnMetadata mapColumnSchema;
 
   protected MapWriter(ColumnMetadata schema, List<AbstractObjectWriter> writers) {
-    super(schema.mapSchema(), writers);
+    super(schema.tupleSchema(), writers);
     mapColumnSchema = schema;
   }
 
@@ -224,7 +189,7 @@
   }
 
   public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, AbstractMapVector vector) {
-    assert schema.mapSchema().size() == 0;
+    assert schema.tupleSchema().size() == 0;
     return buildMapWriter(schema, vector, new ArrayList<AbstractObjectWriter>());
   }
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java
new file mode 100644
index 0000000..a845828
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyDictWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
+
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The implementation represents the writer as an array writer
+ * with special dict entry writer as its element writer.
+ */
+public class ObjectDictWriter extends ObjectArrayWriter implements DictWriter {
+
+  public static class DictObjectWriter extends AbstractArrayWriter.ArrayObjectWriter {
+
+    public DictObjectWriter(DictWriter dictWriter) {
+      super((AbstractArrayWriter) dictWriter);
+    }
+
+    @Override
+    public DictWriter dict() {
+      return (DictWriter) arrayWriter;
+    }
+
+    @Override
+    public void dump(HierarchicalFormatter format) {
+      format.startObject(this)
+          .attribute("dictWriter");
+      arrayWriter.dump(format);
+      format.endObject();
+    }
+  }
+
+  public static ObjectDictWriter.DictObjectWriter buildDict(ColumnMetadata metadata, DictVector vector,
+                                                            List<AbstractObjectWriter> keyValueWriters) {
+    DictEntryWriter.DictEntryObjectWriter entryObjectWriter =
+        DictEntryWriter.buildDictEntryWriter(metadata, keyValueWriters, vector);
+    DictWriter objectDictWriter;
+    if (vector != null) {
+      objectDictWriter = new ObjectDictWriter(metadata, vector.getOffsetVector(), entryObjectWriter);
+    } else {
+      objectDictWriter = new DummyDictWriter(metadata, entryObjectWriter);
+    }
+    return new ObjectDictWriter.DictObjectWriter(objectDictWriter);
+  }
+
+  public static ArrayObjectWriter buildDictArray(ColumnMetadata metadata, RepeatedDictVector vector,
+                                                 List<AbstractObjectWriter> keyValueWriters) {
+    final DictVector dataVector;
+    if (vector != null) {
+      dataVector = (DictVector) vector.getDataVector();
+    } else {
+      dataVector = null;
+    }
+    ObjectDictWriter.DictObjectWriter dictWriter = buildDict(metadata, dataVector, keyValueWriters);
+    AbstractArrayWriter arrayWriter;
+    if (vector != null) {
+      arrayWriter = new ObjectArrayWriter(metadata, vector.getOffsetVector(), dictWriter);
+    } else {
+      arrayWriter = new DummyArrayWriter(metadata, dictWriter);
+    }
+    return new ArrayObjectWriter(arrayWriter);
+  }
+
+  public static final int FIELD_KEY_ORDINAL = 0;
+  public static final int FIELD_VALUE_ORDINAL = 1;
+
+  private final DictEntryWriter.DictEntryObjectWriter entryObjectWriter;
+
+  public ObjectDictWriter(ColumnMetadata schema, UInt4Vector offsetVector,
+                           DictEntryWriter.DictEntryObjectWriter entryObjectWriter) {
+    super(schema, offsetVector, entryObjectWriter);
+    this.entryObjectWriter = entryObjectWriter;
+  }
+
+  @Override
+  public ValueType keyType() {
+    return tuple().scalar(FIELD_KEY_ORDINAL).valueType();
+  }
+
+  @Override
+  public ObjectType valueType() {
+    return tuple().type(FIELD_VALUE_ORDINAL);
+  }
+
+  @Override
+  public ScalarWriter keyWriter() {
+    return tuple().scalar(FIELD_KEY_ORDINAL);
+  }
+
+  @Override
+  public ObjectWriter valueWriter() {
+    return tuple().column(FIELD_VALUE_ORDINAL);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setObject(Object object) {
+    if (object == null) {
+      return;
+    }
+
+    if (object instanceof Map) {
+      Map<Object, Object> map = (Map<Object, Object>) object;
+      for (Map.Entry<Object, Object> entry : map.entrySet()) {
+        entryObjectWriter.setKeyValue(entry.getKey(), entry.getValue());
+        save();
+      }
+    } else {
+      int size = Array.getLength(object);
+      assert size % 2 == 0;
+      for (int i = 0; i < size; i += 2) {
+        Object key = Array.get(object, i);
+        Object value = Array.get(object, i + 1);
+        entryObjectWriter.setKeyValue(key, value);
+        save();
+      }
+    }
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyDictWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyDictWriter.java
new file mode 100644
index 0000000..7e38470
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyDictWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer.dummy;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.writer.DictEntryWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
+
+public class DummyDictWriter extends DummyArrayWriter implements DictWriter {
+
+  public DummyDictWriter(ColumnMetadata schema, DictEntryWriter.DictEntryObjectWriter entryObjectWriter) {
+    super(schema, entryObjectWriter);
+  }
+
+  @Override
+  public ValueType keyType() {
+    return tuple().scalar(ObjectDictWriter.FIELD_KEY_ORDINAL).valueType();
+  }
+
+  @Override
+  public ObjectType valueType() {
+    return tuple().type(ObjectDictWriter.FIELD_VALUE_ORDINAL);
+  }
+
+  @Override
+  public ScalarWriter keyWriter() {
+    return tuple().scalar(ObjectDictWriter.FIELD_KEY_ORDINAL);
+  }
+
+  @Override
+  public ObjectWriter valueWriter() {
+    return tuple().column(ObjectDictWriter.FIELD_VALUE_ORDINAL);
+  }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java
index 2fadebc..9c86946 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -44,6 +45,7 @@
  * it may have 2 children only, named {@link #FIELD_KEY_NAME} and {@link #FIELD_VALUE_NAME}.
  * The {@link #FIELD_KEY_NAME} can be of primitive type only and its values should not be {@code null},
  * while the other, {@link #FIELD_VALUE_NAME}, field can be either of primitive or complex type.
+ * Value field can hold {@code null} values.
  *
  * <p>This vector has it's own {@link org.apache.drill.exec.vector.complex.reader.FieldReader} and
  * {@link org.apache.drill.exec.vector.complex.writer.FieldWriter} to ensure data is read and written correctly.
@@ -255,7 +257,12 @@
 
   @Override
   MajorType getLastPathType() {
-    return valueType;
+    if (Types.isRepeated(valueType)) {
+      return valueType;
+    }
+    return valueType.toBuilder()
+        .setMode(TypeProtos.DataMode.OPTIONAL)
+        .build();
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java
index 7ebac29..22e2183 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java
@@ -138,6 +138,6 @@
 
   @Override
   public String getTypeString() {
-    return "ARRAY<" + reader.getTypeString() + '>';
+    return "ARRAY<" + reader().getTypeString() + '>';
   }
 }
diff --git a/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java b/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
index 0a6bb78..2e41986 100644
--- a/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
+++ b/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
@@ -240,7 +240,7 @@
     assertTrue(map.isMap());
     assertEquals(TypeProtos.DataMode.REQUIRED, map.mode());
 
-    TupleMetadata mapSchema = map.mapSchema();
+    TupleMetadata mapSchema = map.tupleSchema();
     assertFalse(mapSchema.metadata("m1").isNullable());
     assertTrue(mapSchema.metadata("m2").isNullable());
   }
@@ -259,7 +259,7 @@
     ColumnMetadata mapArray = schema.metadata("ma");
     assertTrue(mapArray.isArray());
     assertTrue(mapArray.isMap());
-    TupleMetadata mapSchema = mapArray.mapSchema();
+    TupleMetadata mapSchema = mapArray.tupleSchema();
     assertFalse(mapSchema.metadata("m1").isNullable());
     assertTrue(mapSchema.metadata("m2").isNullable());
   }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
index 3f3e40b..4643c31 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
@@ -50,7 +50,7 @@
     while (!colPath.isLastPath() && colMetadata != null) {
       if (colMetadata.isDict()) {
         // get dict's value field metadata
-        colMetadata = colMetadata.mapSchema().metadata(0).mapSchema().metadata(1);
+        colMetadata = colMetadata.tupleSchema().metadata(0).tupleSchema().metadata(1);
         break;
       }
       if (!colMetadata.isMap()) {
@@ -58,7 +58,7 @@
         break;
       }
       colPath = (PathSegment.NameSegment) colPath.getChild();
-      colMetadata = colMetadata.mapSchema().metadata(colPath.getPath());
+      colMetadata = colMetadata.tupleSchema().metadata(colPath.getPath());
     }
     return colMetadata;
   }
@@ -94,7 +94,7 @@
         throw new DrillRuntimeException(String.format("Expected map or dict, but was %s", colMetadata.majorType()));
       }
 
-      schema = colMetadata.mapSchema();
+      schema = colMetadata.tupleSchema();
       colPath = (PathSegment.NameSegment) colPath.getChild();
     }