DRILL-4982: Separate Hive reader classes for different data formats to improve performance.

1, Separating Hive reader classes allows optimization to apply on different classes in optimized ways. This  separation effectively avoid the performance degradation of scan.

2, Do not apply Skip footer/header mechanism on most Hive formats. This skip mechanism introduces extra checks on each incoming records.

close apache/drill#638
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/config.fmpp
index cd36891..d8ca3fa 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/config.fmpp
@@ -16,6 +16,7 @@
 
 data: {
     drillOI:tdd(../data/HiveTypes.tdd)
+    hiveFormat:tdd(../data/HiveFormats.tdd)
 }
 freemarkerLinks: {
     includes: includes/
diff --git a/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd b/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
new file mode 100644
index 0000000..5200e4a
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  map: [
+    {
+      hiveFormat: "HiveAvro",
+      hiveReader: "Avro",
+      hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveParquet",
+      hiveReader: "Parquet",
+      hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveText",
+      hiveReader: "Text",
+      hasHeaderFooter: true,
+    },
+    {
+      hiveFormat: "HiveOrc",
+      hiveReader: "Orc",
+      hasHeaderFooter: false,
+    },
+    {
+       hiveFormat: "HiveRCFile",
+       hiveReader: "RCFile",
+       hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveDefault",
+      hiveReader: "Default",
+      hasHeaderFooter: false,
+    }
+  ]
+}
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
new file mode 100644
index 0000000..0dc8c08
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This template is used to generate different Hive record reader classes for different data formats
+ * to avoid JIT profile pullusion. These readers are derived from HiveAbstractReader which implements
+ * codes for init and setup stage, but the repeated - and performance critical part - next() method is
+ * separately implemented in the classes generated from this template. The internal SkipRecordReeader
+ * class is also separated as well due to the same reason.
+ *
+ * As to the performance gain with this change, please refer to:
+ * https://issues.apache.org/jira/browse/DRILL-4982
+ *
+ */
+<@pp.dropOutputFile />
+<#list hiveFormat.map as entry>
+<@pp.changeOutputFile name="/org/apache/drill/exec/store/hive/Hive${entry.hiveReader}Reader.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.store.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+
+import org.apache.hadoop.mapred.RecordReader;
+<#if entry.hasHeaderFooter == true>
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+</#if>
+
+public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
+
+  Object key;
+<#if entry.hasHeaderFooter == true>
+  SkipRecordsInspector skipRecordsInspector;
+<#else>
+  Object value;
+</#if>
+
+  public Hive${entry.hiveReader}Reader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+                       FragmentContext context, final HiveConf hiveConf,
+                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
+    super(table, partition, inputSplit, projectedColumns, context, hiveConf, proxyUgi);
+  }
+
+  public  void internalInit(Properties tableProperties, RecordReader<Object, Object> reader) {
+
+    key = reader.createKey();
+<#if entry.hasHeaderFooter == true>
+    skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
+<#else>
+    value = reader.createValue();
+</#if>
+
+  }
+  private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
+    for (int i = 0; i < selectedStructFieldRefs.size(); i++) {
+      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs.get(i));
+      if (hiveValue != null) {
+        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
+          vectors.get(i), outputRecordIndex);
+      }
+    }
+  }
+
+<#if entry.hasHeaderFooter == true>
+  @Override
+  public int next() {
+    for (ValueVector vv : vectors) {
+      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
+    }
+    if (empty) {
+      setValueCountAndPopulatePartitionVectors(0);
+      return 0;
+    }
+
+    try {
+      skipRecordsInspector.reset();
+      Object value;
+
+      int recordCount = 0;
+
+      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
+        if (skipRecordsInspector.doSkipHeader(recordCount++)) {
+          continue;
+        }
+        Object bufferedValue = skipRecordsInspector.bufferAdd(value);
+        if (bufferedValue != null) {
+          Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
+          if (partTblObjectInspectorConverter != null) {
+            deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
+          }
+          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
+          skipRecordsInspector.incrementActualCount();
+        }
+        skipRecordsInspector.incrementTempCount();
+      }
+
+      setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
+      skipRecordsInspector.updateContinuance();
+      return skipRecordsInspector.getActualCount();
+    } catch (IOException | SerDeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+/**
+ * SkipRecordsInspector encapsulates logic to skip header and footer from file.
+ * Logic is applicable only for predefined in constructor file formats.
+ */
+protected class SkipRecordsInspector {
+
+  private final Set<Object> fileFormats;
+  private int headerCount;
+  private int footerCount;
+  private Queue<Object> footerBuffer;
+  // indicates if we continue reading the same file
+  private boolean continuance;
+  private int holderIndex;
+  private List<Object> valueHolder;
+  private int actualCount;
+  // actualCount without headerCount, used to determine holderIndex
+  private int tempCount;
+
+  protected SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
+    this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
+    this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
+    this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
+    logger.debug("skipRecordInspector: fileFormat {}, headerCount {}, footerCount {}",
+        this.fileFormats, this.headerCount, this.footerCount);
+    this.footerBuffer = Lists.newLinkedList();
+    this.continuance = false;
+    this.holderIndex = -1;
+    this.valueHolder = initializeValueHolder(reader, footerCount);
+    this.actualCount = 0;
+    this.tempCount = 0;
+  }
+
+  protected boolean doSkipHeader(int recordCount) {
+    return !continuance && recordCount < headerCount;
+  }
+
+  protected void reset() {
+    tempCount = holderIndex + 1;
+    actualCount = 0;
+    if (!continuance) {
+      footerBuffer.clear();
+    }
+  }
+
+  protected Object bufferAdd(Object value) throws SerDeException {
+    footerBuffer.add(value);
+    if (footerBuffer.size() <= footerCount) {
+      return null;
+    }
+    return footerBuffer.poll();
+  }
+
+  protected Object getNextValue() {
+    holderIndex = tempCount % getHolderSize();
+    return valueHolder.get(holderIndex);
+  }
+
+  private int getHolderSize() {
+    return valueHolder.size();
+  }
+
+  protected void updateContinuance() {
+    this.continuance = actualCount != 0;
+  }
+
+  protected int incrementTempCount() {
+    return ++tempCount;
+  }
+
+  protected int getActualCount() {
+    return actualCount;
+  }
+
+  protected int incrementActualCount() {
+    return ++actualCount;
+  }
+
+  /**
+   * Retrieves positive numeric property from Properties object by name.
+   * Return default value if
+   * 1. file format is absent in predefined file formats list
+   * 2. property doesn't exist in table properties
+   * 3. property value is negative
+   * otherwise casts value to int.
+   *
+   * @param tableProperties property holder
+   * @param propertyName    name of the property
+   * @param defaultValue    default value
+   * @return property numeric value
+   * @throws NumberFormatException if property value is non-numeric
+   */
+  protected int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
+    int propertyIntValue = defaultValue;
+    if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
+      return propertyIntValue;
+    }
+    Object propertyObject = tableProperties.get(propertyName);
+    if (propertyObject != null) {
+      try {
+        propertyIntValue = Integer.valueOf((String) propertyObject);
+      } catch (NumberFormatException e) {
+        throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
+      }
+    }
+    return propertyIntValue < 0 ? defaultValue : propertyIntValue;
+  }
+
+  /**
+   * Creates buffer of objects to be used as values, so these values can be re-used.
+   * Objects number depends on number of lines to skip in the end of the file plus one object.
+   *
+   * @param reader          RecordReader to return value object
+   * @param skipFooterLines number of lines to skip at the end of the file
+   * @return list of objects to be used as values
+   */
+  private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
+    List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
+    for (int i = 0; i <= skipFooterLines; i++) {
+      valueHolder.add(reader.createValue());
+    }
+    return valueHolder;
+  }
+ }
+
+<#else>
+  @Override
+  public int next() {
+    for (ValueVector vv : vectors) {
+      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
+    }
+    if (empty) {
+      setValueCountAndPopulatePartitionVectors(0);
+      return 0;
+    }
+
+    try {
+      int recordCount = 0;
+      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+        Object deSerializedValue = partitionSerDe.deserialize((Writable) value);
+        if (partTblObjectInspectorConverter != null) {
+          deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
+        }
+        readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount);
+        recordCount++;
+      }
+
+      setValueCountAndPopulatePartitionVectors(recordCount);
+      return recordCount;
+    } catch (IOException | SerDeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+</#if>
+
+}
+</#list>
\ No newline at end of file
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
similarity index 64%
rename from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
rename to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
index 8631b8d..107fc66 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
@@ -17,20 +17,13 @@
  */
 package org.apache.drill.exec.store.hive;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
