PIG-4764: Make Pig work with Hive 3.1 (szita)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1873947 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index abf7090..64e4c45 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-4764: Make Pig work with Hive 3.1 (szita)
+
 PIG-5352: Please add OWASP Dependency Check to the build ivy.xml (knoguchi)
 
 PIG-5385: Skip calling extra gc() before spilling large bag when unnecessary (knoguchi)
diff --git a/build.xml b/build.xml
index b86e747..56e36cd 100644
--- a/build.xml
+++ b/build.xml
@@ -154,7 +154,7 @@
     <condition property="isWindows">
       <os family="windows"/>
     </condition>
-	
+
     <target name="setTezEnv">
         <propertyreset name="test.timeout" value="900000" />
         <propertyreset name="hadoopversion" value="2" />
@@ -241,6 +241,7 @@
     </if>
     <property name="hbaseversion" value="1" />
     <property name="sparkversion" value="1" />
+    <property name="hiveversion" value="1" />
 
     <condition property="src.exclude.dir" value="**/Spark2*.java" else="**/Spark1*.java">
         <equals arg1="${sparkversion}" arg2="1"/>
@@ -248,6 +249,7 @@
 
     <property name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
     <property name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
+    <property name="src.hive.shims.dir" value="${basedir}/shims/src/hive${hiveversion}" />
 
     <property name="asfrepo" value="https://repository.apache.org"/>
     <property name="asfsnapshotrepo" value="${asfrepo}/content/repositories/snapshots"/>
@@ -353,6 +355,7 @@
                 <source path="${test.e2e.dir}/udfs/java"/>
                 <source path="${src.shims.dir}"/>
                 <source path="${src.shims.test.dir}"/>
+                <source path="${src.hive.shims.dir}"/>
                 <source path="tutorial/src"/>
                 <source path="${test.src.dir}" excluding="e2e/pig/udfs/java/|resources/|perf/"/>
                 <output path="${build.dir.eclipse-main-classes}" />
@@ -568,8 +571,8 @@
         <echo>*** Building Main Sources ***</echo>
         <echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo>
         <echo>*** Else, you will only be warned about deprecations ***</echo>
-        <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used: ${hbaseversion} ; Spark version used: ${sparkversion} ***</echo>
-        <compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/bzip2;${src.shims.dir}"
+        <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used: ${hbaseversion} ; Spark version used: ${sparkversion} ; Hive version used: ${hiveversion} ***</echo>
+        <compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/bzip2;${src.shims.dir};${src.hive.shims.dir}"
             excludes="${src.exclude.dir}" dist="${build.classes}" cp="classpath" warnings="${javac.args.warnings}" />
         <copy todir="${build.classes}/META-INF">
             <fileset dir="${src.dir}/META-INF" includes="**"/>
@@ -734,6 +737,7 @@
             <fileset dir="${ivy.lib.dir}" includes="metrics-core-*.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="hbase-*.jar" excludes="hbase-*tests.jar,hbase-*hadoop2*.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="hive-*.jar" excludes="hive-shims-0.*.jar, hive-contrib*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="minlog-*.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="protobuf-java-*.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="zookeeper-*.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="accumulo-*.jar" excludes="accumulo-minicluster*.jar"/>
@@ -1161,6 +1165,10 @@
             <fileset dir="${basedir}/shims" />
         </copy>
 
+        <copy todir="${tar.dist.dir}/hive-shims" includeEmptyDirs="true">
+            <fileset dir="${basedir}/hive-shims" />
+        </copy>
+
         <copy todir="${tar.dist.dir}/lib-src" includeEmptyDirs="true">
             <fileset dir="${src.lib.dir}" />
         </copy>
@@ -1236,6 +1244,7 @@
                 <include name="lib-src/**"/>
                 <include name="license/**"/>
                 <include name="shims/**"/>
+                <include name="hive-shims/**"/>
                 <include name="src/**"/>
                 <include name="test/**"/>
                 <exclude name="test/**/*.jar"/>
@@ -1723,7 +1732,7 @@
 
      <target name="ivy-resolve" depends="ivy-init" unless="ivy.resolved" description="Resolve Ivy dependencies">
        <property name="ivy.resolved" value="true"/>
-       <echo>*** Ivy resolve with Hadoop ${hadoopversion}, Spark ${sparkversion} and HBase ${hbaseversion} ***</echo>
+       <echo>*** Ivy resolve with Hadoop ${hadoopversion}, Spark ${sparkversion}, HBase ${hbaseversion}, Hive ${hiveversion} ***</echo>
        <ivy:resolve log="${loglevel}" settingsRef="${ant.project.name}.ivy.settings" conf="compile"/>
        <ivy:report toDir="build/ivy/report"/>
      </target>
diff --git a/ivy.xml b/ivy.xml
index 37ba5d4..800d21e 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -31,7 +31,7 @@
     <conf name="default" extends="master,runtime"/>
     <conf name="runtime" extends="compile,test" description="runtime but not the artifact" />
     <!--Private configurations. -->
-    <conf name="compile" extends="hadoop${hadoopversion},hbase${hbaseversion}" visibility="private" description="compile artifacts"/>
+    <conf name="compile" extends="hadoop${hadoopversion},hbase${hbaseversion},hive${hiveversion}" visibility="private" description="compile artifacts"/>
     <conf name="test" extends="compile" visibility="private"/>
     <conf name="javadoc" visibility="private" extends="compile,test"/>
     <conf name="releaseaudit" visibility="private"/>
