DRILL-7454: Convert Avro to EVF

1. Replaced old format implementation with EVF.
2. Updated, added and improved performance for Avro tests.
3. Code refactoring.

closes #1951
diff --git a/common/src/test/java/org/apache/drill/test/BaseTest.java b/common/src/test/java/org/apache/drill/test/BaseTest.java
index adf0d5e..6256b5e 100644
--- a/common/src/test/java/org/apache/drill/test/BaseTest.java
+++ b/common/src/test/java/org/apache/drill/test/BaseTest.java
@@ -34,7 +34,7 @@
      */
     ProtobufPatcher.patch();
     /*
-     * Some libraries, such as Hadoop or HBase, depend on incompatible versions of Guava.
+     * Some libraries, such as Hadoop, HBase, Iceberg depend on incompatible versions of Guava.
      * This code adds back some methods to so that the libraries can work with single Guava version.
      */
     GuavaPatcher.patch();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java
index a9df2e0..b033685 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java
@@ -19,8 +19,8 @@
 
 /**
  * Extended version of a record reader which uses a size-aware batch mutator.
- * Use this for all new readers. Replaces the original {@link RecordReader}
- * interface.
+ * Use this for all new readers. Replaces the original
+ * {@link org.apache.drill.exec.store.RecordReader} interface.
  * <p>
  * This interface is used to create readers that work with the projection
  * mechanism to provide services for handling projection, setting up the result
@@ -33,12 +33,12 @@
  * <ul>
  * <li>Constructor: allocate no resources. Obtain a reference to a reader-specific
  * schema and projection manager.</li>
- * <li>{@link open()}: Use the provided {@link SchemaNegotiator} to configure the
+ * <li>{@link #open(SchemaNegotiator)}: Use the provided {@link SchemaNegotiator} to configure the
  * scanner framework for this reader by specifying a schema (if known), desired
  * row counts and other configuration options. Call {@link SchemaNegotiator#build()}
- * to obtain a {@link ResultSetLoader} to use to capture the rows that the reader
- * reads.</li>
- * <li>{@link next()}: called for each batch. The batch is written using the
+ * to obtain a {@link org.apache.drill.exec.physical.resultSet.RowSetLoader}
+ * to use to capture the rows that the reader reads.</li>
+ * <li>{@link #next()}: called for each batch. The batch is written using the
  * result set loader obtained above. The scanner framework handles details of
  * tracking version changes, handling overflow, limiting record counts, and
  * so on. Return <tt>true</tt> to indicate a batch is available, <tt>false</tt>
@@ -48,7 +48,7 @@
  * <tt>next()</tt> returns </tt>false</tt>.</li>
  * <p>
  * If an error occurs, the reader can throw a {@link RuntimeException}
- * from any method. A {@link UserException} is preferred to provide
+ * from any method. A <tt>UserException</tt> is preferred to provide
  * detailed information about the source of the problem.
  */
 
@@ -59,7 +59,7 @@
    * to <tt>next()</tt>. Allocate resources here, not in the constructor.
    * Example: open files, allocate buffers, etc.
    *
-   * @param schemaNegotiator mechanism to negotiate select and table
+   * @param negotiator mechanism to negotiate select and table
    * schemas, then create the row set reader used to load data into
    * value vectors
    *
@@ -93,7 +93,7 @@
    * <tt>next()</tt> should be called again, <tt>false</tt> to indicate
    * that EOF was reached
    *
-   * @throws RutimeException (<tt>UserException</tt> preferred) if an
+   * @throws RuntimeException (<tt>UserException</tt> preferred) if an
    * error occurs that should fail the query.
    */
 
@@ -106,7 +106,7 @@
    * <tt>open()</tt> returns normally; will not be called if <tt>open()</tt>
    * throws an exception.
    *
-   * @throws RutimeException (<tt>UserException</tt> preferred) if an
+   * @throws RuntimeException (<tt>UserException</tt> preferred) if an
    * error occurs that should fail the query.
    */
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index 9acc830..edc6acf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -232,7 +232,7 @@
      * per batch for this scan. Readers can adjust this, but the adjustment is capped
      * at the value specified here
      *
-     * @param scanBatchSize maximum records per batch
+     * @param batchRecordLimit maximum records per batch
      */
 
     public void setBatchRecordLimit(int batchRecordLimit) {
@@ -250,7 +250,7 @@
      * nullable int. This type is used for all missing columns. (Readers
      * that need per-column control need a different mechanism.)
      *
-     * @param nullType
+     * @param nullType the type to use for null columns
      */
 
     public void setNullType(MajorType nullType) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
index 5f0680b..3220efa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
@@ -31,7 +31,6 @@
 import org.apache.drill.exec.physical.resultSet.model.ReaderBuilder;
 import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
 import org.apache.drill.exec.physical.rowSet.IndirectRowIndex;
-import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetReaderImpl;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -57,7 +56,8 @@
  * <p>
  * Derived classes handle the details of the various kinds of readers.
  * Today there is a single subclass that builds (test-time)
- * {@link RowSet} objects. The idea, however, is that we may eventually
+ * {@link org.apache.drill.exec.physical.rowSet.RowSet} objects.
+ * The idea, however, is that we may eventually
  * want to create a "result set reader" for use in internal operators,
  * in parallel to the "result set loader". The result set reader would
  * handle a stream of incoming batches. The extant RowSet class handles
@@ -200,7 +200,7 @@
 
   private AbstractObjectReader buildUnion(UnionVector vector, VectorAccessor unionAccessor, VectorDescrip descrip) {
     final MetadataProvider provider = descrip.childProvider();
-    final AbstractObjectReader variants[] = new AbstractObjectReader[MinorType.values().length];
+    final AbstractObjectReader[] variants = new AbstractObjectReader[MinorType.values().length];
     int i = 0;
     for (final MinorType type : vector.getField().getType().getSubTypeList()) {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
index 9e0d783..3e91466 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
@@ -274,14 +274,12 @@
 
       member.projectAllElements();
       return;
-    } else if (member.hasIndex(index)) {
-      throw UserException
-        .validationError()
-        .message("Duplicate array index in project list: %s[%d]",
-            member.fullName(), index)
-        .build(logger);
     }
-    member.addIndex(index);
+
+    // Allow duplicate indexes. Example: z[0], z[0]['orange']
+    if (!member.hasIndex(index)) {
+      member.addIndex(index);
+    }
 
     // Drills SQL parser does not support map arrays: a[0].c
     // But, the SchemaPath does support them, so no harm in
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExtendableRelDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExtendableRelDataType.java
deleted file mode 100644
index f41ad2a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExtendableRelDataType.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.drill.exec.planner.types.ExtendableRelDataTypeHolder;
-import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
-
-/**
- * RelDataType for non-dynamic table structure which
- * may be extended by adding partitions or implicit columns.
- */
-public class ExtendableRelDataType extends RelDataTypeDrillImpl {
-
-  public ExtendableRelDataType(ExtendableRelDataTypeHolder holder, RelDataTypeFactory typeFactory) {
-    super(holder, typeFactory);
-  }
-
-  @Override
-  protected void generateTypeString(StringBuilder sb, boolean withDetail) {
-    sb.append("(ExtendableRelDataType").append(getFieldNames()).append(")");
-  }
-
-  @Override
-  public boolean isDynamicStruct() {
-    return false;
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
index 9ad751d..08dcc47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
@@ -18,8 +18,6 @@
 package org.apache.drill.exec.planner.sql.conversion;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -29,25 +27,19 @@
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.validate.SelectScope;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.Static;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 class DrillValidator extends SqlValidatorImpl {
 
-  private final OptionManager sessionOptions;
-
   DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
-                 RelDataTypeFactory typeFactory, SqlConformance conformance, OptionManager sessionOptions) {
+                 RelDataTypeFactory typeFactory, SqlConformance conformance) {
     super(opTab, catalogReader, typeFactory, conformance);
-    this.sessionOptions = sessionOptions;
   }
 
   @Override
@@ -79,31 +71,6 @@
     return SqlValidatorUtil.getAlias(node, ordinal);
   }
 
-  /**
-   * Checks that specified expression is not implicit column and
-   * adds it to a select list, ensuring that its alias does not
-   * clash with any existing expressions on the list.
-   * <p>
-   * This method may be used when {@link RelDataType#isDynamicStruct}
-   * method returns false. Each column from table row type except
-   * the implicit is added into specified list, aliases and fieldList.
-   * In the opposite case when {@link RelDataType#isDynamicStruct}
-   * returns true, only dynamic star is added into specified
-   * list, aliases and fieldList.
-   */
-  @Override
-  protected void addToSelectList(List<SqlNode> list,
-                                 Set<String> aliases,
-                                 List<Map.Entry<String, RelDataType>> fieldList,
-                                 SqlNode exp,
-                                 SelectScope scope,
-                                 final boolean includeSystemVars) {
-    if (!ColumnExplorer.initImplicitFileColumns(sessionOptions)
-        .containsKey(SqlValidatorUtil.getAlias(exp, -1))) {
-      super.addToSelectList(list, aliases, fieldList, exp, scope, includeSystemVars);
-    }
-  }
-
   @Override
   protected void inferUnknownTypes(RelDataType inferredType, SqlValidatorScope scope, SqlNode node) {
     // calls validateQuery() for SqlSelect to be sure that temporary table name will be changed
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
index 522a8fb..03ed970 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
@@ -63,6 +63,8 @@
 import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.util.Utilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class responsible for managing:
@@ -73,7 +75,7 @@
  * <ul/>
  */
 public class SqlConverter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SqlConverter.class);
+  private static final Logger logger = LoggerFactory.getLogger(SqlConverter.class);
 
   private final JavaTypeFactory typeFactory;
   private final SqlParser.Config parserConfig;
@@ -137,7 +139,7 @@
         this::getDefaultSchema);
     this.opTab = new ChainedSqlOperatorTable(Arrays.asList(context.getDrillOperatorTable(), catalog));
     this.costFactory = (settings.useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory();
-    this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), session.getOptions());
+    this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
     validator.setIdentifierExpansion(true);
     cluster = null;
   }
@@ -158,7 +160,7 @@
     this.catalog = catalog;
     this.opTab = parent.opTab;
     this.planner = parent.planner;
-    this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), parent.session.getOptions());
+    this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
     this.temporarySchema = parent.temporarySchema;
     this.session = parent.session;
     this.drillConfig = parent.drillConfig;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/ExtendableRelDataTypeHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/ExtendableRelDataTypeHolder.java
deleted file mode 100644
index ce024e2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/ExtendableRelDataTypeHolder.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.types;
-
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.List;
-
-/**
- * Holder for list of RelDataTypeField which may be expanded by implicit columns.
- */
-public class ExtendableRelDataTypeHolder extends AbstractRelDataTypeHolder {
-  private final List<String> implicitColumnNames;
-
-  public ExtendableRelDataTypeHolder(List<RelDataTypeField> fields, List<String> implicitColumnNames) {
-    super(fields);
-    this.implicitColumnNames = implicitColumnNames;
-  }
-
-  /**
-   * Returns RelDataTypeField field with specified name.
-   * If field is implicit and absent in the fields list, it will be added.
-   *
-   * @param typeFactory RelDataTypeFactory which will be used
-   *                    for the creation of RelDataType for new fields.
-   * @param fieldName   name of the field.
-   * @return RelDataTypeField field
-   */
-  public RelDataTypeField getField(RelDataTypeFactory typeFactory, String fieldName) {
-
-    /* First check if this field name exists in our field list */
-    for (RelDataTypeField f : fields) {
-      if (fieldName.equalsIgnoreCase(f.getName())) {
-        return f;
-      }
-    }
-    RelDataTypeField newField = null;
-
-    if (isImplicitField(fieldName)) {
-      // This implicit field does not exist in our field list, add it
-      newField = new RelDataTypeFieldImpl(
-          fieldName,
-          fields.size(),
-          typeFactory.createTypeWithNullability(
-              typeFactory.createSqlType(SqlTypeName.VARCHAR), true));
-      fields.add(newField);
-    }
-    return newField;
-  }
-
-  /**
-   * Checks that specified field is implicit.
-   *
-   * @param fieldName name of the field which should be checked
-   * @return {@code true} if specified filed is implicit
-   */
-  private boolean isImplicitField(String fieldName) {
-    for (String implicitColumn : implicitColumnNames) {
-      if (implicitColumn.equalsIgnoreCase(fieldName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
new file mode 100644
index 0000000..6213015
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.FsInput;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.stream.IntStream;
+
+public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(AvroBatchReader.class);
+
+  private Path filePath;
+  private long endPosition;
+  private DataFileReader<GenericRecord> reader;
+  private ResultSetLoader loader;
+  private List<ColumnConverter> converters;
+  // re-use container instance
+  private GenericRecord record = null;
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    FileSplit split = negotiator.split();
+    filePath = split.getPath();
+
+    // Avro files are splittable, define reading start / end positions
+    long startPosition = split.getStart();
+    endPosition = startPosition + split.getLength();
+
+    logger.debug("Processing Avro file: {}, start position: {}, end position: {}",
+      filePath, startPosition, endPosition);
+
+    reader = prepareReader(split, negotiator.fileSystem(),
+      negotiator.userName(), negotiator.context().getFragmentContext().getQueryUserName());
+
+    logger.debug("Avro file schema: {}", reader.getSchema());
+    TupleMetadata schema = AvroSchemaUtil.convert(reader.getSchema());
+    logger.debug("Avro file converted schema: {}", schema);
+    negotiator.setTableSchema(schema, true);
+    loader = negotiator.build();
+    converters = ColumnConvertersUtil.initConverters(schema, loader.writer());
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    RowSetLoader rowWriter = loader.writer();
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    if (reader == null) {
+      return;
+    }
+
+    try {
+      reader.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Avro reader: {}", e.getMessage(), e);
+    } finally {
+      reader = null;
+    }
+  }
+
+  @Override
+  public String toString() {
+    long currentPosition = -1L;
+    try {
+      if (reader != null) {
+        currentPosition = reader.tell();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain Avro reader position: {}", e.getMessage(), e);
+    }
+    return "AvroBatchReader[File=" + filePath
+      + ", Position=" + currentPosition
+      + "]";
+  }
+
+  /**
+   * Initialized Avro data reader based on given file system and file path.
+   * Moves reader to the sync point from where to start reading the data.
+   *
+   * @param fileSplit file split
+   * @param fs file system
+   * @param opUserName name of the user whom to impersonate while reading the data
+   * @param queryUserName name of the user who issues the query
+   * @return Avro file reader
+   */
+  private DataFileReader<GenericRecord> prepareReader(FileSplit fileSplit, FileSystem fs, String opUserName, String queryUserName) {
+    try {
+      UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      DataFileReader<GenericRecord> reader = ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericRecord>>) () ->
+        new DataFileReader<>(new FsInput(fileSplit.getPath(), fs.getConf()), new GenericDatumReader<GenericRecord>()));
+
+      // move to sync point from where to read the file
+      reader.sync(fileSplit.getStart());
+      return reader;
+    } catch (IOException | InterruptedException e) {
+      throw UserException.dataReadError(e)
+        .message("Error preparing Avro reader")
+        .addContext("Reader", this)
+        .build(logger);
+    }
+  }
+
+  private boolean nextLine(RowSetLoader rowWriter) {
+    try {
+      if (!reader.hasNext() || reader.pastSync(endPosition)) {
+        return false;
+      }
+      record = reader.next(record);
+    } catch (IOException e) {
+      throw UserException.dataReadError(e)
+        .addContext("Reader", this)
+        .build(logger);
+    }
+
+    Schema schema = record.getSchema();
+
+    if (Schema.Type.RECORD != schema.getType()) {
+      throw UserException.dataReadError()
+        .message("Root object must be record type. Found: %s", schema.getType())
+        .addContext("Reader", this)
+        .build(logger);
+    }
+
+    rowWriter.start();
+    IntStream.range(0, rowWriter.size())
+      .forEach(i -> converters.get(i).convert(record.get(i)));
+    rowWriter.save();
+
+    return true;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
deleted file mode 100644
index 10d90d4..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericContainer;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.ExtendableRelDataType;
-import org.apache.drill.exec.planner.types.ExtendableRelDataTypeHolder;
-import org.apache.drill.exec.store.ColumnExplorer;
-import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
-public class AvroDrillTable extends DrillTable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroDrillTable.class);
-
-  private final DataFileReader<GenericContainer> reader;
-  private final SchemaConfig schemaConfig;
-  private ExtendableRelDataTypeHolder holder;
-
-  public AvroDrillTable(String storageEngineName,
-                       FileSystemPlugin plugin,
-                       SchemaConfig schemaConfig,
-                       FormatSelection selection) {
-    super(storageEngineName, plugin, schemaConfig.getUserName(), selection);
-    List<Path> asFiles = selection.getAsFiles();
-    Path path = asFiles.get(0);
-    this.schemaConfig = schemaConfig;
-    try {
-      reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader<GenericContainer>());
-    } catch (IOException e) {
-      throw UserException.dataReadError(e).build(logger);
-    }
-  }
-
-  @Override
-  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-    // ExtendableRelDataTypeHolder is reused to preserve previously added implicit columns
-    if (holder == null) {
-      List<RelDataType> typeList = Lists.newArrayList();
-      List<String> fieldNameList = Lists.newArrayList();
-
-      // adds partition columns to RowType since they always present in star queries
-      List<String> partitions =
-          ColumnExplorer.getPartitionColumnNames(((FormatSelection) getSelection()).getSelection(), schemaConfig);
-      for (String partitionName : partitions) {
-        fieldNameList.add(partitionName);
-        typeList.add(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true));
-      }
-
-      // adds non-partition table columns to RowType
-      Schema schema = reader.getSchema();
-      for (Field field : schema.getFields()) {
-        fieldNameList.add(field.name());
-        typeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
-      }
-
-      holder = new ExtendableRelDataTypeHolder(
-          typeFactory.createStructType(typeList, fieldNameList).getFieldList(),
-          ColumnExplorer.getImplicitColumnsNames(schemaConfig));
-    }
-
-    return new ExtendableRelDataType(holder, typeFactory);
-  }
-
-  private RelDataType getNullableRelDataTypeFromAvroType(
-      RelDataTypeFactory typeFactory, Schema fieldSchema) {
-    LogicalType logicalType = fieldSchema.getLogicalType();
-    String logicalTypeName = logicalType != null ? logicalType.getName() : "";
-    RelDataType relDataType = null;
-    switch (fieldSchema.getType()) {
-    case ARRAY:
-      RelDataType eleType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getElementType());
-      relDataType = typeFactory.createArrayType(eleType, -1);
-      break;
-    case BOOLEAN:
-      relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
-      break;
-    case BYTES:
-      switch (logicalTypeName) {
-        case "decimal":
-          relDataType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
-          break;
-        default:
-          relDataType = typeFactory.createSqlType(SqlTypeName.BINARY);
-      }
-      break;
-    case DOUBLE:
-      relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
-      break;
-    case FIXED:
-      switch (logicalTypeName) {
-        case "decimal":
-          relDataType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
-          break;
-        default:
-          logger.error("{} type not supported", fieldSchema.getType());
-          throw UserException.unsupportedError().message("FIXED type not supported yet").build(logger);
-      }
-      break;
-    case FLOAT:
-      relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
-      break;
-    case INT:
-      switch (logicalTypeName) {
-        case "date":
-          relDataType = typeFactory.createSqlType(SqlTypeName.DATE);
-          break;
-        case "time-millis":
-          relDataType = typeFactory.createSqlType(SqlTypeName.TIME);
-          break;
-        default:
-          relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
-      }
-      break;
-    case LONG:
-      switch (logicalTypeName) {
-        case "date":
-          relDataType = typeFactory.createSqlType(SqlTypeName.DATE);
-          break;
-        case "time-micros":
-          relDataType = typeFactory.createSqlType(SqlTypeName.TIME);
-          break;
-        case "timestamp-millis":
-        case "timestamp-micros":
-          relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
-          break;
-        default:
-          relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
-      }
-      break;
-    case MAP:
-      RelDataType valueType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getValueType());
-      RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
-      relDataType = typeFactory.createMapType(keyType, valueType);
-      break;
-    case NULL:
-      relDataType = typeFactory.createSqlType(SqlTypeName.NULL);
-      break;
-    case RECORD:
-//      List<String> fieldNameList = Lists.newArrayList();
-//      List<RelDataType> fieldRelDataTypeList = Lists.newArrayList();
-//      for(Field field : fieldSchema.getFields()) {
-//        fieldNameList.add(field.name());
-//        fieldRelDataTypeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
-//      }
-//      relDataType = typeFactory.createStructType(fieldRelDataTypeList, fieldNameList);
-
-      //TODO This has to be mapped to struct type but because of calcite issue,
-      //for now mapping it to map type.
-      keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
-      valueType = typeFactory.createSqlType(SqlTypeName.ANY);
-      relDataType = typeFactory.createMapType(keyType, valueType);
-      break;
-    case ENUM:
-    case STRING:
-      relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
-      break;
-    case UNION:
-      RelDataType optinalType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getTypes().get(1));
-      relDataType = typeFactory.createTypeWithNullability(optinalType, true);
-      break;
-    }
-    return relDataType;
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
index 2551ba5..6e8a9a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
@@ -17,23 +17,38 @@
  */
 package org.apache.drill.exec.store.avro;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 import org.apache.drill.common.logical.FormatPluginConfig;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
 /**
  * Format plugin config for Avro data files.
  */
