Revert "DRILL-4373: Drill and Hive have incompatible timestamp representations in parquet - added sys/sess option "store.parquet.int96_as_timestamp"; - added int96 to timestamp converter for both readers; - added unit tests;"

This reverts commit 7e7214b40784668d1599f265067f789aedb6cf86.
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 228308f..c43664c 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -68,7 +68,7 @@
   public static final ConvertHiveParquetScanToDrillParquetScan INSTANCE = new ConvertHiveParquetScanToDrillParquetScan();
 
   private static final DrillSqlOperator INT96_TO_TIMESTAMP =
-      new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA_LOCALTIMEZONE", 1, true);
+      new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA", 1, true);
 
   private static final DrillSqlOperator RTRIM = new DrillSqlOperator("RTRIM", 1, true);
 
@@ -296,7 +296,6 @@
     if (outputType.getSqlTypeName() == SqlTypeName.TIMESTAMP) {
       // TIMESTAMP is stored as INT96 by Hive in ParquetFormat. Use convert_fromTIMESTAMP_IMPALA UDF to convert
       // INT96 format data to TIMESTAMP
-      // TODO: Remove this conversion once "store.parquet.reader.int96_as_timestamp" will be true by default
       return rb.makeCall(INT96_TO_TIMESTAMP, inputRef);
     }
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index f99a934..8f8fdba 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -114,8 +114,6 @@
 
   @Override
   public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function
-    // once "store.parquet.reader.int96_as_timestamp" will be true by default
     if(optimizerRulesContext.getPlannerSettings().getOptions()
         .getOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS).bool_val) {
       return ImmutableSet.<StoragePluginOptimizerRule>of(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 5f62781..fd8e5e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -148,8 +148,6 @@
   OptionValidator PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_CHECK_THRESHOLD, 100l, 10l);
   String PARQUET_NEW_RECORD_READER = "store.parquet.use_new_reader";
   OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR = new BooleanValidator(PARQUET_NEW_RECORD_READER, false);
-  String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp";
-  OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP, false);
 
   String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
   OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