@@ -43,6 +43,8 @@
     <conf name="hbase2" visibility="private"/>
     <conf name="spark1" visibility="private" />
     <conf name="spark2" visibility="private" />
+    <conf name="hive1" visibility="private"/>
+    <conf name="hive3" visibility="private"/>
     <conf name="owasp" visibility="private" description="Artifacts required for owasp target"/>
   </configurations>
   <publications>
@@ -525,23 +527,48 @@
     <!-- for piggybank -->
     <dependency org="org.hsqldb" name="hsqldb" rev="${hsqldb.version}"
       conf="test->default" />
-    <dependency org="org.apache.hive" name="hive-exec" rev="${hive.version}" conf="compile->master" changing="true">
+
+    <!-- Hive 1 -->
+    <dependency org="org.apache.hive" name="hive-exec" rev="${hive1.version}" conf="hive1->master" changing="true">
+      <artifact name="hive-exec" m:classifier="core" />
+    </dependency>
+    <dependency org="org.apache.hive" name="hive-serde" rev="${hive1.version}" changing="true"
+      conf="hive1->master" />
+    <dependency org="org.apache.hive" name="hive-common" rev="${hive1.version}" changing="true"
+      conf="hive1->master" />
+    <dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive1.version}" changing="true"
+      conf="hive1->master" />
+    <dependency org="org.apache.hive" name="hive-contrib" rev="${hive1.version}" changing="true"
+                conf="test->master" />
+    <dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive1.version}" changing="true"
+      conf="hive1->master" />
+
+    <!-- Hive 3 -->
+    <dependency org="org.apache.hive" name="hive-exec" rev="${hive.version}" conf="hive3->master" changing="true">
       <artifact name="hive-exec" m:classifier="core" />
     </dependency>
     <dependency org="org.apache.hive" name="hive-serde" rev="${hive.version}" changing="true"
-      conf="compile->master" />
+                conf="hive3->master" />
     <dependency org="org.apache.hive" name="hive-common" rev="${hive.version}" changing="true"
-      conf="compile->master" />
+                conf="hive3->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive.version}" changing="true"
-      conf="compile->master" />
+                conf="hive3->master" />
     <dependency org="org.apache.hive" name="hive-contrib" rev="${hive.version}" changing="true"
                 conf="test->master" />
+    <dependency org="org.apache.hive" name="hive-llap-common" rev="${hive.version}" changing="true"
+                conf="hive3->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive.version}" changing="true"
-      conf="hadoop2->master" />
+                conf="hive3->master" />
+
+
+    <dependency org="org.apache.orc" name="orc-core" rev="${orc.version}" changing="true" conf="hive3->default" />
+    <dependency org="org.apache.hive" name="hive-storage-api" rev="${hive-storage-api.version}" changing="true" conf="hive3->master" />
     <dependency org="org.iq80.snappy" name="snappy" rev="${snappy.version}"
       conf="test->master" />
-    <dependency org="com.esotericsoftware.kryo" name="kryo" rev="${kryo.version}"
-      conf="compile->master" />
+    <dependency org="com.esotericsoftware" name="kryo-shaded" rev="${kryo.version}"
+      conf="hive3->default" />
+    <dependency org="com.esotericsoftware.kryo" name="kryo" rev="2.22"
+      conf="hive1->default" />
     <dependency org="org.apache.commons" name="commons-lang3" rev="${commons-lang3.version}"
       conf="compile->master" />
 
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 9d099e7..dd33107 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -41,7 +41,10 @@
 hbase1.version=1.2.4
 hbase2.version=2.0.0
 hsqldb.version=2.4.0
-hive.version=1.2.1
+hive1.version=1.2.1
+hive.version=3.1.2
+hive-storage-api.version=2.7.0
+orc.version=1.5.6
 httpcomponents.version=4.4
 jackson.version=1.9.13
 jackson-pig-3039-test.version=1.9.9
@@ -59,7 +62,7 @@
 junit.version=4.11
 jruby.version=1.7.26
 jython.version=2.7.1
-kryo.version=2.22
+kryo.version=3.0.3
 rhino.version=1.7R2
 antlr.version=3.4
 stringtemplate.version=4.0.4