-import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -50,19 +43,16 @@
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -71,15 +61,17 @@
 import com.google.common.collect.Lists;
 import org.apache.hadoop.security.UserGroupInformation;
 
-public class HiveRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveRecordReader.class);
 
-  private final DrillBuf managedBuffer;
+public abstract class HiveAbstractReader extends AbstractRecordReader {
+  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAbstractReader.class);
+
+  protected final DrillBuf managedBuffer;
 
   protected Table table;
   protected Partition partition;
   protected InputSplit inputSplit;
   protected List<String> selectedColumnNames;
+  protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
   protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
   protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
   protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
@@ -99,24 +91,24 @@
   protected StructObjectInspector finalOI;
 
   // Converter which converts data from partition schema to table schema.
-  private Converter partTblObjectInspectorConverter;
+  protected Converter partTblObjectInspectorConverter;
 
   protected Object key;
   protected RecordReader<Object, Object> reader;
   protected List<ValueVector> vectors = Lists.newArrayList();
   protected List<ValueVector> pVectors = Lists.newArrayList();
   protected boolean empty;
-  private HiveConf hiveConf;
-  private FragmentContext fragmentContext;
-  private String defaultPartitionValue;
-  private final UserGroupInformation proxyUgi;
-  private SkipRecordsInspector skipRecordsInspector;
+  protected HiveConf hiveConf;
+  protected FragmentContext fragmentContext;
+  protected String defaultPartitionValue;
+  protected final UserGroupInformation proxyUgi;
+
 
   protected static final int TARGET_RECORD_COUNT = 4000;
 