index 38e0514..a57eede 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
@@ -28,29 +28,6 @@
 public class ConvertFromImpalaTimestamp {
 
 
-  @FunctionTemplate(name = "convert_fromTIMESTAMP_IMPALA_LOCALTIMEZONE", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
-  public static class ImpalaTimestampConvertFromWithLocalTimezone implements DrillSimpleFunc {
-
-    @Param VarBinaryHolder in;
-    @Output TimeStampHolder out;
-
-
-    @Override
-    public void setup() { }
-
-    @Override
-    public void eval() {
-      org.apache.drill.exec.util.ByteBufUtil.checkBufferLength(in.buffer, in.start, in.end, 12);
-
-      in.buffer.readerIndex(in.start);
-      long nanosOfDay = in.buffer.readLong();
-      int julianDay = in.buffer.readInt();
-      long dateTime = (julianDay - org.apache.drill.exec.store.parquet.ParquetReaderUtility.JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) *
-          org.joda.time.DateTimeConstants.MILLIS_PER_DAY + (nanosOfDay / org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.NANOS_PER_MILLISECOND);
-      out.value = new org.joda.time.DateTime(dateTime, org.joda.time.chrono.JulianChronology.getInstance()).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
-    }
-  }
-
   @FunctionTemplate(name = "convert_fromTIMESTAMP_IMPALA", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
   public static class ImpalaTimestampConvertFrom implements DrillSimpleFunc {
 
@@ -68,8 +45,16 @@
       in.buffer.readerIndex(in.start);
       long nanosOfDay = in.buffer.readLong();
       int julianDay = in.buffer.readInt();
-      out.value = (julianDay - org.apache.drill.exec.store.parquet.ParquetReaderUtility.JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) *
-          org.joda.time.DateTimeConstants.MILLIS_PER_DAY + (nanosOfDay / org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.NANOS_PER_MILLISECOND);
+      /* We use the same implementation as org.joda.time.DateTimeUtils.fromJulianDay but avoid rounding errors
+         Note we need to subtract half of a day because julian days are recorded as starting at noon.
+         From Joda :
+              public static final long fromJulianDay(double julianDay) {
+                484            double epochDay = julianDay - 2440587.5d;
+                485            return (long) (epochDay * 86400000d);
+                486        }
+      */
+      long dateTime = (julianDay - 2440588)*86400000L + (nanosOfDay / 1000000);
+      out.value = new org.joda.time.DateTime((long) dateTime, org.joda.time.chrono.JulianChronology.getInstance()).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 2c322c7..317ec36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -107,7 +107,6 @@
       ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR,
-      ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
       ExecConstants.ENABLE_UNION_TYPE,
       ExecConstants.TEXT_ESTIMATED_ROW_SIZE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 767c98d..12b2088 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -39,8 +39,6 @@
 import org.apache.parquet.schema.OriginalType;
 import org.joda.time.Chronology;
 import org.joda.time.DateTimeConstants;
-import org.apache.parquet.example.data.simple.NanoTime;
-import org.apache.parquet.io.api.Binary;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -78,21 +76,21 @@
    * in the data pages themselves to see if they are likely corrupt.
    */
   public enum DateCorruptionStatus {
-    META_SHOWS_CORRUPTION {
+    META_SHOWS_CORRUPTION{
       @Override
-      public String toString() {
+      public String toString(){
         return "It is determined from metadata that the date values are definitely CORRUPT";
       }
     },
     META_SHOWS_NO_CORRUPTION {
       @Override
-      public String toString() {
+      public String toString(){
         return "It is determined from metadata that the date values are definitely CORRECT";
       }
     },
     META_UNCLEAR_TEST_VALUES {
       @Override
-      public String toString() {
+      public String toString(){
         return "Not enough info in metadata, parquet reader will test individual date values";
       }
     }
@@ -154,7 +152,7 @@
             OriginalType originalType = columnMetadata.getOriginalType();
             if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue() &&
                 (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
-              int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
+              int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
               columnMetadata.setMax(newMinMax);
               columnMetadata.setMin(newMinMax);
             }
@@ -292,31 +290,4 @@
     }
     return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
   }
-
-  /**
-   * Utilities for converting from parquet INT96 binary (impala, hive timestamp)
-   * to date time value. This utilizes the Joda library.
-   */
-  public static class NanoTimeUtils {
-
-    public static final long NANOS_PER_MILLISECOND = 1000000;
-
-  /**
-   * @param binaryTimeStampValue
-   *          hive, impala timestamp values with nanoseconds precision
-   *          are stored in parquet Binary as INT96 (12 constant bytes)
-   *
-   * @return  Unix Timestamp - the number of milliseconds since January 1, 1970, 00:00:00 GMT
-   *          represented by @param binaryTimeStampValue .
-   */
-    public static long getDateTimeValueFromBinary(Binary binaryTimeStampValue) {
-      // This method represents binaryTimeStampValue as ByteBuffer, where timestamp is stored as sum of
-      // julian day number (32-bit) and nanos of day (64-bit)
-      NanoTime nt = NanoTime.fromBinary(binaryTimeStampValue);
-      int julianDay = nt.getJulianDay();
-      long nanosOfDay = nt.getTimeOfDayNanos();
-      return (julianDay - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
-          + nanosOfDay / NANOS_PER_MILLISECOND;
-    }
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 662d5c9..ea65615 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.BitVector;
@@ -242,12 +241,7 @@
 
     if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
       if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) {
-         // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
-        if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
-          return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
-        } else {
-          return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
-        }
+        return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
       }else{
         return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
       }
@@ -278,12 +272,7 @@
               throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT64");
           }
         case INT96:
-          // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
-          if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
-            return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
-          } else {
-            return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
-          }
+           return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
         case FLOAT:
           return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
         case DOUBLE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index e20504f..a36bf2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -46,7 +46,6 @@
 import org.joda.time.DateTimeConstants;
 
 import io.netty.buffer.DrillBuf;
-import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
 
 public class NullableFixedByteAlignedReaders {
 
@@ -108,33 +107,6 @@
     }
   }
 
-  /**
-   * Class for reading parquet fixed binary type INT96, which is used for storing hive,
-   * impala timestamp values with nanoseconds precision. So it reads such values as a drill timestamp.
-   */
-  static class NullableFixedBinaryAsTimeStampReader extends NullableFixedByteAlignedReader<NullableTimeStampVector> {
-    NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                              ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    @Override
-    protected void readField(long recordsToReadInThisPass) {
-      this.bytebuf = pageReader.pageData;
-      if (usingDictionary) {
-        for (int i = 0; i < recordsToReadInThisPass; i++){
-          Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes();
-          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue));
-        }
-      } else {
-        for (int i = 0; i < recordsToReadInThisPass; i++) {
-          Binary binaryTimeStampValue = pageReader.valueReader.readBytes();
-          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue));
-        }
-      }
-    }
-  }
-
   static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> {
 
     NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index be27f3e..1d0a7ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -21,7 +21,6 @@
 import org.apache.drill.common.types.TypeProtos.MinorType;
 
 import org.apache.drill.common.util.CoreDecimalUtility;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.parquet.format.ConvertedType;
@@ -96,11 +95,7 @@
       // TODO - Both of these are not supported by the parquet library yet (7/3/13),
       // but they are declared here for when they are implemented
       case INT96:
-        if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
-          return TypeProtos.MinorType.TIMESTAMP;
-        } else {
-          return TypeProtos.MinorType.VARBINARY;
-        }
+        return TypeProtos.MinorType.VARBINARY;
       case FIXED_LEN_BYTE_ARRAY:
         if (convertedType == null) {
           checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 2f2db05..48a0bfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -28,7 +28,6 @@
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.DateHolder;
@@ -82,8 +81,6 @@
 
 import com.google.common.collect.Lists;
 
-import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
-
 public class DrillParquetGroupConverter extends GroupConverter {
 
   private List<Converter> converters;
@@ -229,15 +226,9 @@
         }
       }
       case INT96: {
-        // TODO: replace null with TIMESTAMP_NANOS once parquet support such type annotation.
         if (type.getOriginalType() == null) {
-          if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
-            TimeStampWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).timeStamp() : mapWriter.timeStamp(name);
-            return new DrillFixedBinaryToTimeStampConverter(writer);
-          } else {
-            VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
-            return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer());
-          }
+          VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
+          return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer());
         }
 
       }