diff --git a/shims/src/hive1/org/apache/pig/hive/HiveShims.java b/shims/src/hive1/org/apache/pig/hive/HiveShims.java
new file mode 100644
index 0000000..b606821
--- /dev/null
+++ b/shims/src/hive1/org/apache/pig/hive/HiveShims.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.hive;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+
+import org.joda.time.DateTime;
+
+public class HiveShims {
+    public static String normalizeOrcVersionName(String version) {
+        return Version.byName(version).getName();
+    }
+
+    public static void addLessThanOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object value) {
+        builder.lessThan(columnName, value);
+    }
+
+    public static void addLessThanEqualsOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object value) {
+        builder.lessThanEquals(columnName, value);
+    }
+
+    public static void addEqualsOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object value) {
+        builder.equals(columnName, value);
+    }
+
+    public static void addBetweenOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object low, Object high) {
+        builder.between(columnName, low, high);
+    }
+
+    public static void addIsNullOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType) {
+        builder.isNull(columnName);
+    }
+
+    public static Class[] getOrcDependentClasses(Class hadoopVersionShimsClass) {
+        return new Class[]{OrcFile.class, HiveConf.class, AbstractSerDe.class,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+                Input.class};
+    }
+
+    public static Class[] getHiveUDFDependentClasses(Class hadoopVersionShimsClass) {
+        return new Class[]{GenericUDF.class,
+                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+                hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class};
+    }
+
+    public static Object getSearchArgObjValue(Object value) {
+        if (value instanceof BigInteger) {
+            return new BigDecimal((BigInteger) value);
+        } else if (value instanceof DateTime) {
+            return new Timestamp(((DateTime) value).getMillis());
+        } else {
+            return value;
+        }
+    }
+
+    public static void setOrcConfigOnJob(Job job, Long stripeSize, Integer rowIndexStride, Integer bufferSize, Boolean blockPadding, CompressionKind compress, String versionName) {
+        if (stripeSize != null) {
+            job.getConfiguration().setLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, stripeSize);
+        }
+        if (rowIndexStride != null) {
+            job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.varname, rowIndexStride);
+        }
+        if (bufferSize != null) {
+            job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname, bufferSize);
+        }
+        if (blockPadding != null) {
+            job.getConfiguration().setBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING.varname, blockPadding);
+        }
+        if (compress != null) {
+            job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, compress.toString());
+        }
+        if (versionName != null) {
+            job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname, versionName);
+        }
+    }
+
+    public static class PigJodaTimeStampObjectInspector extends
+            AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector {
+
+        public PigJodaTimeStampObjectInspector() {
+            super(TypeInfoFactory.timestampTypeInfo);
+        }
+
+        @Override
+        public TimestampWritable getPrimitiveWritableObject(Object o) {
+            return o == null ? null : new TimestampWritable(new Timestamp(((DateTime) o).getMillis()));
+        }
+
+        @Override
+        public Timestamp getPrimitiveJavaObject(Object o) {
+            return o == null ? null : new Timestamp(((DateTime) o).getMillis());
+        }
+    }
+
+    public static GenericUDAFParameterInfo newSimpleGenericUDAFParameterInfo(ObjectInspector[] arguments,
+            boolean distinct, boolean allColumns) {
+        return new SimpleGenericUDAFParameterInfo(arguments, distinct, allColumns);
+    }
+
+    public static class TimestampShim {
+
+        public static Timestamp cast(Object ts) {
+            return (Timestamp) ts;
+        }
+
+        public static long millisFromTimestamp(Object ts) {
+            return cast(ts).getTime();
+        }
+    }
+
+    public static class TimestampWritableShim {
+
+        public static boolean isAssignableFrom(Object object) {
+            return object instanceof TimestampWritable;
+        }
+
+        public static TimestampWritable cast(Object ts) {
+            return (TimestampWritable) ts;
+        }
+
+        public static long millisFromTimestampWritable(Object ts) {
+            return cast(ts).getTimestamp().getTime();
+        }
+    }
+}
\ No newline at end of file
diff --git a/shims/src/hive3/org/apache/pig/hive/HiveShims.java b/shims/src/hive3/org/apache/pig/hive/HiveShims.java
new file mode 100644
index 0000000..32456c5
--- /dev/null
+++ b/shims/src/hive3/org/apache/pig/hive/HiveShims.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.hive;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile.Version;
+
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+
+import org.joda.time.DateTime;
+
+
+public class HiveShims {
+    public static String normalizeOrcVersionName(String version) {
+        return Version.byName(version).getName();
+    }
+
+    public static void addLessThanOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object value) {
+        builder.lessThan(columnName, columnType, value);
+    }
+
+    public static void addLessThanEqualsOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object value) {
+        builder.lessThanEquals(columnName, columnType, value);
+    }
+
+    public static void addEqualsOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object value) {
+        builder.equals(columnName, columnType, value);
+    }
+
+    public static void addBetweenOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType, Object low, Object high) {
+        builder.between(columnName, columnType, low, high);
+    }
+
+    public static void addIsNullOpToBuilder(SearchArgument.Builder builder,
+            String columnName, PredicateLeaf.Type columnType) {
+        builder.isNull(columnName, columnType);
+    }
+
+    public static Class[] getOrcDependentClasses(Class hadoopVersionShimsClass) {
+        return new Class[]{OrcFile.class, HiveConf.class, AbstractSerDe.class,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, DateWritable.class,
+                hadoopVersionShimsClass, Input.class, org.apache.orc.OrcFile.class,
+                com.esotericsoftware.minlog.Log.class};
+    }
+
+    public static Class[] getHiveUDFDependentClasses(Class hadoopVersionShimsClass) {
+        return new Class[]{GenericUDF.class,
+                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+                hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class, HiveDecimalWritable.class};
+    }
+
+    public static Object getSearchArgObjValue(Object value) {
+        if (value instanceof Integer) {
+            return new Long((Integer) value);
+        } else if (value instanceof Float) {
+            return new Double((Float) value);
+        } else if (value instanceof BigInteger) {
+            return new HiveDecimalWritable(HiveDecimal.create((BigInteger) value));
+        } else if (value instanceof BigDecimal) {
+            return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) value));
+        } else if (value instanceof DateTime) {
+            return new java.sql.Date(((DateTime) value).getMillis());
+        } else {
+            return value;
+        }
+    }
+
+    public static void setOrcConfigOnJob(Job job, Long stripeSize, Integer rowIndexStride, Integer bufferSize,
+            Boolean blockPadding, CompressionKind compress, String versionName) {
+        if (stripeSize != null) {
+            job.getConfiguration().setLong(OrcConf.STRIPE_SIZE.getAttribute(), stripeSize);
+        }
+        if (rowIndexStride != null) {
+            job.getConfiguration().setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), rowIndexStride);
+        }
+        if (bufferSize != null) {
+            job.getConfiguration().setInt(OrcConf.BUFFER_SIZE.getAttribute(), bufferSize);
+        }
+        if (blockPadding != null) {
+            job.getConfiguration().setBoolean(OrcConf.BLOCK_PADDING.getAttribute(), blockPadding);
+        }
+        if (compress != null) {
+            job.getConfiguration().set(OrcConf.COMPRESS.getAttribute(), compress.toString());
+        }
+        if (versionName != null) {
+            job.getConfiguration().set(OrcConf.WRITE_FORMAT.getAttribute(), versionName);
+        }
+    }
+
+    public static class PigJodaTimeStampObjectInspector extends
+            AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector {
+
+        public PigJodaTimeStampObjectInspector() {
+            super(TypeInfoFactory.timestampTypeInfo);
+        }
+
+        private static Timestamp getHiveTimeStampFromDateTime(Object o) {
+            if (o == null) {
+                return null;
+            }
+            Timestamp ts = new Timestamp();
+            ts.setTimeInMillis(((DateTime) o).getMillis());
+            return ts;
+        }
+
+        @Override
+        public TimestampWritableV2 getPrimitiveWritableObject(Object o) {
+            return o == null ? null : new TimestampWritableV2(getHiveTimeStampFromDateTime(o));
+        }
+
+        @Override
+        public Timestamp getPrimitiveJavaObject(Object o) {
+            return o == null ? null : new Timestamp(getHiveTimeStampFromDateTime(o));
+        }
+    }
+
+    public static GenericUDAFParameterInfo newSimpleGenericUDAFParameterInfo(ObjectInspector[] arguments,
+            boolean distinct, boolean allColumns) {
+        return new SimpleGenericUDAFParameterInfo(arguments, false, distinct, allColumns);
+    }
+
+    public static class TimestampShim {
+
+        public static Timestamp cast(Object ts) {
+            return (Timestamp) ts;
+        }
+
+        public static long millisFromTimestamp(Object ts) {
+            return cast(ts).toEpochMilli();
+        }
+    }
+
+    public static class TimestampWritableShim {
+
+        public static boolean isAssignableFrom(Object object) {
+            return object instanceof TimestampWritableV2;
+        }
+
+        public static TimestampWritableV2 cast(Object ts) {
+            return (TimestampWritableV2) ts;
+        }
+
+        public static long millisFromTimestampWritable(Object ts) {
+            return cast(ts).getTimestamp().toEpochMilli();
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/src/org/apache/pig/Expression.java b/src/org/apache/pig/Expression.java
index a843c16..2b6ddca 100644
--- a/src/org/apache/pig/Expression.java
+++ b/src/org/apache/pig/Expression.java
@@ -79,6 +79,8 @@
 
     protected OpType opType;
 
+    protected byte dataType;
+
     /**
      * @return the opType
      */
@@ -86,6 +88,10 @@
         return opType;
     }
 
+    public byte getDataType() {
+        return dataType;
+    }
+
     //TODO: Apply a optimizer to Expression from PredicatePushdownOptimizer and
     // convert OR clauses to BETWEEN OR IN
     public static class BetweenExpression extends Expression {
@@ -221,9 +227,10 @@
         /**
          * @param name
          */
-        public Column(String name) {
+        public Column(String name, byte dataType) {
             this.opType = OpType.TERM_COL;
             this.name = name;
+            this.dataType = dataType;
         }
 
         @Override
diff --git a/src/org/apache/pig/builtin/HiveUDAF.java b/src/org/apache/pig/builtin/HiveUDAF.java
index b86159d..789600c 100644
--- a/src/org/apache/pig/builtin/HiveUDAF.java
+++ b/src/org/apache/pig/builtin/HiveUDAF.java
@@ -42,6 +42,7 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.hive.HiveShims;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.util.hive.HiveUtils;
@@ -105,8 +106,7 @@
                 
                 if (udaf instanceof GenericUDAFResolver2) {
                     GenericUDAFParameterInfo paramInfo =
-                            new SimpleGenericUDAFParameterInfo(
-                                    arguments, false, false);
+                            HiveShims.newSimpleGenericUDAFParameterInfo(arguments, false, false);
                     evaluator = ((GenericUDAFResolver2)udaf).getEvaluator(paramInfo);
                 } else {
                     TypeInfo[] params = ((StructTypeInfo)inputTypeInfo)
diff --git a/src/org/apache/pig/builtin/HiveUDFBase.java b/src/org/apache/pig/builtin/HiveUDFBase.java
index ebfbbe1..8e8aa32 100644
--- a/src/org/apache/pig/builtin/HiveUDFBase.java
+++ b/src/org/apache/pig/builtin/HiveUDFBase.java
@@ -49,6 +49,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.hive.HiveShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
@@ -181,9 +182,7 @@
 
     @Override
     public List<String> getShipFiles() {
-        List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
-                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
-                Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
+        List<String> files = FuncUtils.getShipFiles(HiveShims.getHiveUDFDependentClasses(Hadoop23Shims.class));
         return files;
     }
 
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index 5f89706..9e4de7f 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -17,10 +17,10 @@
  */
 package org.apache.pig.builtin;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -33,13 +33,13 @@
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
@@ -47,18 +47,16 @@
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.Hadoop23Shims;
-import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -68,6 +66,7 @@
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pig.Expression;
 import org.apache.pig.Expression.BetweenExpression;
+import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.Expression.Column;
 import org.apache.pig.Expression.Const;
 import org.apache.pig.Expression.InExpression;
@@ -80,26 +79,28 @@
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreResources;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.hive.HiveShims;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.hive.HiveUtils;
-import org.joda.time.DateTime;
 
-import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.annotations.VisibleForTesting;
 
+import org.joda.time.DateTime;
+
 /**
  * A load function and store function for ORC file.
  * An optional constructor argument is provided that allows one to customize
@@ -133,7 +134,7 @@
     private Integer bufferSize;
     private Boolean blockPadding;
     private CompressionKind compress;
-    private Version version;
+    private String versionName;
 
     private static final Options validOptions;
     private final CommandLineParser parser = new GnuParser();
@@ -182,7 +183,7 @@
                 compress = CompressionKind.valueOf(configuredOptions.getOptionValue('c'));
             }
             if (configuredOptions.hasOption('v')) {
-                version = Version.byName(configuredOptions.getOptionValue('v'));
+                versionName = HiveShims.normalizeOrcVersionName(configuredOptions.getOptionValue('v'));
             }
         } catch (ParseException e) {
             log.error("Exception in OrcStorage", e);
@@ -207,30 +208,8 @@
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
         if (!UDFContext.getUDFContext().isFrontend()) {
-            if (stripeSize!=null) {
-                job.getConfiguration().setLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname,
-                        stripeSize);
-            }
-            if (rowIndexStride!=null) {
-                job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.varname,
-                        rowIndexStride);
-            }
-            if (bufferSize!=null) {
-                job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname,
-                        bufferSize);
-            }
-            if (blockPadding!=null) {
-                job.getConfiguration().setBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING.varname,
-                        blockPadding);
-            }
-            if (compress!=null) {
-                job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname,
-                        compress.toString());
-            }
-            if (version!=null) {
-                job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname,
-                        version.getName());
-            }
+            HiveShims.setOrcConfigOnJob(job, stripeSize, rowIndexStride, bufferSize, blockPadding, compress,
+                    versionName);
         }
         FileOutputFormat.setOutputPath(job, new Path(location));
         if (typeInfo==null) {
@@ -396,9 +375,7 @@
 
     @Override
     public List<String> getShipFiles() {
-        Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
-                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
-                Input.class};
+        Class[] classList = HiveShims.getOrcDependentClasses(Hadoop23Shims.class);
         return FuncUtils.getShipFiles(classList);
     }
 
@@ -582,7 +559,11 @@
             log.info("Pushdown predicate SearchArgument is:\n" + sArg);
             Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
             try {
-                p.setProperty(signature + SearchArgsSuffix, sArg.toKryo());
+                Kryo kryo = new Kryo();
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                Output output = new Output(baos);
+                kryo.writeObject(output, sArg);
+                p.setProperty(signature + SearchArgsSuffix, new String(Base64.encodeBase64(output.toBytes())));
             } catch (Exception e) {
                 throw new IOException("Cannot serialize SearchArgument: " + sArg);
             }
@@ -625,35 +606,43 @@
                 builder.end();
                 break;
             case OP_EQ:
-                builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+                HiveShims.addEqualsOpToBuilder(builder, getColumnName(lhs),
+                        getColumnType(lhs), getExpressionValue(rhs));
                 break;
             case OP_NE:
                 builder.startNot();
-                builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+                HiveShims.addEqualsOpToBuilder(builder, getColumnName(lhs),
+                        getColumnType(lhs), getExpressionValue(rhs));
                 builder.end();
                 break;
             case OP_LT:
-                builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+                HiveShims.addLessThanOpToBuilder(builder, getColumnName(lhs),
+                        getColumnType(lhs), getExpressionValue(rhs));
                 break;
             case OP_LE:
-                builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+                HiveShims.addLessThanEqualsOpToBuilder(builder, getColumnName(lhs),
+                        getColumnType(lhs), getExpressionValue(rhs));
                 break;
             case OP_GT:
                 builder.startNot();
-                builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+                HiveShims.addLessThanEqualsOpToBuilder(builder, getColumnName(lhs),
+                        getColumnType(lhs), getExpressionValue(rhs));
                 builder.end();
                 break;
             case OP_GE:
                 builder.startNot();
-                builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+                HiveShims.addLessThanOpToBuilder(builder, getColumnName(lhs),
+                        getColumnType(lhs), getExpressionValue(rhs));
                 builder.end();
                 break;
             case OP_BETWEEN:
                 BetweenExpression between = (BetweenExpression) rhs;
-                builder.between(getColumnName(lhs), getSearchArgObjValue(between.getLower()),  getSearchArgObjValue(between.getUpper()));
+                HiveShims.addBetweenOpToBuilder(builder, getColumnName(lhs),
+                        getColumnType(lhs), HiveShims.getSearchArgObjValue(between.getLower()),
+                        HiveShims.getSearchArgObjValue(between.getUpper()));
             case OP_IN:
                 InExpression in = (InExpression) rhs;
-                builder.in(getColumnName(lhs), getSearchArgObjValues(in.getValues()).toArray());
+                builder.in(getColumnName(lhs), getColumnType(lhs), getSearchArgObjValues(in.getValues()).toArray());
             default:
                 throw new RuntimeException("Unsupported binary expression type: " + expr.getOpType() + " in " + expr);
             }
@@ -661,7 +650,8 @@
             Expression unaryExpr = ((UnaryExpression) expr).getExpression();
             switch (expr.getOpType()) {
             case OP_NULL:
-                builder.isNull(getColumnName(unaryExpr));
+                HiveShims.addIsNullOpToBuilder(builder, getColumnName(unaryExpr),
+                        getColumnType(unaryExpr));
                 break;
             case OP_NOT:
                 builder.startNot();
@@ -686,12 +676,21 @@
         }
     }
 
+    private PredicateLeaf.Type getColumnType(Expression expr) {
+        try {
+            return HiveUtils.getDataTypeForSearchArgs(expr.getDataType());
+        } catch (ClassCastException e) {
+            throw new RuntimeException("Expected a Column but found " + expr.getClass().getName() +
+                    " in expression " + expr, e);
+        }
+    }
+
     private Object getExpressionValue(Expression expr) {
         switch(expr.getOpType()) {
         case TERM_COL:
             return ((Column) expr).getName();
         case TERM_CONST:
-            return getSearchArgObjValue(((Const) expr).getValue());
+            return HiveShims.getSearchArgObjValue(((Const) expr).getValue());
         default:
             throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
         }
@@ -703,21 +702,8 @@
         }
         List<Object> newValues = new ArrayList<Object>(values.size());
         for (Object value : values) {
-            newValues.add(getSearchArgObjValue(value));
+            newValues.add(HiveShims.getSearchArgObjValue(value));
         }
         return values;
     }
-
-    private Object getSearchArgObjValue(Object value) {
-        if (value instanceof BigInteger) {
-            return new BigDecimal((BigInteger)value);
-        } else if (value instanceof BigDecimal) {
-            return value;
-        } else if (value instanceof DateTime) {
-            return new Timestamp(((DateTime)value).getMillis());
-        } else {
-            return value;
-        }
-    }
-
 }
diff --git a/src/org/apache/pig/impl/util/hive/HiveUtils.java b/src/org/apache/pig/impl/util/hive/HiveUtils.java
index 3691cd0..7ff75e4 100644
--- a/src/org/apache/pig/impl/util/hive/HiveUtils.java
+++ b/src/org/apache/pig/impl/util/hive/HiveUtils.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
@@ -69,6 +70,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.hive.HiveShims;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.joda.time.DateTime;
 
@@ -179,8 +181,7 @@
             result = new DataByteArray(b, 0, b.length);
             break;
         case TIMESTAMP:
-            java.sql.Timestamp origTimeStamp = (java.sql.Timestamp)poi.getPrimitiveJavaObject(obj);
-            result = new DateTime(origTimeStamp.getTime());
+            result = new DateTime(HiveShims.TimestampShim.millisFromTimestamp(poi.getPrimitiveJavaObject(obj)));
             break;
         case DATE:
             java.sql.Date origDate = (java.sql.Date)poi.getPrimitiveJavaObject(obj);
@@ -674,24 +675,6 @@
 
     }
 
-    static class PigJodaTimeStampObjectInspector extends
-            AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector {
-
-        protected PigJodaTimeStampObjectInspector() {
-            super(TypeInfoFactory.timestampTypeInfo);
-        }
-
-        @Override
-        public TimestampWritable getPrimitiveWritableObject(Object o) {
-            return o == null ? null : new TimestampWritable(new Timestamp(((DateTime)o).getMillis()));
-        }
-
-        @Override
-        public Timestamp getPrimitiveJavaObject(Object o) {
-            return o == null ? null : new Timestamp(((DateTime)o).getMillis());
-        }
-    }
-
     static class PigDecimalObjectInspector extends
             AbstractPrimitiveJavaObjectInspector implements HiveDecimalObjectInspector {
 
@@ -735,7 +718,7 @@
             case STRING:
               return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
             case TIMESTAMP:
-              return new PigJodaTimeStampObjectInspector();
+              return new HiveShims.PigJodaTimeStampObjectInspector();
             case DECIMAL:
               return new PigDecimalObjectInspector();
             case BINARY:
@@ -781,4 +764,28 @@
             throw new IllegalArgumentException("Not implemented " + obj.getClass().getName());
         }
     }
+
+    public static PredicateLeaf.Type getDataTypeForSearchArgs(byte dataType) {
+        switch (dataType) {
+            case DataType.INTEGER:
+                return PredicateLeaf.Type.LONG;
+            case DataType.LONG:
+                return PredicateLeaf.Type.LONG;
+            case DataType.DOUBLE:
+                return PredicateLeaf.Type.FLOAT;
+            case DataType.FLOAT:
+                return PredicateLeaf.Type.FLOAT;
+            case DataType.CHARARRAY:
+                return PredicateLeaf.Type.STRING;
+            case DataType.DATETIME:
+                return PredicateLeaf.Type.DATE;
+            case DataType.BIGINTEGER:
+            case DataType.BIGDECIMAL:
+                return PredicateLeaf.Type.DECIMAL;
+            case DataType.BOOLEAN:
+                return PredicateLeaf.Type.BOOLEAN;
+            default:
+                throw new RuntimeException("Unsupported data type:" + DataType.findTypeName(dataType));
+        }
+    }
 }
diff --git a/src/org/apache/pig/newplan/FilterExtractor.java b/src/org/apache/pig/newplan/FilterExtractor.java
index fd748bd..8788cdb 100644
--- a/src/org/apache/pig/newplan/FilterExtractor.java
+++ b/src/org/apache/pig/newplan/FilterExtractor.java
@@ -356,7 +356,7 @@
         } else if (op instanceof ProjectExpression) {
             ProjectExpression projExpr = (ProjectExpression)op;
             String fieldName = projExpr.getFieldSchema().alias;
-            return new Expression.Column(fieldName);
+            return new Expression.Column(fieldName, projExpr.getType());
         } else if(op instanceof BinaryExpression) {
             BinaryExpression binOp = (BinaryExpression)op;
             if(binOp instanceof AddExpression) {
diff --git a/test/org/apache/pig/builtin/TestOrcStorage.java b/test/org/apache/pig/builtin/TestOrcStorage.java
index ddff101..6e8b4dc 100644
--- a/test/org/apache/pig/builtin/TestOrcStorage.java
+++ b/test/org/apache/pig/builtin/TestOrcStorage.java
@@ -17,28 +17,19 @@
  */
 package org.apache.pig.builtin;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.apache.pig.builtin.mock.Storage.resetData;
-import static org.apache.pig.builtin.mock.Storage.tuple;
-
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.TimeZone;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -48,7 +39,6 @@
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BooleanWritable;
@@ -65,8 +55,10 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.hive.HiveShims;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.test.Util;
+
 import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Assert;
@@ -74,6 +66,12 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class TestOrcStorage {
     final protected static Log LOG = LogFactory.getLog(TestOrcStorage.class);
 
@@ -89,6 +87,11 @@
     private static PigServer pigServer = null;
     private static FileSystem fs;
 
+    static {
+        System.setProperty("user.timezone", "UTC");
+        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    }
+
     @BeforeClass
     public static void oneTimeSetup(){
         if(Util.WINDOWS){
@@ -282,7 +285,7 @@
         Tuple t = iter.next();
         assertTrue(t.toString().startsWith("(false,1,1024,65536,9223372036854775807,1.0,-15.0," +
                 ",hi,({(1,bye),(2,sigh)}),{(3,good),(4,bad)},[],"));
-        assertTrue(t.get(12).toString().matches("2000-03-12T15:00:00.000.*"));
+        assertTrue(t.get(12).toString().matches("2000-03-12T15:00:00\\.000Z.*"));
         assertTrue(t.toString().endsWith(",12345678.6547456)"));
     }
 
@@ -406,9 +409,9 @@
         } else if (expected instanceof BooleanWritable) {
             assertEquals(Boolean.class, actual.getClass());
             assertEquals(((BooleanWritable) expected).get(), actual);
-        } else if (expected instanceof TimestampWritable) {
+        } else if (HiveShims.TimestampWritableShim.isAssignableFrom(expected)) {
             assertEquals(DateTime.class, actual.getClass());
-            assertEquals(((TimestampWritable) expected).getTimestamp().getTime(),
+            assertEquals(HiveShims.TimestampWritableShim.millisFromTimestampWritable(expected),
                     ((DateTime) actual).getMillis());
         } else if (expected instanceof BytesWritable) {
             assertEquals(DataByteArray.class, actual.getClass());
diff --git a/test/org/apache/pig/builtin/TestOrcStoragePushdown.java b/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
index 461d671..de62c88 100644
--- a/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
+++ b/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
@@ -17,9 +17,6 @@
  */
 package org.apache.pig.builtin;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -61,6 +58,9 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
+
 public class TestOrcStoragePushdown {
 
     private static List<OpType> supportedOpTypes;
@@ -221,8 +221,8 @@
         String q = query + "b = filter a by srcid == 10;" + "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
         SearchArgument sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
-                "expr = leaf-0", sarg.toString());
+        assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+                "expr = leaf-0");
     }
 
     @Test
@@ -230,11 +230,11 @@
         String q = query + "b = filter a by (srcid > 10 or dstid <= 5) and name == 'foo' and mrkt is null;" + "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid", "dstid", "name", "mrkt"));
         SearchArgument sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (LESS_THAN_EQUALS srcid 10)\n" +
-                "leaf-1 = (LESS_THAN_EQUALS dstid 5)\n" +
-                "leaf-2 = (EQUALS name foo)\n" +
-                "leaf-3 = (IS_NULL mrkt)\n" +
-                "expr = (and (or (not leaf-0) leaf-1) leaf-2 leaf-3)", sarg.toString());
+        assertEqualsSarg(sarg, "leaf-0 = (LESS_THAN_EQUALS srcid 10)",
+                "leaf-1 = (LESS_THAN_EQUALS dstid 5)",
+                "leaf-2 = (EQUALS name foo)",
+                "leaf-3 = (IS_NULL mrkt)",
+                "expr = (and (or (not leaf-0) leaf-1) leaf-2 leaf-3)");
     }
 
     @Test
@@ -242,9 +242,9 @@
         String q = query + "b = filter a by srcid != 10 and mrkt is not null;" + "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid", "dstid", "name", "mrkt"));
         SearchArgument sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