-  public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
-                          FragmentContext context, final HiveConf hiveConf,
-                          UserGroupInformation proxyUgi) throws ExecutionSetupException {
+  public HiveAbstractReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+                       FragmentContext context, final HiveConf hiveConf,
+                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
     this.table = table;
     this.partition = partition;
     this.inputSplit = inputSplit;
@@ -128,6 +120,8 @@
     setColumns(projectedColumns);
   }
 
+  public abstract void internalInit(Properties tableProperties, RecordReader<Object, Object> reader);
+
   private void init() throws ExecutionSetupException {
     final JobConf job = new JobConf(hiveConf);
 
@@ -161,6 +155,12 @@
         job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
       }
 
+      if (logger.isTraceEnabled()) {
+        for (StructField field: finalOI.getAllStructFieldRefs()) {
+          logger.trace("field in finalOI: {}", field.getClass().getName());
+        }
+        logger.trace("partitionSerDe class is {} {}", partitionSerDe.getClass().getName());
+      }
       // Get list of partition column names
       final List<String> partitionNames = Lists.newArrayList();
       for (FieldSchema field : table.getPartitionKeys()) {
@@ -197,7 +197,10 @@
       ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
 
       for (String columnName : selectedColumnNames) {
-        ObjectInspector fieldOI = finalOI.getStructFieldRef(columnName).getFieldObjectInspector();
+        StructField fieldRef = finalOI.getStructFieldRef(columnName);
+        selectedStructFieldRefs.add(fieldRef);
+        ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
+
         TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
 
         selectedColumnObjInspectors.add(fieldOI);
@@ -205,6 +208,14 @@
         selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
       }
 
+      for(int i=0; i<selectedColumnNames.size(); ++i){
+        logger.trace("inspector:typeName={}, className={}, TypeInfo: {}, converter:{}",
+            selectedColumnObjInspectors.get(i).getTypeName(),
+            selectedColumnObjInspectors.get(i).getClass().getName(),
+            selectedColumnTypes.get(i).toString(),
+            selectedColumnFieldConverters.get(i).getClass().getName());
+      }
+
       for (int i = 0; i < table.getPartitionKeys().size(); i++) {
         FieldSchema field = table.getPartitionKeys().get(i);
         if (selectedPartitionNames.contains(field.getName())) {
@@ -218,17 +229,18 @@
         }
       }
     } catch (Exception e) {
-      throw new ExecutionSetupException("Failure while initializing HiveRecordReader: " + e.getMessage(), e);
+      throw new ExecutionSetupException("Failure while initializing Hive Reader " + this.getClass().getName(), e);
     }
 
     if (!empty) {
       try {
         reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
+        logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
       } catch (Exception e) {
         throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
       }
-      key = reader.createKey();
-      skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
+
+      internalInit(tableProperties, reader);
     }
   }
 
