PIG-4764: Make Pig work with Hive 3.1 (szita)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1873947 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index abf7090..64e4c45 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-4764: Make Pig work with Hive 3.1 (szita)
+
PIG-5352: Please add OWASP Dependency Check to the build ivy.xml (knoguchi)
PIG-5385: Skip calling extra gc() before spilling large bag when unnecessary (knoguchi)
diff --git a/build.xml b/build.xml
index b86e747..56e36cd 100644
--- a/build.xml
+++ b/build.xml
@@ -154,7 +154,7 @@
<condition property="isWindows">
<os family="windows"/>
</condition>
-
+
<target name="setTezEnv">
<propertyreset name="test.timeout" value="900000" />
<propertyreset name="hadoopversion" value="2" />
@@ -241,6 +241,7 @@
</if>
<property name="hbaseversion" value="1" />
<property name="sparkversion" value="1" />
+ <property name="hiveversion" value="1" />
<condition property="src.exclude.dir" value="**/Spark2*.java" else="**/Spark1*.java">
<equals arg1="${sparkversion}" arg2="1"/>
@@ -248,6 +249,7 @@
<property name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
<property name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
+ <property name="src.hive.shims.dir" value="${basedir}/shims/src/hive${hiveversion}" />
<property name="asfrepo" value="https://repository.apache.org"/>
<property name="asfsnapshotrepo" value="${asfrepo}/content/repositories/snapshots"/>
@@ -353,6 +355,7 @@
<source path="${test.e2e.dir}/udfs/java"/>
<source path="${src.shims.dir}"/>
<source path="${src.shims.test.dir}"/>
+ <source path="${src.hive.shims.dir}"/>
<source path="tutorial/src"/>
<source path="${test.src.dir}" excluding="e2e/pig/udfs/java/|resources/|perf/"/>
<output path="${build.dir.eclipse-main-classes}" />
@@ -568,8 +571,8 @@
<echo>*** Building Main Sources ***</echo>
<echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo>
<echo>*** Else, you will only be warned about deprecations ***</echo>
- <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used: ${hbaseversion} ; Spark version used: ${sparkversion} ***</echo>
- <compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/bzip2;${src.shims.dir}"
+ <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used: ${hbaseversion} ; Spark version used: ${sparkversion} ; Hive version used: ${hiveversion} ***</echo>
+ <compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/bzip2;${src.shims.dir};${src.hive.shims.dir}"
excludes="${src.exclude.dir}" dist="${build.classes}" cp="classpath" warnings="${javac.args.warnings}" />
<copy todir="${build.classes}/META-INF">
<fileset dir="${src.dir}/META-INF" includes="**"/>
@@ -734,6 +737,7 @@
<fileset dir="${ivy.lib.dir}" includes="metrics-core-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="hbase-*.jar" excludes="hbase-*tests.jar,hbase-*hadoop2*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="hive-*.jar" excludes="hive-shims-0.*.jar, hive-contrib*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="minlog-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="protobuf-java-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="zookeeper-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="accumulo-*.jar" excludes="accumulo-minicluster*.jar"/>
@@ -1161,6 +1165,10 @@
<fileset dir="${basedir}/shims" />
</copy>
+ <copy todir="${tar.dist.dir}/hive-shims" includeEmptyDirs="true">
+ <fileset dir="${basedir}/hive-shims" />
+ </copy>
+
<copy todir="${tar.dist.dir}/lib-src" includeEmptyDirs="true">
<fileset dir="${src.lib.dir}" />
</copy>
@@ -1236,6 +1244,7 @@
<include name="lib-src/**"/>
<include name="license/**"/>
<include name="shims/**"/>
+ <include name="hive-shims/**"/>
<include name="src/**"/>
<include name="test/**"/>
<exclude name="test/**/*.jar"/>
@@ -1723,7 +1732,7 @@
<target name="ivy-resolve" depends="ivy-init" unless="ivy.resolved" description="Resolve Ivy dependencies">
<property name="ivy.resolved" value="true"/>
- <echo>*** Ivy resolve with Hadoop ${hadoopversion}, Spark ${sparkversion} and HBase ${hbaseversion} ***</echo>
+ <echo>*** Ivy resolve with Hadoop ${hadoopversion}, Spark ${sparkversion}, HBase ${hbaseversion}, Hive ${hiveversion} ***</echo>
<ivy:resolve log="${loglevel}" settingsRef="${ant.project.name}.ivy.settings" conf="compile"/>
<ivy:report toDir="build/ivy/report"/>
</target>
diff --git a/ivy.xml b/ivy.xml
index 37ba5d4..800d21e 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -31,7 +31,7 @@
<conf name="default" extends="master,runtime"/>
<conf name="runtime" extends="compile,test" description="runtime but not the artifact" />
<!--Private configurations. -->
- <conf name="compile" extends="hadoop${hadoopversion},hbase${hbaseversion}" visibility="private" description="compile artifacts"/>
+ <conf name="compile" extends="hadoop${hadoopversion},hbase${hbaseversion},hive${hiveversion}" visibility="private" description="compile artifacts"/>
<conf name="test" extends="compile" visibility="private"/>
<conf name="javadoc" visibility="private" extends="compile,test"/>
<conf name="releaseaudit" visibility="private"/>
@@ -43,6 +43,8 @@
<conf name="hbase2" visibility="private"/>
<conf name="spark1" visibility="private" />
<conf name="spark2" visibility="private" />
+ <conf name="hive1" visibility="private"/>
+ <conf name="hive3" visibility="private"/>
<conf name="owasp" visibility="private" description="Artifacts required for owasp target"/>
</configurations>
<publications>
@@ -525,23 +527,48 @@
<!-- for piggybank -->
<dependency org="org.hsqldb" name="hsqldb" rev="${hsqldb.version}"
conf="test->default" />
- <dependency org="org.apache.hive" name="hive-exec" rev="${hive.version}" conf="compile->master" changing="true">
+
+ <!-- Hive 1 -->
+ <dependency org="org.apache.hive" name="hive-exec" rev="${hive1.version}" conf="hive1->master" changing="true">
+ <artifact name="hive-exec" m:classifier="core" />
+ </dependency>
+ <dependency org="org.apache.hive" name="hive-serde" rev="${hive1.version}" changing="true"
+ conf="hive1->master" />
+ <dependency org="org.apache.hive" name="hive-common" rev="${hive1.version}" changing="true"
+ conf="hive1->master" />
+ <dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive1.version}" changing="true"
+ conf="hive1->master" />
+ <dependency org="org.apache.hive" name="hive-contrib" rev="${hive1.version}" changing="true"
+ conf="test->master" />
+ <dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive1.version}" changing="true"
+ conf="hive1->master" />
+
+ <!-- Hive 3 -->
+ <dependency org="org.apache.hive" name="hive-exec" rev="${hive.version}" conf="hive3->master" changing="true">
<artifact name="hive-exec" m:classifier="core" />
</dependency>
<dependency org="org.apache.hive" name="hive-serde" rev="${hive.version}" changing="true"
- conf="compile->master" />
+ conf="hive3->master" />
<dependency org="org.apache.hive" name="hive-common" rev="${hive.version}" changing="true"
- conf="compile->master" />
+ conf="hive3->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive.version}" changing="true"
- conf="compile->master" />
+ conf="hive3->master" />
<dependency org="org.apache.hive" name="hive-contrib" rev="${hive.version}" changing="true"
conf="test->master" />
+ <dependency org="org.apache.hive" name="hive-llap-common" rev="${hive.version}" changing="true"
+ conf="hive3->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive.version}" changing="true"
- conf="hadoop2->master" />
+ conf="hive3->master" />
+
+
+ <dependency org="org.apache.orc" name="orc-core" rev="${orc.version}" changing="true" conf="hive3->default" />
+ <dependency org="org.apache.hive" name="hive-storage-api" rev="${hive-storage-api.version}" changing="true" conf="hive3->master" />
<dependency org="org.iq80.snappy" name="snappy" rev="${snappy.version}"
conf="test->master" />
- <dependency org="com.esotericsoftware.kryo" name="kryo" rev="${kryo.version}"
- conf="compile->master" />
+ <dependency org="com.esotericsoftware" name="kryo-shaded" rev="${kryo.version}"
+ conf="hive3->default" />
+ <dependency org="com.esotericsoftware.kryo" name="kryo" rev="2.22"
+ conf="hive1->default" />
<dependency org="org.apache.commons" name="commons-lang3" rev="${commons-lang3.version}"
conf="compile->master" />
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 9d099e7..dd33107 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -41,7 +41,10 @@
hbase1.version=1.2.4
hbase2.version=2.0.0
hsqldb.version=2.4.0
-hive.version=1.2.1
+hive1.version=1.2.1
+hive.version=3.1.2
+hive-storage-api.version=2.7.0
+orc.version=1.5.6
httpcomponents.version=4.4
jackson.version=1.9.13
jackson-pig-3039-test.version=1.9.9
@@ -59,7 +62,7 @@
junit.version=4.11
jruby.version=1.7.26
jython.version=2.7.1
-kryo.version=2.22
+kryo.version=3.0.3
rhino.version=1.7R2
antlr.version=3.4
stringtemplate.version=4.0.4
diff --git a/shims/src/hive1/org/apache/pig/hive/HiveShims.java b/shims/src/hive1/org/apache/pig/hive/HiveShims.java
new file mode 100644
index 0000000..b606821
--- /dev/null
+++ b/shims/src/hive1/org/apache/pig/hive/HiveShims.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.hive;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+
+import org.joda.time.DateTime;
+
+public class HiveShims {
+ public static String normalizeOrcVersionName(String version) {
+ return Version.byName(version).getName();
+ }
+
+ public static void addLessThanOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object value) {
+ builder.lessThan(columnName, value);
+ }
+
+ public static void addLessThanEqualsOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object value) {
+ builder.lessThanEquals(columnName, value);
+ }
+
+ public static void addEqualsOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object value) {
+ builder.equals(columnName, value);
+ }
+
+ public static void addBetweenOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object low, Object high) {
+ builder.between(columnName, low, high);
+ }
+
+ public static void addIsNullOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType) {
+ builder.isNull(columnName);
+ }
+
+ public static Class[] getOrcDependentClasses(Class hadoopVersionShimsClass) {
+ return new Class[]{OrcFile.class, HiveConf.class, AbstractSerDe.class,
+ org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+ Input.class};
+ }
+
+ public static Class[] getHiveUDFDependentClasses(Class hadoopVersionShimsClass) {
+ return new Class[]{GenericUDF.class,
+ PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+ hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class};
+ }
+
+ public static Object getSearchArgObjValue(Object value) {
+ if (value instanceof BigInteger) {
+ return new BigDecimal((BigInteger) value);
+ } else if (value instanceof DateTime) {
+ return new Timestamp(((DateTime) value).getMillis());
+ } else {
+ return value;
+ }
+ }
+
+ public static void setOrcConfigOnJob(Job job, Long stripeSize, Integer rowIndexStride, Integer bufferSize, Boolean blockPadding, CompressionKind compress, String versionName) {
+ if (stripeSize != null) {
+ job.getConfiguration().setLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, stripeSize);
+ }
+ if (rowIndexStride != null) {
+ job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.varname, rowIndexStride);
+ }
+ if (bufferSize != null) {
+ job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname, bufferSize);
+ }
+ if (blockPadding != null) {
+ job.getConfiguration().setBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING.varname, blockPadding);
+ }
+ if (compress != null) {
+ job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, compress.toString());
+ }
+ if (versionName != null) {
+ job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname, versionName);
+ }
+ }
+
+ public static class PigJodaTimeStampObjectInspector extends
+ AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector {
+
+ public PigJodaTimeStampObjectInspector() {
+ super(TypeInfoFactory.timestampTypeInfo);
+ }
+
+ @Override
+ public TimestampWritable getPrimitiveWritableObject(Object o) {
+ return o == null ? null : new TimestampWritable(new Timestamp(((DateTime) o).getMillis()));
+ }
+
+ @Override
+ public Timestamp getPrimitiveJavaObject(Object o) {
+ return o == null ? null : new Timestamp(((DateTime) o).getMillis());
+ }
+ }
+
+ public static GenericUDAFParameterInfo newSimpleGenericUDAFParameterInfo(ObjectInspector[] arguments,
+ boolean distinct, boolean allColumns) {
+ return new SimpleGenericUDAFParameterInfo(arguments, distinct, allColumns);
+ }
+
+ public static class TimestampShim {
+
+ public static Timestamp cast(Object ts) {
+ return (Timestamp) ts;
+ }
+
+ public static long millisFromTimestamp(Object ts) {
+ return cast(ts).getTime();
+ }
+ }
+
+ public static class TimestampWritableShim {
+
+ public static boolean isAssignableFrom(Object object) {
+ return object instanceof TimestampWritable;
+ }
+
+ public static TimestampWritable cast(Object ts) {
+ return (TimestampWritable) ts;
+ }
+
+ public static long millisFromTimestampWritable(Object ts) {
+ return cast(ts).getTimestamp().getTime();
+ }
+ }
+}
\ No newline at end of file
diff --git a/shims/src/hive3/org/apache/pig/hive/HiveShims.java b/shims/src/hive3/org/apache/pig/hive/HiveShims.java
new file mode 100644
index 0000000..32456c5
--- /dev/null
+++ b/shims/src/hive3/org/apache/pig/hive/HiveShims.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.hive;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile.Version;
+
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+
+import org.joda.time.DateTime;
+
+
+public class HiveShims {
+ public static String normalizeOrcVersionName(String version) {
+ return Version.byName(version).getName();
+ }
+
+ public static void addLessThanOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object value) {
+ builder.lessThan(columnName, columnType, value);
+ }
+
+ public static void addLessThanEqualsOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object value) {
+ builder.lessThanEquals(columnName, columnType, value);
+ }
+
+ public static void addEqualsOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object value) {
+ builder.equals(columnName, columnType, value);
+ }
+
+ public static void addBetweenOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType, Object low, Object high) {
+ builder.between(columnName, columnType, low, high);
+ }
+
+ public static void addIsNullOpToBuilder(SearchArgument.Builder builder,
+ String columnName, PredicateLeaf.Type columnType) {
+ builder.isNull(columnName, columnType);
+ }
+
+ public static Class[] getOrcDependentClasses(Class hadoopVersionShimsClass) {
+ return new Class[]{OrcFile.class, HiveConf.class, AbstractSerDe.class,
+ org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, DateWritable.class,
+ hadoopVersionShimsClass, Input.class, org.apache.orc.OrcFile.class,
+ com.esotericsoftware.minlog.Log.class};
+ }
+
+ public static Class[] getHiveUDFDependentClasses(Class hadoopVersionShimsClass) {
+ return new Class[]{GenericUDF.class,
+ PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+ hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class, HiveDecimalWritable.class};
+ }
+
+ public static Object getSearchArgObjValue(Object value) {
+ if (value instanceof Integer) {
+ return new Long((Integer) value);
+ } else if (value instanceof Float) {
+ return new Double((Float) value);
+ } else if (value instanceof BigInteger) {
+ return new HiveDecimalWritable(HiveDecimal.create((BigInteger) value));
+ } else if (value instanceof BigDecimal) {
+ return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) value));
+ } else if (value instanceof DateTime) {
+ return new java.sql.Date(((DateTime) value).getMillis());
+ } else {
+ return value;
+ }
+ }
+
+ public static void setOrcConfigOnJob(Job job, Long stripeSize, Integer rowIndexStride, Integer bufferSize,
+ Boolean blockPadding, CompressionKind compress, String versionName) {
+ if (stripeSize != null) {
+ job.getConfiguration().setLong(OrcConf.STRIPE_SIZE.getAttribute(), stripeSize);
+ }
+ if (rowIndexStride != null) {
+ job.getConfiguration().setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), rowIndexStride);
+ }
+ if (bufferSize != null) {
+ job.getConfiguration().setInt(OrcConf.BUFFER_SIZE.getAttribute(), bufferSize);
+ }
+ if (blockPadding != null) {
+ job.getConfiguration().setBoolean(OrcConf.BLOCK_PADDING.getAttribute(), blockPadding);
+ }
+ if (compress != null) {
+ job.getConfiguration().set(OrcConf.COMPRESS.getAttribute(), compress.toString());
+ }
+ if (versionName != null) {
+ job.getConfiguration().set(OrcConf.WRITE_FORMAT.getAttribute(), versionName);
+ }
+ }
+
+ public static class PigJodaTimeStampObjectInspector extends
+ AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector {
+
+ public PigJodaTimeStampObjectInspector() {
+ super(TypeInfoFactory.timestampTypeInfo);
+ }
+
+ private static Timestamp getHiveTimeStampFromDateTime(Object o) {
+ if (o == null) {
+ return null;
+ }
+ Timestamp ts = new Timestamp();
+ ts.setTimeInMillis(((DateTime) o).getMillis());
+ return ts;
+ }
+
+ @Override
+ public TimestampWritableV2 getPrimitiveWritableObject(Object o) {
+ return o == null ? null : new TimestampWritableV2(getHiveTimeStampFromDateTime(o));
+ }
+
+ @Override
+ public Timestamp getPrimitiveJavaObject(Object o) {
+ return o == null ? null : new Timestamp(getHiveTimeStampFromDateTime(o));
+ }
+ }
+
+ public static GenericUDAFParameterInfo newSimpleGenericUDAFParameterInfo(ObjectInspector[] arguments,
+ boolean distinct, boolean allColumns) {
+ return new SimpleGenericUDAFParameterInfo(arguments, false, distinct, allColumns);
+ }
+
+ public static class TimestampShim {
+
+ public static Timestamp cast(Object ts) {
+ return (Timestamp) ts;
+ }
+
+ public static long millisFromTimestamp(Object ts) {
+ return cast(ts).toEpochMilli();
+ }
+ }
+
+ public static class TimestampWritableShim {
+
+ public static boolean isAssignableFrom(Object object) {
+ return object instanceof TimestampWritableV2;
+ }
+
+ public static TimestampWritableV2 cast(Object ts) {
+ return (TimestampWritableV2) ts;
+ }
+
+ public static long millisFromTimestampWritable(Object ts) {
+ return cast(ts).getTimestamp().toEpochMilli();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/org/apache/pig/Expression.java b/src/org/apache/pig/Expression.java
index a843c16..2b6ddca 100644
--- a/src/org/apache/pig/Expression.java
+++ b/src/org/apache/pig/Expression.java
@@ -79,6 +79,8 @@
protected OpType opType;
+ protected byte dataType;
+
/**
* @return the opType
*/
@@ -86,6 +88,10 @@
return opType;
}
+ public byte getDataType() {
+ return dataType;
+ }
+
//TODO: Apply a optimizer to Expression from PredicatePushdownOptimizer and
// convert OR clauses to BETWEEN OR IN
public static class BetweenExpression extends Expression {
@@ -221,9 +227,10 @@
/**
* @param name
*/
- public Column(String name) {
+ public Column(String name, byte dataType) {
this.opType = OpType.TERM_COL;
this.name = name;
+ this.dataType = dataType;
}
@Override
diff --git a/src/org/apache/pig/builtin/HiveUDAF.java b/src/org/apache/pig/builtin/HiveUDAF.java
index b86159d..789600c 100644
--- a/src/org/apache/pig/builtin/HiveUDAF.java
+++ b/src/org/apache/pig/builtin/HiveUDAF.java
@@ -42,6 +42,7 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.hive.HiveShims;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.hive.HiveUtils;
@@ -105,8 +106,7 @@
if (udaf instanceof GenericUDAFResolver2) {
GenericUDAFParameterInfo paramInfo =
- new SimpleGenericUDAFParameterInfo(
- arguments, false, false);
+ HiveShims.newSimpleGenericUDAFParameterInfo(arguments, false, false);
evaluator = ((GenericUDAFResolver2)udaf).getEvaluator(paramInfo);
} else {
TypeInfo[] params = ((StructTypeInfo)inputTypeInfo)
diff --git a/src/org/apache/pig/builtin/HiveUDFBase.java b/src/org/apache/pig/builtin/HiveUDFBase.java
index ebfbbe1..8e8aa32 100644
--- a/src/org/apache/pig/builtin/HiveUDFBase.java
+++ b/src/org/apache/pig/builtin/HiveUDFBase.java
@@ -49,6 +49,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.hive.HiveShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
@@ -181,9 +182,7 @@
@Override
public List<String> getShipFiles() {
- List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
- PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
- Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
+ List<String> files = FuncUtils.getShipFiles(HiveShims.getHiveUDFDependentClasses(Hadoop23Shims.class));
return files;
}
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index 5f89706..9e4de7f 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -17,10 +17,10 @@
*/
package org.apache.pig.builtin;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -33,13 +33,13 @@
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
@@ -47,18 +47,16 @@
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.Hadoop23Shims;
-import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -68,6 +66,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.Expression;
import org.apache.pig.Expression.BetweenExpression;
+import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.Expression.Column;
import org.apache.pig.Expression.Const;
import org.apache.pig.Expression.InExpression;
@@ -80,26 +79,28 @@
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.hive.HiveShims;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.hive.HiveUtils;
-import org.joda.time.DateTime;
-import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
+import org.joda.time.DateTime;
+
/**
* A load function and store function for ORC file.
* An optional constructor argument is provided that allows one to customize
@@ -133,7 +134,7 @@
private Integer bufferSize;
private Boolean blockPadding;
private CompressionKind compress;
- private Version version;
+ private String versionName;
private static final Options validOptions;
private final CommandLineParser parser = new GnuParser();
@@ -182,7 +183,7 @@
compress = CompressionKind.valueOf(configuredOptions.getOptionValue('c'));
}
if (configuredOptions.hasOption('v')) {
- version = Version.byName(configuredOptions.getOptionValue('v'));
+ versionName = HiveShims.normalizeOrcVersionName(configuredOptions.getOptionValue('v'));
}
} catch (ParseException e) {
log.error("Exception in OrcStorage", e);
@@ -207,30 +208,8 @@
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if (!UDFContext.getUDFContext().isFrontend()) {
- if (stripeSize!=null) {
- job.getConfiguration().setLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname,
- stripeSize);
- }
- if (rowIndexStride!=null) {
- job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.varname,
- rowIndexStride);
- }
- if (bufferSize!=null) {
- job.getConfiguration().setInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname,
- bufferSize);
- }
- if (blockPadding!=null) {
- job.getConfiguration().setBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING.varname,
- blockPadding);
- }
- if (compress!=null) {
- job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname,
- compress.toString());
- }
- if (version!=null) {
- job.getConfiguration().set(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname,
- version.getName());
- }
+ HiveShims.setOrcConfigOnJob(job, stripeSize, rowIndexStride, bufferSize, blockPadding, compress,
+ versionName);
}
FileOutputFormat.setOutputPath(job, new Path(location));
if (typeInfo==null) {
@@ -396,9 +375,7 @@
@Override
public List<String> getShipFiles() {
- Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
- org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
- Input.class};
+ Class[] classList = HiveShims.getOrcDependentClasses(Hadoop23Shims.class);
return FuncUtils.getShipFiles(classList);
}
@@ -582,7 +559,11 @@
log.info("Pushdown predicate SearchArgument is:\n" + sArg);
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
try {
- p.setProperty(signature + SearchArgsSuffix, sArg.toKryo());
+ Kryo kryo = new Kryo();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Output output = new Output(baos);
+ kryo.writeObject(output, sArg);
+ p.setProperty(signature + SearchArgsSuffix, new String(Base64.encodeBase64(output.toBytes())));
} catch (Exception e) {
throw new IOException("Cannot serialize SearchArgument: " + sArg);
}
@@ -625,35 +606,43 @@
builder.end();
break;
case OP_EQ:
- builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+ HiveShims.addEqualsOpToBuilder(builder, getColumnName(lhs),
+ getColumnType(lhs), getExpressionValue(rhs));
break;
case OP_NE:
builder.startNot();
- builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+ HiveShims.addEqualsOpToBuilder(builder, getColumnName(lhs),
+ getColumnType(lhs), getExpressionValue(rhs));
builder.end();
break;
case OP_LT:
- builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+ HiveShims.addLessThanOpToBuilder(builder, getColumnName(lhs),
+ getColumnType(lhs), getExpressionValue(rhs));
break;
case OP_LE:
- builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+ HiveShims.addLessThanEqualsOpToBuilder(builder, getColumnName(lhs),
+ getColumnType(lhs), getExpressionValue(rhs));
break;
case OP_GT:
builder.startNot();
- builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+ HiveShims.addLessThanEqualsOpToBuilder(builder, getColumnName(lhs),
+ getColumnType(lhs), getExpressionValue(rhs));
builder.end();
break;
case OP_GE:
builder.startNot();
- builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+ HiveShims.addLessThanOpToBuilder(builder, getColumnName(lhs),
+ getColumnType(lhs), getExpressionValue(rhs));
builder.end();
break;
case OP_BETWEEN:
BetweenExpression between = (BetweenExpression) rhs;
- builder.between(getColumnName(lhs), getSearchArgObjValue(between.getLower()), getSearchArgObjValue(between.getUpper()));
+ HiveShims.addBetweenOpToBuilder(builder, getColumnName(lhs),
+ getColumnType(lhs), HiveShims.getSearchArgObjValue(between.getLower()),
+ HiveShims.getSearchArgObjValue(between.getUpper()));
case OP_IN:
InExpression in = (InExpression) rhs;
- builder.in(getColumnName(lhs), getSearchArgObjValues(in.getValues()).toArray());
+ builder.in(getColumnName(lhs), getColumnType(lhs), getSearchArgObjValues(in.getValues()).toArray());
default:
throw new RuntimeException("Unsupported binary expression type: " + expr.getOpType() + " in " + expr);
}
@@ -661,7 +650,8 @@
Expression unaryExpr = ((UnaryExpression) expr).getExpression();
switch (expr.getOpType()) {
case OP_NULL:
- builder.isNull(getColumnName(unaryExpr));
+ HiveShims.addIsNullOpToBuilder(builder, getColumnName(unaryExpr),
+ getColumnType(unaryExpr));
break;
case OP_NOT:
builder.startNot();
@@ -686,12 +676,21 @@
}
}
+ private PredicateLeaf.Type getColumnType(Expression expr) {
+ try {
+ return HiveUtils.getDataTypeForSearchArgs(expr.getDataType());
+ } catch (ClassCastException e) {
+ throw new RuntimeException("Expected a Column but found " + expr.getClass().getName() +
+ " in expression " + expr, e);
+ }
+ }
+
private Object getExpressionValue(Expression expr) {
switch(expr.getOpType()) {
case TERM_COL:
return ((Column) expr).getName();
case TERM_CONST:
- return getSearchArgObjValue(((Const) expr).getValue());
+ return HiveShims.getSearchArgObjValue(((Const) expr).getValue());
default:
throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
}
@@ -703,21 +702,8 @@
}
List<Object> newValues = new ArrayList<Object>(values.size());
for (Object value : values) {
- newValues.add(getSearchArgObjValue(value));
+ newValues.add(HiveShims.getSearchArgObjValue(value));
}
return values;
}
-
- private Object getSearchArgObjValue(Object value) {
- if (value instanceof BigInteger) {
- return new BigDecimal((BigInteger)value);
- } else if (value instanceof BigDecimal) {
- return value;
- } else if (value instanceof DateTime) {
- return new Timestamp(((DateTime)value).getMillis());
- } else {
- return value;
- }
- }
-
}
diff --git a/src/org/apache/pig/impl/util/hive/HiveUtils.java b/src/org/apache/pig/impl/util/hive/HiveUtils.java
index 3691cd0..7ff75e4 100644
--- a/src/org/apache/pig/impl/util/hive/HiveUtils.java
+++ b/src/org/apache/pig/impl/util/hive/HiveUtils.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
@@ -69,6 +70,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.hive.HiveShims;
import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.joda.time.DateTime;
@@ -179,8 +181,7 @@
result = new DataByteArray(b, 0, b.length);
break;
case TIMESTAMP:
- java.sql.Timestamp origTimeStamp = (java.sql.Timestamp)poi.getPrimitiveJavaObject(obj);
- result = new DateTime(origTimeStamp.getTime());
+ result = new DateTime(HiveShims.TimestampShim.millisFromTimestamp(poi.getPrimitiveJavaObject(obj)));
break;
case DATE:
java.sql.Date origDate = (java.sql.Date)poi.getPrimitiveJavaObject(obj);
@@ -674,24 +675,6 @@
}
- static class PigJodaTimeStampObjectInspector extends
- AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector {
-
- protected PigJodaTimeStampObjectInspector() {
- super(TypeInfoFactory.timestampTypeInfo);
- }
-
- @Override
- public TimestampWritable getPrimitiveWritableObject(Object o) {
- return o == null ? null : new TimestampWritable(new Timestamp(((DateTime)o).getMillis()));
- }
-
- @Override
- public Timestamp getPrimitiveJavaObject(Object o) {
- return o == null ? null : new Timestamp(((DateTime)o).getMillis());
- }
- }
-
static class PigDecimalObjectInspector extends
AbstractPrimitiveJavaObjectInspector implements HiveDecimalObjectInspector {
@@ -735,7 +718,7 @@
case STRING:
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
case TIMESTAMP:
- return new PigJodaTimeStampObjectInspector();
+ return new HiveShims.PigJodaTimeStampObjectInspector();
case DECIMAL:
return new PigDecimalObjectInspector();
case BINARY:
@@ -781,4 +764,28 @@
throw new IllegalArgumentException("Not implemented " + obj.getClass().getName());
}
}
+
+ public static PredicateLeaf.Type getDataTypeForSearchArgs(byte dataType) {
+ switch (dataType) {
+ case DataType.INTEGER:
+ return PredicateLeaf.Type.LONG;
+ case DataType.LONG:
+ return PredicateLeaf.Type.LONG;
+ case DataType.DOUBLE:
+ return PredicateLeaf.Type.FLOAT;
+ case DataType.FLOAT:
+ return PredicateLeaf.Type.FLOAT;
+ case DataType.CHARARRAY:
+ return PredicateLeaf.Type.STRING;
+ case DataType.DATETIME:
+ return PredicateLeaf.Type.DATE;
+ case DataType.BIGINTEGER:
+ case DataType.BIGDECIMAL:
+ return PredicateLeaf.Type.DECIMAL;
+ case DataType.BOOLEAN:
+ return PredicateLeaf.Type.BOOLEAN;
+ default:
+ throw new RuntimeException("Unsupported data type:" + DataType.findTypeName(dataType));
+ }
+ }
}
diff --git a/src/org/apache/pig/newplan/FilterExtractor.java b/src/org/apache/pig/newplan/FilterExtractor.java
index fd748bd..8788cdb 100644
--- a/src/org/apache/pig/newplan/FilterExtractor.java
+++ b/src/org/apache/pig/newplan/FilterExtractor.java
@@ -356,7 +356,7 @@
} else if (op instanceof ProjectExpression) {
ProjectExpression projExpr = (ProjectExpression)op;
String fieldName = projExpr.getFieldSchema().alias;
- return new Expression.Column(fieldName);
+ return new Expression.Column(fieldName, projExpr.getType());
} else if(op instanceof BinaryExpression) {
BinaryExpression binOp = (BinaryExpression)op;
if(binOp instanceof AddExpression) {
diff --git a/test/org/apache/pig/builtin/TestOrcStorage.java b/test/org/apache/pig/builtin/TestOrcStorage.java
index ddff101..6e8b4dc 100644
--- a/test/org/apache/pig/builtin/TestOrcStorage.java
+++ b/test/org/apache/pig/builtin/TestOrcStorage.java
@@ -17,28 +17,19 @@
*/
package org.apache.pig.builtin;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.apache.pig.builtin.mock.Storage.resetData;
-import static org.apache.pig.builtin.mock.Storage.tuple;
-
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.TimeZone;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -48,7 +39,6 @@
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BooleanWritable;
@@ -65,8 +55,10 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.hive.HiveShims;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.test.Util;
+
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
@@ -74,6 +66,12 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class TestOrcStorage {
final protected static Log LOG = LogFactory.getLog(TestOrcStorage.class);
@@ -89,6 +87,11 @@
private static PigServer pigServer = null;
private static FileSystem fs;
+ static {
+ System.setProperty("user.timezone", "UTC");
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ }
+
@BeforeClass
public static void oneTimeSetup(){
if(Util.WINDOWS){
@@ -282,7 +285,7 @@
Tuple t = iter.next();
assertTrue(t.toString().startsWith("(false,1,1024,65536,9223372036854775807,1.0,-15.0," +
",hi,({(1,bye),(2,sigh)}),{(3,good),(4,bad)},[],"));
- assertTrue(t.get(12).toString().matches("2000-03-12T15:00:00.000.*"));
+ assertTrue(t.get(12).toString().matches("2000-03-12T15:00:00\\.000Z.*"));
assertTrue(t.toString().endsWith(",12345678.6547456)"));
}
@@ -406,9 +409,9 @@
} else if (expected instanceof BooleanWritable) {
assertEquals(Boolean.class, actual.getClass());
assertEquals(((BooleanWritable) expected).get(), actual);
- } else if (expected instanceof TimestampWritable) {
+ } else if (HiveShims.TimestampWritableShim.isAssignableFrom(expected)) {
assertEquals(DateTime.class, actual.getClass());
- assertEquals(((TimestampWritable) expected).getTimestamp().getTime(),
+ assertEquals(HiveShims.TimestampWritableShim.millisFromTimestampWritable(expected),
((DateTime) actual).getMillis());
} else if (expected instanceof BytesWritable) {
assertEquals(DataByteArray.class, actual.getClass());
diff --git a/test/org/apache/pig/builtin/TestOrcStoragePushdown.java b/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
index 461d671..de62c88 100644
--- a/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
+++ b/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
@@ -17,9 +17,6 @@
*/
package org.apache.pig.builtin;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
@@ -61,6 +58,9 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.*;
+
+
public class TestOrcStoragePushdown {
private static List<OpType> supportedOpTypes;
@@ -221,8 +221,8 @@
String q = query + "b = filter a by srcid == 10;" + "store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
SearchArgument sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
- "expr = leaf-0", sarg.toString());
+ assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+ "expr = leaf-0");
}
@Test
@@ -230,11 +230,11 @@
String q = query + "b = filter a by (srcid > 10 or dstid <= 5) and name == 'foo' and mrkt is null;" + "store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid", "dstid", "name", "mrkt"));
SearchArgument sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (LESS_THAN_EQUALS srcid 10)\n" +
- "leaf-1 = (LESS_THAN_EQUALS dstid 5)\n" +
- "leaf-2 = (EQUALS name foo)\n" +
- "leaf-3 = (IS_NULL mrkt)\n" +
- "expr = (and (or (not leaf-0) leaf-1) leaf-2 leaf-3)", sarg.toString());
+ assertEqualsSarg(sarg, "leaf-0 = (LESS_THAN_EQUALS srcid 10)",
+ "leaf-1 = (LESS_THAN_EQUALS dstid 5)",
+ "leaf-2 = (EQUALS name foo)",
+ "leaf-3 = (IS_NULL mrkt)",
+ "expr = (and (or (not leaf-0) leaf-1) leaf-2 leaf-3)");
}
@Test
@@ -242,9 +242,9 @@
String q = query + "b = filter a by srcid != 10 and mrkt is not null;" + "store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid", "dstid", "name", "mrkt"));
SearchArgument sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
- "leaf-1 = (IS_NULL mrkt)\n" +
- "expr = (and (not leaf-0) (not leaf-1))", sarg.toString());
+ assertEqualsSarg(sarg,"leaf-0 = (EQUALS srcid 10)",
+ "leaf-1 = (IS_NULL mrkt)",
+ "expr = (and (not leaf-0) (not leaf-1))");
}
@Test
@@ -253,9 +253,9 @@
String q = query + "b = filter a by srcid > 10 or srcid < 20;" + "store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
SearchArgument sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (LESS_THAN_EQUALS srcid 10)\n" +
- "leaf-1 = (LESS_THAN srcid 20)\n" +
- "expr = (or (not leaf-0) leaf-1)", sarg.toString());
+ assertEqualsSarg(sarg, "leaf-0 = (LESS_THAN_EQUALS srcid 10)",
+ "leaf-1 = (LESS_THAN srcid 20)",
+ "expr = (or (not leaf-0) leaf-1)");
}
@Test
@@ -264,9 +264,9 @@
String q = query + "b = filter a by srcid == 10 or srcid == 11;" + "store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
SearchArgument sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
- "leaf-1 = (EQUALS srcid 11)\n" +
- "expr = (or leaf-0 leaf-1)", sarg.toString());
+ assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+ "leaf-1 = (EQUALS srcid 11)",
+ "expr = (or leaf-0 leaf-1)");
}
@Test
@@ -282,14 +282,14 @@
q = query + "b = filter a by name matches 'foo*' and srcid == 10;" + "store b into 'out';";
expr = getExpressionForTest(q, Arrays.asList("srcid", "name"));
sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
- "expr = leaf-0", sarg.toString());
+ assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+ "expr = leaf-0");
q = query + "b = filter a by srcid == 10 and name matches 'foo*';" + "store b into 'out';";
expr = getExpressionForTest(q, Arrays.asList("srcid", "name"));
sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
- "expr = leaf-0", sarg.toString());
+ assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+ "expr = leaf-0");
// OR - Nothing should be pushed
q = query + "b = filter a by name matches 'foo*' or srcid == 10;" + "store b into 'out';";
@@ -307,8 +307,8 @@
"store b into 'out';";
Expression expr = getExpressionForTest(q, Arrays.asList("srcid"));
SearchArgument sarg = orcStorage.getSearchArgument(expr);
- assertEquals("leaf-0 = (EQUALS srcid 10)\n" +
- "expr = leaf-0", sarg.toString());
+ assertEqualsSarg(sarg, "leaf-0 = (EQUALS srcid 10)",
+ "expr = leaf-0");
}
@Test
@@ -419,4 +419,17 @@
public void testPredicatePushdownVarchar() throws Exception {
testPredicatePushdown(basedir + "charvarchar.orc", "$1 == 'alice allen '", 19, 18000);
}
+
+ private static void assertEqualsSarg(SearchArgument actual, String... expected) {
+ String hive1Expected = String.join("\n", expected);
+ String hive3Expected = String.join(", ", expected);
+
+ if (hive1Expected.equals(actual.toString())) {
+ return;
+ }
+ if (hive3Expected.equals(actual.toString())) {
+ return;
+ }
+ fail(actual.toString() + "\n does not match expected SARG:\n" + hive3Expected);
+ }
}
diff --git a/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java b/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
index 793e127..45f3417 100644
--- a/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
+++ b/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
@@ -19,11 +19,12 @@
import java.util.List;
+import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.Utils;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -45,10 +46,20 @@
"store a into 'ooo';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0.23", "hive-shims-common", "kryo"};
+ String hiveVersion = HiveVersionInfo.getVersion().substring(0, 1);
+ if (hiveVersion.equals("3")) {
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "orc-core",
+ "hive-storage-api", "kryo", "minlog"
+ };
- checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+ checkPlan(pp, expectedJars, 9, pigServer.getPigContext());
+ } else {
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
+
+ checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+ }
}
@Test
@@ -57,10 +68,21 @@
"store a into 'ooo' using OrcStorage;";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0.23", "hive-shims-common", "kryo"};
+ String hiveVersion = HiveVersionInfo.getVersion().substring(0, 1);
+ if (hiveVersion.equals("3")) {
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "orc-core",
+ "hive-storage-api", "kryo", "minlog"
+ };
- checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+ checkPlan(pp, expectedJars, 9, pigServer.getPigContext());
+ } else {
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
+
+
+ checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
+ }
}
@Test