-                "leaf-1 = (IS_NULL mrkt)\n" +
-                "expr = (and (not leaf-0) (not leaf-1))", sarg.toString());
+        assertEqualsSarg(sarg,"leaf-0 = (EQUALS srcid 10)",
+                "leaf-1 = (IS_NULL mrkt)",
+                "expr = (and (not leaf-0) (not leaf-1))");
     }
 
     @Test
@@ -253,9 +253,9 @@
         String q = query + "b = filter a by srcid > 10 or srcid < 20;" + "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
         SearchArgument sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (LESS_THAN_EQUALS srcid 10)\n" +
-                "leaf-1 = (LESS_THAN srcid 20)\n" +
-                "expr = (or (not leaf-0) leaf-1)", sarg.toString());
+        assertEqualsSarg(sarg, "leaf-0 = (LESS_THAN_EQUALS srcid 10)",
+                "leaf-1 = (LESS_THAN srcid 20)",
+                "expr = (or (not leaf-0) leaf-1)");
     }
 
     @Test
@@ -264,9 +264,9 @@
         String q = query + "b = filter a by srcid == 10 or srcid == 11;" + "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
         SearchArgument sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
-                "leaf-1 = (EQUALS srcid 11)\n" +
-                "expr = (or leaf-0 leaf-1)", sarg.toString());
+        assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+                "leaf-1 = (EQUALS srcid 11)",
+                "expr = (or leaf-0 leaf-1)");
     }
 
     @Test