@@ -631,23 +622,4 @@
       writer.write(holder);
     }
   }
-
-  /**
-   * Parquet currently supports a fixed binary type INT96 for storing hive, impala timestamp
-   * with nanoseconds precision.
-   */
-  public static class DrillFixedBinaryToTimeStampConverter extends PrimitiveConverter {
-    private TimeStampWriter writer;
-    private TimeStampHolder holder = new TimeStampHolder();
-
-    public DrillFixedBinaryToTimeStampConverter(TimeStampWriter writer) {
-      this.writer = writer;
-    }
-
-    @Override
-    public void addBinary(Binary value) {
-      holder.value = getDateTimeValueFromBinary(value);
-      writer.write(holder);
-    }
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index a19b30e..8acf936 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -190,23 +190,12 @@
     return this;
   }
 
-  public TestBuilder optionSettingQueriesForBaseline(String queries, Object... args) {
-    this.baselineOptionSettingQueries = String.format(queries, args);
-    return this;
-  }
-
   // list of queries to run before the test query, can be used to set several options
   // list takes the form of a semi-colon separated list
   public TestBuilder optionSettingQueriesForTestQuery(String queries) {
     this.testOptionSettingQueries = queries;
     return this;
   }
-
-  public TestBuilder optionSettingQueriesForTestQuery(String query, Object... args) throws Exception {
-    this.testOptionSettingQueries = String.format(query, args);
-    return this;
-  }
-
   public TestBuilder approximateEquality() {
     this.approximateEquality = true;
     return this;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index ae0e699..0761f08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -34,9 +34,7 @@
 
 import com.google.common.base.Joiner;
 import org.apache.drill.BaseTestQuery;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.DrillVersionInfo;
-import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -746,76 +744,30 @@
   }
 
   /*
-    Impala encodes timestamp values as int96 fields. Test the reading of an int96 field with two converters:
-    the first one converts parquet INT96 into drill VARBINARY and the second one (works while
-    store.parquet.reader.int96_as_timestamp option is enabled) converts parquet INT96 into drill TIMESTAMP.
+  Test the reading of an int96 field. Impala encodes timestamps as int96 fields
    */
   @Test
   public void testImpalaParquetInt96() throws Exception {
     compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
-    try {
-      test("alter session set %s = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
-      compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
-    } finally {
-      test("alter session reset %s", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
-  }
   }
 
   /*
-  Test the reading of a binary field as drill varbinary where data is in dicationary _and_ non-dictionary encoded pages
+  Test the reading of a binary field where data is in dicationary _and_ non-dictionary encoded pages
    */
   @Test
-  public void testImpalaParquetBinaryAsVarBinary_DictChange() throws Exception {
+  public void testImpalaParquetVarBinary_DictChange() throws Exception {
     compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_dict_change.parquet`");
   }
 
   /*
-  Test the reading of a binary field as drill timestamp where data is in dicationary _and_ non-dictionary encoded pages
-   */
-  @Test
-  public void testImpalaParquetBinaryAsTimeStamp_DictChange() throws Exception {
-    final String WORKING_PATH = TestTools.getWorkingPath();
-    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
-    try {
-      testBuilder()
-          .sqlQuery("select int96_ts from dfs_test.`%s/parquet/int96_dict_change` order by int96_ts", TEST_RES_PATH)
-          .optionSettingQueriesForTestQuery(
-              "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
-          .ordered()
-          .csvBaselineFile("testframework/testParquetReader/testInt96DictChange/q1.tsv")
-          .baselineTypes(TypeProtos.MinorType.TIMESTAMP)
-          .baselineColumns("int96_ts")
-          .build().run();
-    } finally {
-      test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
-    }
-  }
-
-  /*
      Test the conversion from int96 to impala timestamp
    */
   @Test
-  public void testTimestampImpalaConvertFrom() throws Exception {
+  public void testImpalaParquetTimestampAsInt96() throws Exception {
     compareParquetReadersColumnar("convert_from(field_impala_ts, 'TIMESTAMP_IMPALA')", "cp.`parquet/int96_impala_1.parquet`");
   }
 
   /*
-     Test reading parquet Int96 as TimeStamp and comparing obtained values with the
-     old results (reading the same values as VarBinary and convert_fromTIMESTAMP_IMPALA function using)
-   */
-  @Test
-  public void testImpalaParquetTimestampInt96AsTimeStamp() throws Exception {
-    try {
-      test("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER);
-      compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
-      test("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER);
-      compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
-    } finally {
-      test("alter session reset `%s`", ExecConstants.PARQUET_NEW_RECORD_READER);
-    }
-  }
-
-  /*
     Test a file with partitions and an int96 column. (Data generated using Hive)
    */
   @Test
@@ -836,6 +788,7 @@
   Test the conversion from int96 to impala timestamp with hive data including nulls. Validate against expected values
   */
   @Test
+  @Ignore("relies on particular time zone")
   public void testHiveParquetTimestampAsInt96_basic() throws Exception {
     final String q = "SELECT cast(convert_from(timestamp_field, 'TIMESTAMP_IMPALA') as varchar(19))  as timestamp_field "
             + "from cp.`parquet/part1/hive_all_types.parquet` ";
@@ -844,7 +797,7 @@
             .unOrdered()
             .sqlQuery(q)
             .baselineColumns("timestamp_field")
-            .baselineValues("2013-07-06 00:01:00")
+            .baselineValues("2013-07-05 17:01:00")
             .baselineValues((Object)null)
             .go();
   }
@@ -912,23 +865,6 @@
         "cp.`parquet/last_page_one_null.parquet`");
   }
 
-  private void compareParquetInt96Converters(String selection, String table) throws Exception {
-    try {
-      testBuilder()
-          .ordered()
-          .sqlQuery("select `%s` from %s", selection, table)
-          .optionSettingQueriesForTestQuery(
-              "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
-          .sqlBaselineQuery("select convert_from(`%1$s`, 'TIMESTAMP_IMPALA') as `%1$s` from %2$s", selection, table)
-          .optionSettingQueriesForBaseline(
-              "alter session set `%s` = false", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
-          .build()
-          .run();
-    } finally {
-      test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
-    }
-  }
-
   @Ignore ("Used to test decompression in AsyncPageReader. Takes too long.")
   @Test
   public void testTPCHReadWriteRunRepeated() throws Exception {
diff --git a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0 b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0
deleted file mode 100644
index 8517428..0000000
--- a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0
+++ /dev/null
Binary files differ
diff --git a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1 b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1
deleted file mode 100644
index 0183b50..0000000
--- a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1
+++ /dev/null
Binary files differ
diff --git a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
deleted file mode 100644
index 91b9b01..0000000
--- a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
+++ /dev/null
@@ -1,12 +0,0 @@
-1970-01-01 00:00:01.000
-1971-01-01 00:00:01.000
-1972-01-01 00:00:01.000
-1973-01-01 00:00:01.000
-1974-01-01 00:00:01.000
-2010-01-01 00:00:01.000
-2011-01-01 00:00:01.000
-2012-01-01 00:00:01.000
-2013-01-01 00:00:01.000
-2014-01-01 00:00:01.000
-2015-01-01 00:00:01.000
-2016-01-01 00:00:01.000