@@ -305,56 +317,11 @@
    * For each new file queue is cleared to drop footer lines from previous file.
    */
   @Override
-  public int next() {
-    for (ValueVector vv : vectors) {
-      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
-    }
-    if (empty) {
-      setValueCountAndPopulatePartitionVectors(0);
-      return 0;
-    }
+  public abstract int next();
 
-    try {
-      skipRecordsInspector.reset();
-      int recordCount = 0;
-      Object value;
-      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
-        if (skipRecordsInspector.doSkipHeader(recordCount++)) {
-          continue;
-        }
-        Object bufferedValue = skipRecordsInspector.bufferAdd(value);
-        if (bufferedValue != null) {
-          Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
-          if (partTblObjectInspectorConverter != null) {
-            deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
-          }
-          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
-          skipRecordsInspector.incrementActualCount();
-        }
-        skipRecordsInspector.incrementTempCount();
-      }
 
-      setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
-      skipRecordsInspector.updateContinuance();
-      return skipRecordsInspector.getActualCount();
-    } catch (IOException | SerDeException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
 
-  private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
-    for (int i = 0; i < selectedColumnNames.size(); i++) {
-      final String columnName = selectedColumnNames.get(i);
-      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, finalOI.getStructFieldRef(columnName));
-
-      if (hiveValue != null) {
-        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
-            vectors.get(i), outputRecordIndex);
-      }
-    }
-  }
-
-  private void setValueCountAndPopulatePartitionVectors(int recordCount) {
+  protected void setValueCountAndPopulatePartitionVectors(int recordCount) {
     for (ValueVector v : vectors) {
       v.getMutator().setValueCount(recordCount);
     }
@@ -391,125 +358,4 @@
     }
   }
 