@@ -282,14 +282,14 @@
         q = query + "b = filter a by name matches 'foo*' and srcid == 10;" + "store b into 'out';";
         expr = getExpressionForTest(q, Arrays.asList("srcid", "name"));
         sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
-                "expr = leaf-0", sarg.toString());
+        assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+                "expr = leaf-0");
 
         q = query + "b = filter a by srcid == 10 and name matches 'foo*';" + "store b into 'out';";
         expr = getExpressionForTest(q, Arrays.asList("srcid", "name"));
         sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
-                "expr = leaf-0", sarg.toString());
+        assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+                "expr = leaf-0");
 
         // OR - Nothing should be pushed
         q = query + "b = filter a by name matches 'foo*' or srcid == 10;" + "store b into 'out';";
@@ -307,8 +307,8 @@
                 "store b into 'out';";
         Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
         SearchArgument sarg = orcStorage.getSearchArgument(expr);
-        assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
-                "expr = leaf-0", sarg.toString());
+        assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+                "expr = leaf-0");
     }
 
     @Test
@@ -419,4 +419,17 @@
     public void testPredicatePushdownVarchar() throws Exception {
         testPredicatePushdown(basedir + "charvarchar.orc", "$1 == 'alice allen         '", 19, 18000);
     }
+
+    private static void assertEqualsSarg(SearchArgument actual, String... expected) {
+        String hive1Expected = String.join("\n", expected);
+        String hive3Expected = String.join(", ", expected);
+
+        if (hive1Expected.equals(actual.toString())) {
+            return;
+        }
+        if (hive3Expected.equals(actual.toString())) {
+            return;
+        }
+        fail(actual.toString() + "\n does not match expected SARG:\n" + hive3Expected);
+    }
 }