-@JsonTypeName("avro")
+@JsonTypeName(AvroFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class AvroFormatConfig implements FormatPluginConfig {
 
+  public List<String> extensions = Collections.singletonList("avro");
+
   @Override
   public int hashCode() {
-    return 101; // XXX - WHAT IS THIS SUPPOSED TO BE?
+    return Objects.hash(extensions);
   }
 
   @Override
-  public boolean equals(Object obj) {
-    return obj instanceof AvroFormatConfig;
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AvroFormatConfig that = (AvroFormatConfig) o;
+    return Objects.equals(extensions, that.extensions);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index dc2046a..952fb1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -17,117 +17,62 @@
  */
 package org.apache.drill.exec.store.avro;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
-import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 /**
  * Format plugin for Avro data files.
  */
 public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
 
-  private final AvroFormatMatcher matcher;
+  public static final String DEFAULT_NAME = "avro";
 
-  public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
-                          StoragePluginConfig storagePluginConfig) {
-    this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
+  public AvroFormatPlugin(String name,
+                          DrillbitContext context,
+                          Configuration fsConf,
+                          StoragePluginConfig storageConfig,
+                          AvroFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
   }
 
-  public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true, false, true, false, Lists.newArrayList("avro"), "avro");
-    this.matcher = new AvroFormatMatcher(this);
+  private static EasyFormatConfig easyConfig(Configuration fsConf, AvroFormatConfig formatConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = true;
+    config.compressible = false;
+    config.supportsProjectPushdown = true;
+    config.extensions = formatConfig.extensions;
+    config.fsConf = fsConf;
+    config.defaultName = DEFAULT_NAME;
+    config.readerOperatorType = CoreOperatorType.AVRO_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    return config;
   }
 
   @Override
-  public boolean supportsPushDown() {
-    return true;
+  protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+    FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
+    builder.setReaderFactory(new AvroReaderFactory());
+    initScanBuilder(builder, scan);
+    builder.setNullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    return builder;
   }
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException {
-    return new AvroRecordReader(context, fileWork.getPath(), fileWork.getStart(), fileWork.getLength(), dfs, columns,
-      userName);
-  }
-
-  @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  @Override
-  public int getReaderOperatorType() {
-    return CoreOperatorType.AVRO_SUB_SCAN_VALUE;
-  }
-
-  @Override
-  public int getWriterOperatorType() {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  @Override
-  public FormatMatcher getMatcher() {
-    return this.matcher;
-  }
-
-  @Override
-  public boolean supportsStatistics() {
-    return false;
-  }
-
-  @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  private static class AvroFormatMatcher extends BasicFormatMatcher {
-
-    public AvroFormatMatcher(AvroFormatPlugin plugin) {
-      super(plugin, ImmutableList.of(Pattern.compile(".*\\.avro$")), ImmutableList.<MagicString>of());
-    }
+  private static class AvroReaderFactory extends FileScanFramework.FileReaderFactory {
 
     @Override
-    public DrillTable isReadable(DrillFileSystem fs,
-        FileSelection selection, FileSystemPlugin fsPlugin,
-        String storageEngineName, SchemaConfig schemaConfig) throws IOException {
-      if (isFileReadable(fs, selection.getFirstPath(fs))) {
-        return new AvroDrillTable(storageEngineName, fsPlugin, schemaConfig,
-            new FormatSelection(plugin.getConfig(), selection));
-      }
-      return null;
+    public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
+      return new AvroBatchReader();
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
deleted file mode 100644
index 34a035c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericContainer;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.util.Utf8;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.vector.complex.DictVector;
-import org.apache.drill.exec.vector.complex.fn.FieldSelection;
-import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-
-import io.netty.buffer.DrillBuf;
-import org.joda.time.DateTimeConstants;
-
-/**
- * A RecordReader implementation for Avro data files.
- *
- * @see org.apache.drill.exec.store.RecordReader
- */
-public class AvroRecordReader extends AbstractRecordReader {
-
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
-
-  private final Path hadoop;
-  private final long start;
-  private final long end;
-  private final FieldSelection fieldSelection;
-  private final OptionManager optionManager;
-  private DrillBuf buffer;
-  private VectorContainerWriter writer;
-
-  private DataFileReader<GenericContainer> reader = null;
-  private FileSystem fs;
-
-  private final String opUserName;
-  private final String queryUserName;
-
-  private static final int DEFAULT_BATCH_SIZE = 4096;
-
-
-  public AvroRecordReader(final FragmentContext fragmentContext,
-                          final Path inputPath,
-                          final long start,
-                          final long length,
-                          final FileSystem fileSystem,
-                          final List<SchemaPath> projectedColumns,
-                          final String userName) {
-    hadoop = inputPath;
-    this.start = start;
-    this.end = start + length;
-    buffer = fragmentContext.getManagedBuffer();
-    this.fs = fileSystem;
-    this.opUserName = userName;
-    this.queryUserName = fragmentContext.getQueryUserName();
-    setColumns(projectedColumns);
-    this.fieldSelection = FieldSelection.getFieldSelection(projectedColumns);
-    optionManager = fragmentContext.getOptions();
-  }
-
-  private DataFileReader<GenericContainer> getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException {
-    try {
-      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
-      return ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericContainer>>) () ->
-              new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>()));
-    } catch (IOException | InterruptedException e) {
-      throw new ExecutionSetupException(
-        String.format("Error in creating avro reader for file: %s", hadoop), e);
-    }
-  }
-
-  @Override
-  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
-    writer = new VectorContainerWriter(output);
-
-    try {
-      reader = getReader(hadoop, fs);
-      logger.debug("Processing file : {}, start position : {}, end position : {} ", hadoop, start, end);
-      reader.sync(this.start);
-    } catch (IOException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  @Override
-  public int next() {
-    final Stopwatch watch = Stopwatch.createStarted();
-
-    if (reader == null) {
-      throw new IllegalStateException("Avro reader is not open.");
-    }
-    if (!reader.hasNext()) {
-      return 0;
-    }
-
-    int recordCount = 0;
-    writer.allocate();
-    writer.reset();
-
-    try {
-      for (GenericContainer container = null;
-           recordCount < DEFAULT_BATCH_SIZE && reader.hasNext() && !reader.pastSync(end);
-           recordCount++) {
-        writer.setPosition(recordCount);
-        container = reader.next(container);
-        processRecord(container, container.getSchema());
-      }
-
-      writer.setValueCount(recordCount);
-
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e);
-    }
-
-    logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
-    return recordCount;
-  }
-
-  private void processRecord(final GenericContainer container, final Schema schema) {
-
-    final Schema.Type type = schema.getType();
-
-    switch (type) {
-      case RECORD:
-        process(container, schema, null, new MapOrListWriterImpl(writer.rootAsMap()), fieldSelection);
-        break;
-      default:
-        throw new DrillRuntimeException("Root object must be record type. Found: " + type);
-    }
-  }
-
-  private void process(final Object value, final Schema schema, final String fieldName, MapOrListWriterImpl writer, FieldSelection fieldSelection) {
-    if (value == null) {
-      return;
-    }
-    final Schema.Type type = schema.getType();
-
-    switch (type) {
-      case RECORD:
-        // list field of MapOrListWriter will be non null when we want to store array of maps/records.
-        MapOrListWriterImpl _writer = writer;
-
-        for (final Schema.Field field : schema.getFields()) {
-          if (field.schema().getType() == Schema.Type.RECORD ||
-              (field.schema().getType() == Schema.Type.UNION &&
-              field.schema().getTypes().get(0).getType() == Schema.Type.NULL &&
-              field.schema().getTypes().get(1).getType() == Schema.Type.RECORD)) {
-              _writer = (MapOrListWriterImpl) writer.map(field.name());
-          }
-
-          process(((GenericRecord) value).get(field.name()), field.schema(), field.name(), _writer, fieldSelection.getChild(field.name()));
-
-        }
-        break;
-      case ARRAY:
-        assert fieldName != null;
-        final GenericArray<?> array = (GenericArray<?>) value;
-        Schema elementSchema = array.getSchema().getElementType();
-        Type elementType = elementSchema.getType();
-        if (elementType == Schema.Type.RECORD) {
-          writer = (MapOrListWriterImpl) writer.list(fieldName).listoftmap(fieldName);
-        } else if (elementType == Schema.Type.MAP) {
-          writer = (MapOrListWriterImpl) writer.list(fieldName);
-          writer.listOfDict();
-        } else {
-          writer = (MapOrListWriterImpl) writer.list(fieldName);
-        }
-        for (final Object o : array) {
-          writer.start();
-          process(o, elementSchema, fieldName, writer, fieldSelection.getChild(fieldName));
-          writer.end();
-        }
-        break;
-      case UNION:
-        // currently supporting only nullable union (optional fields) like ["null", "some-type"].
-        if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
-          throw new UnsupportedOperationException("Avro union type must be of the format : [\"null\", \"some-type\"]");
-        }
-        process(value, schema.getTypes().get(1), fieldName, writer, fieldSelection);
-        break;
-      case MAP:
-        @SuppressWarnings("unchecked")
-        Map<Object, Object> map = (Map<Object, Object>) value;
-        // key type in Avro MAP is assumed to be string
-        Schema keySchema = Schema.create(Type.STRING);
-        Schema valueSchema = schema.getValueType();
-
-        writer = (MapOrListWriterImpl) writer.dict(fieldName);
-        BaseWriter.DictWriter dictWriter = (BaseWriter.DictWriter) writer.map;
-
-        dictWriter.start();
-        for (Entry<Object, Object> entry : map.entrySet()) {
-          dictWriter.startKeyValuePair();
-          processPrimitive(entry.getKey(), keySchema, DictVector.FIELD_KEY_NAME, writer);
-          process(entry.getValue(), valueSchema, DictVector.FIELD_VALUE_NAME, writer, FieldSelection.ALL_VALID);
-          dictWriter.endKeyValuePair();
-        }
-        dictWriter.end();
-        break;
-      case FIXED:
-      case ENUM:  // Enum symbols are strings
-      case NULL:  // Treat null type as a primitive
-      default:
-        assert fieldName != null;
-
-        if (writer.isMapWriter()) {
-          if (fieldSelection.isNeverValid()) {
-            break;
-          }
-        }
-
-        processPrimitive(value, schema, fieldName, writer);
-        break;
-    }
-
-  }
-
-  private void processPrimitive(final Object value, final Schema schema, final String fieldName,
-                                final MapOrListWriterImpl writer) {
-    LogicalType logicalType = schema.getLogicalType();
-    String logicalTypeName = logicalType != null ? logicalType.getName() : "";
-
-    if (value == null) {
-      return;
-    }
-
-    switch (schema.getType()) {
-      case STRING:
-        byte[] binary;
-        final int length;
-        if (value instanceof Utf8) {
-          binary = ((Utf8) value).getBytes();
-          length = ((Utf8) value).getByteLength();
-        } else {
-          binary = value.toString().getBytes(Charsets.UTF_8);
-          length = binary.length;
-        }
-        ensure(length);
-        buffer.setBytes(0, binary);
-        writer.varChar(fieldName).writeVarChar(0, length, buffer);
-        break;
-      case INT:
-        switch (logicalTypeName) {
-          case "date":
-            writer.date(fieldName).writeDate((int) value * (long) DateTimeConstants.MILLIS_PER_DAY);
-            break;
-          case "time-millis":
-            writer.time(fieldName).writeTime((Integer) value);
-            break;
-          default:
-            writer.integer(fieldName).writeInt((Integer) value);
-        }
-        break;
-      case LONG:
-        switch (logicalTypeName) {
-          case "date":
-            writer.date(fieldName).writeDate((Long) value);
-            break;
-          case "time-micros":
-            writer.time(fieldName).writeTime((int) ((long) value / 1000));
-            break;
-          case "timestamp-millis":
-            writer.timeStamp(fieldName).writeTimeStamp((Long) value);
-            break;
-          case "timestamp-micros":
-            writer.timeStamp(fieldName).writeTimeStamp((long) value / 1000);
-            break;
-          default:
-            writer.bigInt(fieldName).writeBigInt((Long) value);
-        }
-        break;
-      case FLOAT:
-        writer.float4(fieldName).writeFloat4((Float) value);
-        break;
-      case DOUBLE:
-        writer.float8(fieldName).writeFloat8((Double) value);
-        break;
-      case BOOLEAN:
-        writer.bit(fieldName).writeBit((Boolean) value ? 1 : 0);
-        break;
-      case BYTES:
-        final ByteBuffer buf = (ByteBuffer) value;
-        length = buf.remaining();
-        ensure(length);
-        buffer.setBytes(0, buf);
-        switch (logicalTypeName) {
-          case "decimal":
-            ParquetReaderUtility.checkDecimalTypeEnabled(optionManager);
-            LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
-            writer.varDecimal(fieldName, decimalType.getPrecision(), decimalType.getScale())
-                .writeVarDecimal(0, length, buffer, decimalType.getPrecision(), decimalType.getScale());
-            break;
-          default:
-            writer.binary(fieldName).writeVarBinary(0, length, buffer);
-        }
-        break;
-      case NULL:
-        // Nothing to do for null type
-        break;
-      case ENUM:
-        final String symbol = value.toString();
-        final byte[] b;
-        try {
-          b = symbol.getBytes(Charsets.UTF_8.name());
-        } catch (UnsupportedEncodingException e) {
-          throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e);
-        }
-        ensure(b.length);
-        buffer.setBytes(0, b);
-        writer.varChar(fieldName).writeVarChar(0, b.length, buffer);
-        break;
-      case FIXED:
-        GenericFixed genericFixed = (GenericFixed) value;
-        switch (logicalTypeName) {
-          case "decimal":
-            ParquetReaderUtility.checkDecimalTypeEnabled(optionManager);
-            LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
-            writer.varDecimal(fieldName, decimalType.getPrecision(), decimalType.getScale())
-                .writeVarDecimal(new BigDecimal(new BigInteger(genericFixed.bytes()), decimalType.getScale()));
-            break;
-          default:
-            throw new UnsupportedOperationException("Unimplemented type: " + schema.getType().toString());
-        }
-        break;
-      default:
-        throw new DrillRuntimeException("Unhandled Avro type: " + schema.getType().toString());
-    }
-  }
-
-  private boolean selected(SchemaPath field) {
-    if (isStarQuery()) {
-      return true;
-    }
-    for (final SchemaPath sp : getColumns()) {
-      if (sp.contains(field)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void ensure(final int length) {
-    buffer = buffer.reallocIfNeeded(length);
-  }
-
-  @Override
-  public void close() {
-    if (reader != null) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        logger.warn("Error closing Avro reader", e);
-      } finally {
-        reader = null;
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    long currentPosition = -1L;
-    try {
-      if (reader != null) {
-        currentPosition = reader.tell();
-      }
-    } catch (IOException e) {
-      logger.trace("Unable to obtain reader position.", e);
-    }
-    return "AvroRecordReader[File=" + hadoop
-        + ", Position=" + currentPosition
-        + "]";
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSchemaUtil.java
new file mode 100644
index 0000000..53f08d9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSchemaUtil.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.DictBuilder;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.RepeatedListBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class that provides methods to interact with Avro schema.
+ */
+public class AvroSchemaUtil {
+
+  private static final Logger logger = LoggerFactory.getLogger(AvroSchemaUtil.class);
+
+  public static final String AVRO_LOGICAL_TYPE_PROPERTY = "avro_logical_type";
+
+  public static final String DECIMAL_LOGICAL_TYPE = "decimal";
+  public static final String TIMESTAMP_MICROS_LOGICAL_TYPE = "timestamp-micros";
+  public static final String TIMESTAMP_MILLIS_LOGICAL_TYPE = "timestamp-millis";
+  public static final String DATE_LOGICAL_TYPE = "date";
+  public static final String TIME_MICROS_LOGICAL_TYPE = "time-micros";
+  public static final String TIME_MILLIS_LOGICAL_TYPE = "time-millis";
+  public static final String DURATION_LOGICAL_TYPE = "duration";
+
+  /**
+   * Converts Avro schema into Drill metadata description of the schema.
+   *
+   * @param schema Avro schema
+   * @return metadata description of the schema
+   * @throws UserException if schema contains unsupported types
+   */
+  public static TupleMetadata convert(Schema schema) {
+    return SchemaConverter.INSTANCE.convert(schema);
+  }
+
+  /**
+   * Avro represents nullable type as union of null and another schema: ["null", "some-type"].
+   * This method extracts non-nullable schema for given union schema.
+   *
+   * @param schema Avro schema
+   * @param columnName column name
+   * @return non-nullable Avro schema
+   * @throws UserException if given schema is not a union or represents complex union
+   */
+  public static Schema extractSchemaFromNullable(Schema schema, String columnName) {
+    if (!schema.isUnion()) {
+      throw UserException.validationError()
+        .message("Expected union type, but received: %s", schema.getType())
+        .addContext("Column", columnName)
+        .build(logger);
+    }
+    List<Schema> unionSchemas = schema.getTypes();
+
+    // exclude all schemas with null type
+    List<Schema> nonNullSchemas = unionSchemas.stream()
+      .filter(unionSchema -> !Schema.Type.NULL.equals(unionSchema.getType()))
+      .collect(Collectors.toList());
+
+    // if original schema has two elements and only one non-nullable schema, this is simple nullable type
+    if (unionSchemas.size() == 2 && nonNullSchemas.size() == 1) {
+      return nonNullSchemas.get(0);
+    } else {
+      return throwUnsupportedErrorForType("complex union", columnName);
+    }
+  }
+
+  private static <T> T throwUnsupportedErrorForType(String type, String columnName) {
+    throw UserException.unsupportedError()
+      .message("'%s' type is not supported", type)
+      .addContext("Column", columnName)
+      .build(logger);
+  }
+
+  /**
+   * Class is responsible for converting Avro schema into Drill metadata description of the schema.
+   * It does not hold state and thus is thread-safe.
+   */
+  private static class SchemaConverter {
+
+    private static final SchemaConverter INSTANCE = new SchemaConverter();
+
+    TupleMetadata convert(Schema schema) {
+      /*
+        Avro allows to reference types by name, sometimes reference can be done to the type under construction.
+        For example, a linked-list of 64-bit values:
+        {
+          "type": "record",
+          "name": "LongList",
+          "fields" : [
+             {"name": "value", "type": "long"},             // each element has a long
+             {"name": "next", "type": ["null", "LongList"]} // optional next element
+           ]
+        }
+
+        Since we cannot build record type which is not constructed yet, when such situation is detected,
+        record type is set to Drill Map without columns, columns will be detected when reading actual data.
+
+        `typeNamesUnderConstruction` is a holder to store record type names under construction to detect
+         reference to the types which are not yet constructed.
+       */
+      Set<String> typeNamesUnderConstruction = new HashSet<>();
+      TupleSchema tupleSchema = new TupleSchema();
+
+      // add current record type to the set of types under construction
+      typeNamesUnderConstruction.add(schema.getFullName());
+
+      List<Schema.Field> fields = schema.getFields();
+      fields.stream()
+        .map(field -> convert(field, typeNamesUnderConstruction))
+        .forEach(tupleSchema::add);
+      return tupleSchema;
+    }
+
+    private ColumnMetadata convert(Schema.Field field, Set<String> typeNamesUnderConstruction) {
+      Schema fieldSchema = field.schema();
+      return defineColumn(field.name(), fieldSchema, TypeProtos.DataMode.REQUIRED, typeNamesUnderConstruction);
+    }
+
+    private ColumnMetadata defineColumn(String name, Schema fieldSchema,
+                                        TypeProtos.DataMode mode,
+                                        Set<String> typeNamesUnderConstruction) {
+      String logicalTypeName = getLogicalTypeName(fieldSchema);
+      switch (fieldSchema.getType()) {
+        case INT:
+          switch (logicalTypeName) {
+            case DATE_LOGICAL_TYPE:
+              return initField(name, TypeProtos.MinorType.DATE, mode);
+            case TIME_MILLIS_LOGICAL_TYPE:
+              return initField(name, TypeProtos.MinorType.TIME, mode);
+            default:
+              return initField(name, TypeProtos.MinorType.INT, mode);
+          }
+        case LONG:
+          switch (logicalTypeName) {
+            case TIMESTAMP_MICROS_LOGICAL_TYPE:
+            case TIMESTAMP_MILLIS_LOGICAL_TYPE:
+              ColumnMetadata timestampColumn = initField(name, TypeProtos.MinorType.TIMESTAMP, mode);
+              // add avro logical type property to know how to convert timestamp value
+              timestampColumn.setProperty(AVRO_LOGICAL_TYPE_PROPERTY, logicalTypeName);
+              return timestampColumn;
+            case TIME_MICROS_LOGICAL_TYPE:
+              return initField(name, TypeProtos.MinorType.TIME, mode);
+            default:
+              return initField(name, TypeProtos.MinorType.BIGINT, mode);
+          }
+        case FLOAT:
+          return initField(name, TypeProtos.MinorType.FLOAT4, mode);
+        case DOUBLE:
+          return initField(name, TypeProtos.MinorType.FLOAT8, mode);
+        case FIXED:
+          if (DURATION_LOGICAL_TYPE.equals(logicalTypeName)) {
+            return initField(name, TypeProtos.MinorType.INTERVAL, mode);
+          }
+          // fall through
+        case BYTES:
+          if (DECIMAL_LOGICAL_TYPE.equals(logicalTypeName)) {
+            LogicalTypes.Decimal decimalLogicalType = (LogicalTypes.Decimal) fieldSchema.getLogicalType();
+            TypeProtos.MajorType majorType = Types.withPrecisionAndScale(TypeProtos.MinorType.VARDECIMAL,
+              mode, decimalLogicalType.getPrecision(), decimalLogicalType.getScale());
+            return initField(name, majorType);
+          } else {
+            return initField(name, TypeProtos.MinorType.VARBINARY, mode);
+          }
+        case BOOLEAN:
+          return initField(name, TypeProtos.MinorType.BIT, mode);
+        case ENUM:
+        case STRING:
+          return initField(name, TypeProtos.MinorType.VARCHAR, mode);
+        case NULL:
+          return initField(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+        case UNION:
+          Schema schema = extractSchemaFromNullable(fieldSchema, name);
+          TypeProtos.DataMode nullableMode =
+            TypeProtos.DataMode.REPEATED == mode ? TypeProtos.DataMode.REPEATED : TypeProtos.DataMode.OPTIONAL;
+          return defineColumn(name, schema, nullableMode, typeNamesUnderConstruction);
+        case RECORD:
+          MapBuilder recordBuilder = new MapBuilder(null, name, mode);
+          String typeName = fieldSchema.getFullName();
+          // if type name is not under construction, proceed adding columns
+          // otherwise leave type as Drill Map without columns
+          if (typeNamesUnderConstruction.add(typeName)) {
+            fieldSchema.getFields().stream()
+              .map(field -> convert(field, typeNamesUnderConstruction))
+              .forEach(recordBuilder::addColumn);
+            // remove record type name since it was constructed
+            typeNamesUnderConstruction.remove(typeName);
+          }
+          return recordBuilder.buildColumn();
+        case ARRAY:
+          Schema elementSchema = fieldSchema.getElementType();
+          boolean hasNestedArray = elementSchema.isUnion()
+            ? Schema.Type.ARRAY == extractSchemaFromNullable(elementSchema, name).getType()
+            : Schema.Type.ARRAY == elementSchema.getType();
+
+          if (hasNestedArray) {
+            RepeatedListBuilder builder = new RepeatedListBuilder(null, name);
+            builder.addColumn(defineColumn(name, elementSchema, TypeProtos.DataMode.REQUIRED, typeNamesUnderConstruction));
+            return builder.buildColumn();
+          }
+          return defineColumn(name, elementSchema, TypeProtos.DataMode.REPEATED, typeNamesUnderConstruction);
+        case MAP:
+          DictBuilder dictBuilder = new DictBuilder(null, name, mode);
+          // per Avro specification Map key is always of varchar type
+          dictBuilder.key(TypeProtos.MinorType.VARCHAR);
+          Schema valueSchema = fieldSchema.getValueType();
+          ColumnMetadata valueColumn = defineColumn(DictVector.FIELD_VALUE_NAME, valueSchema,
+            TypeProtos.DataMode.REQUIRED, typeNamesUnderConstruction);
+          dictBuilder.addColumn(valueColumn);
+          return dictBuilder.buildColumn();
+        default:
+          return throwUnsupportedErrorForType(fieldSchema.getType().getName(), name);
+      }
+    }
+
+    /**
+     * Identifies logical type name for the column if present.
+     * Column schema can have logical type set as object or through property.
+     *
+     * @param schema Avro column schema
+     * @return logical type name if present, empty string otherwise
+     */
+    private String getLogicalTypeName(Schema schema) {
+      String name = schema.getLogicalType() != null
+        ? schema.getLogicalType().getName()
+        : schema.getProp(LogicalType.LOGICAL_TYPE_PROP);
+      return name == null ? "" : name;
+    }
+
+    private ColumnMetadata initField(String name, TypeProtos.MinorType minorType, TypeProtos.DataMode mode) {
+      TypeProtos.MajorType majorType = Types.withMode(minorType, mode);
+      return initField(name, majorType);
+    }
+
+    private ColumnMetadata initField(String name, TypeProtos.MajorType majorType) {
+      MaterializedField materializedField = MaterializedField.create(name, majorType);
+      return MetadataUtils.fromField(materializedField);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
deleted file mode 100644
index aee5a1e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.Types;
-
-/**
- * Utility class for working with Avro data types.
- */
-public final class AvroTypeHelper {
-
-  // XXX - Decide what to do about Avro's NULL type
-  /*
-  public static final MajorType MAJOR_TYPE_NULL_OPTIONAL      = Types.optional(MinorType.NULL);
-  public static final MajorType MAJOR_TYPE_NULL_REQUIRED      = Types.required(MinorType.NULL);
-  public static final MajorType MAJOR_TYPE_NULL_REPEATED      = Types.repeated(MinorType.NULL);
-  */
-  public static final MajorType MAJOR_TYPE_BOOL_OPTIONAL      = Types.optional(MinorType.UINT1);
-  public static final MajorType MAJOR_TYPE_BOOL_REQUIRED      = Types.required(MinorType.UINT1);
-  public static final MajorType MAJOR_TYPE_BOOL_REPEATED      = Types.repeated(MinorType.UINT1);
-  public static final MajorType MAJOR_TYPE_INT_OPTIONAL       = Types.optional(MinorType.INT);
-  public static final MajorType MAJOR_TYPE_INT_REQUIRED       = Types.required(MinorType.INT);
-  public static final MajorType MAJOR_TYPE_INT_REPEATED       = Types.repeated(MinorType.INT);
-  public static final MajorType MAJOR_TYPE_BIGINT_OPTIONAL    = Types.optional(MinorType.BIGINT);
-  public static final MajorType MAJOR_TYPE_BIGINT_REQUIRED    = Types.required(MinorType.BIGINT);
-  public static final MajorType MAJOR_TYPE_BIGINT_REPEATED    = Types.repeated(MinorType.BIGINT);
-  public static final MajorType MAJOR_TYPE_FLOAT4_OPTIONAL    = Types.optional(MinorType.FLOAT4);
-  public static final MajorType MAJOR_TYPE_FLOAT4_REQUIRED    = Types.required(MinorType.FLOAT4);
-  public static final MajorType MAJOR_TYPE_FLOAT4_REPEATED    = Types.repeated(MinorType.FLOAT4);
-  public static final MajorType MAJOR_TYPE_FLOAT8_OPTIONAL    = Types.optional(MinorType.FLOAT8);
-  public static final MajorType MAJOR_TYPE_FLOAT8_REQUIRED    = Types.required(MinorType.FLOAT8);
-  public static final MajorType MAJOR_TYPE_FLOAT8_REPEATED    = Types.repeated(MinorType.FLOAT8);
-  public static final MajorType MAJOR_TYPE_VARBINARY_OPTIONAL = Types.optional(MinorType.VARBINARY);
-  public static final MajorType MAJOR_TYPE_VARBINARY_REQUIRED = Types.required(MinorType.VARBINARY);
-  public static final MajorType MAJOR_TYPE_VARBINARY_REPEATED = Types.repeated(MinorType.VARBINARY);
-  public static final MajorType MAJOR_TYPE_VARCHAR_OPTIONAL   = Types.optional(MinorType.VARCHAR);
-  public static final MajorType MAJOR_TYPE_VARCHAR_REQUIRED   = Types.required(MinorType.VARCHAR);
-  public static final MajorType MAJOR_TYPE_VARCHAR_REPEATED   = Types.repeated(MinorType.VARCHAR);
-
-
-  private static final String UNSUPPORTED = "Unsupported type: %s [%s]";
-
-  private AvroTypeHelper() { }
-
-  /**
-   * Maintains a mapping between Avro types and Drill types. Given an Avro data
-   * type, this method will return the corresponding Drill field major type.
-   *
-   * @param field   Avro field
-   * @return        Major type or null if no corresponding type
-   */
-  public static MajorType getFieldMajorType(final Field field, final DataMode mode) {
-    return getFieldMajorType(field.schema().getType(), mode);
-  }
-
-  /**
-   * Maintains a mapping between Avro types and Drill types. Given an Avro data
-   * type, this method will return the corresponding Drill field major type.
-   *
-   * @param type Avro type
-   * @param mode Data mode
-   * @return     Drill major type or null if no corresponding type
-   */
-  public static MajorType getFieldMajorType(final Type type, final DataMode mode) {
-
-    switch (type) {
-      case MAP:
-      case RECORD:
-      case ENUM:
-      case UNION:
-        throw new UnsupportedOperationException("Complex types are unimplemented");
-      case NULL:
-        /*
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_NULL_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_NULL_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_NULL_REPEATED;
-        }
-        break;
-        */
-        throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
-      case ARRAY:
-        break;
-      case BOOLEAN:
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_BOOL_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_BOOL_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_BOOL_REPEATED;
-        }
-        break;
-      case INT:
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_INT_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_INT_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_INT_REPEATED;
-        }
-        break;
-      case LONG:
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_BIGINT_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_BIGINT_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_BIGINT_REPEATED;
-        }
-        break;
-      case FLOAT:
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_FLOAT4_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_FLOAT4_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_FLOAT4_REPEATED;
-        }
-        break;
-      case DOUBLE:
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_FLOAT8_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_FLOAT8_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_FLOAT8_REPEATED;
-        }
-        break;
-      case BYTES:
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_VARBINARY_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_VARBINARY_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_VARBINARY_REPEATED;
-        }
-        break;
-      case STRING:
-        switch (mode) {
-          case OPTIONAL:
-            return MAJOR_TYPE_VARCHAR_OPTIONAL;
-          case REQUIRED:
-            return MAJOR_TYPE_VARCHAR_REQUIRED;
-          case REPEATED:
-            return MAJOR_TYPE_VARCHAR_REPEATED;
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
-    }
-
-    throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverter.java
new file mode 100644
index 0000000..1a63f7b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverter.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.joda.time.DateTimeConstants;
+import org.joda.time.Period;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+/**
+ * Converts and sets given value into the specific column writer.
+ */
+public interface ColumnConverter {
+
+  void convert(Object value);
+
+  /**
+   * Does nothing, is used when column is not projected to avoid unnecessary
+   * column values conversions and writes.
+   */
+  class DummyColumnConverter implements ColumnConverter {
+
+    public static final DummyColumnConverter INSTANCE = new DummyColumnConverter();
+
+    @Override
+    public void convert(Object value) {
+      // do nothing
+    }
+  }
+
+  /**
+   * Converts and writes scalar values using provided {@link #valueConverter}.
+   * {@link #valueConverter} has different implementation depending
+   * on the scalar value type.
+   */
+  class ScalarColumnConverter implements ColumnConverter {
+
+    private final Consumer<Object> valueConverter;
+
+    public ScalarColumnConverter(Consumer<Object> valueConverter) {
+      this.valueConverter = valueConverter;
+    }
+
+    public static ScalarColumnConverter init(ScalarWriter writer) {
+      ColumnMetadata columnMetadata = writer.schema();
+      switch (columnMetadata.type()) {
+        case VARCHAR:
+          return new ScalarColumnConverter(value -> {
+            byte[] binary;
+            int length;
+            if (value instanceof Utf8) {
+              Utf8 utf8 = (Utf8) value;
+              binary = utf8.getBytes();
+              length = utf8.getByteLength();
+            } else {
+              binary = value.toString().getBytes(Charsets.UTF_8);
+              length = binary.length;
+            }
+            writer.setBytes(binary, length);
+          });
+        case VARBINARY:
+          return new ScalarColumnConverter(value -> {
+            if (value instanceof ByteBuffer) {
+              ByteBuffer buf = (ByteBuffer) value;
+              writer.setBytes(buf.array(), buf.remaining());
+            } else {
+              byte[] bytes = ((GenericFixed) value).bytes();
+              writer.setBytes(bytes, bytes.length);
+            }
+          });
+        case VARDECIMAL:
+          return new ScalarColumnConverter(value -> {
+            BigInteger bigInteger;
+            if (value instanceof ByteBuffer) {
+              ByteBuffer decBuf = (ByteBuffer) value;
+              bigInteger = new BigInteger(decBuf.array());
+            } else {
+              GenericFixed genericFixed = (GenericFixed) value;
+              bigInteger = new BigInteger(genericFixed.bytes());
+            }
+            BigDecimal decimalValue = new BigDecimal(bigInteger, writer.schema().scale());
+            writer.setDecimal(decimalValue);
+          });
+        case TIMESTAMP:
+          return new ScalarColumnConverter(value -> {
+            String avroLogicalType = writer.schema().property(AvroSchemaUtil.AVRO_LOGICAL_TYPE_PROPERTY);
+            if (AvroSchemaUtil.TIMESTAMP_MILLIS_LOGICAL_TYPE.equals(avroLogicalType)) {
+              writer.setLong((long) value);
+            } else {
+              writer.setLong((long) value / 1000);
+            }
+          });
+        case DATE:
+          return new ScalarColumnConverter(value -> writer.setLong((int) value * (long) DateTimeConstants.MILLIS_PER_DAY));
+        case TIME:
+          return new ScalarColumnConverter(value -> {
+            if (value instanceof Long) {
+              writer.setInt((int) ((long) value / 1000));
+            } else {
+              writer.setInt((int) value);
+            }
+          });
+        case INTERVAL:
+          return new ScalarColumnConverter(value -> {
+            GenericFixed genericFixed = (GenericFixed) value;
+            IntBuffer intBuf = ByteBuffer.wrap(genericFixed.bytes())
+              .order(ByteOrder.LITTLE_ENDIAN)
+              .asIntBuffer();
+
+            Period period = Period.months(intBuf.get(0))
+              .withDays(intBuf.get(1)
+              ).withMillis(intBuf.get(2));
+
+            writer.setPeriod(period);
+          });
+        default:
+          return new ScalarColumnConverter(writer::setObject);
+      }
+    }
+
+    @Override
+    public void convert(Object value) {
+      if (value == null) {
+        return;
+      }
+
+      valueConverter.accept(value);
+    }
+  }
+
+  /**
+   * Converts and writes array values using {@link #valueConverter}
+   * into {@link #arrayWriter}.
+   */
+  class ArrayColumnConverter implements ColumnConverter {
+
+    private final ArrayWriter arrayWriter;
+    private final ColumnConverter valueConverter;
+
+    public ArrayColumnConverter(ArrayWriter arrayWriter, ColumnConverter valueConverter) {
+      this.arrayWriter = arrayWriter;
+      this.valueConverter = valueConverter;
+    }
+
+    @Override
+    public void convert(Object value) {
+      if (value == null || !arrayWriter.isProjected()) {
+        return;
+      }
+
+      GenericArray<?> array = (GenericArray<?>) value;
+      array.forEach(arrayValue -> {
+        valueConverter.convert(arrayValue);
+        arrayWriter.save();
+      });
+    }
+  }
+
+  /**
+   * Converts and writes all map children using provided {@link #converters}.
+   * If {@link #converters} are empty, generates their converters based on
+   * {@link GenericRecord} schema.
+   */
+  class MapColumnConverter implements ColumnConverter {
+
+    private final TupleWriter tupleWriter;
+    private final List<ColumnConverter> converters;
+
+    public MapColumnConverter(TupleWriter tupleWriter, List<ColumnConverter> converters) {
+      this.tupleWriter = tupleWriter;
+      this.converters = new ArrayList<>(converters);
+    }
+
+    @Override
+    public void convert(Object value) {
+      if (value == null) {
+        return;
+      }
+
+      GenericRecord genericRecord = (GenericRecord) value;
+
+      if (converters.isEmpty()) {
+        // fill in tuple schema for cases when it contains recursive named record types
+        TupleMetadata metadata = AvroSchemaUtil.convert(genericRecord.getSchema());
+        metadata.toMetadataList().forEach(tupleWriter::addColumn);
+
+        IntStream.range(0, metadata.size())
+          .mapToObj(i -> ColumnConvertersUtil.getConverter(metadata.metadata(i), tupleWriter.column(i)))
+          .forEach(converters::add);
+      }
+
+      IntStream.range(0, converters.size())
+        .forEach(i -> converters.get(i).convert(genericRecord.get(i)));
+    }
+  }
+
+  /**
+   * Converts and writes dict values using provided key / value converters.
+   */
+  class DictColumnConverter implements ColumnConverter {
+
+    private final DictWriter dictWriter;
+    private final ColumnConverter keyConverter;
+    private final ColumnConverter valueConverter;
+
+    public DictColumnConverter(DictWriter dictWriter, ColumnConverter keyConverter, ColumnConverter valueConverter) {
+      this.dictWriter = dictWriter;
+      this.keyConverter = keyConverter;
+      this.valueConverter = valueConverter;
+    }
+
+    @Override
+    public void convert(Object value) {
+      if (value == null) {
+        return;
+      }
+
+      @SuppressWarnings("unchecked") Map<Object, Object> map = (Map<Object, Object>) value;
+      map.forEach((key, val) -> {
+        keyConverter.convert(key);
+        valueConverter.convert(val);
+        dictWriter.save();
+      });
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConvertersUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConvertersUtil.java
new file mode 100644
index 0000000..99308fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConvertersUtil.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.avro.ColumnConverter.ArrayColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.DictColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.DummyColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.MapColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.ScalarColumnConverter;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ColumnConvertersUtil {
+
+  /**
+   * Based on given converted Avro schema and current row writer generates list of
+   * column converters based on column type.
+   *
+   * @param schema converted Avro schema
+   * @param rowWriter current row writer
+   * @return list of column converters
+   */
+  public static List<ColumnConverter> initConverters(TupleMetadata schema, RowSetLoader rowWriter) {
+    return IntStream.range(0, schema.size())
+      .mapToObj(i -> getConverter(schema.metadata(i), rowWriter.column(i)))
+      .collect(Collectors.toList());
+  }
+
+  /**
+   * Based on column type, creates corresponding column converter
+   * which holds conversion logic and appropriate writer to set converted data into.
+   * For columns which are not projected, {@link DummyColumnConverter} is used.
+   *
+   * @param metadata column metadata
+   * @param writer column writer
+   * @return column converter
+   */
+  public static ColumnConverter getConverter(ColumnMetadata metadata, ObjectWriter writer) {
+    if (!writer.isProjected()) {
+      return DummyColumnConverter.INSTANCE;
+    }
+
+    if (metadata.isArray()) {
+      return getArrayConverter(metadata, writer.array());
+    }
+
+    if (metadata.isMap()) {
+      return getMapConverter(metadata.tupleSchema(), writer.tuple());
+    }
+
+    if (metadata.isDict()) {
+      return getDictConverter(metadata.tupleSchema(), writer.dict());
+    }
+
+    return getScalarConverter(writer.scalar());
+  }
+
+  private static ColumnConverter getArrayConverter(ColumnMetadata metadata, ArrayWriter arrayWriter) {
+    ObjectWriter valueWriter = arrayWriter.entry();
+    ColumnConverter valueConverter;
+    if (metadata.isMap()) {
+      valueConverter = getMapConverter(metadata.tupleSchema(), valueWriter.tuple());
+    } else if (metadata.isDict()) {
+      valueConverter = getDictConverter(metadata.tupleSchema(), valueWriter.dict());
+    } else if (metadata.isMultiList()) {
+      valueConverter = getConverter(metadata.childSchema(), valueWriter);
+    } else {
+      valueConverter = getScalarConverter(valueWriter.scalar());
+    }
+    return new ArrayColumnConverter(arrayWriter, valueConverter);
+  }
+
+  private static ColumnConverter getMapConverter(TupleMetadata metadata, TupleWriter tupleWriter) {
+    List<ColumnConverter> converters = IntStream.range(0, metadata.size())
+      .mapToObj(i -> getConverter(metadata.metadata(i), tupleWriter.column(i)))
+      .collect(Collectors.toList());
+    return new MapColumnConverter(tupleWriter, converters);
+  }
+
+  private static ColumnConverter getDictConverter(TupleMetadata metadata, DictWriter dictWriter) {
+    ColumnConverter keyConverter = getScalarConverter(dictWriter.keyWriter());
+    ColumnConverter valueConverter = getConverter(metadata.metadata(DictVector.FIELD_VALUE_NAME), dictWriter.valueWriter());
+    return new DictColumnConverter(dictWriter, keyConverter, valueConverter);
+  }
+
+  private static ColumnConverter getScalarConverter(ScalarWriter scalarWriter) {
+    return ScalarColumnConverter.init(scalarWriter);
+  }
+}
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 75e363f..8c0428d 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -50,7 +50,8 @@
           "type" : "pcapng"
         },
         "avro" : {
-          "type" : "avro"
+          "type" : "avro",
+          "extensions" : [ "avro" ]
         },
         "sequencefile" : {
           "type" : "sequencefile",
@@ -163,4 +164,4 @@
       "enabled" : true
     }
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index 718b6e9..ce03232 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.impersonation;
 
+import org.apache.drill.exec.store.avro.AvroDataGenerator;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.categories.SecurityTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.util.DrillFileUtils;
-import org.apache.drill.exec.store.avro.AvroTestUtil;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.categories.SlowTest;
 import org.apache.hadoop.fs.FileSystem;
@@ -168,7 +168,8 @@
     fs.setOwner(dfsFile, user, group);
     fs.setPermission(dfsFile, new FsPermission((short) 0700));
 
-    localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath());
+    AvroDataGenerator avroDataGenerator = new AvroDataGenerator(dirTestWatcher);
+    localFile = new Path(avroDataGenerator.generateSimplePrimitiveSchema_NoNullValues().getFilePath());
     dfsFile = new Path(getUserHome(user), "simple.avro");
     fs.copyFromLocalFile(localFile, dfsFile);
     fs.setOwner(dfsFile, user, group);
@@ -283,12 +284,9 @@
     createView(org1Users[0], org1Groups[0], "simple_avro_view",
       String.format("SELECT h_boolean, e_double FROM %s.`%s` t", MINI_DFS_STORAGE_PLUGIN_NAME,
         new Path(getUserHome(org1Users[0]), "simple.avro")));
-    try {
-      updateClient(org1Users[1]);
-      test("SELECT h_boolean FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
-    } catch (UserRemoteException e) {
-      assertNull("This test should pass.", e);
-    }
+
+    updateClient(org1Users[1]);
+    test("SELECT h_boolean FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
   }
 
   @Test // DRILL-7250
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
index c82870d..b4700a6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
@@ -25,6 +25,7 @@
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.categories.RowSetTests;
@@ -76,7 +77,7 @@
 
     // Empty list means nothing is projected
 
-    RequestedTuple projSet = RequestedTupleImpl.parse(new ArrayList<SchemaPath>());
+    RequestedTuple projSet = RequestedTupleImpl.parse(Collections.emptyList());
     assertEquals(TupleProjectionType.NONE, projSet.type());
     assertTrue(projSet instanceof ImpliedTupleRequest);
     List<RequestedColumn> cols = projSet.projections();
@@ -297,7 +298,7 @@
     RequestedColumn wildcard = cols.get(0);
     assertEquals(ProjectionType.WILDCARD, wildcard.type());
     assertEquals(SchemaPath.DYNAMIC_STAR, wildcard.name());
-    assertTrue(! wildcard.isSimple());
+    assertFalse(wildcard.isSimple());
     assertTrue(wildcard.isWildcard());
     assertNull(wildcard.mapProjection());
     assertNull(wildcard.indexes());
@@ -326,7 +327,7 @@
     assertTrue(a.isArray());
     assertFalse(a.isSimple());
     assertFalse(a.isTuple());
-    boolean indexes[] = a.indexes();
+    boolean[] indexes = a.indexes();
     assertNotNull(indexes);
     assertEquals(4, indexes.length);
     assertFalse(indexes[0]);
@@ -335,15 +336,29 @@
     assertTrue(indexes[3]);
   }
 
+  /**
+   * Duplicate array entries are allowed to handle the
+   * use case of a[1], a[1].z. Each element is reported once;
+   * the project operator will create copies as needed.
+   */
   @Test
   public void testArrayDups() {
-    try {
-      RequestedTupleImpl.parse(
-          RowSetTestUtils.projectList("a[1]", "a[3]", "a[1]"));
-      fail();
-    } catch (UserException e) {
-      // Expected
-    }
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+      RowSetTestUtils.projectList("a[1]", "a[3]", "a[1]", "a[3].z"));
+
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    RequestedColumn a = cols.get(0);
+    assertEquals("a", a.name());
+    assertTrue(a.isArray());
+    boolean[] indexes = a.indexes();
+    assertNotNull(indexes);
+    assertEquals(4, indexes.length);
+    assertFalse(indexes[0]);
+    assertTrue(indexes[1]);
+    assertFalse(indexes[2]);
+    assertTrue(indexes[3]);
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
index af29f06..6da29f8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
@@ -790,13 +790,13 @@
   }
 
   @Test // DRILL-6944
-  public void testMapTypeFullyQualifiedInNewlyCreatedView() throws Exception {
+  public void testMapTypeTreatedAsAnyInNewlyCreatedView() throws Exception {
     try {
       test("CREATE VIEW dfs.tmp.`mapf_view` AS SELECT `mapf` FROM dfs.`avro/map_string_to_long.avro`");
       testPlanWithAttributesMatchingPatterns("SELECT * FROM dfs.tmp.`mapf_view`", new String[]{
-          "Screen : rowType = RecordType\\(\\(VARCHAR\\(65535\\), BIGINT\\) MAP mapf\\)",
-          "Project\\(mapf=\\[\\$0\\]\\) : rowType = RecordType\\(\\(VARCHAR\\(65535\\), BIGINT\\) MAP mapf\\)",
-          "Scan.*avro/map_string_to_long.avro.*rowType = RecordType\\(\\(VARCHAR\\(65535\\), BIGINT\\) MAP mapf\\)"
+          "Screen : rowType = RecordType\\(ANY mapf\\)",
+          "Project\\(mapf=\\[\\$0\\]\\) : rowType = RecordType\\(ANY mapf\\)",
+          "Scan.*avro/map_string_to_long.avro.*rowType = RecordType\\(ANY mapf\\)"
       }, null);
     } finally {
       test("DROP VIEW IF EXISTS dfs.tmp.`mapf_view`");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
index b4c8b14..4aca1a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
@@ -19,7 +19,7 @@
 
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.avro.AvroTestUtil;
+import org.apache.drill.exec.store.avro.AvroDataGenerator;
 import org.junit.Test;
 
 import java.nio.file.Paths;
@@ -63,8 +63,8 @@
 
   @Test
   public void testAvro() throws Exception {
-    AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(5);
-    String file = testSetup.getFileName();
+    AvroDataGenerator dataGenerator = new AvroDataGenerator(dirTestWatcher);
+    String file = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(5).getFileName();
     testPhysicalPlanSubmission(
         String.format("select * from dfs.`%s`", file),
         String.format("select * from table(dfs.`%s`(type=>'avro'))", file)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroDataGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroDataGenerator.java
new file mode 100644
index 0000000..b737092
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroDataGenerator.java
@@ -0,0 +1,853 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
+import org.apache.drill.test.BaseDirTestWatcher;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Utilities for generating Avro test data.
+ */
+public class AvroDataGenerator {
+
+  public static final int RECORD_COUNT = 50;
+  public static int ARRAY_SIZE = 4;
+
+  private final BaseDirTestWatcher dirTestWatcher;
+
+  public AvroDataGenerator(BaseDirTestWatcher dirTestWatcher) {
+    this.dirTestWatcher = dirTestWatcher;
+  }
+
+  /**
+   * Class to write records to an Avro file while simultaneously
+   * constructing a corresponding list of records in the format taken in
+   * by the Drill test builder to describe expected results.
+   */
+  public static class AvroTestRecordWriter implements Closeable {
+
+    private final List<Map<String, Object>> expectedRecords;
+    private final Schema schema;
+    private final DataFileWriter<GenericData.Record> writer;
+    private final String filePath;
+    private final String fileName;
+
+    private GenericData.Record currentAvroRecord;
+    private Map<String, Object> currentExpectedRecord;
+
+    public AvroTestRecordWriter(Schema schema, File file) {
+      writer = new DataFileWriter<>(new GenericDatumWriter<>(schema));
+      try {
+        writer.create(schema, file);
+      } catch (IOException e) {
+        throw new RuntimeException("Error creating file in Avro test setup.", e);
+      }
+      this.schema = schema;
+      currentExpectedRecord = new TreeMap<>();
+      expectedRecords = new ArrayList<>();
+      filePath = file.getAbsolutePath();
+      fileName = file.getName();
+    }
+
+    public void startRecord() {
+      currentAvroRecord = new GenericData.Record(schema);
+      currentExpectedRecord = new TreeMap<>();
+    }
+
+    public void put(String key, Object value) {
+      currentAvroRecord.put(key, value);
+      // convert binary values into byte[], the format they will be given
+      // in the Drill result set in the test framework
+      currentExpectedRecord.put("`" + key + "`", convertAvroValToDrill(value, true));
+    }
+
+    // TODO - fix this the test wrapper to prevent the need for this hack
+    // to make the root behave differently than nested fields for String vs. Text
+    private Object convertAvroValToDrill(Object value, boolean root) {
+      if (value instanceof ByteBuffer) {
+        ByteBuffer bb = ((ByteBuffer) value);
+        byte[] drillVal = new byte[((ByteBuffer) value).remaining()];
+        bb.get(drillVal);
+        bb.position(0);
+        value = drillVal;
+      } else if (!root && value instanceof CharSequence) {
+        value = new Text(value.toString());
+      } else if (value instanceof GenericData.Array) {
+        GenericData.Array<?> array = ((GenericData.Array<?>) value);
+         JsonStringArrayList<Object> drillList = new JsonStringArrayList<>();
+        for (Object o : array) {
+          drillList.add(convertAvroValToDrill(o, false));
+        }
+        value = drillList;
+      } else if (value instanceof GenericData.EnumSymbol) {
+        value = value.toString();
+      } else if (value instanceof GenericData.Record) {
+        GenericData.Record rec = ((GenericData.Record) value);
+        JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
+        for (Schema.Field field : rec.getSchema().getFields()) {
+          Object val = rec.get(field.name());
+          newRecord.put(field.name(), convertAvroValToDrill(val, false));
+        }
+        value = newRecord;
+      }
+      return value;
+    }
+
+    public void endRecord() throws IOException {
+      writer.append(currentAvroRecord);
+      expectedRecords.add(currentExpectedRecord);
+    }
+
+    @Override
+    public void close() throws IOException {
+      writer.close();
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public String getFileName() {
+      return fileName;
+    }
+
+    public List<Map<String, Object>> getExpectedRecords() {
+      return expectedRecords;
+    }
+  }
+
+
+  public AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues() throws Exception {
+    return generateSimplePrimitiveSchema_NoNullValues(RECORD_COUNT);
+  }
+
+  public AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords) throws Exception {
+    return generateSimplePrimitiveSchema_NoNullValues(numRecords, "");
+  }
+
+  /**
+   * Generates Avro table with specified rows number.
+   *
+   * @param numRecords rows number in the table
+   * @param tablePath  table path
+   * @return AvroTestRecordWriter instance
+   */
+  public AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords, String tablePath) throws Exception {
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_long").type().longType().noDefault()
+      .name("d_float").type().floatType().noDefault()
+      .name("e_double").type().doubleType().noDefault()
+      .name("f_bytes").type().bytesType().noDefault()
+      .name("g_null").type().nullType().noDefault()
+      .name("h_boolean").type().booleanType().noDefault()
+      .endRecord();
+
+    File file = File.createTempFile("avro-simple-primitive-no-nulls-test", ".avro",
+     dirTestWatcher.makeRootSubDir(Paths.get(tablePath)));
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      ByteBuffer bb = ByteBuffer.allocate(2);
+      bb.put(0, (byte) 'a');
+      for (int i = 0; i < numRecords; i++) {
+        bb.put(1, (byte) ('0' + (i % 10)));
+        bb.position(0);
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+        record.put("c_long", (long) i);
+        record.put("d_float", (float) i);
+        record.put("e_double", (double) i);
+        record.put("f_bytes", bb);
+        record.put("g_null", null);
+        record.put("h_boolean", (i % 2 == 0));
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateUnionSchema_WithNullValues() throws Exception {
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_long").type().longType().noDefault()
+      .name("d_float").type().floatType().noDefault()
+      .name("e_double").type().doubleType().noDefault()
+      .name("f_bytes").type().bytesType().noDefault()
+      .name("g_null").type().nullType().noDefault()
+      .name("h_boolean").type().booleanType().noDefault()
+      .name("i_union").type().optional().doubleType()
+      .endRecord();
+
+    File file = File.createTempFile("avro-simple-primitive-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      ByteBuffer bb = ByteBuffer.allocate(1);
+      bb.put(0, (byte) 1);
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+        record.put("c_long", (long) i);
+        record.put("d_float", (float) i);
+        record.put("e_double", (double) i);
+        record.put("f_bytes", bb);
+        record.put("g_null", null);
+        record.put("h_boolean", (i % 2 == 0));
+        record.put("i_union", (i % 2 == 0 ? (double) i : null));
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateUnionSchema_WithNonNullValues() throws Exception {
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("i_union").type().unionOf().doubleType().and().longType().endUnion().noDefault()
+      .endRecord();
+
+    File file = File.createTempFile("avro-complex-union-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("i_union", (i % 2 == 0 ? (double) i : (long) i));
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateSimpleEnumSchema_NoNullValues() throws Exception {
+    String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
+
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_enum").type().enumeration("my_enum").symbols(symbols).noDefault()
+      .endRecord();
+
+    File file = File.createTempFile("avro-primitive-with-enum-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema enumSchema = schema.getField("b_enum").schema();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]);
+        record.put("b_enum", symbol);
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateSimpleArraySchema_NoNullValues() throws Exception {
+    File file = File.createTempFile("avro-simple-array-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_string_array").type().array().items().stringType().noDefault()
+      .name("d_int_array").type().array().items().intType().noDefault()
+      .name("e_float_array").type().array().items().floatType().noDefault()
+      .endRecord();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+        {
+          GenericArray<String> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("c_string_array").schema());
+          for (int j = 0; j < ARRAY_SIZE; j++) {
+            array.add(j, "c_string_array_" + i + "_" + j);
+          }
+          record.put("c_string_array", array);
+        }
+        {
+          GenericArray<Integer> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("d_int_array").schema());
+          for (int j = 0; j < ARRAY_SIZE; j++) {
+            array.add(j, i * j);
+          }
+          record.put("d_int_array", array);
+        }
+        {
+          GenericArray<Float> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("e_float_array").schema());
+          for (int j = 0; j < ARRAY_SIZE; j++) {
+            array.add(j, (float) (i * j));
+          }
+          record.put("e_float_array", array);
+        }
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateSimpleNestedSchema_NoNullValues() throws Exception {
+    File file = File.createTempFile("avro-simple-nested-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_record").type().record("my_record_1")
+        .namespace("foo.blah.org")
+        .fields()
+        .name("nested_1_string").type().stringType().noDefault()
+        .name("nested_1_int").type().intType().noDefault()
+        .endRecord()
+        .noDefault()
+      .endRecord();
+
+    Schema nestedSchema = schema.getField("c_record").schema();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+        nestedRecord.put("nested_1_int", i * i);
+
+        record.put("c_record", nestedRecord);
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateUnionNestedArraySchema_withNullValues() throws Exception {
+    File file = File.createTempFile("avro-nested-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_array").type().optional().array().items().record("my_record_1")
+        .namespace("foo.blah.org").fields()
+        .name("nested_1_string").type().optional().stringType()
+        .name("nested_1_int").type().optional().intType()
+        .endRecord()
+      .endRecord();
+
+    Schema nestedSchema = schema.getField("c_array").schema();
+    Schema arraySchema = nestedSchema.getTypes().get(1);
+    Schema itemSchema = arraySchema.getElementType();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          GenericArray<GenericRecord> array = new GenericData.Array<>(1, arraySchema);
+          GenericRecord nestedRecord = new GenericData.Record(itemSchema);
+          nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+          nestedRecord.put("nested_1_int", i * i);
+          array.add(nestedRecord);
+          record.put("c_array", array);
+        }
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateNestedArraySchema() throws IOException {
+    return generateNestedArraySchema(RECORD_COUNT, ARRAY_SIZE);
+  }
+
+  public AvroTestRecordWriter generateNestedArraySchema(int numRecords, int numArrayItems) throws IOException {
+    File file = File.createTempFile("avro-nested-array-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest").namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_int").type().intType().noDefault()
+      .name("b_array").type().array().items()
+        .record("my_record_1").namespace("foo.blah.org")
+        .fields()
+        .name("nested_1_int").type().optional().intType()
+        .endRecord()
+        .arrayDefault(Collections.emptyList())
+      .endRecord();
+
+    Schema arraySchema = schema.getField("b_array").schema();
+    Schema itemSchema = arraySchema.getElementType();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < numRecords; i++) {
+        record.startRecord();
+        record.put("a_int", i);
+        GenericArray<GenericRecord> array = new GenericData.Array<>(ARRAY_SIZE, arraySchema);
+
+        for (int j = 0; j < numArrayItems; j++) {
+          GenericRecord nestedRecord = new GenericData.Record(itemSchema);
+          nestedRecord.put("nested_1_int", j);
+          array.add(nestedRecord);
+        }
+        record.put("b_array", array);
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateMapSchema_withNullValues() throws Exception {
+    File file = File.createTempFile("avro-map-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
+      .endRecord();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          Map<String, String> strMap = new HashMap<>();
+          strMap.put("key1", "nested_1_string_" +  i);
+          strMap.put("key2", "nested_1_string_" +  (i + 1 ));
+          record.put("c_map", strMap);
+        }
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateMapSchemaComplex_withNullValues() throws Exception {
+    File file = File.createTempFile("avro-map-complex-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
+      .name("d_map").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
+      .endRecord();
+
+    Schema arrayMapSchema = schema.getField("d_map").schema();
+    Schema arrayItemSchema = arrayMapSchema.getTypes().get(1).getValueType();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          Map<String, String> c_map = new HashMap<>();
+          c_map.put("key1", "nested_1_string_" +  i);
+          c_map.put("key2", "nested_1_string_" +  (i + 1 ));
+          record.put("c_map", c_map);
+        } else {
+          Map<String, GenericArray<Double>> d_map = new HashMap<>();
+          GenericArray<Double> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
+          for (int j = 0; j < ARRAY_SIZE; j++) {
+            array.add((double)j);
+          }
+          d_map.put("key1", array);
+          d_map.put("key2", array);
+
+          record.put("d_map", d_map);
+        }
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateUnionNestedSchema_withNullValues() throws Exception {
+    File file = File.createTempFile("avro-nested-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_record").type().optional().record("my_record_1")
+        .namespace("foo.blah.org").fields()
+        .name("nested_1_string").type().optional().stringType()
+        .name("nested_1_int").type().optional().intType()
+        .endRecord()
+      .endRecord();
+
+    Schema nestedSchema = schema.getField("c_record").schema();
+    Schema optionalSchema = nestedSchema.getTypes().get(1);
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          GenericRecord nestedRecord = new GenericData.Record(optionalSchema);
+          nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+          nestedRecord.put("nested_1_int", i * i);
+          record.put("c_record", nestedRecord);
+        }
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateDoubleNestedSchema_NoNullValues() throws Exception {
+    File file = File.createTempFile("avro-double-nested-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringType().noDefault()
+      .name("b_int").type().intType().noDefault()
+      .name("c_record").type().record("my_record_1")
+        .namespace("foo.blah.org")
+        .fields()
+        .name("nested_1_string").type().stringType().noDefault()
+        .name("nested_1_int").type().intType().noDefault()
+        .name("nested_1_record").type().record("my_double_nested_record_1")
+          .namespace("foo.blah.org.rot")
+          .fields()
+          .name("double_nested_1_string").type().stringType().noDefault()
+          .name("double_nested_1_int").type().intType().noDefault()
+          .endRecord()
+          .noDefault()
+        .endRecord()
+        .noDefault()
+      .endRecord();
+
+    Schema nestedSchema = schema.getField("c_record").schema();
+    Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema();
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+        nestedRecord.put("nested_1_int", i * i);
+
+        GenericRecord doubleNestedRecord = new GenericData.Record(doubleNestedSchema);
+        doubleNestedRecord.put("double_nested_1_string", "double_nested_1_string_" + i + "_" + i);
+        doubleNestedRecord.put("double_nested_1_int", i * i * i);
+
+        nestedRecord.put("nested_1_record", doubleNestedRecord);
+        record.put("c_record", nestedRecord);
+
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public String generateLinkedList(int numRows) throws Exception {
+    File file = File.createTempFile("avro-linked-list-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("LongList")
+      .namespace("org.apache.drill.exec.store.avro")
+      .aliases("LinkedLongs")
+      .fields()
+      .name("value").type().optional().longType()
+      .name("next").type().optional().type("LongList")
+      .endRecord();
+
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+      writer.create(schema, file);
+      GenericRecord previousRecord = null;
+      for (int i = 0; i < numRows; i++) {
+        GenericRecord record = (GenericRecord) (previousRecord == null ? new GenericData.Record(schema) : previousRecord.get("next"));
+        record.put("value", (long) i);
+        if (previousRecord != null) {
+          writer.append(previousRecord);
+        }
+        GenericRecord nextRecord = new GenericData.Record(record.getSchema());
+        record.put("next", nextRecord);
+        previousRecord = record;
+      }
+      writer.append(previousRecord);
+    }
+    return file.getName();
+  }
+
+  public AvroTestRecordWriter generateStringAndUtf8Data() throws Exception {
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("a_string").type().stringBuilder().prop("avro.java.string", "String").endString().noDefault()
+      .name("b_utf8").type().stringType().noDefault()
+      .endRecord();
+
+    File file = File.createTempFile("avro-string-utf8-test", ".avro", dirTestWatcher.getRootDir());
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+        record.put("a_string", "a_" + i);
+        record.put("b_utf8", "b_" + i);
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public AvroTestRecordWriter generateMapSchema() throws Exception {
+    File file = File.createTempFile("avro-map-test", ".avro", dirTestWatcher.getRootDir());
+    Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+      .namespace("org.apache.drill.exec.store.avro")
+      .fields()
+      .name("map_field").type().optional().map().values(Schema.create(Type.LONG))
+      .name("map_array").type().optional().array().items(Schema.createMap(Schema.create(Type.INT)))
+      .name("map_array_value").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
+      .endRecord();
+
+    Schema mapArraySchema = schema.getField("map_array").schema();
+    Schema arrayItemSchema = mapArraySchema.getTypes().get(1);
+
+    try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        record.startRecord();
+
+        // Create map with long values
+        Map<String, Long> map = new HashMap<>();
+        map.put("key1", (long) i);
+        map.put("key2", (long) i + 1);
+        record.put("map_field", map);
+
+        // Create list of map with int values
+        GenericArray<Map<String, Integer>> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
+        for (int j = 0; j < ARRAY_SIZE; j++) {
+          Map<String, Integer> mapInt = new HashMap<>();
+          mapInt.put("key1", (i + 1) * (j + 50));
+          mapInt.put("key2", (i + 1) * (j + 100));
+          array.add(mapInt);
+        }
+        record.put("map_array", array);
+
+        // create map with array value
+        Map<String, GenericArray<Double>> mapArrayValue = new HashMap<>();
+        GenericArray<Double> doubleArray = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
+        for (int j = 0; j < ARRAY_SIZE; j++) {
+          doubleArray.add((double) (i + 1) * j);
+        }
+        mapArrayValue.put("key1", doubleArray);
+        mapArrayValue.put("key2", doubleArray);
+        record.put("map_array_value", mapArrayValue);
+
+        record.endRecord();
+      }
+      return record;
+    }
+  }
+
+  public String generateDecimalData(int numRecords) throws Exception {
+    File file = File.createTempFile("avro-decimal-test", ".avro", dirTestWatcher.getRootDir());
+    Schema decBytes = LogicalTypes.decimal(10, 2)
+      .addToSchema(SchemaBuilder.builder().bytesType());
+
+    Schema decFixed = LogicalTypes.decimal(5, 2)
+      .addToSchema(SchemaBuilder.builder().fixed("dec_fixed").size(5));
+
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_dec_pos_bytes").type(decBytes).noDefault()
+      .name("col_dec_neg_bytes").type(decBytes).noDefault()
+      .name("col_dec_pos_fixed").type(decFixed).noDefault()
+      .name("col_dec_neg_fixed").type(decFixed).noDefault()
+      .endRecord();
+
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+      writer.create(schema, file);
+      for (int i = 0; i < numRecords; i++) {
+        GenericRecord record = new GenericData.Record(schema);
+
+        ByteBuffer posBytes = ByteBuffer.wrap(BigInteger.valueOf(100 + i).toByteArray());
+        record.put("col_dec_pos_bytes", posBytes);
+
+        ByteBuffer negBytes = ByteBuffer.wrap(BigInteger.valueOf(-200 + i).toByteArray());
+        record.put("col_dec_neg_bytes", negBytes);
+
+        byte[] posFixedBytes = new byte[5];
+        byte[] posValueBytes = BigInteger.valueOf(300 + i).toByteArray();
+        int posDiff = posFixedBytes.length - posValueBytes.length;
+        assert posDiff > -1;
+        System.arraycopy(posValueBytes, 0, posFixedBytes, posDiff, posValueBytes.length);
+        Arrays.fill(posFixedBytes, 0, posDiff, (byte) 0);
+
+        GenericData.Fixed posFixed = new GenericData.Fixed(decFixed, posFixedBytes);
+        record.put("col_dec_pos_fixed", posFixed);
+
+        byte[] negFixedBytes = new byte[5];
+        byte[] negValueBytes = BigInteger.valueOf(-400 + i).toByteArray();
+        int negDiff = negFixedBytes.length - negValueBytes.length;
+        assert negDiff > -1;
+        System.arraycopy(negValueBytes, 0, negFixedBytes, negDiff, negValueBytes.length);
+        Arrays.fill(negFixedBytes, 0, negDiff, (byte) -1);
+
+        GenericData.Fixed negFixed = new GenericData.Fixed(decFixed, negFixedBytes);
+        record.put("col_dec_neg_fixed", negFixed);
+
+        writer.append(record);
+      }
+    }
+    return file.getName();
+  }
+
+  public String generateDateTimeData(LocalDateTime dateTime) throws Exception {
+    File file = File.createTempFile("avro-date-time-test", ".avro", dirTestWatcher.getRootDir());
+
+    Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType());
+    Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+    Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+    Schema timeMillis = LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+    Schema timeMicros = LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_timestamp_millis").type(timestampMillis).noDefault()
+      .name("col_timestamp_micros").type(timestampMicros).noDefault()
+      .name("col_date").type(date).noDefault()
+      .name("col_time_millis").type(timeMillis).noDefault()
+      .name("col_time_micros").type(timeMicros).noDefault()
+      .endRecord();
+
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+      writer.create(schema, file);
+      GenericRecord record = new GenericData.Record(schema);
+      long timestampMillisValue = dateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+      record.put("col_timestamp_millis", timestampMillisValue);
+      record.put("col_timestamp_micros", timestampMillisValue * 1000);
+      record.put("col_date", dateTime.toLocalDate().toEpochDay());
+      long startOfDayMillis = dateTime.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli();
+      long timeMillisValue = timestampMillisValue - startOfDayMillis;
+      record.put("col_time_millis", timeMillisValue);
+      record.put("col_time_micros", timeMillisValue * 1000);
+      writer.append(record);
+    }
+    return file.getName();
+  }
+
+  public String generateDuration(int numRows) throws Exception {
+    File file = File.createTempFile("avro-duration-test", ".avro", dirTestWatcher.getRootDir());
+
+    Schema durationSchema = new LogicalType("duration")
+      .addToSchema(SchemaBuilder.builder().fixed("duration_fixed").size(12));
+
+    Schema schema = SchemaBuilder.record("record")
+      .fields()
+      .name("col_duration").type(durationSchema).noDefault()
+      .endRecord();
+
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+      writer.create(schema, file);
+      for (int i = 0; i < numRows; i++) {
+        GenericRecord record = new GenericData.Record(schema);
+
+        ByteBuffer bb = ByteBuffer.allocate(12);
+        bb.order(ByteOrder.LITTLE_ENDIAN);
+        bb.putInt(10 + i); // month
+        bb.putInt(100 + i); // days
+        bb.putInt(1000 + i); // milliseconds
+
+        GenericData.Fixed fixed = new GenericData.Fixed(durationSchema, bb.array());
+        record.put("col_duration", fixed);
+        writer.append(record);
+      }
+    }
+    return file.getName();
+  }
+
+  public String generateMultiDimensionalArray(int numRecords, int arraySize) throws Exception {
+    File file = File.createTempFile("avro-multi-dimensional-array-test", ".avro", dirTestWatcher.getRootDir());
+
+    String colTwoDimsName = "col_array_two_dims";
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name(colTwoDimsName).type()
+        .array().items()
+        .array().items()
+        .stringType()
+        .noDefault()
+      .endRecord();
+
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+      writer.create(schema, file);
+
+      for (int i = 0; i < numRecords; i++) {
+        GenericRecord record = new GenericData.Record(schema);
+        Schema twoDimsSchema = schema.getField(colTwoDimsName).schema();
+        GenericArray<GenericArray<String>> arrayTwoDims = new GenericData.Array<>(numRecords, twoDimsSchema);
+        for (int a = 0; a < arraySize; a++) {
+          GenericArray<String> nestedArray = new GenericData.Array<>(2, twoDimsSchema.getElementType());
+          nestedArray.add(String.format("val_%s_%s_0", i, a));
+          nestedArray.add(String.format("val_%s_%s_1", i, a));
+          arrayTwoDims.add(nestedArray);
+        }
+        record.put(colTwoDimsName, arrayTwoDims);
+        writer.append(record);
+      }
+    }
+
+    return file.getName();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
index 5a634fd..27fd926 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -17,243 +17,123 @@
  */
 package org.apache.drill.exec.store.avro;
 
-import static org.apache.drill.exec.store.avro.AvroTestUtil.ARRAY_SIZE;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.RECORD_COUNT;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateDoubleNestedSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateLinkedList;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateMapSchema;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateMapSchemaComplex_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateMapSchema_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateNestedArraySchema;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimpleArraySchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimpleEnumSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimpleNestedSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateStringAndUtf8Data;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionNestedArraySchema_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionNestedSchema_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionSchema_WithNonNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionSchema_WithNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.write;
-import static org.apache.drill.test.TestBuilder.listOf;
-import static org.apache.drill.test.TestBuilder.mapOfObject;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.TestBuilder;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Period;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.avro.specific.TestRecordWithLogicalTypes;
-import org.apache.commons.io.FileUtils;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.util.JsonStringHashMap;
-import org.apache.drill.exec.work.ExecErrorConstants;
-import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.test.TestBuilder;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+import static org.apache.drill.test.TestBuilder.mapOfObject;
 
 /**
  * Unit tests for Avro record reader.
  */
-public class AvroFormatTest extends BaseTestQuery {
+public class AvroFormatTest extends ClusterTest {
 
   private static String mapTableName;
+  private static AvroDataGenerator dataGenerator;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
 
   @BeforeClass
-  public static void init() throws Exception {
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+    dataGenerator = new AvroDataGenerator(dirTestWatcher);
     // Create temporary table containing map and map array
-    mapTableName = generateMapSchema().getFileName();
-  }
-
-  // XXX
-  //      1. Need to test nested field names with same name as top-level names for conflict.
-  //      2. Avro supports recursive types? Can we test this?
-
-  @Test
-  public void testBatchCutoff() throws Exception {
-    final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimplePrimitiveSchema_NoNullValues(5000);
-    final String file = testSetup.getFileName();
-    final String sql = "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null from dfs.`%s`";
-    test(sql, file);
-    testBuilder()
-        .sqlQuery(sql, file)
-        .unOrdered()
-        .expectsNumBatches(2)
-        .baselineRecords(testSetup.getExpectedRecords())
-        .go();
-  }
-
-  /**
-   * Previously a bug in the Avro table metadata would cause wrong results
-   * for some queries on varchar types, as a length was not provided during metadata
-   * population. In some cases casts were being added with the default length
-   * of 1 and truncating values.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testFiltersOnVarchar() throws Exception {
-    final String file = generateSimplePrimitiveSchema_NoNullValues(5000).getFileName();
-    final String sql = "select a_string from dfs.`%s` where a_string = 'a_1'";
-    testBuilder()
-        .sqlQuery(sql, file)
-        .unOrdered()
-        .baselineColumns("a_string")
-        .baselineValues("a_1")
-        .go();
-
-    final String sql2 = "select a_string from dfs.`%s` where a_string IN ('a_1')";
-    testBuilder()
-        .sqlQuery(sql2, file)
-        .unOrdered()
-        .baselineColumns("a_string")
-        .baselineValues("a_1")
-        .go();
-  }
-
-  @Test
-  public void testFiltersOnVarBinary() throws Exception {
-    final String file = generateSimplePrimitiveSchema_NoNullValues(5000).getFileName();
-    final String sql = "select f_bytes from dfs.`%s` where f_bytes = BINARY_STRING('\\x61\\x31')";
-    TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, file)
-        .unOrdered()
-        .baselineColumns("f_bytes");
-
-    for (int i = 0; i < 500; i++) {
-      testBuilder.baselineValues(new byte[] {'a', '1'});
-    }
-    testBuilder.go();
-
-    final String sql2 = "select f_bytes from dfs.`%s` where f_bytes IN (BINARY_STRING('\\x61\\x31'))";
-    testBuilder = testBuilder()
-        .sqlQuery(sql2, file)
-        .unOrdered()
-        .baselineColumns("f_bytes");
-
-    for (int i = 0; i < 500; i++) {
-      testBuilder.baselineValues(new byte[] {'a', '1'});
-    }
-    testBuilder.go();
+    mapTableName = dataGenerator.generateMapSchema().getFileName();
   }
 
   @Test
   public void testSimplePrimitiveSchema_NoNullValues() throws Exception {
-    final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimplePrimitiveSchema_NoNullValues();
-    final String file = testSetup.getFileName();
-    final String sql = "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null from dfs.`%s`";
-    test(sql, file);
+    AvroDataGenerator.AvroTestRecordWriter testSetup = dataGenerator.generateSimplePrimitiveSchema_NoNullValues();
+    String file = testSetup.getFileName();
+    String sql = "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null from dfs.`%s`";
     testBuilder()
-        .sqlQuery(sql, file)
-        .unOrdered()
-        .baselineRecords(testSetup.getExpectedRecords())
-        .go();
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineRecords(testSetup.getExpectedRecords())
+      .go();
   }
 
   @Test
   public void testSimplePrimitiveSchema_StarQuery() throws Exception {
-    simpleAvroTestHelper(generateSimplePrimitiveSchema_NoNullValues(), "select * from dfs.`%s`");
-  }
-
-  private List<Map<String, Object>> project(
-      List<Map<String,Object>> incomingRecords,
-      List<String> projectCols) {
-    List<Map<String,Object>> output = Lists.newArrayList();
-    for (Map<String, Object> incomingRecord : incomingRecords) {
-      final JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
-      for (String s : incomingRecord.keySet()) {
-        if (projectCols.contains(s)) {
-          newRecord.put(s, incomingRecord.get(s));
-        }
-      }
-      output.add(newRecord);
-    }
-    return output;
+    simpleAvroTestHelper(dataGenerator.generateSimplePrimitiveSchema_NoNullValues());
   }
 
   @Test
   public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception {
-    final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimplePrimitiveSchema_NoNullValues();
-    final String file = testSetup.getFileName();
-    List<String> projectList = Lists.newArrayList("`h_boolean`", "`e_double`");
+    AvroDataGenerator.AvroTestRecordWriter testSetup = dataGenerator.generateSimplePrimitiveSchema_NoNullValues();
+    String file = testSetup.getFileName();
+    List<String> projectList = Arrays.asList("`h_boolean`", "`e_double`");
     testBuilder()
-        .sqlQuery("select h_boolean, e_double from dfs.`%s`", file)
-        .unOrdered()
-        .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
-        .go();
-  }
-
-  @Test
-  public void testSimplePrimitiveSchema_NoColumnsExistInTheSchema() throws Exception {
-    final String file = generateSimplePrimitiveSchema_NoNullValues().getFileName();
-    try {
-      test("select h_dummy1, e_dummy2 from dfs.`%s`", file);
-      Assert.fail("Test should fail as h_dummy1 and e_dummy2 does not exist.");
-    } catch(UserException ue) {
-      Assert.assertTrue("Test should fail as h_dummy1 and e_dummy2 does not exist.",
-          ue.getMessage().contains("Column 'h_dummy1' not found in any table"));
-    }
-  }
-
-  @Test
-  public void testSimplePrimitiveSchema_OneExistAndOneDoesNotExistInTheSchema() throws Exception {
-    final String file = generateSimplePrimitiveSchema_NoNullValues().getFileName();
-    try {
-      test("select h_boolean, e_dummy2 from dfs.`%s`", file);
-      Assert.fail("Test should fail as e_dummy2 does not exist.");
-    } catch(UserException ue) {
-      Assert.assertTrue("Test should fail as e_dummy2 does not exist.", true);
-    }
+      .sqlQuery("select h_boolean, e_double from dfs.`%s`", file)
+      .unOrdered()
+      .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
+      .go();
   }
 
   @Test
   public void testImplicitColumnsWithStar() throws Exception {
-    AvroTestUtil.AvroTestRecordWriter testWriter = generateSimplePrimitiveSchema_NoNullValues(1);
-    final String file = testWriter.getFileName();
+    AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1);
     // removes "." and ".." from the path
-    String tablePath = new File(testWriter.getFilePath()).getCanonicalPath();
+    String tablePathString = new File(testWriter.getFilePath()).getCanonicalPath();
+    Path tablePath = new Path(tablePathString);
 
     List<Map<String, Object>> expectedRecords = testWriter.getExpectedRecords();
-    expectedRecords.get(0).put("`filename`", file);
+    expectedRecords.get(0).put("`filename`", tablePath.getName());
     expectedRecords.get(0).put("`suffix`", "avro");
-    expectedRecords.get(0).put("`fqn`", tablePath);
-    expectedRecords.get(0).put("`filepath`", new File(tablePath).getParent());
+    expectedRecords.get(0).put("`fqn`", tablePath.toUri().getPath());
+    expectedRecords.get(0).put("`filepath`", tablePath.getParent().toUri().getPath());
     try {
       testBuilder()
-          .sqlQuery("select filename, *, suffix, fqn, filepath from dfs.`%s`", file)
-          .unOrdered()
-          .baselineRecords(expectedRecords)
-          .go();
+        .sqlQuery("select filename, *, suffix, fqn, filepath from dfs.`%s`", tablePath.getName())
+        .unOrdered()
+        .baselineRecords(expectedRecords)
+        .go();
     } finally {
-      FileUtils.deleteQuietly(new File(tablePath));
+      FileUtils.deleteQuietly(new File(tablePath.toUri().getPath()));
     }
   }
 
   @Test
   public void testImplicitColumnAlone() throws Exception {
-    AvroTestUtil.AvroTestRecordWriter testWriter = generateSimplePrimitiveSchema_NoNullValues(1);
-    final String file = testWriter.getFileName();
+    AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1);
+    String file = testWriter.getFileName();
     // removes "." and ".." from the path
     String tablePath = new File(testWriter.getFilePath()).getCanonicalPath();
     try {
       testBuilder()
-          .sqlQuery("select filename from dfs.`%s`", file)
-          .unOrdered()
-          .baselineColumns("filename")
-          .baselineValues(file)
-          .go();
+        .sqlQuery("select filename from dfs.`%s`", file)
+        .unOrdered()
+        .baselineColumns("filename")
+        .baselineValues(file)
+        .go();
     } finally {
       FileUtils.deleteQuietly(new File(tablePath));
     }
@@ -261,18 +141,17 @@
 
   @Test
   public void testImplicitColumnInWhereClause() throws Exception {
-    AvroTestUtil.AvroTestRecordWriter testWriter = generateSimplePrimitiveSchema_NoNullValues(1);
-    final String file = testWriter.getFileName();
+    AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1);
+    String file = testWriter.getFileName();
     // removes "." and ".." from the path
     String tablePath = new File(testWriter.getFilePath()).getCanonicalPath();
 
-    List<Map<String, Object>> expectedRecords = testWriter.getExpectedRecords();
     try {
       testBuilder()
-          .sqlQuery("select * from dfs.`%1$s` where filename = '%1$s'", file)
-          .unOrdered()
-          .baselineRecords(expectedRecords)
-          .go();
+        .sqlQuery("select * from dfs.`%1$s` where filename = '%1$s'", file)
+        .unOrdered()
+        .baselineRecords(testWriter.getExpectedRecords())
+        .go();
     } finally {
       FileUtils.deleteQuietly(new File(tablePath));
     }
@@ -280,302 +159,234 @@
 
   @Test
   public void testPartitionColumn() throws Exception {
-    setSessionOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL, "directory");
+    client.alterSession(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL, "directory");
     String file = "avroTable";
     String partitionColumn = "2018";
-    AvroTestUtil.AvroTestRecordWriter testWriter =
-        generateSimplePrimitiveSchema_NoNullValues(1, FileUtils.getFile(file, partitionColumn).getPath());
+    String tablePath = FileUtils.getFile(file, partitionColumn).getPath();
+    AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1, tablePath);
     try {
       testBuilder()
-          .sqlQuery("select directory0 from dfs.`%s`", file)
-          .unOrdered()
-          .baselineColumns("directory0")
-          .baselineValues(partitionColumn)
-          .go();
-    } finally {
-      FileUtils.deleteQuietly(new File(testWriter.getFilePath()));
-      resetSessionOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
-    }
-  }
-
-  @Test
-  public void testSelectAllWithPartitionColumn() throws Exception {
-    String file = "avroTable";
-    String partitionColumn = "2018";
-    AvroTestUtil.AvroTestRecordWriter testWriter =
-      generateSimplePrimitiveSchema_NoNullValues(1, FileUtils.getFile(file, partitionColumn).getPath());
-    List<Map<String, Object>> expectedRecords = testWriter.getExpectedRecords();
-    expectedRecords.get(0).put("`dir0`", partitionColumn);
-    try {
-      testBuilder()
-          .sqlQuery("select * from dfs.`%s`", file)
-          .unOrdered()
-          .baselineRecords(expectedRecords)
-          .go();
-    } finally {
-      FileUtils.deleteQuietly(new File(testWriter.getFilePath()));
-    }
-  }
-
-  @Test
-  public void testAvroTableWithLogicalTypesDecimal() throws Exception {
-    ExecTest.mockUtcDateTimeZone();
-    LocalDate date = DateUtility.parseLocalDate("2018-02-03");
-    LocalTime time = DateUtility.parseLocalTime("19:25:03.0");
-    LocalDateTime timestamp = DateUtility.parseLocalDateTime("2018-02-03 19:25:03.0");
-
-    // Avro uses joda package
-    org.joda.time.DateTime jodaDateTime = org.joda.time.DateTime.parse("2018-02-03T19:25:03");
-    BigDecimal bigDecimal = new BigDecimal("123.45");
-
-    TestRecordWithLogicalTypes record = new TestRecordWithLogicalTypes(
-        true,
-        34,
-        35L,
-        3.14F,
-        3019.34,
-        "abc",
-        jodaDateTime.toLocalDate(),
-        jodaDateTime.toLocalTime(),
-        jodaDateTime,
-        bigDecimal
-    );
-
-    File data = write(TestRecordWithLogicalTypes.getClassSchema(), record);
-
-    final String query = "select * from dfs.`%s`";
-
-    testBuilder()
-        .sqlQuery(query, data.getName())
+        .sqlQuery("select directory0 from dfs.`%s`", file)
         .unOrdered()
-        .baselineColumns("b", "i32", "i64", "f32", "f64", "s", "d", "t", "ts", "dec")
-        .baselineValues(true, 34, 35L, 3.14F, 3019.34, "abc", date, time, timestamp, bigDecimal)
+        .baselineColumns("directory0")
+        .baselineValues(partitionColumn)
         .go();
-  }
-
-  @Test
-  public void testAvroWithDisabledDecimalType() throws Exception {
-    TestRecordWithLogicalTypes record = new TestRecordWithLogicalTypes(
-        true,
-        34,
-        35L,
-        3.14F,
-        3019.34,
-        "abc",
-        org.joda.time.LocalDate.now(),
-        org.joda.time.LocalTime.now(),
-        org.joda.time.DateTime.now(),
-        new BigDecimal("123.45")
-    );
-
-    File data = write(TestRecordWithLogicalTypes.getClassSchema(), record);
-    final String query = String.format("select * from dfs.`%s`", data.getName());
-
-    try {
-      alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, false);
-      errorMsgTestHelper(query, ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG);
     } finally {
-      resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+      client.resetSession(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+      FileUtils.deleteQuietly(new File(testWriter.getFilePath()));
     }
   }
 
   @Test
   public void testSimpleArraySchema_NoNullValues() throws Exception {
-    final String file = generateSimpleArraySchema_NoNullValues().getFileName();
-    final String sql = "select a_string, c_string_array[0], e_float_array[2] from dfs.`%s`";
-    test(sql, file);
+    String file = dataGenerator.generateSimpleArraySchema_NoNullValues().getFileName();
+    String sql = "select a_string, c_string_array[0] as csa, e_float_array[2] as efa " +
+      "from dfs.`%s` where a_string in ('a_0', 'a_15')";
+
+    testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("a_string", "csa", "efa")
+      .baselineValues("a_0", "c_string_array_0_0", 0.0F)
+      .baselineValues("a_15", "c_string_array_15_0", 30.0F)
+      .go();
   }
 
   @Test
   public void testSimpleArraySchema_StarQuery() throws Exception {
-    simpleAvroTestHelper(generateSimpleArraySchema_NoNullValues(), "select * from dfs.`%s`");
+    simpleAvroTestHelper(dataGenerator.generateSimpleArraySchema_NoNullValues());
   }
 
   @Test
   public void testDoubleNestedSchema_NoNullValues_NotAllColumnsProjected() throws Exception {
-    final String file = generateDoubleNestedSchema_NoNullValues().getFileName();
-    final String sql = "select t.c_record.nested_1_int, t.c_record.nested_1_record.double_nested_1_int from dfs.`%s` t";
-    test(sql, file);
+    String file = dataGenerator.generateDoubleNestedSchema_NoNullValues().getFileName();
+    String sql = "select a_string, t.c_record.nested_1_int as ni, " +
+      "t.c_record.nested_1_record.double_nested_1_int as dni from dfs.`%s` t " +
+      "where a_string in ('a_3', 'a_11')";
+
+    testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("a_string", "ni", "dni")
+      .baselineValues("a_3", 9, 27)
+      .baselineValues("a_11", 121, 1331)
+      .go();
   }
 
   @Test
   public void testSimpleNestedSchema_NoNullValues() throws Exception {
-    final String file = generateSimpleNestedSchema_NoNullValues().getFileName();
-    final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int from dfs.`%s` t";
-    test(sql, file);
+    String file = dataGenerator.generateSimpleNestedSchema_NoNullValues().getFileName();
+    String sql = "select a_string, b_int, t.c_record.nested_1_string as ns," +
+      " t.c_record.nested_1_int as ni from dfs.`%s` t where b_int in (6, 19)";
+
+    testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("a_string", "b_int", "ns", "ni")
+      .baselineValues("a_6", 6, "nested_1_string_6", 36)
+      .baselineValues("a_19", 19, "nested_1_string_19", 361)
+      .go();
   }
 
   @Test
   public void testSimpleNestedSchema_StarQuery() throws Exception {
-    final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimpleNestedSchema_NoNullValues();
-    final String file = testSetup.getFileName();
-    testBuilder()
-        .sqlQuery("select * from dfs.`%s`", file)
-        .unOrdered()
-        .baselineRecords(testSetup.getExpectedRecords())
-        .go();
+    simpleAvroTestHelper(dataGenerator.generateSimpleNestedSchema_NoNullValues());
   }
   @Test
   public void testDoubleNestedSchema_NoNullValues() throws Exception {
-    final String file = generateDoubleNestedSchema_NoNullValues().getFileName();
-    final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int, " +
-            "t.c_record.nested_1_record.double_nested_1_string, " +
-            "t.c_record.nested_1_record.double_nested_1_int " +
-            "from dfs.`%s` t";
-    test(sql, file);
+    String file = dataGenerator.generateDoubleNestedSchema_NoNullValues().getFileName();
+    String sql = "select a_string, b_int, t.c_record.nested_1_string as ns, t.c_record.nested_1_int as ni, " +
+      "t.c_record.nested_1_record.double_nested_1_string as dns, t.c_record.nested_1_record.double_nested_1_int as dni " +
+      "from dfs.`%s` t where b_int in (2, 14)";
 
-    final String sql2 = "select t.c_record.nested_1_string from dfs.`%s` t limit 1";
-    TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql2, file)
-        .unOrdered()
-        .baselineColumns("EXPR$0");
-    for (int i = 0; i < 1; i++) {
-      testBuilder.baselineValues("nested_1_string_" + i);
-    }
-    testBuilder.go();
+    testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("a_string", "b_int", "ns", "ni", "dns", "dni")
+      .baselineValues("a_2", 2, "nested_1_string_2", 4, "double_nested_1_string_2_2", 8)
+      .baselineValues("a_14", 14, "nested_1_string_14", 196, "double_nested_1_string_14_14", 2744)
+      .go();
   }
 
   @Test
   public void testDoubleNestedSchema_StarQuery() throws Exception {
-    simpleAvroTestHelper(generateDoubleNestedSchema_NoNullValues(), "select * from dfs.`%s`");
-  }
-
-  private static void simpleAvroTestHelper(AvroTestUtil.AvroTestRecordWriter testSetup, final String sql) throws Exception {
-    testBuilder()
-        .sqlQuery(sql, testSetup.getFileName())
-        .unOrdered()
-        .baselineRecords(testSetup.getExpectedRecords())
-        .go();
+    simpleAvroTestHelper(dataGenerator.generateDoubleNestedSchema_NoNullValues());
   }
 
   @Test
   public void testSimpleEnumSchema_NoNullValues() throws Exception {
-    final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimpleEnumSchema_NoNullValues();
-    final String file = testSetup.getFileName();
-    final String sql = "select a_string, b_enum from dfs.`%s`";
-    List<String> projectList = Lists.newArrayList("`a_string`", "`b_enum`");
+    AvroDataGenerator.AvroTestRecordWriter testSetup = dataGenerator.generateSimpleEnumSchema_NoNullValues();
+    String file = testSetup.getFileName();
+    String sql = "select a_string, b_enum from dfs.`%s`";
+    List<String> projectList = Arrays.asList("`a_string`", "`b_enum`");
     testBuilder()
-        .sqlQuery(sql, file)
-        .unOrdered()
-        .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
-        .go();
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
+      .go();
   }
 
   @Test
   public void testSimpleEnumSchema_StarQuery() throws Exception {
-    simpleAvroTestHelper(generateSimpleEnumSchema_NoNullValues(), "select * from dfs.`%s`");
+    simpleAvroTestHelper(dataGenerator.generateSimpleEnumSchema_NoNullValues());
   }
 
   @Test
   public void testSimpleUnionSchema_StarQuery() throws Exception {
-    simpleAvroTestHelper(generateUnionSchema_WithNullValues(), "select * from dfs.`%s`");
+    simpleAvroTestHelper(dataGenerator.generateUnionSchema_WithNullValues());
   }
 
   @Test
   public void testShouldFailSimpleUnionNonNullSchema_StarQuery() throws Exception {
-    final String file = generateUnionSchema_WithNonNullValues().getFileName();
-    try {
-      test("select * from dfs.`%s`", file);
-      Assert.fail("Test should fail as union is only supported for optional fields");
-    } catch(UserRemoteException e) {
-      String message = e.getMessage();
-      Assert.assertTrue(message.contains("Avro union type must be of the format : [\"null\", \"some-type\"]"));
-    }
+    String file = dataGenerator.generateUnionSchema_WithNonNullValues().getFileName();
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage("UNSUPPORTED_OPERATION ERROR");
+    run("select * from dfs.`%s`", file);
   }
 
   @Test
   public void testNestedUnionSchema_withNullValues() throws Exception {
-    final String file = generateUnionNestedSchema_withNullValues().getFileName();
-    final String sql = "select t.c_record.nested_1_string,t.c_record.nested_1_int from dfs.`%s` t";
-    test(sql, file);
+    String file = dataGenerator.generateUnionNestedSchema_withNullValues().getFileName();
+    String sql = "select a_string, t.c_record.nested_1_string as ns, " +
+      "t.c_record.nested_1_int as ni from dfs.`%s` t where a_string in ('a_0', 'a_1')";
+
+    testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("a_string", "ns", "ni")
+      .baselineValues("a_0", "nested_1_string_0", 0)
+      .baselineValues("a_1", null, null)
+      .go();
   }
 
-  // DRILL-4574"></a>
+  // DRILL-4574
   @Test
   public void testFlattenPrimitiveArray() throws Exception {
-    final String file = generateSimpleArraySchema_NoNullValues().getFileName();
-    final String sql = "select a_string, flatten(c_string_array) as array_item from dfs.`%s` t";
+    String file = dataGenerator.generateSimpleArraySchema_NoNullValues().getFileName();
+    String sql = "select a_string, flatten(c_string_array) as array_item from dfs.`%s` t";
 
     TestBuilder testBuilder = testBuilder()
       .sqlQuery(sql, file)
       .unOrdered()
       .baselineColumns("a_string", "array_item");
 
-    for (int i = 0; i < RECORD_COUNT; i++) {
-
-      for (int j = 0; j < ARRAY_SIZE; j++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
+      for (int j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
         testBuilder.baselineValues("a_" + i, "c_string_array_" + i + "_" + j);
       }
     }
-
     testBuilder.go();
   }
 
-  private TestBuilder nestedArrayQueryTestBuilder(String file) {
-    final String sql = "select rec_nr, array_item['nested_1_int'] as array_item_nested_int from "
-        + "(select a_int as rec_nr, flatten(t.b_array) as array_item from dfs.`%s` t) a";
-
-    TestBuilder testBuilder = testBuilder()
-      .sqlQuery(sql, file)
-      .unOrdered()
-      .baselineColumns("rec_nr", "array_item_nested_int");
-
-    return testBuilder;
-  }
-
   //DRILL-4574
   @Test
   public void testFlattenComplexArray() throws Exception {
-    final String file = generateNestedArraySchema().getFileName();
+    String file = dataGenerator.generateNestedArraySchema().getFileName();
 
     TestBuilder testBuilder = nestedArrayQueryTestBuilder(file);
-    for (int i = 0; i < RECORD_COUNT; i++) {
-      for (int j = 0; j < ARRAY_SIZE; j++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
+      for (int j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
         testBuilder.baselineValues(i, j);
       }
     }
     testBuilder.go();
-
   }
 
   //DRILL-4574
   @Test
   public void testFlattenEmptyComplexArrayMustYieldNoResults() throws Exception {
-    final String file = generateNestedArraySchema(RECORD_COUNT, 0).getFilePath();
+    String file = dataGenerator.generateNestedArraySchema(AvroDataGenerator.RECORD_COUNT, 0).getFilePath();
     TestBuilder testBuilder = nestedArrayQueryTestBuilder(file);
     testBuilder.expectsEmptyResultSet();
   }
 
   @Test
   public void testNestedUnionArraySchema_withNullValues() throws Exception {
-    final String file = generateUnionNestedArraySchema_withNullValues().getFileName();
-    final String sql = "select t.c_array[0].nested_1_string,t.c_array[0].nested_1_int from dfs.`%s` t";
-    test(sql, file);
+    String file = dataGenerator.generateUnionNestedArraySchema_withNullValues().getFileName();
+    String sql = "select a_string, t.c_array[0].nested_1_string as ns, " +
+      "t.c_array[0].nested_1_int as ni from dfs.`%s` t where a_string in ('a_2', 'a_3')";
+
+    testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("a_string", "ns", "ni")
+      .baselineValues("a_2", "nested_1_string_2", 4)
+      .baselineValues("a_3", null, null)
+      .go();
   }
 
   @Test
   public void testMapSchema_withNullValues() throws Exception {
-    final String file = generateMapSchema_withNullValues().getFileName();
-    final String sql = "select c_map['key1'],c_map['key2'] from dfs.`%s`";
-    test(sql, file);
+    String file = dataGenerator.generateMapSchema_withNullValues().getFileName();
+    String sql = "select a_string, c_map['key1'] as k1, c_map['key2'] as k2 " +
+      "from dfs.`%s` where a_string in ('a_4', 'a_5')";
+
+    testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("a_string", "k1", "k2")
+      .baselineValues("a_4", "nested_1_string_4", "nested_1_string_5")
+      .baselineValues("a_5", null, null)
+      .go();
   }
 
   @Test
   public void testMapSchemaComplex_withNullValues() throws Exception {
-    final String file = generateMapSchemaComplex_withNullValues().getFileName();
-    final String sql = "select d_map['key1'] nested_key1, d_map['key2'] nested_key2 from dfs.`%s`";
+    String file = dataGenerator.generateMapSchemaComplex_withNullValues().getFileName();
+    String sql = "select d_map['key1'] nested_key1, d_map['key2'] nested_key2 from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, file)
-        .unOrdered()
-        .baselineColumns("nested_key1", "nested_key2");
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("nested_key1", "nested_key2");
 
-    final List<Object> expectedList = Lists.newArrayList();
-    for (int i = 0; i < ARRAY_SIZE; i++) {
-      expectedList.add((double)i);
+    List<Object> expectedList = new ArrayList<>();
+    for (int i = 0; i < AvroDataGenerator.ARRAY_SIZE; i++) {
+      expectedList.add((double) i);
     }
-    final List<Object> emptyList = listOf();
-    for (int i = 0; i < RECORD_COUNT; i += 2) {
+    List<Object> emptyList = listOf();
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i += 2) {
       testBuilder.baselineValues(expectedList, expectedList);
       testBuilder.baselineValues(emptyList, emptyList);
     }
@@ -584,26 +395,39 @@
 
   @Test
   public void testStringAndUtf8Data() throws Exception {
-    simpleAvroTestHelper(generateStringAndUtf8Data(), "select * from dfs.`%s`");
+    simpleAvroTestHelper(dataGenerator.generateStringAndUtf8Data());
   }
 
   @Test
   public void testLinkedList() throws Exception {
-    final String file = generateLinkedList();
-    final String sql = "select * from dfs.`%s`";
-    test(sql, file);
+    int numRows = 5;
+    String file = dataGenerator.generateLinkedList(numRows);
+
+    TestBuilder testBuilder = testBuilder()
+      .sqlQuery("select * from dfs.`%s` t", file)
+      .unOrdered()
+      .baselineColumns("value", "next");
+
+    for (long i = 0; i < numRows; i++) {
+      if (i == numRows - 1) { // last row
+        testBuilder.baselineValues(i, mapOf("next", new JsonStringHashMap<>()));
+        continue;
+      }
+      testBuilder.baselineValues(i, mapOf("value", i + 1, "next", new JsonStringHashMap<>()));
+    }
+    testBuilder.go();
   }
 
   @Test
   public void testCountStar() throws Exception {
-    final String file = generateStringAndUtf8Data().getFileName();
-    final String sql = "select count(*) as row_count from dfs.`%s`";
+    String file = dataGenerator.generateStringAndUtf8Data().getFileName();
+    String sql = "select count(*) as row_count from dfs.`%s`";
     testBuilder()
-        .sqlQuery(sql, file)
-        .ordered()
-        .baselineColumns("row_count")
-        .baselineValues((long) RECORD_COUNT)
-        .go();
+      .sqlQuery(sql, file)
+      .ordered()
+      .baselineColumns("row_count")
+      .baselineValues((long) AvroDataGenerator.RECORD_COUNT)
+      .go();
   }
 
   @Test
@@ -611,11 +435,11 @@
     String sql = "select map_field from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, mapTableName)
-        .unOrdered()
-        .baselineColumns("map_field");
+      .sqlQuery(sql, mapTableName)
+      .unOrdered()
+      .baselineColumns("map_field");
 
-    for (long i = 0; i < RECORD_COUNT; i++) {
+    for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       testBuilder.baselineValues(mapOfObject("key1", i, "key2", i + 1));
     }
     testBuilder.go();
@@ -623,14 +447,14 @@
 
   @Test
   public void testMapSchemaGetByKey() throws Exception {
-     String sql = "select map_field['key1'] val1, map_field['key2'] val2 from dfs.`%s`";
+    String sql = "select map_field['key1'] val1, map_field['key2'] val2 from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, mapTableName)
-        .unOrdered()
-        .baselineColumns("val1", "val2");
+      .sqlQuery(sql, mapTableName)
+      .unOrdered()
+      .baselineColumns("val1", "val2");
 
-    for (long i = 0; i < RECORD_COUNT; i++) {
+    for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       testBuilder.baselineValues(i, i + 1);
     }
     testBuilder.go();
@@ -641,12 +465,12 @@
     String sql = "select map_field['notExists'] as map_field from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, mapTableName)
-        .unOrdered()
-        .baselineColumns("map_field");
+      .sqlQuery(sql, mapTableName)
+      .unOrdered()
+      .baselineColumns("map_field");
 
     Object[] nullValue = new Object[] {null};
-    for (long i = 0; i < RECORD_COUNT; i++) {
+    for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       testBuilder.baselineValues(nullValue);
     }
     testBuilder.go();
@@ -657,11 +481,11 @@
     String sql = "select t.map_field.key1 val1, t.map_field.key2 val2 from dfs.`%s` t";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, mapTableName)
-        .unOrdered()
-        .baselineColumns("val1", "val2");
+      .sqlQuery(sql, mapTableName)
+      .unOrdered()
+      .baselineColumns("val1", "val2");
 
-    for (long i = 0; i < RECORD_COUNT; i++) {
+    for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       testBuilder.baselineValues(i, i + 1);
     }
     testBuilder.go();
@@ -672,14 +496,14 @@
     String sql = "select map_array from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, mapTableName)
-        .unOrdered()
-        .baselineColumns("map_array");
+      .sqlQuery(sql, mapTableName)
+      .unOrdered()
+      .baselineColumns("map_array");
 
 
-    for (int i = 0; i < RECORD_COUNT; i++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       List<Object> array = listOf();
-      for (int j = 0; j < ARRAY_SIZE; j++) {
+      for (int j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
         array.add(mapOfObject(
             "key1", (i + 1) * (j + 50),
             "key2", (i + 1) * (j + 100)
@@ -696,11 +520,11 @@
     String sql = "select map_array[%d] element from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, elementIndex, mapTableName)
-        .unOrdered()
-        .baselineColumns("element");
+      .sqlQuery(sql, elementIndex, mapTableName)
+      .unOrdered()
+      .baselineColumns("element");
 
-    for (int i = 0; i < RECORD_COUNT; i++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       testBuilder.baselineValues(mapOfObject(
           "key1", (i + 1) * (elementIndex + 50),
           "key2", (i + 1) * (elementIndex + 100)
@@ -715,11 +539,11 @@
     String sql = "select map_array[%d]['key2'] val from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, elementIndex, mapTableName)
-        .unOrdered()
-        .baselineColumns("val");
+      .sqlQuery(sql, elementIndex, mapTableName)
+      .unOrdered()
+      .baselineColumns("val");
 
-    for (int i = 0; i < RECORD_COUNT; i++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       testBuilder.baselineValues((i + 1) * (elementIndex + 100));
     }
     testBuilder.go();
@@ -730,13 +554,13 @@
     String sql = "select map_array_value from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, mapTableName)
-        .unOrdered()
-        .baselineColumns("map_array_value");
+      .sqlQuery(sql, mapTableName)
+      .unOrdered()
+      .baselineColumns("map_array_value");
 
-    for (int i = 0; i < RECORD_COUNT; i++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       List<Object> doubleArray = listOf();
-      for (double j = 0; j < ARRAY_SIZE; j++) {
+      for (double j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
         doubleArray.add((double) (i + 1) * j);
       }
       testBuilder.baselineValues(mapOfObject("key1", doubleArray, "key2", doubleArray));
@@ -750,13 +574,13 @@
     String sql = "select map_array_value['key1'] element from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, generateMapSchema().getFileName())
-        .unOrdered()
-        .baselineColumns("element");
+      .sqlQuery(sql, dataGenerator.generateMapSchema().getFileName())
+      .unOrdered()
+      .baselineColumns("element");
 
-    for (int i = 0; i < RECORD_COUNT; i++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       List<Object> doubleArray = listOf();
-      for (double j = 0; j < ARRAY_SIZE; j++) {
+      for (double j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
         doubleArray.add((double) (i + 1) * j);
       }
       testBuilder.baselineValues(doubleArray);
@@ -770,11 +594,11 @@
     String sql = "select map_array_value['key1'][3] element from dfs.`%s`";
 
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, generateMapSchema().getFileName())
-        .unOrdered()
-        .baselineColumns("element");
+      .sqlQuery(sql, mapTableName)
+      .unOrdered()
+      .baselineColumns("element");
 
-    for (int i = 0; i < RECORD_COUNT; i++) {
+    for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
       double val = (double) (i + 1) * 3;
       testBuilder.baselineValues(val);
     }
@@ -786,11 +610,11 @@
   public void testMapSchemaValueInFilter() throws Exception {
     String sql = "select map_field['key1'] val from dfs.`%s` where map_field['key1'] < %d";
 
-    long filterValue = RECORD_COUNT / 10;
+    long filterValue = AvroDataGenerator.RECORD_COUNT / 10;
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, mapTableName, filterValue)
-        .unOrdered()
-        .baselineColumns("val");
+      .sqlQuery(sql, mapTableName, filterValue)
+      .unOrdered()
+      .baselineColumns("val");
 
     for (long i = 0; i < filterValue; i++) {
       testBuilder.baselineValues(i);
@@ -803,16 +627,134 @@
     String sql = "select map_array[%d]['key2'] val from dfs.`%s` where map_array[%d]['key2'] > %d";
 
     int elementIndex = 1;
-    int startRecord = 5001;
-    int filterValue = 5002 * (elementIndex + 100);
+    int startRecord = 30;
+    int filterValue = startRecord * (elementIndex + 100);
     TestBuilder testBuilder = testBuilder()
-        .sqlQuery(sql, elementIndex, mapTableName, elementIndex, filterValue)
-        .unOrdered()
-        .baselineColumns("val");
+      .sqlQuery(sql, elementIndex, mapTableName, elementIndex, filterValue)
+      .unOrdered()
+      .baselineColumns("val");
 
-    for (int i = startRecord + 1; i < RECORD_COUNT; i++) {
+    for (int i = startRecord; i < AvroDataGenerator.RECORD_COUNT; i++) {
       testBuilder.baselineValues((i + 1) * (elementIndex + 100));
     }
     testBuilder.go();
   }
+
+  @Test
+  public void testDecimal() throws Exception {
+    int numRows = 5;
+    String fileName = dataGenerator.generateDecimalData(numRows);
+
+    TestBuilder testBuilder = testBuilder()
+      .sqlQuery("select * from dfs.`%s`", fileName)
+      .unOrdered()
+      .baselineColumns("col_dec_pos_bytes", "col_dec_neg_bytes", "col_dec_pos_fixed", "col_dec_neg_fixed");
+
+    for (int i = 0; i < numRows; i++) {
+      testBuilder.baselineValues(
+        new BigDecimal(BigInteger.valueOf(100 + i), 2),
+        new BigDecimal(BigInteger.valueOf(-200 + i), 2),
+        new BigDecimal(BigInteger.valueOf(300 + i), 2),
+        new BigDecimal(BigInteger.valueOf(-400 + i), 2));
+    }
+    testBuilder.go();
+  }
+
+  @Test
+  public void testDateTime() throws Exception {
+    LocalDateTime dateTime = LocalDateTime.now(ZoneId.of("UTC")).withNano(0);
+    LocalDate localDate = dateTime.toLocalDate();
+    LocalTime localTime = dateTime.toLocalTime();
+
+    String fileName = dataGenerator.generateDateTimeData(dateTime);
+
+    testBuilder()
+      .sqlQuery("select * from dfs.`%s`", fileName)
+      .unOrdered()
+      .baselineColumns("col_timestamp_millis", "col_timestamp_micros", "col_date", "col_time_millis", "col_time_micros")
+      .baselineValues(dateTime, dateTime, localDate, localTime, localTime)
+      .go();
+  }
+
+  @Test
+  public void testDuration() throws Exception {
+    int numRows = 5;
+    String fileName = dataGenerator.generateDuration(numRows);
+
+    TestBuilder testBuilder = testBuilder()
+      .sqlQuery("select * from dfs.`%s`", fileName)
+      .unOrdered()
+      .baselineColumns("col_duration");
+
+    for (int i = 0; i < numRows; i++) {
+      testBuilder.baselineValues(Period.months(10 + i).withDays(100 + i).withMillis(1000 + i));
+    }
+    testBuilder.go();
+  }
+
+  @Test
+  public void testMultiDimensionalArray() throws Exception {
+    int numRecords = 5;
+    int arraySize = 3;
+    String fileName = dataGenerator.generateMultiDimensionalArray(numRecords, arraySize);
+
+    TestBuilder testBuilder = testBuilder()
+      .sqlQuery("select * from dfs.`%s`", fileName)
+      .unOrdered()
+      .baselineColumns("col_array_two_dims");
+
+    for (int i = 0; i < numRecords; i++) {
+      JsonStringArrayList<Object> nestedArray = new JsonStringArrayList<>();
+      for (int a = 0; a < arraySize; a++) {
+        nestedArray.add(listOf(String.format("val_%s_%s_0", i, a), String.format("val_%s_%s_1", i, a)));
+      }
+      testBuilder.baselineValues(nestedArray);
+    }
+    testBuilder.go();
+  }
+
+  @Test
+  public void testWithProvidedSchema() throws Exception {
+    testBuilder()
+      .sqlQuery("select * from " +
+          "table(dfs.`%s`(schema=>'inline=(col_i int not null default `15`, a_string varchar)')) " +
+          "where a_string = 'a_0'",
+        dataGenerator.generateStringAndUtf8Data().getFileName())
+      .unOrdered()
+      .baselineColumns("col_i", "a_string", "b_utf8")
+      .baselineValues(15, "a_0", "b_0")
+      .go();
+  }
+
+  private void simpleAvroTestHelper(AvroDataGenerator.AvroTestRecordWriter testSetup) throws Exception {
+    testBuilder()
+      .sqlQuery("select * from dfs.`%s`", testSetup.getFileName())
+      .unOrdered()
+      .baselineRecords(testSetup.getExpectedRecords())
+      .go();
+  }
+
+  private List<Map<String, Object>> project(List<Map<String,Object>> incomingRecords, List<String> projectCols) {
+    List<Map<String,Object>> output = new ArrayList<>();
+    for (Map<String, Object> incomingRecord : incomingRecords) {
+      JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
+      for (String s : incomingRecord.keySet()) {
+        if (projectCols.contains(s)) {
+          newRecord.put(s, incomingRecord.get(s));
+        }
+      }
+      output.add(newRecord);
+    }
+    return output;
+  }
+
+  private TestBuilder nestedArrayQueryTestBuilder(String file) {
+    String sql = "select rec_nr, array_item['nested_1_int'] as array_item_nested_int from "
+      + "(select a_int as rec_nr, flatten(t.b_array) as array_item from dfs.`%s` t) a";
+
+    return testBuilder()
+      .sqlQuery(sql, file)
+      .unOrdered()
+      .baselineColumns("rec_nr", "array_item_nested_int");
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroSchemaUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroSchemaUtilTest.java
new file mode 100644
index 0000000..1e548fd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroSchemaUtilTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.BaseTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AvroSchemaUtilTest extends BaseTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testExtractSchemaFromNullableNotUnion() {
+    Schema schema = SchemaBuilder.builder().stringType();
+
+    thrown.expect(UserException.class);
+    thrown.expectMessage("VALIDATION ERROR");
+
+    AvroSchemaUtil.extractSchemaFromNullable(schema, "s");
+  }
+
+  @Test
+  public void testExtractSchemaFromNullableComplexUnion() {
+    Schema schema = SchemaBuilder.unionOf()
+      .doubleType().and()
+      .longType().and()
+      .nullType()
+      .endUnion();
+
+    thrown.expect(UserException.class);
+    thrown.expectMessage("UNSUPPORTED_OPERATION ERROR");
+
+    AvroSchemaUtil.extractSchemaFromNullable(schema, "u");
+  }
+
+  @Test
+  public void testExtractSchemaFromNullable() {
+    Schema schema = SchemaBuilder.builder().nullable().stringType();
+    Schema actual = AvroSchemaUtil.extractSchemaFromNullable(schema, "s");
+
+    assertEquals(SchemaBuilder.builder().stringType(), actual);
+  }
+
+  @Test
+  public void testConvertSimpleTypes() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .requiredString("col_string")
+      .requiredBytes("col_bytes")
+      .requiredBoolean("col_boolean")
+      .requiredInt("col_int")
+      .requiredLong("col_long")
+      .requiredFloat("col_float")
+      .requiredDouble("col_double")
+      .optionalString("col_opt_string")
+      .optionalBytes("col_opt_bytes")
+      .optionalBoolean("col_opt_boolean")
+      .optionalInt("col_opt_int")
+      .optionalLong("col_opt_long")
+      .optionalFloat("col_opt_float")
+      .optionalDouble("col_opt_double")
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .add("col_string", TypeProtos.MinorType.VARCHAR)
+      .add("col_bytes", TypeProtos.MinorType.VARBINARY)
+      .add("col_boolean", TypeProtos.MinorType.BIT)
+      .add("col_int", TypeProtos.MinorType.INT)
+      .add("col_long", TypeProtos.MinorType.BIGINT)
+      .add("col_float", TypeProtos.MinorType.FLOAT4)
+      .add("col_double", TypeProtos.MinorType.FLOAT8)
+      .addNullable("col_opt_string", TypeProtos.MinorType.VARCHAR)
+      .addNullable("col_opt_bytes", TypeProtos.MinorType.VARBINARY)
+      .addNullable("col_opt_boolean", TypeProtos.MinorType.BIT)
+      .addNullable("col_opt_int", TypeProtos.MinorType.INT)
+      .addNullable("col_opt_long", TypeProtos.MinorType.BIGINT)
+      .addNullable("col_opt_float", TypeProtos.MinorType.FLOAT4)
+      .addNullable("col_opt_double", TypeProtos.MinorType.FLOAT8)
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertDecimal() {
+    Schema decBytes = LogicalTypes.decimal(10, 2)
+      .addToSchema(SchemaBuilder.builder().bytesType());
+
+    Schema decFixed = LogicalTypes.decimal(5, 2)
+      .addToSchema(SchemaBuilder.builder().fixed("dec_fixed").size(5));
+
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_dec_bytes").type(decBytes).noDefault()
+      .name("col_dec_fixed").type(decFixed).noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .add("col_dec_bytes", TypeProtos.MinorType.VARDECIMAL, 10, 2)
+      .add("col_dec_fixed", TypeProtos.MinorType.VARDECIMAL, 5, 2)
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertDateTime() {
+    Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType());
+    Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+    Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+    Schema timeMillis = LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+    Schema timeMicros = LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+    Schema duration = new LogicalType("duration")
+      .addToSchema(SchemaBuilder.builder().fixed("duration_fixed").size(12));
+
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_timestamp_millis").type(timestampMillis).noDefault()
+      .name("col_timestamp_micros").type(timestampMicros).noDefault()
+      .name("col_date").type(date).noDefault()
+      .name("col_time_millis").type(timeMillis).noDefault()
+      .name("col_time_micros").type(timeMicros).noDefault()
+      .name("col_duration").type(duration).noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .add("col_timestamp_millis", TypeProtos.MinorType.TIMESTAMP)
+      .add("col_timestamp_micros", TypeProtos.MinorType.TIMESTAMP)
+      .add("col_date", TypeProtos.MinorType.DATE)
+      .add("col_time_millis", TypeProtos.MinorType.TIME)
+      .add("col_time_micros", TypeProtos.MinorType.TIME)
+      .add("col_duration", TypeProtos.MinorType.INTERVAL)
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertNullType() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_null").type().nullType().noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .addNullable("col_null", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertEnum() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_enum").type().enumeration("letters").symbols("A", "B", "C").noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .add("col_enum", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertFixed() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_fixed").type().fixed("md5").size(16).noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .add("col_fixed", TypeProtos.MinorType.VARBINARY)
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertArray() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_array").type().array().items().booleanType().noDefault()
+      .name("col_opt_array").type().array().items().nullable().longType().noDefault()
+      .name("col_nested_array").type().array().items()
+        .array().items()
+        .stringType()
+        .noDefault()
+      .name("col_array_map").type().array().items()
+          .record("arr_rec")
+          .fields()
+          .optionalString("s")
+          .requiredLong("i")
+          .endRecord()
+        .noDefault()
+      .name("col_array_dict").type().array().items()
+        .map().values().intType().noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .addArray("col_array", TypeProtos.MinorType.BIT)
+      .addArray("col_opt_array", TypeProtos.MinorType.BIGINT)
+      .addArray("col_nested_array", TypeProtos.MinorType.VARCHAR, 2)
+      .addMapArray("col_array_map")
+        .addNullable("s", TypeProtos.MinorType.VARCHAR)
+        .add("i", TypeProtos.MinorType.BIGINT)
+        .resumeSchema()
+      .addDictArray("col_array_dict", TypeProtos.MinorType.VARCHAR)
+        .value(TypeProtos.MinorType.INT)
+        .resumeSchema()
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertMap() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_map_simple").type().record("map_simple_rec")
+        .fields()
+          .optionalInt("i")
+          .requiredString("s")
+          .endRecord()
+          .noDefault()
+      .name("col_map_complex").type().record("map_complex_rec")
+        .fields()
+          .name("col_nested_map").type().record("map_nested_rec")
+            .fields()
+              .optionalBoolean("nest_b")
+              .requiredDouble("nest_d")
+              .endRecord()
+              .noDefault()
+          .name("col_nested_dict").type().map().values().stringType().noDefault()
+          .name("col_nested_array").type().array().items().booleanType().noDefault()
+          .endRecord()
+          .noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .addMap("col_map_simple")
+        .addNullable("i", TypeProtos.MinorType.INT)
+        .add("s", TypeProtos.MinorType.VARCHAR)
+        .resumeSchema()
+      .addMap("col_map_complex")
+        .addMap("col_nested_map")
+          .addNullable("nest_b", TypeProtos.MinorType.BIT)
+          .add("nest_d", TypeProtos.MinorType.FLOAT8)
+          .resumeMap()
+        .addDict("col_nested_dict", TypeProtos.MinorType.VARCHAR)
+          .value(TypeProtos.MinorType.VARCHAR)
+          .resumeMap()
+        .addArray("col_nested_array", TypeProtos.MinorType.BIT)
+        .resumeSchema()
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertDict() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .name("col_dict").type().map().values().stringType().noDefault()
+      .name("col_opt_dict").type().map().values().nullable().intType().noDefault()
+      .name("col_dict_val_array").type().map().values().array().items().stringType().noDefault()
+      .name("col_dict_val_dict").type().map().values().map().values().intType().noDefault()
+      .name("col_dict_val_map").type().map().values()
+        .record("dict_val")
+        .fields()
+        .optionalInt("i")
+        .requiredString("s")
+        .endRecord().noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .addDict("col_dict", TypeProtos.MinorType.VARCHAR).value(TypeProtos.MinorType.VARCHAR).resumeSchema()
+       .addDict("col_opt_dict", TypeProtos.MinorType.VARCHAR).nullableValue(TypeProtos.MinorType.INT).resumeSchema()
+      .addDict("col_dict_val_array", TypeProtos.MinorType.VARCHAR)
+        .repeatedValue(TypeProtos.MinorType.VARCHAR).resumeSchema()
+      .addDict("col_dict_val_dict", TypeProtos.MinorType.VARCHAR)
+        .dictValue()
+          .key(TypeProtos.MinorType.VARCHAR)
+          .value(TypeProtos.MinorType.INT)
+          .resumeDict()
+        .resumeSchema()
+      .addDict("col_dict_val_map", TypeProtos.MinorType.VARCHAR)
+        .mapValue()
+          .addNullable("i", TypeProtos.MinorType.INT)
+          .add("s", TypeProtos.MinorType.VARCHAR)
+          .resumeDict()
+        .resumeSchema()
+      .buildSchema();
+
+   assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertComplexUnion() {
+    Schema schema = SchemaBuilder.record("rec")
+      .fields()
+      .optionalString("s")
+      .name("u").type().unionOf()
+      .stringType().and().longType().and().nullType().endUnion().noDefault()
+      .endRecord();
+
+    thrown.expect(UserException.class);
+    thrown.expectMessage("UNSUPPORTED_OPERATION ERROR");
+
+    AvroSchemaUtil.convert(schema);
+  }
+
+  @Test
+  public void testConvertWithNamedTypes() {
+    Schema schema = SchemaBuilder.record("MixedList")
+      .fields()
+      .name("rec_l").type().record("LongList")
+        .fields()
+        .requiredLong("l")
+        .name("list_l_1").type("LongList").noDefault()
+        .name("list_l_2").type("LongList").noDefault()
+        .endRecord()
+        .noDefault()
+      .name("rec_s").type().record("StringList")
+        .fields()
+        .requiredString("s")
+        .name("list_s_1").type("StringList").noDefault()
+        .name("list_s_2").type("StringList").noDefault()
+        .endRecord()
+        .noDefault()
+      .name("rec_m").type().record("rec_m")
+        .fields()
+        .name("list_l").type("LongList").noDefault()
+        .name("list_s").type("StringList").noDefault()
+        .name("a").type().array().items().type("MixedList").noDefault()
+        .endRecord()
+        .noDefault()
+      .name("mixed").type("MixedList").noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .addMap("rec_l")
+        .add("l", TypeProtos.MinorType.BIGINT)
+        .addMap("list_l_1")
+          .resumeMap()
+        .addMap("list_l_2")
+          .resumeMap()
+        .resumeSchema()
+      .addMap("rec_s")
+        .add("s", TypeProtos.MinorType.VARCHAR)
+        .addMap("list_s_1")
+          .resumeMap()
+        .addMap("list_s_2")
+          .resumeMap()
+        .resumeSchema()
+      .addMap("rec_m")
+        .addMap("list_l")
+          .add("l", TypeProtos.MinorType.BIGINT)
+          .addMap("list_l_1")
+            .resumeMap()
+          .addMap("list_l_2")
+            .resumeMap()
+          .resumeMap()
+        .addMap("list_s")
+          .add("s", TypeProtos.MinorType.VARCHAR)
+          .addMap("list_s_1")
+            .resumeMap()
+          .addMap("list_s_2")
+            .resumeMap()
+          .resumeMap()
+        .addMapArray("a")
+          .resumeMap()
+        .resumeSchema()
+      .addMap("mixed")
+        .resumeSchema()
+      .build();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+
+  @Test
+  public void testConvertWithNamespaces() {
+    Schema schema = SchemaBuilder.record("rec").namespace("n1")
+      .fields()
+      .requiredString("s")
+      .name("m").type().record("rec").namespace("n2")
+        .fields()
+        .requiredLong("l")
+        .endRecord()
+        .noDefault()
+      .endRecord();
+
+    TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+      .add("s", TypeProtos.MinorType.VARCHAR)
+      .addMap("m")
+        .add("l", TypeProtos.MinorType.BIGINT)
+        .resumeSchema()
+      .buildSchema();
+
+    assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
deleted file mode 100644
index 6f3d19d..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.exec.util.JsonStringHashMap;
-import org.apache.drill.exec.util.Text;
-import org.apache.drill.test.BaseTestQuery;
-
-/**
- * Utilities for generating Avro test data.
- */
-public class AvroTestUtil {
-
-  public static final int RECORD_COUNT = 10_000;
-  public static int ARRAY_SIZE = 20;
-
-  /**
-   * Class to write records to an Avro file while simultaneously
-   * constructing a corresponding list of records in the format taken in
-   * by the Drill test builder to describe expected results.
-   */
-  public static class AvroTestRecordWriter implements Closeable {
-    private final List<Map<String, Object>> expectedRecords;
-    GenericData.Record currentAvroRecord;
-    TreeMap<String, Object> currentExpectedRecord;
-    private Schema schema;
-    private final DataFileWriter<GenericData.Record> writer;
-    private final String filePath;
-    private final String fileName;
-
-    private AvroTestRecordWriter(Schema schema, File file) {
-      writer = new DataFileWriter<GenericData.Record>(new GenericDatumWriter<GenericData.Record>(schema));
-      try {
-        writer.create(schema, file);
-      } catch (IOException e) {
-        throw new RuntimeException("Error creating file in Avro test setup.", e);
-      }
-      this.schema = schema;
-      currentExpectedRecord = new TreeMap<>();
-      expectedRecords = new ArrayList<>();
-      filePath = file.getAbsolutePath();
-      fileName = file.getName();
-    }
-
-    public void startRecord() {
-      currentAvroRecord = new GenericData.Record(schema);
-      currentExpectedRecord = new TreeMap<>();
-    }
-
-    public void put(String key, Object value) {
-      currentAvroRecord.put(key, value);
-      // convert binary values into byte[], the format they will be given
-      // in the Drill result set in the test framework
-      currentExpectedRecord.put("`" + key + "`", convertAvroValToDrill(value, true));
-    }
-
-    // TODO - fix this the test wrapper to prevent the need for this hack
-    // to make the root behave differently than nested fields for String vs. Text
-    private Object convertAvroValToDrill(Object value, boolean root) {
-      if (value instanceof ByteBuffer) {
-        ByteBuffer bb = ((ByteBuffer)value);
-        byte[] drillVal = new byte[((ByteBuffer)value).remaining()];
-        bb.get(drillVal);
-        bb.position(0);
-        value = drillVal;
-      } else if (!root && value instanceof CharSequence) {
-        value = new Text(value.toString());
-      } else if (value instanceof GenericData.Array) {
-        GenericData.Array array = ((GenericData.Array) value);
-        final JsonStringArrayList<Object> drillList = new JsonStringArrayList<>();
-        for (Object o : array) {
-          drillList.add(convertAvroValToDrill(o, false));
-        }
-        value = drillList;
-      } else if (value instanceof GenericData.EnumSymbol) {
-        value = value.toString();
-      } else if (value instanceof GenericData.Record) {
-        GenericData.Record rec = ((GenericData.Record) value);
-        final JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
-        for (Schema.Field field : rec.getSchema().getFields()) {
-          Object val = rec.get(field.name());
-          newRecord.put(field.name(), convertAvroValToDrill(val, false));
-        }
-        value = newRecord;
-      }
-      return value;
-    }
-
-    public void endRecord() throws IOException {
-      writer.append(currentAvroRecord);
-      expectedRecords.add(currentExpectedRecord);
-    }
-
-    @Override
-    public void close() throws IOException {
-      writer.close();
-    }
-
-    public String getFilePath() {
-      return filePath;
-    }
-
-    public String getFileName() {
-      return fileName;
-    }
-
-    public List<Map<String, Object>>getExpectedRecords() {
-      return expectedRecords;
-    }
-  }
-
-
-  public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues() throws Exception {
-    return generateSimplePrimitiveSchema_NoNullValues(RECORD_COUNT);
-  }
-
-  public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords) throws Exception {
-    return generateSimplePrimitiveSchema_NoNullValues(numRecords, "");
-  }
-
-  /**
-   * Generates Avro table with specified rows number in specified path.
-   *
-   * @param numRecords rows number in the table
-   * @param tablePath  table path
-   * @return AvroTestRecordWriter instance
-   */
-  public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords, String tablePath)
-      throws Exception {
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_long").type().longType().noDefault()
-            .name("d_float").type().floatType().noDefault()
-            .name("e_double").type().doubleType().noDefault()
-            .name("f_bytes").type().bytesType().noDefault()
-            .name("g_null").type().nullType().noDefault()
-            .name("h_boolean").type().booleanType().noDefault()
-            .endRecord();
-
-    final File file = File.createTempFile("avro-primitive-test", ".avro",
-        BaseTestQuery.dirTestWatcher.makeRootSubDir(Paths.get(tablePath)));
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-
-    try {
-      ByteBuffer bb = ByteBuffer.allocate(2);
-      bb.put(0, (byte) 'a');
-
-      for (int i = 0; i < numRecords; i++) {
-        bb.put(1, (byte) ('0' + (i % 10)));
-        bb.position(0);
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-        record.put("c_long", (long) i);
-        record.put("d_float", (float) i);
-        record.put("e_double", (double) i);
-        record.put("f_bytes", bb);
-        record.put("g_null", null);
-        record.put("h_boolean", (i % 2 == 0));
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateUnionSchema_WithNullValues() throws Exception {
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_long").type().longType().noDefault()
-            .name("d_float").type().floatType().noDefault()
-            .name("e_double").type().doubleType().noDefault()
-            .name("f_bytes").type().bytesType().noDefault()
-            .name("g_null").type().nullType().noDefault()
-            .name("h_boolean").type().booleanType().noDefault()
-            .name("i_union").type().optional().doubleType()
-            .endRecord();
-
-    final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-
-    try {
-      ByteBuffer bb = ByteBuffer.allocate(1);
-      bb.put(0, (byte) 1);
-
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-        record.put("c_long", (long) i);
-        record.put("d_float", (float) i);
-        record.put("e_double", (double) i);
-        record.put("f_bytes", bb);
-        record.put("g_null", null);
-        record.put("h_boolean", (i % 2 == 0));
-        record.put("i_union", (i % 2 == 0 ? (double) i : null));
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateUnionSchema_WithNonNullValues() throws Exception {
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_long").type().longType().noDefault()
-            .name("d_float").type().floatType().noDefault()
-            .name("e_double").type().doubleType().noDefault()
-            .name("f_bytes").type().bytesType().noDefault()
-            .name("g_null").type().nullType().noDefault()
-            .name("h_boolean").type().booleanType().noDefault()
-            .name("i_union").type().unionOf().doubleType().and().longType().endUnion().noDefault()
-            .endRecord();
-
-    final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-
-      ByteBuffer bb = ByteBuffer.allocate(1);
-      bb.put(0, (byte) 1);
-
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-        record.put("c_long", (long) i);
-        record.put("d_float", (float) i);
-        record.put("e_double", (double) i);
-        record.put("f_bytes", bb);
-        record.put("g_null", null);
-        record.put("h_boolean", (i % 2 == 0));
-        record.put("i_union", (i % 2 == 0 ? (double) i : (long) i));
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateSimpleEnumSchema_NoNullValues() throws Exception {
-    final String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
-
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_enum").type().enumeration("my_enum").symbols(symbols).noDefault()
-            .endRecord();
-
-    final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema enumSchema = schema.getField("b_enum").schema();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        final GenericData.EnumSymbol symbol =
-                new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]);
-        record.put("b_enum", symbol);
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateSimpleArraySchema_NoNullValues() throws Exception {
-    final File file = File.createTempFile("avro-array-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_string_array").type().array().items().stringType().noDefault()
-            .name("d_int_array").type().array().items().intType().noDefault()
-            .name("e_float_array").type().array().items().floatType().noDefault()
-            .endRecord();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-        {
-          GenericArray<String> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("c_string_array").schema());
-          for (int j = 0; j < ARRAY_SIZE; j++) {
-            array.add(j, "c_string_array_" + i + "_" + j);
-          }
-          record.put("c_string_array", array);
-        }
-        {
-          GenericArray<Integer> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("d_int_array").schema());
-          for (int j = 0; j < ARRAY_SIZE; j++) {
-            array.add(j, i * j);
-          }
-          record.put("d_int_array", array);
-        }
-        {
-          GenericArray<Float> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("e_float_array").schema());
-          for (int j = 0; j < ARRAY_SIZE; j++) {
-            array.add(j, (float) (i * j));
-          }
-          record.put("e_float_array", array);
-        }
-        record.endRecord();
-      }
-
-    } finally {
-      record.close();
-    }
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateSimpleNestedSchema_NoNullValues() throws Exception {
-    final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_record").type().record("my_record_1")
-              .namespace("foo.blah.org")
-              .fields()
-              .name("nested_1_string").type().stringType().noDefault()
-              .name("nested_1_int").type().intType().noDefault()
-              .endRecord()
-            .noDefault()
-            .endRecord();
-
-    final Schema nestedSchema = schema.getField("c_record").schema();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-
-        final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
-        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
-        nestedRecord.put("nested_1_int", i * i);
-
-        record.put("c_record", nestedRecord);
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateUnionNestedArraySchema_withNullValues() throws Exception {
-    final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_array").type().optional().array().items().record("my_record_1")
-            .namespace("foo.blah.org").fields()
-            .name("nested_1_string").type().optional().stringType()
-            .name("nested_1_int").type().optional().intType()
-            .endRecord()
-            .endRecord();
-
-    final Schema nestedSchema = schema.getField("c_array").schema();
-    final Schema arraySchema = nestedSchema.getTypes().get(1);
-    final Schema itemSchema = arraySchema.getElementType();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-
-        if (i % 2 == 0) {
-          GenericArray<GenericRecord> array = new GenericData.Array<>(1, arraySchema);
-          final GenericRecord nestedRecord = new GenericData.Record(itemSchema);
-          nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
-          nestedRecord.put("nested_1_int", i * i);
-          array.add(nestedRecord);
-          record.put("c_array", array);
-        }
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateNestedArraySchema() throws IOException {
-    return generateNestedArraySchema(RECORD_COUNT, ARRAY_SIZE);
-  }
-
-  public static AvroTestRecordWriter generateNestedArraySchema(int numRecords, int numArrayItems) throws IOException {
-    final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest").namespace("org.apache.drill.exec.store.avro")
-        .fields().name("a_int").type().intType().noDefault().name("b_array").type().array().items()
-        .record("my_record_1").namespace("foo.blah.org").fields().name("nested_1_int").type().optional().intType()
-        .endRecord().arrayDefault(Collections.emptyList()).endRecord();
-
-    final Schema arraySchema = schema.getField("b_array").schema();
-    final Schema itemSchema = arraySchema.getElementType();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < numRecords; i++) {
-        record.startRecord();
-        record.put("a_int", i);
-        GenericArray<GenericRecord> array = new GenericData.Array<>(ARRAY_SIZE, arraySchema);
-
-        for (int j = 0; j < numArrayItems; j++) {
-          final GenericRecord nestedRecord = new GenericData.Record(itemSchema);
-          nestedRecord.put("nested_1_int", j);
-          array.add(nestedRecord);
-        }
-        record.put("b_array", array);
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateMapSchema_withNullValues() throws Exception {
-    final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
-            .endRecord();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-
-        if (i % 2 == 0) {
-          Map<String, String> strMap = new HashMap<>();
-          strMap.put("key1", "nested_1_string_" +  i);
-          strMap.put("key2", "nested_1_string_" +  (i + 1 ));
-          record.put("c_map", strMap);
-        }
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateMapSchemaComplex_withNullValues() throws Exception {
-    final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
-            .name("d_map").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
-            .endRecord();
-
-    final Schema arrayMapSchema = schema.getField("d_map").schema();
-    final Schema arrayItemSchema = arrayMapSchema.getTypes().get(1).getValueType();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-
-        if (i % 2 == 0) {
-          Map<String, String> c_map = new HashMap<>();
-          c_map.put("key1", "nested_1_string_" +  i);
-          c_map.put("key2", "nested_1_string_" +  (i + 1 ));
-          record.put("c_map", c_map);
-        } else {
-          Map<String, GenericArray<Double>> d_map = new HashMap<>();
-          GenericArray<Double> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
-          for (int j = 0; j < ARRAY_SIZE; j++) {
-            array.add((double)j);
-          }
-          d_map.put("key1", array);
-          d_map.put("key2", array);
-
-          record.put("d_map", d_map);
-        }
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateUnionNestedSchema_withNullValues() throws Exception {
-    final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_record").type().optional().record("my_record_1")
-            .namespace("foo.blah.org").fields()
-            .name("nested_1_string").type().optional().stringType()
-            .name("nested_1_int").type().optional().intType()
-            .endRecord()
-            .endRecord();
-
-    final Schema nestedSchema = schema.getField("c_record").schema();
-    final Schema optionalSchema = nestedSchema.getTypes().get(1);
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-
-        if (i % 2 == 0) {
-          final GenericRecord nestedRecord = new GenericData.Record(optionalSchema);
-          nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
-          nestedRecord.put("nested_1_int", i * i);
-          record.put("c_record", nestedRecord);
-        }
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static AvroTestRecordWriter generateDoubleNestedSchema_NoNullValues() throws Exception {
-    final File file = File.createTempFile("avro-double-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringType().noDefault()
-            .name("b_int").type().intType().noDefault()
-            .name("c_record").type().record("my_record_1")
-              .namespace("foo.blah.org")
-              .fields()
-              .name("nested_1_string").type().stringType().noDefault()
-              .name("nested_1_int").type().intType().noDefault()
-              .name("nested_1_record").type().record("my_double_nested_record_1")
-                .namespace("foo.blah.org.rot")
-                .fields()
-                .name("double_nested_1_string").type().stringType().noDefault()
-                .name("double_nested_1_int").type().intType().noDefault()
-                .endRecord()
-                .noDefault()
-              .endRecord()
-              .noDefault()
-            .endRecord();
-
-    final Schema nestedSchema = schema.getField("c_record").schema();
-    final Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema();
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_int", i);
-
-        final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
-        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
-        nestedRecord.put("nested_1_int", i * i);
-
-        final GenericRecord doubleNestedRecord = new GenericData.Record(doubleNestedSchema);
-        doubleNestedRecord.put("double_nested_1_string", "double_nested_1_string_" + i + "_" + i);
-        doubleNestedRecord.put("double_nested_1_int", i * i * i);
-
-        nestedRecord.put("nested_1_record", doubleNestedRecord);
-        record.put("c_record", nestedRecord);
-
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  public static String generateLinkedList() throws Exception {
-    final File file = File.createTempFile("avro-linkedlist", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("LongList")
-            .namespace("org.apache.drill.exec.store.avro")
-            .aliases("LinkedLongs")
-            .fields()
-            .name("value").type().optional().longType()
-            .name("next").type().optional().type("LongList")
-            .endRecord();
-
-    final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema));
-    writer.create(schema, file);
-    GenericRecord previousRecord = null;
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        GenericRecord record = (GenericRecord) (previousRecord == null ? new GenericData.Record(schema) : previousRecord.get("next"));
-        record.put("value", (long) i);
-        if (previousRecord != null) {
-          writer.append(previousRecord);
-        }
-        GenericRecord nextRecord = new GenericData.Record(record.getSchema());
-        record.put("next", nextRecord);
-        previousRecord = record;
-      }
-      writer.append(previousRecord);
-    } finally {
-      writer.close();
-    }
-
-    return file.getName();
-  }
-
-  public static AvroTestRecordWriter generateStringAndUtf8Data() throws Exception {
-
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-            .namespace("org.apache.drill.exec.store.avro")
-            .fields()
-            .name("a_string").type().stringBuilder().prop("avro.java.string", "String").endString().noDefault()
-            .name("b_utf8").type().stringType().noDefault()
-            .endRecord();
-
-    final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-
-      ByteBuffer bb = ByteBuffer.allocate(1);
-      bb.put(0, (byte) 1);
-
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-        record.put("a_string", "a_" + i);
-        record.put("b_utf8", "b_" + i);
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-
-  /**
-   * Creates Avro table with specified schema and specified data
-   * @param schema table schema
-   * @param data table data
-   * @param <D> record type
-   * @return file with newly created Avro table.
-   * @throws IOException if an error is appeared during creation or filling the file.
-   */
-  public static <D extends SpecificRecord> File write(Schema schema, D... data) throws IOException {
-    File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-
-    DatumWriter writer = SpecificData.get().createDatumWriter(schema);
-
-    try (DataFileWriter<D> fileWriter = new DataFileWriter<>(writer)) {
-      fileWriter.create(schema, file);
-      for (D datum : data) {
-        fileWriter.append(datum);
-      }
-    }
-
-    return file;
-  }
-
-  public static AvroTestRecordWriter generateMapSchema() throws Exception {
-    final File file = File.createTempFile("avro-map-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
-        .namespace("org.apache.drill.exec.store.avro")
-        .fields()
-        .name("map_field").type().optional().map().values(Schema.create(Type.LONG))
-        .name("map_array").type().optional().array().items(Schema.createMap(Schema.create(Type.INT)))
-        .name("map_array_value").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
-        .endRecord();
-
-    final Schema mapArraySchema = schema.getField("map_array").schema();
-    final Schema arrayItemSchema = mapArraySchema.getTypes().get(1);
-
-    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-    try {
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        record.startRecord();
-
-        // Create map with long values
-        Map<String, Long> map = new HashMap<>();
-        map.put("key1", (long) i);
-        map.put("key2", (long) i + 1);
-        record.put("map_field", map);
-
-        // Create list of map with int values
-        GenericArray<Map<String, Integer>> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
-        for (int j = 0; j < ARRAY_SIZE; j++) {
-          Map<String, Integer> mapInt = new HashMap<>();
-          mapInt.put("key1", (i + 1) * (j + 50));
-          mapInt.put("key2", (i + 1) * (j + 100));
-          array.add(mapInt);
-        }
-        record.put("map_array", array);
-
-        // create map with array value
-        Map<String, GenericArray<Double>> mapArrayValue = new HashMap<>();
-        GenericArray<Double> doubleArray = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
-        for (int j = 0; j < ARRAY_SIZE; j++) {
-          doubleArray.add((double) (i + 1) * j);
-        }
-        mapArrayValue.put("key1", doubleArray);
-        mapArrayValue.put("key2", doubleArray);
-        record.put("map_array_value", mapArrayValue);
-
-        record.endRecord();
-      }
-    } finally {
-      record.close();
-    }
-
-    return record;
-  }
-}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
index 44a4847..b33ad22 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
@@ -87,7 +87,7 @@
    * @param value a value that matches the primary setter above, or null
    * to set the column to null
    *
-   * @See {@link ColumnWriter#setObject()} for the generic case
+   * @see ColumnWriter#setObject(Object) for the generic case
    */
 
   void setValue(Object value);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java
index bc190ac..b3cd3f7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java
@@ -33,7 +33,7 @@
 
   /**
    * Property to control how the conversion handles blanks. Blanks are
-   * zero-length text fields (after triming whitespace.)
+   * zero-length text fields (after trimming whitespace.)
    */
   public static final String BLANK_ACTION_PROP = "blank-as";
 
@@ -251,7 +251,7 @@
   }
 
   @Override
-  public void setBytes(byte bytes[], int length) {
+  public void setBytes(byte[] bytes, int length) {
     setString(new String(bytes, 0, length, Charsets.UTF_8));
   }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
index 60810fb..3f92ab1 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
@@ -36,7 +36,7 @@
  * perform the type conversion, such as overriding "setString" to convert
  * from a string representation of a value to the actual format.
  * <p>
- * The {@link #setObject()} method works here: the object is passed
+ * The {@link #setObject(Object)} method works here: the object is passed
  * to this class's set methods, allowing, say, setting a string object
  * for an int column in the case above.
  */
diff --git a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java
index 3e32977..5674933 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java
@@ -32,9 +32,7 @@
  * live under one storage system. The storage systems themselves are described
  * in {@see StoragePluginConfig}s.
  */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 public interface FormatPluginConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatPluginConfig.class);
-
 
 }
diff --git a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
index 315f7cf..90ffaa8 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
@@ -20,10 +20,12 @@
 import java.util.Set;
 
 import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+public abstract class FormatPluginConfigBase implements FormatPluginConfig {
 
-public abstract class FormatPluginConfigBase implements FormatPluginConfig{
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatPluginConfigBase.class);
+  private static final Logger logger = LoggerFactory.getLogger(FormatPluginConfigBase.class);
 
   /**
    * scan for implementations of see <b>FormatPlugin</b>.
diff --git a/pom.xml b/pom.xml
index adc50dc..a7d87ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
     <javassist.version>3.26.0-GA</javassist.version>
     <msgpack.version>0.6.6</msgpack.version>
     <reflections.version>0.9.10</reflections.version>
-    <avro.version>1.9.0</avro.version>
+    <avro.version>1.9.1</avro.version>
     <metrics.version>4.0.2</metrics.version>
     <jetty.version>9.3.25.v20180904</jetty.version>
     <jersey.version>2.25.1</jersey.version>
@@ -110,6 +110,7 @@
     <joda.version>2.10.5</joda.version>
     <javax.el.version>3.0.0</javax.el.version>
     <surefire.version>3.0.0-M4</surefire.version>
+    <commons.compress>1.19</commons.compress>
   </properties>
 
   <scm>
@@ -1834,6 +1835,11 @@
         <artifactId>javax.el</artifactId>
         <version>${javax.el.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-compress</artifactId>
+        <version>${commons.compress}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>