-  /**
-   * SkipRecordsInspector encapsulates logic to skip header and footer from file.
-   * Logic is applicable only for predefined in constructor file formats.
-   */
-  private class SkipRecordsInspector {
-
-    private final Set<Object> fileFormats;
-    private int headerCount;
-    private int footerCount;
-    private Queue<Object> footerBuffer;
-    // indicates if we continue reading the same file
-    private boolean continuance;
-    private int holderIndex;
-    private List<Object> valueHolder;
-    private int actualCount;
-    // actualCount without headerCount, used to determine holderIndex
-    private int tempCount;
-
-    private SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
-      this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
-      this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
-      this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
-      this.footerBuffer = Lists.newLinkedList();
-      this.continuance = false;
-      this.holderIndex = -1;
-      this.valueHolder = initializeValueHolder(reader, footerCount);
-      this.actualCount = 0;
-      this.tempCount = 0;
-    }
-
-    private boolean doSkipHeader(int recordCount) {
-      return !continuance && recordCount < headerCount;
-    }
-
-    private void reset() {
-      tempCount = holderIndex + 1;
-      actualCount = 0;
-      if (!continuance) {
-        footerBuffer.clear();
-      }
-    }
-
-    private Object bufferAdd(Object value) throws SerDeException {
-      footerBuffer.add(value);
-      if (footerBuffer.size() <= footerCount) {
-        return null;
-      }
-      return footerBuffer.poll();
-    }
-
-    private Object getNextValue() {
-      holderIndex = tempCount % getHolderSize();
-      return valueHolder.get(holderIndex);
-    }
-
-    private int getHolderSize() {
-      return valueHolder.size();
-    }
-
-    private void updateContinuance() {
-      this.continuance = actualCount != 0;
-    }
-
-    private int incrementTempCount() {
-      return ++tempCount;
-    }
-
-    private int getActualCount() {
-      return actualCount;
-    }
-
-    private int incrementActualCount() {
-      return ++actualCount;
-    }
-
-    /**
-     * Retrieves positive numeric property from Properties object by name.
-     * Return default value if
-     * 1. file format is absent in predefined file formats list
-     * 2. property doesn't exist in table properties
-     * 3. property value is negative
-     * otherwise casts value to int.
-     *
-     * @param tableProperties property holder
-     * @param propertyName    name of the property
-     * @param defaultValue    default value
-     * @return property numeric value
-     * @throws NumberFormatException if property value is non-numeric
-     */
-    private int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
-      int propertyIntValue = defaultValue;
-      if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
-        return propertyIntValue;
-      }
-      Object propertyObject = tableProperties.get(propertyName);
-      if (propertyObject != null) {
-        try {
-          propertyIntValue = Integer.valueOf((String) propertyObject);
-        } catch (NumberFormatException e) {
-          throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
-        }
-      }
-      return propertyIntValue < 0 ? defaultValue : propertyIntValue;
-    }
-
-    /**
-     * Creates buffer of objects to be used as values, so these values can be re-used.
-     * Objects number depends on number of lines to skip in the end of the file plus one object.
-     *
-     * @param reader          RecordReader to return value object
-     * @param skipFooterLines number of lines to skip at the end of the file
-     * @return list of objects to be used as values
-     */
-    private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
-      List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
-      for (int i = 0; i <= skipFooterLines; i++) {
-        valueHolder.add(reader.createValue());
-      }
-      return valueHolder;
-    }
-  }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 4be2ced..81be529 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -169,7 +169,7 @@
     // If there are no readers created (which is possible when the table is empty or no row groups are matched),
     // create an empty RecordReader to output the schema
     if (readers.size() == 0) {
-      readers.add(new HiveRecordReader(table, null, null, columns, context, conf,
+      readers.add(new HiveDefaultReader(table, null, null, columns, context, conf,
         ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
     }
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index eee7343..7aece71 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -17,7 +17,10 @@
  */
 package org.apache.drill.exec.store.hive;
 
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -29,14 +32,33 @@
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 
 @SuppressWarnings("unused")
 public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
 
+  /**
+   * Use different classes for different Hive native formats:
+   * ORC, AVRO, RCFFile, Text and Parquet.
+   * If input format is none of them falls to default reader.
+   */
+  static Map<String, Class> readerMap = new HashMap<>();
+  static {
+    readerMap.put(OrcInputFormat.class.getCanonicalName(), HiveOrcReader.class);
+    readerMap.put(AvroContainerInputFormat.class.getCanonicalName(), HiveAvroReader.class);
+    readerMap.put(RCFileInputFormat.class.getCanonicalName(), HiveRCFileReader.class);
+    readerMap.put(MapredParquetInputFormat.class.getCanonicalName(), HiveParquetReader.class);
+    readerMap.put(TextInputFormat.class.getCanonicalName(), HiveTextReader.class);
+  }
+
   @Override
   public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
@@ -51,29 +73,27 @@
 
     final HiveConf hiveConf = config.getHiveConf();
 
-    // Native hive text record reader doesn't handle all types currently. For now use HiveRecordReader which uses
-    // Hive InputFormat and SerDe classes to read the data.
-    //if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
-    //        table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
-    //        config.getColumns() != null) {
-    //  for (InputSplit split : splits) {
-    //    readers.add(new HiveTextRecordReader(table,
-    //        (hasPartitions ? partitions.get(i++) : null),
-    //        split, config.getColumns(), context));
-    //  }
-    //} else {
+    final String formatName = table.getSd().getInputFormat();
+    Class<? extends HiveAbstractReader> readerClass = HiveDefaultReader.class;
+    if (readerMap.containsKey(formatName)) {
+      readerClass = readerMap.get(formatName);
+    }
+    Constructor<? extends HiveAbstractReader> readerConstructor = null;
+    try {
+      readerConstructor = readerClass.getConstructor(Table.class, Partition.class,
+          InputSplit.class, List.class, FragmentContext.class, HiveConf.class,
+          UserGroupInformation.class);
       for (InputSplit split : splits) {
-        readers.add(new HiveRecordReader(table,
+        readers.add(readerConstructor.newInstance(table,
             (hasPartitions ? partitions.get(i++) : null), split, config.getColumns(), context, hiveConf, proxyUgi));
       }
-    //}
-
-    // If there are no readers created (which is possible when the table is empty), create an empty RecordReader to
-    // output the schema
-    if (readers.size() == 0) {
-      readers.add(new HiveRecordReader(table, null, null, config.getColumns(), context, hiveConf, proxyUgi));
+      if (readers.size() == 0) {
+        readers.add(readerConstructor.newInstance(
+            table, null, null, config.getColumns(), context, hiveConf, proxyUgi));
+      }
+    } catch(Exception e) {
+      logger.error("No constructor for {}, thrown {}", readerClass.getName(), e);
     }
-
     return new ScanBatch(config, context, readers.iterator());
   }
 }