diff --git a/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java b/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
index 793e127..45f3417 100644
--- a/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
+++ b/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
@@ -19,11 +19,12 @@
 
 import java.util.List;
 
+import org.apache.hive.common.util.HiveVersionInfo;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.Utils;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -45,10 +46,20 @@
                 "store a into 'ooo';";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
-                "hive-shims-0.23", "hive-shims-common", "kryo"};
+        String hiveVersion = HiveVersionInfo.getVersion().substring(0, 1);
+        if (hiveVersion.equals("3")) {
+            String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+                    "hive-shims-0.23", "hive-shims-common", "orc-core",
+                    "hive-storage-api", "kryo", "minlog"
+            };
 
-        checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+            checkPlan(pp, expectedJars, 9, pigServer.getPigContext());
+        } else {
+            String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+                    "hive-shims-0.23", "hive-shims-common", "kryo"};
+
+            checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+        }
     }
 
     @Test
@@ -57,10 +68,21 @@
                 "store a into 'ooo' using OrcStorage;";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
-                "hive-shims-0.23", "hive-shims-common", "kryo"};
+        String hiveVersion = HiveVersionInfo.getVersion().substring(0, 1);
+        if (hiveVersion.equals("3")) {
+            String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+                    "hive-shims-0.23", "hive-shims-common", "orc-core",
+                    "hive-storage-api", "kryo", "minlog"
+            };
 
-        checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+            checkPlan(pp, expectedJars, 9, pigServer.getPigContext());
+        } else {
+            String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+                    "hive-shims-0.23", "hive-shims-common", "kryo"};
+
+
+            checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+        }
     }
 
     @Test