PIG-4923: Drop Hadoop 1.x support in Pig 0.17 (szita via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1777738 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/.gitignore b/.gitignore
index 4ff5f7e..4d35082 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,4 @@
conf/log4j.properties
lib/jdiff/pig_*SNAPSHOT.xml
test/resources/*.jar
+!ivy/ant-contrib-1.0b3.jar
diff --git a/BUILDING.md b/BUILDING.md
index ccee21c..a3ee000 100644
--- a/BUILDING.md
+++ b/BUILDING.md
@@ -13,18 +13,14 @@
## Building Pig
-To compile with Hadoop 1.x
-
- ant clean jar piggybank
-
To compile with Hadoop 2.x
- ant clean jar piggybank -Dhadoopversion=23
+ ant clean jar piggybank
Building and running the tests needed before submitting a patch.
For more details https://cwiki.apache.org/confluence/display/PIG/HowToContribute
- ANT_OPTS='-Djavac.args="-Xlint -Xmaxwarns 1000" -Dhadoopversion=23'
+ ANT_OPTS='-Djavac.args="-Xlint -Xmaxwarns 1000"'
ant ${ANT_OPTS} clean piggybank jar compile-test test-commit
cd contrib/piggybank/java && ant ${ANT_OPTS} test
diff --git a/CHANGES.txt b/CHANGES.txt
index f75cf34..4a4d433 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -22,6 +22,8 @@
INCOMPATIBLE CHANGES
+PIG-4923: Drop Hadoop 1.x support in Pig 0.17 (szita via rohini)
+
PIG-5067: Revisit union on numeric type and chararray to bytearray (knoguchi)
IMPROVEMENTS
diff --git a/bin/pig b/bin/pig
index 81f1426..e1212fa 100755
--- a/bin/pig
+++ b/bin/pig
@@ -301,7 +301,8 @@
if [ -z "$HADOOP_CORE_JAR" ]; then
HADOOP_VERSION=2
else
- HADOOP_VERSION=1
+ echo "Pig requires Hadoop 2 to be present in HADOOP_HOME (currently: $HADOOP_HOME). Please install Hadoop 2.x"
+ exit 1
fi
# if using HBase, likely want to include HBase jars and config
@@ -377,11 +378,7 @@
if [ -n "$PIG_JAR" ]; then
CLASSPATH=${CLASSPATH}:$PIG_JAR
else
- if [ "$HADOOP_VERSION" == "1" ]; then
- echo "Cannot locate pig-core-h${HADOOP_VERSION}.jar. do 'ant jar', and try again"
- else
- echo "Cannot locate pig-core-h${HADOOP_VERSION}.jar. do 'ant -Dhadoopversion=23 jar', and try again"
- fi
+ echo "Cannot locate pig-core-h${HADOOP_VERSION}.jar. do 'ant jar', and try again"
exit 1
fi
@@ -402,8 +399,8 @@
exec "$HADOOP_BIN" jar "$PIG_JAR" "${remaining[@]}"
fi
else
- # use hadoop-core.jar to run local mode
- PIG_JAR=`echo $PIG_HOME/pig*-core-h1.jar`
+ # use bundled hadoop to run local mode
+ PIG_JAR=`echo $PIG_HOME/pig*-core-h2.jar`
if [ -n "$PIG_JAR" ]; then
CLASSPATH="${CLASSPATH}:$PIG_JAR"
@@ -412,12 +409,12 @@
exit 1
fi
- for f in $PIG_HOME/lib/h1/*.jar; do
+ for f in $PIG_HOME/lib/h2/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
- # Add bundled hadoop-core.jar
- for f in $PIG_HOME/lib/hadoop1-runtime/*.jar; do
+ # Add bundled hadoop jars
+ for f in $PIG_HOME/lib/hadoop2-runtime/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
diff --git a/bin/pig.py b/bin/pig.py
index 5ff1d37..b6c3965 100644
--- a/bin/pig.py
+++ b/bin/pig.py
@@ -338,7 +338,7 @@
if len(hadoopCoreJars) == 0:
hadoopVersion = 2
else:
- hadoopVersion = 1
+ sys.exit("Cannot locate Hadoop 2 binaries, please install Hadoop 2.x and try again.")
if hadoopBin != "":
if debug == True:
@@ -361,10 +361,7 @@
if len(pigJars) == 1:
pigJar = pigJars[0]
else:
- if hadoopVersion == 1:
- sys.exit("Cannot locate pig-core-h1.jar do 'ant jar', and try again")
- else:
- sys.exit("Cannot locate pig-core-h2.jar do 'ant -Dhadoopversion=23 jar', and try again")
+ sys.exit("Cannot locate pig-core-h2.jar do 'ant jar', and try again")
pigLibJars = glob.glob(os.path.join(os.environ['PIG_HOME']+"/lib", "h" + str(hadoopVersion), "*.jar"))
for jar in pigLibJars:
@@ -393,13 +390,13 @@
else:
# fall back to use fat pig.jar
if debug == True:
- print "Cannot find local hadoop installation, using bundled hadoop 1"
-
- if os.path.exists(os.path.join(os.environ['PIG_HOME'], "pig-core-h1.jar")):
- pigJar = os.path.join(os.environ['PIG_HOME'], "pig-core-h1.jar")
+ print "Cannot find local hadoop installation, using bundled hadoop 2"
+
+ if os.path.exists(os.path.join(os.environ['PIG_HOME'], "pig-core-h2.jar")):
+ pigJar = os.path.join(os.environ['PIG_HOME'], "pig-core-h2.jar")
else:
- pigJars = glob.glob(os.path.join(os.environ['PIG_HOME'], "pig-*-core-h1.jar"))
+ pigJars = glob.glob(os.path.join(os.environ['PIG_HOME'], "pig-*-core-h2.jar"))
if len(pigJars) == 1:
pigJar = pigJars[0]
@@ -407,15 +404,15 @@
elif len(pigJars) > 1:
print "Ambiguity with pig jars found the following jars"
print pigJars
- sys.exit("Please remove irrelavant jars from %s" % os.path.join(os.environ['PIG_HOME'], "pig-core-h1.jar"))
+ sys.exit("Please remove irrelavant jars from %s" % os.path.join(os.environ['PIG_HOME'], "pig-core-h2.jar"))
else:
- sys.exit("Cannot locate pig-core-h1.jar. do 'ant jar' and try again")
+ sys.exit("Cannot locate pig-core-h2.jar. do 'ant jar' and try again")
- pigLibJars = glob.glob(os.path.join(os.environ['PIG_HOME']+"/lib", "h1", "*.jar"))
+ pigLibJars = glob.glob(os.path.join(os.environ['PIG_HOME']+"/lib", "h2", "*.jar"))
for jar in pigLibJars:
classpath += os.pathsep + jar
- pigLibJars = glob.glob(os.path.join(os.environ['PIG_HOME']+"/lib", "hadoop1-runtime", "*.jar"))
+ pigLibJars = glob.glob(os.path.join(os.environ['PIG_HOME']+"/lib", "hadoop2-runtime", "*.jar"))
for jar in pigLibJars:
classpath += os.pathsep + jar
@@ -423,7 +420,7 @@
pigClass = "org.apache.pig.Main"
if debug == True:
print "dry runXXX:"
- print "%s %s %s -classpath %s %s %s" % (java, javaHeapMax, pigOpts, classpath, pigClass, ' '.join(restArgs))
+ print "%s %s %s -classpath %s %s %s" % (java, javaHeapMax, pigOpts, classpath, pigClass, ' '.join(restArgs))
else:
cmdLine = java + ' ' + javaHeapMax + ' ' + pigOpts
cmdLine += ' ' + '-classpath ' + classpath + ' ' + pigClass + ' ' + ' '.join(restArgs)
diff --git a/build.xml b/build.xml
index 49f887b..df5bcdd 100644
--- a/build.xml
+++ b/build.xml
@@ -20,6 +20,13 @@
xmlns:ivy="antlib:org.apache.ivy.ant">
<!-- Load all the default properties, and any the user wants -->
<!-- to contribute (without having to type -D or edit this file -->
+
+ <taskdef resource="net/sf/antcontrib/antcontrib.properties">
+ <classpath>
+ <pathelement location="${basedir}/ivy/ant-contrib-1.0b3.jar"/>
+ </classpath>
+ </taskdef>
+
<property file="${user.home}/build.properties" />
<property file="${basedir}/build.properties" />
@@ -35,7 +42,7 @@
<property name="pig.version.suffix" value="-SNAPSHOT" />
<property name="version" value="${pig.version}${pig.version.suffix}" />
<property name="final.name" value="${name}-${version}" />
- <property name="year" value="2007-2012" />
+ <property name="year" value="2007-2016" />
<!-- source properties -->
<property name="lib.dir" value="${basedir}/lib" />
@@ -69,7 +76,6 @@
<!-- artifact jar file names -->
<property name="artifact.pig.jar" value="${final.name}.jar"/>
- <property name="artifact.pig-h1.jar" value="${final.name}-h1.jar"/>
<property name="artifact.pig-h2.jar" value="${final.name}-h2.jar"/>
<property name="artifact.pig-sources.jar" value="${final.name}-sources.jar"/>
<property name="artifact.pig-javadoc.jar" value="${final.name}-javadoc.jar"/>
@@ -77,15 +83,12 @@
<!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version -->
<property name="output.jarfile.withouthadoop" value="${build.dir}/${final.name}-withouthadoop.jar" />
- <property name="output.jarfile.withouthadoop-h1" value="${legacy.dir}/${final.name}-withouthadoop-h1.jar" />
<property name="output.jarfile.withouthadoop-h2" value="${legacy.dir}/${final.name}-withouthadoop-h2.jar" />
<property name="output.jarfile.core" value="${build.dir}/${artifact.pig.jar}" />
- <property name="output.jarfile.core-h1" value="${build.dir}/${artifact.pig-h1.jar}" />
<property name="output.jarfile.core-h2" value="${build.dir}/${artifact.pig-h2.jar}" />
<property name="output.jarfile.sources" value="${build.dir}/${artifact.pig-sources.jar}" />
<property name="output.jarfile.javadoc" value="${build.dir}/${artifact.pig-javadoc.jar}" />
<!-- Maintain old pig.jar in top level directory. -->
- <property name="output.jarfile.backcompat-core-h1" value="${basedir}/${final.name}-core-h1.jar" />
<property name="output.jarfile.backcompat-core-h2" value="${basedir}/${final.name}-core-h2.jar" />
<!-- test properties -->
@@ -104,8 +107,6 @@
<property name="test.smoke.file" value="${test.src.dir}/smoke-tests"/>
<property name="test.all.file" value="${test.src.dir}/all-tests"/>
<property name="test.exclude.file" value="${test.src.dir}/excluded-tests"/>
- <property name="test.exclude.file.20" value="${test.src.dir}/excluded-tests-20"/>
- <property name="test.exclude.file.23" value="${test.src.dir}/excluded-tests-23"/>
<property name="test.exclude.file.mr" value="${test.src.dir}/excluded-tests-mr"/>
<property name="test.exclude.file.tez" value="${test.src.dir}/excluded-tests-tez"/>
<property name="pigunit.jarfile" value="pigunit.jar" />
@@ -151,9 +152,8 @@
<target name="setTezEnv">
<propertyreset name="test.timeout" value="900000" />
- <propertyreset name="hadoopversion" value="23" />
- <propertyreset name="isHadoop23" value="true" />
- <propertyreset name="hbase.hadoop.version" value="hadoop2" />
+ <propertyreset name="hadoopversion" value="2" />
+ <propertyreset name="isHadoop2" value="true" />
<propertyreset name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
<propertyreset name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
<propertyreset name="src.exclude.dir" value="" />
@@ -201,40 +201,42 @@
<property name="loglevel" value="quiet" />
<loadproperties srcfile="${ivy.dir}/libraries.properties"/>
- <property name="hadoopversion" value="20" />
- <condition property="isHadoop23">
+ <!--
+ Hadoop master version
+ (Value 23 is translated for backward compatibility in old build scripts)
+ -->
+ <if>
<equals arg1="${hadoopversion}" arg2="23"/>
- </condition>
+ <then>
+ <echo>Property setting hadoopversion=23 is deprecated. Overwriting to hadoopversion=2</echo>
+ <var name="hadoopversion" unset="true"/>
+ <property name="hadoopversion" value="2" />
+ </then>
+ </if>
+ <property name="hadoopversion" value="2" />
- <condition property="hbase.hadoop.version" value="hadoop1" else="hadoop2">
- <not>
- <equals arg1="${hadoopversion}" arg2="23"/>
- </not>
+ <condition property="isHadoop2">
+ <equals arg1="${hadoopversion}" arg2="2"/>
</condition>
<!--
HBase master version
- Denotes how the HBase dependencies are layout. Value "94" denotes older
- format where all HBase code is present in one single jar, which is the
- way HBase is available up to version 0.94. Value "95" denotes new format
- where HBase is cut into multiple dependencies per each major subsystem,
- e.g. "client", "server", ... . Only values "94" and "95" are supported
- at the moment.
+ (Value 95 is translated for backward compatibility in old build scripts)
-->
- <property name="hbaseversion" value="95" />
-
- <!-- exclude tez code if not hadoop20 -->
- <condition property="src.exclude.dir" value="**/tez/**" else="">
- <not>
- <equals arg1="${hadoopversion}" arg2="23"/>
- </not>
- </condition>
+ <if>
+ <equals arg1="${hbaseversion}" arg2="95"/>
+ <then>
+ <echo>Property setting hbaseversion=95 is deprecated. Overwriting to hbaseversion=1</echo>
+ <var name="hbaseversion" unset="true"/>
+ <property name="hbaseversion" value="1" />
+ </then>
+ </if>
+ <property name="hbaseversion" value="1" />
<property name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
<property name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
- <property name="hadoop.jar" value="hadoop-core-${hadoop-core.version}.jar" />
<property name="asfrepo" value="https://repository.apache.org"/>
<property name="asfsnapshotrepo" value="${asfrepo}/content/repositories/snapshots"/>
<property name="mvnrepo" value="http://repo2.maven.org/maven2"/>
@@ -379,11 +381,6 @@
<include name="joda-time-${joda-time.version}.jar"/>
<include name="automaton-${automaton.version}.jar"/>
<include name="jansi-${jansi.version}.jar"/>
- <include name="jackson-mapper-asl-${jackson.version}.jar" unless="isHadoop23"/>
- <include name="jackson-core-asl-${jackson.version}.jar" unless="isHadoop23"/>
- <include name="guava-${guava.version}.jar" unless="isHadoop23"/>
- <include name="snappy-java-${snappy.version}.jar" unless="isHadoop23"/>
- <include name="asm-${asm.version}.jar" unless="isHadoop23"/>
</patternset>
</fileset>
@@ -545,6 +542,7 @@
<echo>*** Building Main Sources ***</echo>
<echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo>
<echo>*** Else, you will only be warned about deprecations ***</echo>
+ <echo>*** Hadoop version used: ${hadoopversion} ; HBase version used: ${hbaseversion} ***</echo>
<compileSources sources="${src.dir};${src.gen.dir};${src.lib.dir}/bzip2;${src.shims.dir}"
excludes="${src.exclude.dir}" dist="${build.classes}" cp="classpath" warnings="${javac.args.warnings}" />
<copy todir="${build.classes}/META-INF">
@@ -674,31 +672,14 @@
</target>
<!-- ================================================================== -->
- <!-- Facede to build pig.jar for both Hadoop 1 and Hadoop 2 -->
- <!-- ================================================================== -->
- <target name="jar-h12" description="Create pig for both Hadoop 1 and Hadoop 2">
- <propertyreset name="hadoopversion" value="20" />
- <propertyreset name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
- <antcall target="clean" inheritRefs="true" inheritall="true"/>
- <antcall target="jar" inheritRefs="true" inheritall="true"/>
- <antcall target="copyHadoop1LocalRuntimeDependencies"/>
- <delete dir="${build.dir}" />
- <propertyreset name="hadoopversion" value="23" />
- <propertyreset name="hbase.hadoop.version" value="hadoop2" />
- <propertyreset name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
- <propertyreset name="src.exclude.dir" value="" />
- <antcall target="jar" inheritRefs="true" inheritall="true"/>
- </target>
-
- <!-- ================================================================== -->
<!-- Make pig.jar -->
<!-- ================================================================== -->
<target name="jar" depends="compile,ivy-buildJar" description="Create pig core jar">
<buildJar svnString="${svn.revision}" outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/>
<buildJar svnString="${svn.revision}" outputFile="${output.jarfile.withouthadoop}" includedJars="runtime.dependencies-withouthadoop.jar"/>
<antcall target="copyCommonDependencies"/>
- <antcall target="copyh1Dependencies"/>
<antcall target="copyh2Dependencies"/>
+ <antcall target="copyHadoop2LocalRuntimeDependencies" />
</target>
<target name="copyCommonDependencies">
@@ -735,19 +716,7 @@
</copy>
</target>
- <target name="copyh1Dependencies" unless="isHadoop23">
- <mkdir dir="${lib.dir}/h1" />
- <copy todir="${lib.dir}/h1">
- <fileset dir="${ivy.lib.dir}" includes="avro-mapred-*.jar"/>
- <fileset dir="${ivy.lib.dir}" includes="hive-shims-0.*.jar"/>
- <fileset dir="${ivy.lib.dir}" includes="hbase-*hadoop1.jar"/>
- </copy>
- <copy file="${output.jarfile.core}" tofile="${output.jarfile.backcompat-core-h1}"/>
- <mkdir dir="${legacy.dir}" />
- <move file="${output.jarfile.withouthadoop}" tofile="${output.jarfile.withouthadoop-h1}"/>
- </target>
-
- <target name="copyh2Dependencies" if="isHadoop23">
+ <target name="copyh2Dependencies" if="isHadoop2">
<mkdir dir="${lib.dir}/h2" />
<copy todir="${lib.dir}/h2">
<fileset dir="${ivy.lib.dir}" includes="avro-mapred-*.jar"/>
@@ -761,18 +730,21 @@
<move file="${output.jarfile.withouthadoop}" tofile="${output.jarfile.withouthadoop-h2}"/>
</target>
- <target name="copyHadoop1LocalRuntimeDependencies">
- <mkdir dir="${lib.dir}/hadoop1-runtime" />
- <copy todir="${lib.dir}/hadoop1-runtime">
- <fileset dir="${ivy.lib.dir}" includes="hadoop-core-*.jar"/>
+ <target name="copyHadoop2LocalRuntimeDependencies">
+ <mkdir dir="${lib.dir}/hadoop2-runtime" />
+ <copy todir="${lib.dir}/hadoop2-runtime">
+ <fileset dir="${ivy.lib.dir}" includes="hadoop-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="commons-cli-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="commons-configuration-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="commons-collections-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="commons-lang-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="commons-codec-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="commons-io-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="commons-logging-*.jar"/>
- <fileset dir="${ivy.lib.dir}" includes="commons-httpclient-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="httpclient-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="httpcore-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="log4j-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="slf4j-*.jar"/>
</copy>
</target>
@@ -932,8 +904,6 @@
<patternset>
<includesfile name="@{test.file}"/>
<excludesfile name="${test.exclude.file}" if="test.exclude.file"/>
- <excludesfile name="${test.exclude.file.20}" unless="isHadoop23"/>
- <excludesfile name="${test.exclude.file.23}" if="isHadoop23"/>
<excludesfile name="${test.exclude.file.for.exectype}"/>
</patternset>
<exclude name="**/${exclude.testcase}.java" if="exclude.testcase" />
@@ -962,10 +932,10 @@
<target name="test-core-mrtez" description="run core tests on both mr and tez mode"
depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download">
- <fail message="hadoopversion must be set to 23 when invoking test-core-mrtez">
+ <fail message="hadoopversion must be set to 2 when invoking test-core-mrtez">
<condition>
<not>
- <equals arg1="${hadoopversion}" arg2="23" />
+ <equals arg1="${hadoopversion}" arg2="2" />
</not>
</condition>
</fail>
@@ -1049,10 +1019,7 @@
<!-- ================================================================== -->
<!-- Distribution -->
<!-- ================================================================== -->
- <target name="package-h12" depends="jar-h12, docs, api-report, piggybank" description="Create a Pig tar release">
- <package-base/>
- </target>
-
+
<target name="package" depends="jar, docs, api-report, piggybank" description="Create a Pig tar release">
<package-base/>
</target>
@@ -1072,7 +1039,6 @@
<fileset dir="${lib.dir}"/>
</copy>
- <copy file="${output.jarfile.backcompat-core-h1}" tofile="${tar.dist.dir}/${final.name}-core-h1.jar" failonerror="false"/>
<copy file="${output.jarfile.backcompat-core-h2}" tofile="${tar.dist.dir}/${final.name}-core-h2.jar" failonerror="false"/>
<copy todir="${tar.dist.dir}/lib" file="contrib/piggybank/java/piggybank.jar"/>
@@ -1150,10 +1116,6 @@
<tar-base/>
</target>
- <target name="tar-h12" depends="package-h12" description="Source distribution">
- <tar-base/>
- </target>
-
<macrodef name="tar-base">
<sequential>
<tar compression="gzip" longfile="gnu" destfile="${build.dir}/${artifact.pig.tar}">
@@ -1239,15 +1201,13 @@
uri="urn:maven-artifact-ant"
classpathref="mvn-ant-task.classpath"/>
</target>
- <target name="mvn-install" depends="mvn-taskdef,jar-h12, set-version, source-jar,
- javadoc-jar, pigunit-jar, smoketests-jar, piggybank"
+ <target name="mvn-install" depends="mvn-taskdef, mvn-build, set-version"
description="To install pig to local filesystem's m2 cache">
<artifact:pom file="${pig.pom}" id="pig"/>
- <artifact:install file="${output.jarfile.core-h1}">
+ <artifact:install file="${output.jarfile.core-h2}">
<pom refid="pig"/>
<attach file="${output.jarfile.sources}" classifier="sources" />
<attach file="${output.jarfile.javadoc}" classifier="javadoc" />
- <attach file="${output.jarfile.core-h2}" classifier="h2" />
</artifact:install>
<artifact:pom file="${pigunit.pom}" id="pigunit"/>
<artifact:install file="${pigunit.jarfile}">
@@ -1263,10 +1223,9 @@
</artifact:install>
</target>
- <target name="mvn-build" depends="jar-h12, source-jar,
+ <target name="mvn-build" depends="jar, source-jar,
javadoc-jar, smoketests-jar, pigunit-jar, piggybank"
description="To build the pig jar artifacts to be deployed to apache maven repository">
- <move file="${output.jarfile.backcompat-core-h1}" tofile="${output.jarfile.core}"/>
<move file="${output.jarfile.backcompat-core-h2}" tofile="${output.jarfile.core-h2}"/>
</target>
@@ -1657,7 +1616,9 @@
<target name="ivy-resolve" depends="ivy-init" unless="ivy.resolved" description="Resolve Ivy dependencies">
<property name="ivy.resolved" value="true"/>
+ <echo>*** Ivy resolve with Hadoop ${hadoopversion} and HBase ${hbaseversion} ***</echo>
<ivy:resolve log="${loglevel}" settingsRef="${ant.project.name}.ivy.settings" conf="compile"/>
+ <ivy:report toDir="build/ivy/report"/>
</target>
<target name="ivy-compile" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for compile configuration">
diff --git a/contrib/piggybank/java/build.xml b/contrib/piggybank/java/build.xml
index a647e71..1958c50 100755
--- a/contrib/piggybank/java/build.xml
+++ b/contrib/piggybank/java/build.xml
@@ -16,6 +16,13 @@
-->
<project basedir="." default="jar" name="pigudf">
+
+ <taskdef resource="net/sf/antcontrib/antcontrib.properties">
+ <classpath>
+ <pathelement location="../../../ivy/ant-contrib-1.0b3.jar"/>
+ </classpath>
+ </taskdef>
+
<property file="../../../build.properties" />
<!-- javac properties -->
<property name="javac.debug" value="on" />
@@ -38,16 +45,30 @@
<property name="src.dir" value="src/main/java/org/apache/pig/piggybank" />
<property name="hsqldb.jar" value="../../../build/ivy/lib/Pig/hsqldb-1.8.0.10.jar"/>
- <!-- JobHistoryLoader currently does not support 0.23 -->
- <condition property="build.classes.excludes" value="**/HadoopJobHistoryLoader.java" else="">
+ <!--
+ Hadoop master version
+ (Value 23 is translated for backward compatibility in old build scripts)
+ -->
+ <if>
<equals arg1="${hadoopversion}" arg2="23"/>
+ <then>
+ <echo>Property setting hadoopversion=23 is deprecated. Overwriting to hadoopversion=2</echo>
+ <var name="hadoopversion" unset="true"/>
+ <property name="hadoopversion" value="2" />
+ </then>
+ </if>
+ <property name="hadoopversion" value="2" />
+
+ <!-- JobHistoryLoader currently does not support 2 -->
+ <condition property="build.classes.excludes" value="**/HadoopJobHistoryLoader.java" else="">
+ <equals arg1="${hadoopversion}" arg2="2"/>
</condition>
<condition property="test.classes.excludes" value="**/TestHadoopJobHistoryLoader.java" else="">
- <equals arg1="${hadoopversion}" arg2="23"/>
+ <equals arg1="${hadoopversion}" arg2="2"/>
</condition>
- <condition property="hadoopsuffix" value="2" else="1">
- <equals arg1="${hadoopversion}" arg2="23"/>
+ <condition property="hadoopsuffix" value="2" else="">
+ <equals arg1="${hadoopversion}" arg2="2"/>
</condition>
<!-- jar properties -->
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java
index 47c6105..c98493f 100644
--- a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java
@@ -60,7 +60,6 @@
import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DataByteArray;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
/**
* <code>IndexedStorage</code> is a form of <code>PigStorage</code> that supports a
diff --git a/ivy.xml b/ivy.xml
index bae1a81..23d3865 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -38,10 +38,8 @@
<conf name="jdiff" visibility="private"/>
<conf name="checkstyle" visibility="private"/>
<conf name="buildJar" extends="compile,test" visibility="private"/>
- <conf name="hadoop20" visibility="private"/>
- <conf name="hadoop23" visibility="private"/>
- <conf name="hbase94" visibility="private"/>
- <conf name="hbase95" visibility="private"/>
+ <conf name="hadoop2" visibility="private"/>
+ <conf name="hbase1" visibility="private"/>
</configurations>
<publications>
<artifact name="pig" conf="master"/>
@@ -60,17 +58,17 @@
<dependency org="commons-beanutils" name="commons-beanutils-core" rev="${commons-beanutils.version}"
conf="checkstyle->master"/>
<dependency org="xmlenc" name="xmlenc" rev="${xmlenc.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="com.sun.jersey" name="jersey-bundle" rev="${jersey.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="com.sun.jersey" name="jersey-server" rev="${jersey.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="com.sun.jersey.contribs" name="jersey-guice" rev="${jersey.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="commons-codec" name="commons-codec" rev="${commons-codec.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="commons-httpclient" name="commons-httpclient" rev="${commons-httpclient.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="commons-el" name="commons-el" rev="${commons-el.version}"
conf="compile->master"/>
<dependency org="commons-io" name="commons-io" rev="${commons-io.version}"
@@ -88,92 +86,86 @@
<dependency org="nl.basjes.parse" name="parser-core" rev="${basjes-httpdlog-pigloader.version}"
conf="compile->master"/>
<dependency org="commons-configuration" name="commons-configuration" rev="${commons-configuration.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="commons-collections" name="commons-collections" rev="${commons-collections.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="javax.servlet" name="servlet-api" rev="${servlet-api.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="javax.ws.rs" name="jsr311-api" rev="${jsr311-api.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="com.google.protobuf" name="protobuf-java" rev="${protobuf-java.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="javax.inject" name="javax.inject" rev="${javax-inject.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="javax.xml.bind" name="jaxb-api" rev="${jaxb-api.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="com.sun.xml.bind" name="jaxb-impl" rev="${jaxb-impl.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="com.google.inject" name="guice" rev="${guice.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="com.google.inject.extensions" name="guice-servlet" rev="${guice-servlet.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="aopalliance" name="aopalliance" rev="${aopalliance.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.mortbay.jetty" name="jsp-2.1" rev="${jasper.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.mortbay.jetty" name="jsp-api-2.1" rev="${jasper.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="log4j" name="log4j" rev="${log4j.version}"
conf="compile->master"/>
- <dependency org="com.sun.jersey" name="jersey-core" rev="${jersey-core.version}"
- conf="hadoop20->default"/>
- <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}"
- conf="hadoop20->default"/>
- <dependency org="org.apache.hadoop" name="hadoop-test" rev="${hadoop-test.version}"
- conf="hadoop20->default"/>
- <dependency org="org.apache.hadoop" name="hadoop-annotations"
- rev="${hadoop-common.version}" conf="hadoop23->master"/>
+ <dependency org="org.apache.hadoop" name="hadoop-annotations"
+ rev="${hadoop-common.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-auth"
- rev="${hadoop-common.version}" conf="hadoop23->master"/>
+ rev="${hadoop-common.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-common"
- rev="${hadoop-common.version}" conf="hadoop23->master">
+ rev="${hadoop-common.version}" conf="hadoop2->master">
<artifact name="hadoop-common" ext="jar" />
<artifact name="hadoop-common" type="tests" ext="jar" m:classifier="tests" />
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-hdfs"
- rev="${hadoop-hdfs.version}" conf="hadoop23->master">
+ rev="${hadoop-hdfs.version}" conf="hadoop2->master">
<artifact name="hadoop-hdfs" ext="jar" />
<artifact name="hadoop-hdfs" type="tests" ext="jar" m:classifier="tests" />
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core" rev="${hadoop-mapreduce.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-jobclient" rev="${hadoop-mapreduce.version}"
- conf="hadoop23->master">
+ conf="hadoop2->master">
<artifact name="hadoop-mapreduce-client-jobclient" ext="jar" />
<artifact name="hadoop-mapreduce-client-jobclient" type="tests" ext="jar" m:classifier="tests"/>
<exclude org="commons-daemon" module="commons-daemon"/><!--bad POM-->
<exclude org="org.apache.commons" module="commons-daemon"/><!--bad POM-->
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-yarn-server-tests" rev="${hadoop-mapreduce.version}"
- conf="hadoop23->master">
+ conf="hadoop2->master">
<artifact name="hadoop-yarn-server-tests" type="jar" m:classifier="tests"/>
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-app" rev="${hadoop-mapreduce.version}"
- conf="hadoop23->master" />
+ conf="hadoop2->master" />
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-shuffle" rev="${hadoop-mapreduce.version}"
- conf="hadoop23->master" />
+ conf="hadoop2->master" />
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-common"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-api"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-common"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-server"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-server-web-proxy"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-server-common"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-server-nodemanager"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-server-resourcemanager"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-client"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-yarn-server-applicationhistoryservice"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-hs"
- rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
+ rev="${hadoop-mapreduce.version}" conf="hadoop2->master"/>
<dependency org="org.mortbay.jetty" name="jetty" rev="${jetty.version}"
conf="compile->master">
<artifact name="jetty" ext="jar" />
@@ -192,13 +184,7 @@
<exclude org="org.codehaus.jackson" module="jackson-mapper-asl"/>
</dependency>
<dependency org="org.apache.avro" name="avro-mapred" rev="${avro.version}"
- conf="hadoop20->default;checkstyle->master">
- <exclude org="org.codehaus.jackson" module="jackson-core-asl"/>
- <exclude org="org.codehaus.jackson" module="jackson-mapper-asl"/>
- <exclude org="io.netty" module="netty"/>
- </dependency>
- <dependency org="org.apache.avro" name="avro-mapred" rev="${avro.version}"
- conf="hadoop23->default;checkstyle->master">
+ conf="hadoop2->default;checkstyle->master">
<artifact name="avro-mapred" type="jar" m:classifier="hadoop2"/>
<exclude org="org.codehaus.jackson" module="jackson-core-asl"/>
<exclude org="org.codehaus.jackson" module="jackson-mapper-asl"/>
@@ -260,37 +246,14 @@
<dependency org="org.antlr" name="ST4" rev="${stringtemplate.version}" conf="compile->default"/>
<dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}" conf="compile->master"/>
<dependency org="io.netty" name="netty" rev="${netty.version}" conf="test->master"/>
+ <dependency org="io.netty" name="netty-all" rev="${netty-all.version}" conf="test->master" />
<dependency org="dk.brics.automaton" name="automaton" rev="1.11-8" conf="compile->default"/>
<dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
<dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/>
- <!-- HBase dependency in format for releases up to 0.94 (including) -->
- <dependency org="org.apache.hbase" name="hbase" rev="${hbase94.version}" conf="hbase94->master">
- <artifact name="hbase" type="jar"/>
- <artifact name="hbase" type="test-jar" ext="jar" m:classifier="tests"/>
- <exclude org="org.apache.thrift" module="thrift"/>
- <exclude org="org.apache.hadoop" module="hadoop-core"/>
- <exclude org="org.apache.ant" module="ant" />
- <exclude org="org.slf4j" module="slf4j"/>
- <exclude org="org.slf4j" module="slf4j-api"/>
- <exclude org="org.slf4j" module="slf4j-log4j12" />
- <exclude org="org.slf4j" module="log4j12"/>
- <exclude org="org.slf4j" module="log4j-over-slf4j"/>
- <exclude org="stax" module="stax-api" />
- <exclude org="javax.xml.bind" module="jaxb-api" />
- <exclude org="javax.ws.rs" module="jsr311-api" />
- <exclude org="tomcat" module="jasper-runtime"/>
- <exclude org="tomcat" module="jasper-compiler"/>
- <exclude org="com.google.protobuf" module="protobuf-java"/>
- <exclude org="com.sun.jersey" module="jersey-core"/>
- <exclude org="com.sun.jersey" module="jersey-server"/>
- <exclude org="com.sun.jersey" module="jersey-json"/>
- <exclude org="asm" module="asm"/>
- </dependency>
-
<!-- HBase dependency in format for releases higher or equal to 0.95 -->
- <dependency org="org.apache.hbase" name="hbase-client" rev="${hbase95.version}" conf="hbase95->master">
+ <dependency org="org.apache.hbase" name="hbase-client" rev="${hbase1.version}" conf="hbase1->master">
<artifact name="hbase-client" type="jar"/>
<artifact name="hbase-client" type="test-jar" ext="jar" m:classifier="tests"/>
<exclude org="org.slf4j" module="slf4j-api"/>
@@ -306,7 +269,7 @@
<exclude org="asm" module="asm"/>
</dependency>
- <dependency org="org.apache.hbase" name="hbase-common" rev="${hbase95.version}" conf="hbase95->master">
+ <dependency org="org.apache.hbase" name="hbase-common" rev="${hbase1.version}" conf="hbase1->master">
<artifact name="hbase-common" type="jar"/>
<artifact name="hbase-common" type="test-jar" ext="jar" m:classifier="tests"/>
<exclude org="org.apache.hadoop" module="hadoop-core"/>
@@ -321,7 +284,7 @@
<exclude org="asm" module="asm"/>
</dependency>
- <dependency org="org.apache.hbase" name="hbase-server" rev="${hbase95.version}" conf="hbase95->master">
+ <dependency org="org.apache.hbase" name="hbase-server" rev="${hbase1.version}" conf="hbase1->master">
<artifact name="hbase-server" type="jar"/>
<artifact name="hbase-server" type="test-jar" ext="jar" m:classifier="tests"/>
<exclude org="org.apache.hadoop" module="hadoop-core"/>
@@ -338,20 +301,20 @@
<exclude org="asm" module="asm"/>
</dependency>
- <dependency org="org.apache.hbase" name="hbase-protocol" rev="${hbase95.version}" conf="hbase95->master">
+ <dependency org="org.apache.hbase" name="hbase-protocol" rev="${hbase1.version}" conf="hbase1->master">
<artifact name="hbase-protocol" type="jar"/>
<artifact name="hbase-protocol" type="test-jar" ext="jar" m:classifier="tests"/>
<exclude org="com.google.protobuf" module="protobuf-java"/>
</dependency>
- <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase95.version}" conf="hbase95->master">
+ <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase1.version}" conf="hbase1->master">
<artifact name="hbase-hadoop-compat" type="jar"/>
<artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/>
</dependency>
- <dependency org="org.apache.hbase" name="hbase-${hbase.hadoop.version}-compat" rev="${hbase95.version}" conf="hbase95->master">
- <artifact name="hbase-${hbase.hadoop.version}-compat" type="jar"/>
- <artifact name="hbase-${hbase.hadoop.version}-compat" type="test-jar" ext="jar" m:classifier="tests"/>
+ <dependency org="org.apache.hbase" name="hbase-hadoop2-compat" rev="${hbase1.version}" conf="hbase1->master">
+ <artifact name="hbase-hadoop2-compat" type="jar"/>
+ <artifact name="hbase-hadoop2-compat" type="test-jar" ext="jar" m:classifier="tests"/>
<exclude org="org.apache.hadoop" module="hadoop-core"/>
<exclude org="org.slf4j" module="slf4j-api"/>
<exclude org="stax" module="stax-api" />
@@ -364,14 +327,14 @@
<exclude org="asm" module="asm"/>
</dependency>
- <dependency org="org.htrace" name="htrace-core" rev="3.0.4" conf="hadoop23->master"/>
- <dependency org="org.apache.htrace" name="htrace-core" rev="${htrace.version}" conf="hadoop23->master"/>
+ <dependency org="org.htrace" name="htrace-core" rev="3.0.4" conf="hadoop2->master"/>
+ <dependency org="org.apache.htrace" name="htrace-core" rev="${htrace.version}" conf="hadoop2->master"/>
<dependency org="org.fusesource.leveldbjni" name="leveldbjni-all" rev="${leveldbjni.version}"
- conf="hadoop23->master"/>
- <dependency org="org.cloudera.htrace" name="htrace-core" rev="2.00" conf="hbase95->master">
+ conf="hadoop2->master"/>
+ <dependency org="org.cloudera.htrace" name="htrace-core" rev="2.00" conf="hbase1->master">
<artifact name="htrace-core" type="jar"/>
</dependency>
- <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase95->master"/>
+ <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase1->master"/>
<!-- for TestHBaseStorage -->
<dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="${high-scale-lib.version}"
@@ -430,9 +393,7 @@
<dependency org="org.apache.hive" name="hive-contrib" rev="${hive.version}" changing="true"
conf="test->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive.version}" changing="true"
- conf="hadoop23->master" />
- <dependency org="org.apache.hive.shims" name="hive-shims-0.20S" rev="${hive.version}" changing="true"
- conf="hadoop20->master" />
+ conf="hadoop2->master" />
<dependency org="org.iq80.snappy" name="snappy" rev="${snappy.version}"
conf="test->master" />
<dependency org="com.esotericsoftware.kryo" name="kryo" rev="${kryo.version}"
@@ -450,31 +411,31 @@
<!-- for Tez integration -->
<dependency org="org.apache.tez" name="tez" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.tez" name="tez-common" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.tez" name="tez-api" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.tez" name="tez-dag" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.tez" name="tez-runtime-internals" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.tez" name="tez-runtime-library" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.tez" name="tez-mapreduce" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.tez" name="tez-yarn-timeline-history-with-acls" rev="${tez.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.commons" name="commons-collections4" rev="${commons-collections4.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.codehaus.jettison" name="jettison" rev="${jettison.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.commons" name="commons-math3" rev="${commons-math3.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.curator" name="curator-framework" rev="${curator.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
<dependency org="org.apache.curator" name="curator-client" rev="${curator.version}"
- conf="hadoop23->master"/>
+ conf="hadoop2->master"/>
</dependencies>
</ivy-module>
diff --git a/ivy/ant-contrib-1.0b3.jar b/ivy/ant-contrib-1.0b3.jar
new file mode 100644
index 0000000..0625376
--- /dev/null
+++ b/ivy/ant-contrib-1.0b3.jar
Binary files differ
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 3a819a5..b73d6d7 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -39,14 +39,10 @@
jasper.version=6.1.14
groovy.version=2.4.5
guava.version=11.0
-jersey-core.version=1.8
-hadoop-core.version=1.0.4
-hadoop-test.version=1.0.4
-hadoop-common.version=2.6.0
-hadoop-hdfs.version=2.6.0
-hadoop-mapreduce.version=2.6.0
-hbase94.version=0.94.1
-hbase95.version=0.98.12-${hbase.hadoop.version}
+hadoop-common.version=2.7.3
+hadoop-hdfs.version=2.7.3
+hadoop-mapreduce.version=2.7.3
+hbase1.version=0.98.12-hadoop2
hsqldb.version=1.8.0.10
hive.version=1.2.1
httpcomponents.version=4.1
@@ -73,6 +69,7 @@
stringtemplate.version=4.0.4
log4j.version=1.2.16
netty.version=3.6.6.Final
+netty-all.version=4.0.23.Final
rats-lib.version=0.5.1
slf4j-api.version=1.6.1
slf4j-log4j12.version=1.6.1
diff --git a/shims/src/hadoop2/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java b/shims/src/hadoop2/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
new file mode 100644
index 0000000..9a23cce
--- /dev/null
+++ b/shims/src/hadoop2/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.ContextFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+public class HadoopShims {
+
+ private static Log LOG = LogFactory.getLog(HadoopShims.class);
+
+ static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
+ JobContext newContext = ContextFactory.cloneContext(original,
+ new JobConf(original.getConfiguration()));
+ return newContext;
+ }
+
+ static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId) {
+ if (conf instanceof JobConf) {
+ return new TaskAttemptContextImpl(new JobConf(conf), taskId);
+ } else {
+ return new TaskAttemptContextImpl(conf, taskId);
+ }
+ }
+
+ static public JobContext createJobContext(Configuration conf,
+ JobID jobId) {
+ if (conf instanceof JobConf) {
+ return new JobContextImpl(new JobConf(conf), jobId);
+ } else {
+ return new JobContextImpl(conf, jobId);
+ }
+ }
+
+ static public boolean isMap(TaskAttemptID taskAttemptID) {
+ TaskType type = taskAttemptID.getTaskType();
+ if (type==TaskType.MAP)
+ return true;
+
+ return false;
+ }
+
+ static public TaskAttemptID getNewTaskAttemptID() {
+ TaskAttemptID taskAttemptID = new TaskAttemptID("", 1, TaskType.MAP,
+ 1, 1);
+ return taskAttemptID;
+ }
+
+ static public TaskAttemptID createTaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
+ int taskId, int id) {
+ if (isMap) {
+ return new TaskAttemptID(jtIdentifier, jobId, TaskType.MAP, taskId, id);
+ } else {
+ return new TaskAttemptID(jtIdentifier, jobId, TaskType.REDUCE, taskId, id);
+ }
+ }
+
+ /**
+ * Returns whether the give path has a FileSystem implementation.
+ *
+ * @param path path
+ * @param conf configuration
+ * @return true if the give path's scheme has a FileSystem implementation,
+ * false otherwise
+ */
+ public static boolean hasFileSystemImpl(Path path, Configuration conf) {
+ String scheme = path.toUri().getScheme();
+ if (scheme != null) {
+ // Hadoop 0.23
+ if (conf.get("fs.file.impl") != null) {
+ String fsImpl = conf.get("fs." + scheme + ".impl");
+ if (fsImpl == null) {
+ return false;
+ }
+ } else {
+ try {
+ Object fs = FileSystem.getFileSystemClass(scheme,conf);
+ return fs == null ? false : true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
deleted file mode 100644
index 07c5f49..0000000
--- a/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop;
-
-import org.apache.pig.impl.PigContext;
-
-public class PigATSClient {
- public static class ATSEvent {
- public ATSEvent(String pigAuditId, String callerId) {
- this.pigScriptId = pigAuditId;
- this.callerId = callerId;
- }
- String callerId;
- String pigScriptId;
- }
- private static PigATSClient instance;
-
- public static synchronized PigATSClient getInstance() {
- if (instance==null) {
- instance = new PigATSClient();
- }
- return instance;
- }
-
- private PigATSClient() {
- }
-
- public static String getPigAuditId(PigContext context) {
- return "";
- }
-
- synchronized public void logEvent(final ATSEvent event) {
- }
-}
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
deleted file mode 100644
index 8bdfa2c..0000000
--- a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.util.Pair;
-
-abstract public class PigMapBase extends PigGenericMapBase {
- /**
- *
- * Get mapper's illustrator context
- *
- * @param conf Configuration
- * @param input Input bag to serve as data source
- * @param output Map output buffer
- * @param split the split
- * @return Illustrator's context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- public Context getIllustratorContext(Configuration conf, DataBag input,
- List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
- throws IOException, InterruptedException {
- return new IllustratorContext(conf, input, output, split);
- }
-
- @Override
- public boolean inIllustrator(Context context) {
- return (context instanceof PigMapBase.IllustratorContext);
- }
-
- public class IllustratorContext extends Context {
- private DataBag input;
- List<Pair<PigNullableWritable, Writable>> output;
- private Iterator<Tuple> it = null;
- private Tuple value = null;
- private boolean init = false;
-
- public IllustratorContext(Configuration conf, DataBag input,
- List<Pair<PigNullableWritable, Writable>> output,
- InputSplit split) throws IOException, InterruptedException {
- super(conf, new TaskAttemptID(), null, null, null, new IllustrateDummyReporter(), split);
- if (output == null)
- throw new IOException("Null output can not be used");
- this.input = input; this.output = output;
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (input == null) {
- if (!init) {
- init = true;
- return true;
- }
- return false;
- }
- if (it == null)
- it = input.iterator();
- if (!it.hasNext())
- return false;
- value = it.next();
- return true;
- }
-
- @Override
- public Text getCurrentKey() {
- return null;
- }
-
- @Override
- public Tuple getCurrentValue() {
- return value;
- }
-
- @Override
- public void write(PigNullableWritable key, Writable value)
- throws IOException, InterruptedException {
- output.add(new Pair<PigNullableWritable, Writable>(key, value));
- }
-
- @Override
- public void progress() {
-
- }
- }
-}
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
deleted file mode 100644
index 50d3b1b..0000000
--- a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.pen.FakeRawKeyValueIterator;
-
-public class PigMapReduce extends PigGenericMapReduce {
- public static class Reduce extends PigGenericMapReduce.Reduce {
- /**
- * Get reducer's illustrator context
- *
- * @param input Input buffer as output by maps
- * @param pkg package
- * @return reducer's illustrator context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- public Context getIllustratorContext(Job job,
- List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
- return new IllustratorContext(job, input, pkg);
- }
-
- @SuppressWarnings("unchecked")
- public class IllustratorContext extends Context {
- private PigNullableWritable currentKey = null, nextKey = null;
- private NullableTuple nextValue = null;
- private List<NullableTuple> currentValues = null;
- private Iterator<Pair<PigNullableWritable, Writable>> it;
- private final ByteArrayOutputStream bos;
- private final DataOutputStream dos;
- private final RawComparator sortComparator, groupingComparator;
- POPackage pack = null;
-
- public IllustratorContext(Job job,
- List<Pair<PigNullableWritable, Writable>> input,
- POPackage pkg
- ) throws IOException, InterruptedException {
- super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
- null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class);
- bos = new ByteArrayOutputStream();
- dos = new DataOutputStream(bos);
- org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
- sortComparator = nwJob.getSortComparator();
- groupingComparator = nwJob.getGroupingComparator();
-
- Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
- @Override
- public int compare(Pair<PigNullableWritable, Writable> o1,
- Pair<PigNullableWritable, Writable> o2) {
- try {
- o1.first.write(dos);
- int l1 = bos.size();
- o2.first.write(dos);
- int l2 = bos.size();
- byte[] bytes = bos.toByteArray();
- bos.reset();
- return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
- } catch (IOException e) {
- throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
- }
- }
- }
- );
- currentValues = new ArrayList<NullableTuple>();
- it = input.iterator();
- if (it.hasNext()) {
- Pair<PigNullableWritable, Writable> entry = it.next();
- nextKey = entry.first;
- nextValue = (NullableTuple) entry.second;
- }
- pack = pkg;
- }
-
- @Override
- public PigNullableWritable getCurrentKey() {
- return currentKey;
- }
-
- @Override
- public boolean nextKey() {
- if (nextKey == null)
- return false;
- currentKey = nextKey;
- currentValues.clear();
- currentValues.add(nextValue);
- nextKey = null;
- for(; it.hasNext(); ) {
- Pair<PigNullableWritable, Writable> entry = it.next();
- /* Why can't raw comparison be used?
- byte[] bytes;
- int l1, l2;
- try {
- currentKey.write(dos);
- l1 = bos.size();
- entry.first.write(dos);
- l2 = bos.size();
- bytes = bos.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException("nextKey exception : "+e.getMessage());
- }
- bos.reset();
- if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
- */
- if (groupingComparator.compare(currentKey, entry.first) == 0)
- {
- currentValues.add((NullableTuple)entry.second);
- } else {
- nextKey = entry.first;
- nextValue = (NullableTuple) entry.second;
- break;
- }
- }
- return true;
- }
-
- @Override
- public Iterable<NullableTuple> getValues() {
- return currentValues;
- }
-
- @Override
- public void write(PigNullableWritable k, Writable t) {
- }
-
- @Override
- public void progress() {
- }
- }
-
- @Override
- public boolean inIllustrator(
- org.apache.hadoop.mapreduce.Reducer.Context context) {
- return (context instanceof PigMapReduce.Reduce.IllustratorContext);
- }
-
- @Override
- public POPackage getPack(
- org.apache.hadoop.mapreduce.Reducer.Context context) {
- return ((PigMapReduce.Reduce.IllustratorContext) context).pack;
- }
- }
-}
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
deleted file mode 100644
index 515ce66..0000000
--- a/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.shims;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop20.PigJobControl;
-
-/**
- * We need to make Pig work with both hadoop 20 and hadoop 23 (PIG-2125). However,
- * there is API differences between hadoop 20 and 23. Here we use a shims layer to
- * hide these API differences. A dynamic shims layer is not possible due to some
- * static dependencies. We adopt a static shims approach. For different hadoop version,
- * we need to recompile.
- *
- * This class wrapping all static method. PigMapReduce, PigMapBase, MiniCluster wrapping hadoop
- * version dependant implementation of PigGenericMapReduce, PigGenericMapBase and MiniGenericCluster.
- **/
-public class HadoopShims {
-
- private static Log LOG = LogFactory.getLog(HadoopShims.class);
-
- static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
- JobContext newContext = new JobContext(original.getConfiguration(), original.getJobID());
- return newContext;
- }
-
- static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
- TaskAttemptID taskId) {
- TaskAttemptContext newContext = new TaskAttemptContext(conf,
- taskId);
- return newContext;
- }
-
- static public JobContext createJobContext(Configuration conf,
- JobID jobId) {
- JobContext newJobContext = new JobContext(
- conf, jobId);
- return newJobContext;
- }
-
- static public boolean isMap(TaskAttemptID taskAttemptID) {
- return taskAttemptID.isMap();
- }
-
- static public TaskAttemptID getNewTaskAttemptID() {
- return new TaskAttemptID();
- }
-
- static public TaskAttemptID createTaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
- int taskId, int id) {
- return new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, id);
- }
-
- static public void storeSchemaForLocal(Job job, POStore st) throws IOException {
- JobContext jc = HadoopShims.createJobContext(job.getJobConf(),
- new org.apache.hadoop.mapreduce.JobID());
- JobContext updatedJc = PigOutputCommitter.setUpContext(jc, st);
- PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());
- }
-
- static public String getFsCounterGroupName() {
- return "FileSystemCounters";
- }
-
- static public void commitOrCleanup(OutputCommitter oc, JobContext jc) throws IOException {
- oc.cleanupJob(jc);
- }
-
- public static JobControl newJobControl(String groupName, int timeToSleep) {
- return new PigJobControl(groupName, timeToSleep);
- }
-
- public static long getDefaultBlockSize(FileSystem fs, Path path) {
- return fs.getDefaultBlockSize();
- }
-
- public static Counters getCounters(Job job) throws IOException {
- JobClient jobClient = job.getJobClient();
- return jobClient.getJob(job.getAssignedJobID()).getCounters();
- }
-
- public static boolean isJobFailed(TaskReport report) {
- float successfulProgress = 1.0f;
- // if the progress reported is not 1.0f then the map or reduce
- // job failed
- // this comparison is in place for the backward compatibility
- // for Hadoop 0.20
- return report.getProgress() != successfulProgress;
- }
-
- public static void unsetConf(Configuration conf, String key) {
- // Not supported in Hadoop 0.20/1.x
- }
-
- /**
- * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
- * @param conf
- * @param taskAttemptID
- */
- public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
- conf.set(MRConfiguration.TASK_ID, taskAttemptID.toString());
- }
-
- /**
- * Returns whether the give path has a FileSystem implementation.
- *
- * @param path path
- * @param conf configuration
- * @return true if the give path's scheme has a FileSystem implementation,
- * false otherwise
- */
- public static boolean hasFileSystemImpl(Path path, Configuration conf) {
- String scheme = path.toUri().getScheme();
- if (scheme != null) {
- String fsImpl = conf.get("fs." + scheme + ".impl");
- if (fsImpl == null) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Returns the progress of a Job j which is part of a submitted JobControl
- * object. The progress is for this Job. So it has to be scaled down by the
- * num of jobs that are present in the JobControl.
- *
- * @param j The Job for which progress is required
- * @return Returns the percentage progress of this Job
- * @throws IOException
- */
- public static double progressOfRunningJob(Job j)
- throws IOException {
- RunningJob rj = j.getJobClient().getJob(j.getAssignedJobID());
- if (rj == null && j.getState() == Job.SUCCESS)
- return 1;
- else if (rj == null)
- return 0;
- else {
- return (rj.mapProgress() + rj.reduceProgress()) / 2;
- }
- }
-
- public static void killJob(Job job) throws IOException {
- RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
- if (runningJob != null)
- runningJob.killJob();
- }
-
- public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
- if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
- LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
- return null;
- }
- JobClient jobClient = job.getJobClient();
- TaskReport[] reports = null;
- if (type == TaskType.MAP) {
- reports = jobClient.getMapTaskReports(job.getAssignedJobID());
- } else {
- reports = jobClient.getReduceTaskReports(job.getAssignedJobID());
- }
- return reports == null ? null : Arrays.asList(reports).iterator();
- }
-
- public static boolean isHadoopYARN() {
- return false;
- }
-}
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop20/PigJobControl.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop20/PigJobControl.java
deleted file mode 100644
index 07c9841..0000000
--- a/shims/src/hadoop20/org/apache/pig/backend/hadoop20/PigJobControl.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop20;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
-import org.apache.pig.impl.PigContext;
-
-/**
- * extends the hadoop JobControl to remove the hardcoded sleep(5000)
- * as most of this is private we have to use reflection
- *
- * See {@link https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java}
- *
- */
-public class PigJobControl extends JobControl {
- private static final Log log = LogFactory.getLog(PigJobControl.class);
-
- private static Field runnerState;
-
- private static Method checkRunningJobs;
- private static Method checkWaitingJobs;
- private static Method startReadyJobs;
-
- private static boolean initSuccesful;
-
- static {
- try {
-
- runnerState = JobControl.class.getDeclaredField("runnerState");
- runnerState.setAccessible(true);
-
- checkRunningJobs = JobControl.class.getDeclaredMethod("checkRunningJobs");
- checkRunningJobs.setAccessible(true);
- checkWaitingJobs = JobControl.class.getDeclaredMethod("checkWaitingJobs");
- checkWaitingJobs.setAccessible(true);
- startReadyJobs = JobControl.class.getDeclaredMethod("startReadyJobs");
- startReadyJobs.setAccessible(true);
- initSuccesful = true;
- } catch (Exception e) {
- log.warn("falling back to default JobControl (not using hadoop 0.20 ?)", e);
- initSuccesful = false;
- }
- }
-
- // The thread can be in one of the following state
- private static final int RUNNING = 0;
- private static final int SUSPENDED = 1;
- private static final int STOPPED = 2;
- private static final int STOPPING = 3;
- private static final int READY = 4;
-
- private int timeToSleep;
-
- /**
- * Construct a job control for a group of jobs.
- * @param groupName a name identifying this group
- * @param pigContext
- * @param conf
- */
- public PigJobControl(String groupName, int timeToSleep) {
- super(groupName);
- this.timeToSleep = timeToSleep;
- }
-
- public int getTimeToSleep() {
- return timeToSleep;
- }
-
- public void setTimeToSleep(int timeToSleep) {
- this.timeToSleep = timeToSleep;
- }
-
- private void setRunnerState(int state) {
- try {
- runnerState.set(this, state);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-
- private int getRunnerState() {
- try {
- return (Integer)runnerState.get(this);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * The main loop for the thread.
- * The loop does the following:
- * Check the states of the running jobs
- * Update the states of waiting jobs
- * Submit the jobs in ready state
- */
- public void run() {
- if (!initSuccesful) {
- super.run();
- return;
- }
- setRunnerState(PigJobControl.RUNNING);
- while (true) {
- while (getRunnerState() == PigJobControl.SUSPENDED) {
- try {
- Thread.sleep(timeToSleep);
- }
- catch (Exception e) {
-
- }
- }
- mainLoopAction();
- if (getRunnerState() != PigJobControl.RUNNING &&
- getRunnerState() != PigJobControl.SUSPENDED) {
- break;
- }
- try {
- Thread.sleep(timeToSleep);
- }
- catch (Exception e) {
-
- }
- if (getRunnerState() != PigJobControl.RUNNING &&
- getRunnerState() != PigJobControl.SUSPENDED) {
- break;
- }
- }
- setRunnerState(PigJobControl.STOPPED);
- }
-
- private void mainLoopAction() {
- try {
- checkRunningJobs.invoke(this);
- checkWaitingJobs.invoke(this);
- startReadyJobs.invoke(this);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-}
diff --git a/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java b/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java
deleted file mode 100644
index b3a1772..0000000
--- a/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.util.Iterator;
-
-public class DowngradeHelper {
- // This is required since hadoop 2 TaskReport allows
- // only package level access to this api
- public static Iterator<TaskReport> downgradeTaskReports(
- org.apache.hadoop.mapreduce.TaskReport[] reports) {
- return reports == null ? null : new TaskReportIterator(reports);
- }
-
- private static class TaskReportIterator implements Iterator<TaskReport> {
-
- private org.apache.hadoop.mapreduce.TaskReport[] reports;
- private int curIndex = 0;
-
- public TaskReportIterator(org.apache.hadoop.mapreduce.TaskReport[] reports) {
- this.reports = reports;
- }
-
- @Override
- public boolean hasNext() {
- return curIndex < this.reports.length ;
- }
-
- @Override
- public TaskReport next() {
- return TaskReport.downgrade(reports[curIndex++]);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
-}
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java b/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
deleted file mode 100644
index 8fbf33f..0000000
--- a/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.shims;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.DowngradeHelper;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TIPStatus;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.ContextFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop23.PigJobControl;
-
-public class HadoopShims {
-
- private static Log LOG = LogFactory.getLog(HadoopShims.class);
- private static Method getFileSystemClass;
-
- static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
- JobContext newContext = ContextFactory.cloneContext(original,
- new JobConf(original.getConfiguration()));
- return newContext;
- }
-
- static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
- TaskAttemptID taskId) {
- if (conf instanceof JobConf) {
- return new TaskAttemptContextImpl(new JobConf(conf), taskId);
- } else {
- return new TaskAttemptContextImpl(conf, taskId);
- }
- }
-
- static public JobContext createJobContext(Configuration conf,
- JobID jobId) {
- if (conf instanceof JobConf) {
- return new JobContextImpl(new JobConf(conf), jobId);
- } else {
- return new JobContextImpl(conf, jobId);
- }
- }
-
- static public boolean isMap(TaskAttemptID taskAttemptID) {
- TaskType type = taskAttemptID.getTaskType();
- if (type==TaskType.MAP)
- return true;
-
- return false;
- }
-
- static public TaskAttemptID getNewTaskAttemptID() {
- TaskAttemptID taskAttemptID = new TaskAttemptID("", 1, TaskType.MAP,
- 1, 1);
- return taskAttemptID;
- }
-
- static public TaskAttemptID createTaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
- int taskId, int id) {
- if (isMap) {
- return new TaskAttemptID(jtIdentifier, jobId, TaskType.MAP, taskId, id);
- } else {
- return new TaskAttemptID(jtIdentifier, jobId, TaskType.REDUCE, taskId, id);
- }
- }
-
- static public void storeSchemaForLocal(Job job, POStore st) {
- // Doing nothing for hadoop 23
- }
-
- static public String getFsCounterGroupName() {
- return "org.apache.hadoop.mapreduce.FileSystemCounter";
- }
-
- static public void commitOrCleanup(OutputCommitter oc, JobContext jc) throws IOException {
- oc.commitJob(jc);
- }
-
- public static JobControl newJobControl(String groupName, int timeToSleep) {
- return new PigJobControl(groupName, timeToSleep);
- }
-
- public static long getDefaultBlockSize(FileSystem fs, Path path) {
- return fs.getDefaultBlockSize(path);
- }
-
- public static Counters getCounters(Job job) throws IOException {
- try {
- Cluster cluster = new Cluster(job.getJobConf());
- org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
- if (mrJob == null) { // In local mode, mrJob will be null
- mrJob = job.getJob();
- }
- return new Counters(mrJob.getCounters());
- } catch (Exception ir) {
- throw new IOException(ir);
- }
- }
-
- public static boolean isJobFailed(TaskReport report) {
- return report.getCurrentStatus()==TIPStatus.FAILED;
- }
-
- public static void unsetConf(Configuration conf, String key) {
- conf.unset(key);
- }
-
- /**
- * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
- * @param conf
- * @param taskAttemptID
- */
- public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
- conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId());
- }
-
- /**
- * Returns whether the give path has a FileSystem implementation.
- *
- * @param path path
- * @param conf configuration
- * @return true if the give path's scheme has a FileSystem implementation,
- * false otherwise
- */
- public static boolean hasFileSystemImpl(Path path, Configuration conf) {
- String scheme = path.toUri().getScheme();
- if (scheme != null) {
- // Hadoop 0.23
- if (conf.get("fs.file.impl") != null) {
- String fsImpl = conf.get("fs." + scheme + ".impl");
- if (fsImpl == null) {
- return false;
- }
- } else {
- // Hadoop 2.x HADOOP-7549
- if (getFileSystemClass == null) {
- try {
- getFileSystemClass = FileSystem.class.getDeclaredMethod(
- "getFileSystemClass", String.class, Configuration.class);
- } catch (NoSuchMethodException e) {
- LOG.warn("Error while trying to determine if path " + path +
- " has a filesystem implementation");
- // Assume has implementation to be safe
- return true;
- }
- }
- try {
- Object fs = getFileSystemClass.invoke(null, scheme, conf);
- return fs == null ? false : true;
- } catch (Exception e) {
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * Returns the progress of a Job j which is part of a submitted JobControl
- * object. The progress is for this Job. So it has to be scaled down by the
- * num of jobs that are present in the JobControl.
- *
- * @param j The Job for which progress is required
- * @return Returns the percentage progress of this Job
- * @throws IOException
- */
- public static double progressOfRunningJob(Job j)
- throws IOException {
- org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
- try {
- return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
- } catch (Exception ir) {
- return 0;
- }
- }
-
- public static void killJob(Job job) throws IOException {
- org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
- try {
- if (mrJob != null) {
- mrJob.killJob();
- }
- } catch (Exception ir) {
- throw new IOException(ir);
- }
- }
-
- public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
- if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
- LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
- return null;
- }
- Cluster cluster = new Cluster(job.getJobConf());
- try {
- org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
- if (mrJob == null) { // In local mode, mrJob will be null
- mrJob = job.getJob();
- }
- org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
- return DowngradeHelper.downgradeTaskReports(reports);
- } catch (InterruptedException ir) {
- throw new IOException(ir);
- }
- }
-
- public static boolean isHadoopYARN() {
- return true;
- }
-}
diff --git a/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java b/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
deleted file mode 100644
index 2ceeaad..0000000
--- a/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.pig.ExecType;
-import org.apache.pig.backend.hadoop.executionengine.Launcher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-
-public class MiniCluster extends MiniGenericCluster {
- private static final File CONF_DIR = new File("build/classes");
- private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml");
-
- private MiniMRCluster m_mr = null;
-
- /**
- * @deprecated use {@link org.apache.pig.test.MiniGenericCluster.buildCluster() instead.
- */
- @Deprecated
- public static MiniCluster buildCluster() {
- System.setProperty("test.exec.type", "mr");
- return (MiniCluster)MiniGenericCluster.buildCluster();
- }
-
- @Override
- public ExecType getExecType() {
- return ExecType.MAPREDUCE;
- }
-
- @Override
- protected void setupMiniDfsAndMrClusters() {
- try {
- System.setProperty("hadoop.log.dir", "build/test/logs");
- final int dataNodes = 4; // There will be 4 data nodes
- final int taskTrackers = 4; // There will be 4 task tracker nodes
-
- // Create the dir that holds hadoop-site.xml file
- // Delete if hadoop-site.xml exists already
- CONF_DIR.mkdirs();
- if(CONF_FILE.exists()) {
- CONF_FILE.delete();
- }
-
- // Builds and starts the mini dfs and mapreduce clusters
- Configuration config = new Configuration();
- m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
- m_fileSys = m_dfs.getFileSystem();
- m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
-
- // Write the necessary config info to hadoop-site.xml
- m_conf = m_mr.createJobConf();
- m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2);
- m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2);
- m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2);
- m_conf.set("dfs.datanode.address", "0.0.0.0:0");
- m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
- m_conf.set("pig.jobcontrol.sleep", "100");
- m_conf.writeXml(new FileOutputStream(CONF_FILE));
-
- // Set the system properties needed by Pig
- System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER));
- System.setProperty("namenode", m_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
- System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- protected void shutdownMiniMrClusters() {
- // Delete hadoop-site.xml on shutDown
- if(CONF_FILE.exists()) {
- CONF_FILE.delete();
- }
- if (m_mr != null) { m_mr.shutdown(); }
- m_mr = null;
- }
-
- static public Launcher getLauncher() {
- return new MapReduceLauncher();
- }
-}
diff --git a/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java b/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
deleted file mode 100644
index 505c472..0000000
--- a/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.test.MiniGenericCluster;
-
-/**
- * Dummy class for compile-time compatibility with Hadoop 1.x and 0.20.x
- */
-public class TezMiniCluster extends MiniGenericCluster {
- TezMiniCluster() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ExecType getExecType() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void setupMiniDfsAndMrClusters() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void shutdownMiniMrClusters() {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/src/docs/src/documentation/content/xdocs/start.xml b/src/docs/src/documentation/content/xdocs/start.xml
index 8de0933..c9a1491 100644
--- a/src/docs/src/documentation/content/xdocs/start.xml
+++ b/src/docs/src/documentation/content/xdocs/start.xml
@@ -34,7 +34,7 @@
<p><strong>Mandatory</strong></p>
<p>Unix and Windows users need the following:</p>
<ul>
- <li> <strong>Hadoop 0.23.X, 1.X or 2.X</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 1.0.4.)</li>
+ <li> <strong>Hadoop 2.X</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 2.7.3.)</li>
<li> <strong>Java 1.7</strong> - <a href="http://java.sun.com/javase/downloads/index.jsp">http://java.sun.com/javase/downloads/index.jsp</a> (set JAVA_HOME to the root of your Java installation)</li>
</ul>
<p></p>
@@ -82,7 +82,6 @@
<li> Build the code from the top directory: <code>ant</code> <br></br>
If the build is successful, you should see the pig.jar file created in that directory. </li>
<li> Validate the pig.jar by running a unit test: <code>ant test</code></li>
- <li> If you are using Hadoop 0.23.X or 2.X, please add -Dhadoopversion=23 in your ant command line in the previous steps</li>
</ol>
</section>
</section>
diff --git a/src/docs/src/documentation/content/xdocs/tabs.xml b/src/docs/src/documentation/content/xdocs/tabs.xml
index f2f34eb..24d8f59 100644
--- a/src/docs/src/documentation/content/xdocs/tabs.xml
+++ b/src/docs/src/documentation/content/xdocs/tabs.xml
@@ -32,6 +32,6 @@
-->
<tab label="Project" href="http://hadoop.apache.org/pig/" type="visible" />
<tab label="Wiki" href="http://wiki.apache.org/pig/" type="visible" />
- <tab label="Pig 0.16.0 Documentation" dir="" type="visible" />
+ <tab label="Pig 0.17.0 Documentation" dir="" type="visible" />
</tabs>
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java b/src/org/apache/pig/backend/hadoop/PigATSClient.java
similarity index 100%
rename from shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
rename to src/org/apache/pig/backend/hadoop/PigATSClient.java
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java b/src/org/apache/pig/backend/hadoop/PigJobControl.java
similarity index 98%
rename from shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java
rename to src/org/apache/pig/backend/hadoop/PigJobControl.java
index 6439611..f2886f1 100644
--- a/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java
+++ b/src/org/apache/pig/backend/hadoop/PigJobControl.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.backend.hadoop23;
+package org.apache.pig.backend.hadoop;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
diff --git a/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java b/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
index ee04950..a96d095 100644
--- a/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
+++ b/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
@@ -17,8 +17,6 @@
package org.apache.pig.backend.hadoop.accumulo;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
@@ -303,24 +301,8 @@
*/
protected void simpleUnset(Configuration conf,
Map<String, String> entriesToUnset) {
- try {
- Method unset = conf.getClass().getMethod("unset", String.class);
-
- for (String key : entriesToUnset.keySet()) {
- unset.invoke(conf, key);
- }
- } catch (NoSuchMethodException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
- } catch (IllegalArgumentException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
+ for (String key : entriesToUnset.keySet()) {
+ conf.unset(key);
}
}
diff --git a/src/org/apache/pig/backend/hadoop/accumulo/Utils.java b/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
index ccbcaaf..4bc771e 100644
--- a/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
+++ b/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
@@ -22,8 +22,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLDecoder;
import java.text.MessageFormat;
@@ -42,6 +40,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
@@ -112,7 +111,7 @@
// attempt to locate an existing jar for the class.
String jar = findContainingJar(my_class, packagedClasses);
if (null == jar || jar.isEmpty()) {
- jar = getJar(my_class);
+ jar = JarFinder.getJar(my_class);
updateMap(jar, packagedClasses);
}
@@ -200,41 +199,6 @@
}
/**
- * Invoke 'getJar' on a JarFinder implementation. Useful for some job
- * configuration contexts (HBASE-8140) and also for testing on MRv2. First
- * check if we have HADOOP-9426. Lacking that, fall back to the backport.
- *
- * @param my_class
- * the class to find.
- * @return a jar file that contains the class, or null.
- */
- private static String getJar(Class<?> my_class) {
- String ret = null;
- String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
- Class<?> jarFinder = null;
- try {
- log.debug("Looking for " + hadoopJarFinder + ".");
- jarFinder = Class.forName(hadoopJarFinder);
- log.debug(hadoopJarFinder + " found.");
- Method getJar = jarFinder.getMethod("getJar", Class.class);
- ret = (String) getJar.invoke(null, my_class);
- } catch (ClassNotFoundException e) {
- log.debug("Using backported JarFinder.");
- ret = jarFinderGetJar(my_class);
- } catch (InvocationTargetException e) {
- // function was properly called, but threw it's own exception.
- // Unwrap it
- // and pass it on.
- throw new RuntimeException(e.getCause());
- } catch (Exception e) {
- // toss all other exceptions, related to reflection failure
- throw new RuntimeException("getJar invocation failed.", e);
- }
-
- return ret;
- }
-
- /**
* Returns the full path to the Jar containing the class. It always return a
* JAR.
*
diff --git a/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java b/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
index 26fc5d5..efead86 100644
--- a/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
+++ b/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
@@ -29,7 +29,6 @@
import org.apache.pig.PigConstants;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
public class ConfigurationUtil {
@@ -89,7 +88,7 @@
// so build/classes/hadoop-site.xml contains such entry. This prevents some tests from
// successful (They expect those files in hdfs), so we need to unset it in hadoop 23.
// This should go away once MiniMRCluster fix the distributed cache issue.
- HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES);
+ localConf.unset(MRConfiguration.JOB_CACHE_FILES);
}
localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
Properties props = ConfigurationUtil.toProperties(localConf);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java b/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
index cc85991..0715363 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
@@ -32,7 +32,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.FuncSpec;
@@ -40,7 +41,6 @@
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.PlanException;
@@ -177,7 +177,7 @@
String exceptionCreateFailMsg = null;
boolean jobFailed = false;
if (msgs.length > 0) {
- if (HadoopShims.isJobFailed(report)) {
+ if (report.getCurrentStatus()== TIPStatus.FAILED) {
jobFailed = true;
}
Set<String> errorMessageSet = new HashSet<String>();
@@ -259,11 +259,30 @@
List<Job> runnJobs = jc.getRunningJobs();
for (Job j : runnJobs) {
- prog += HadoopShims.progressOfRunningJob(j);
+ prog += progressOfRunningJob(j);
}
return prog;
}
+ /**
+ * Returns the progress of a Job j which is part of a submitted JobControl
+ * object. The progress is for this Job. So it has to be scaled down by the
+ * num of jobs that are present in the JobControl.
+ *
+ * @param j The Job for which progress is required
+ * @return Returns the percentage progress of this Job
+ * @throws IOException
+ */
+ private static double progressOfRunningJob(Job j)
+ throws IOException {
+ org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
+ try {
+ return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
+ } catch (Exception ir) {
+ return 0;
+ }
+ }
+
public long getTotalHadoopTimeSpent() {
return totalHadoopTimeSpent;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
index 019fc87..f249442 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
@@ -25,6 +25,7 @@
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -122,7 +123,8 @@
poStore.setUp();
TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
- HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+ //Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId());
if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java b/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
index 2a57435..a5a7c38 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
@@ -95,7 +95,7 @@
}
if (outputCommitter.needsTaskCommit(context))
outputCommitter.commitTask(context);
- HadoopShims.commitOrCleanup(outputCommitter, context);
+ outputCommitter.commitJob(context);
}
@Override
@@ -109,7 +109,7 @@
}
writer = null;
}
- HadoopShims.commitOrCleanup(outputCommitter, context);
+ outputCommitter.commitJob(context);
}
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
index 4966f57..ccfda73 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -61,6 +60,7 @@
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
@@ -71,6 +71,7 @@
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.PigJobControl;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -89,7 +90,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataType;
@@ -122,6 +122,7 @@
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
@@ -311,7 +312,7 @@
" should be a time in ms. default=" + defaultPigJobControlSleep, e);
}
- JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
+ JobControl jobCtrl = new PigJobControl(grpName, timeToSleep);
try {
List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -384,7 +385,7 @@
ArrayList<Pair<String,Long>> counterPairs;
try {
- counters = HadoopShims.getCounters(job);
+ counters = MRJobStats.getCounters(job);
String groupName = getGroupName(counters.getGroupNames());
// In case that the counter group was not find, we need to find
@@ -1672,14 +1673,6 @@
if (distCachePath != null) {
log.info("Jar file " + url + " already in DistributedCache as "
+ distCachePath + ". Not copying to hdfs and adding again");
- // Path already in dist cache
- if (!HadoopShims.isHadoopYARN()) {
- // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
- // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
- // But path may only be in 'mapred.cache.files' and not be in
- // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
- DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
- }
}
else {
// REGISTER always copies locally the jar file. see PigServer.registerJar()
@@ -1965,20 +1958,9 @@
public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if it is supported by
- // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- try {
- Class<?> clazz = PigContext
- .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
- Method method = clazz.getMethod("setOutputFormatClass",
- org.apache.hadoop.mapreduce.Job.class, Class.class);
- method.invoke(null, job, PigOutputFormat.class);
- } catch (Exception e) {
- job.setOutputFormatClass(PigOutputFormat.class);
- log.warn(PigConfiguration.PIG_OUTPUT_LAZY
- + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
- }
+ LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class);
} else {
job.setOutputFormatClass(PigOutputFormat.class);
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
index ba3ad52..cacba40 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
@@ -1278,7 +1278,7 @@
List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
List<List<InputSplit>> results = MapRedUtil
.getCombinePigSplits(splits,
- HadoopShims.getDefaultBlockSize(fs, path),
+ fs.getDefaultBlockSize(path),
conf);
numFiles += results.size();
} else {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
index 632806f..03d238b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
@@ -42,7 +42,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.pig.PigConfiguration;
@@ -81,9 +82,12 @@
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.python.google.common.collect.Lists;
+
/**
* Main class that launches pig for Map Reduce
@@ -109,7 +113,14 @@
if (jc != null && jc.getRunningJobs().size() > 0) {
log.info("Received kill signal");
for (Job job : jc.getRunningJobs()) {
- HadoopShims.killJob(job);
+ org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
+ try {
+ if (mrJob != null) {
+ mrJob.killJob();
+ }
+ } catch (Exception ir) {
+ throw new IOException(ir);
+ }
log.info("Job " + job.getAssignedJobID() + " killed");
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(Calendar.getInstance().getTime());
@@ -332,11 +343,6 @@
log.info("detailed locations: " + aliasLocation);
}
- if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) {
- log.info("More information at: http://" + jobTrackerLoc
- + "/jobdetails.jsp?jobid=" + job.getAssignedJobID());
- }
-
// update statistics for this job so jobId is set
MRPigStatsUtil.addJobStats(job);
MRScriptState.get().emitJobStartedNotification(
@@ -486,10 +492,6 @@
for (Job job : succJobs) {
List<POStore> sts = jcc.getStores(job);
for (POStore st : sts) {
- if (Utils.isLocal(pc, job.getJobConf())) {
- HadoopShims.storeSchemaForLocal(job, st);
- }
-
if (!st.isTmpStore()) {
// create an "_SUCCESS" file in output location if
// output location is a filesystem dir
@@ -755,7 +757,7 @@
@SuppressWarnings("deprecation")
void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
try {
- Counters counters = HadoopShims.getCounters(job);
+ Counters counters = MRJobStats.getCounters(job);
if (counters==null)
{
long nullCounterCount =
@@ -809,13 +811,13 @@
throw new ExecException(backendException);
}
try {
- Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+ Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP);
if (mapRep != null) {
getErrorMessages(mapRep, "map", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
mapRep = null;
}
- Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE);
if (redRep != null) {
getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(redRep);
@@ -833,5 +835,6 @@
throw new ExecException(e);
}
}
+
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
index e6a3d65..6fe8ff3 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
@@ -267,7 +267,7 @@
jobcontext.getJobID()));
List<InputSplit> oneInputPigSplits = getPigSplits(
oneInputSplits, i, inpTargets.get(i),
- HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
+ fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
combinable, confClone);
splits.addAll(oneInputPigSplits);
} catch (ExecException ee) {
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
similarity index 100%
rename from shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
rename to src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
similarity index 100%
rename from shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
rename to src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
index 83f3fbb..72998da 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
@@ -18,7 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@@ -156,12 +155,7 @@
for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
if (mapCommitter.first!=null) {
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
- && (Boolean)m.invoke(mapCommitter.first);
- } catch (NoSuchMethodException e) {
- allOutputCommitterSupportRecovery = false;
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -173,12 +167,7 @@
reduceOutputCommitters) {
if (reduceCommitter.first!=null) {
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
- && (Boolean)m.invoke(reduceCommitter.first);
- } catch (NoSuchMethodException e) {
- allOutputCommitterSupportRecovery = false;
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -197,10 +186,7 @@
mapCommitter.second);
try {
// Use reflection, Hadoop 1.x line does not have such method
- Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
- m.invoke(mapCommitter.first, updatedContext);
- } catch (NoSuchMethodException e) {
- // We are using Hadoop 1.x, ignore
+ mapCommitter.first.recoverTask(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -212,11 +198,7 @@
TaskAttemptContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
- m.invoke(reduceCommitter.first, updatedContext);
- } catch (NoSuchMethodException e) {
- // We are using Hadoop 1.x, ignore
+ reduceCommitter.first.recoverTask(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -256,10 +238,7 @@
mapCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- // Use reflection, 20.2 does not have such method
- Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
- m.setAccessible(true);
- m.invoke(mapCommitter.first, updatedContext);
+ mapCommitter.first.commitJob(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -273,10 +252,7 @@
reduceCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- // Use reflection, 20.2 does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
- m.setAccessible(true);
- m.invoke(reduceCommitter.first, updatedContext);
+ reduceCommitter.first.commitJob(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -293,10 +269,7 @@
JobContext updatedContext = setUpContext(context,
mapCommitter.second);
try {
- // Use reflection, 20.2 does not have such method
- Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
- m.setAccessible(true);
- m.invoke(mapCommitter.first, updatedContext, state);
+ mapCommitter.first.abortJob(updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
@@ -309,10 +282,7 @@
JobContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- // Use reflection, 20.2 does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
- m.setAccessible(true);
- m.invoke(reduceCommitter.first, updatedContext, state);
+ reduceCommitter.first.abortJob(updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index fb401b8..a1a9104 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -19,7 +19,6 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -43,6 +42,7 @@
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -88,7 +88,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -285,7 +284,7 @@
try {
fs = FileSystem.get(globalConf);
- intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc));
+ intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
} catch (Exception e) {
log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
intermediateTaskInputSize = 134217728L;
@@ -1428,22 +1427,12 @@
private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if it is supported by
- // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- try {
- Class<?> clazz = PigContext
- .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
- Method method = clazz.getMethod("setOutputFormatClass",
- org.apache.hadoop.mapreduce.Job.class, Class.class);
- method.invoke(null, job, PigOutputFormatTez.class);
- } catch (Exception e) {
- job.setOutputFormatClass(PigOutputFormatTez.class);
- log.warn(PigConfiguration.PIG_OUTPUT_LAZY
- + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
- }
+ LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
} else {
job.setOutputFormatClass(PigOutputFormatTez.class);
}
}
+
}
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 13ab055..1826131 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -18,7 +18,6 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -65,6 +64,7 @@
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
@@ -86,7 +86,6 @@
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.builtin.Utf8StorageConverter;
@@ -787,46 +786,35 @@
public List<String> getShipFiles() {
// Depend on HBase to do the right thing when available, as of HBASE-9165
try {
- Method addHBaseDependencyJars =
- TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
- if (addHBaseDependencyJars != null) {
- Configuration conf = new Configuration();
- addHBaseDependencyJars.invoke(null, conf);
- if (conf.get("tmpjars") != null) {
- String[] tmpjars = conf.getStrings("tmpjars");
- List<String> shipFiles = new ArrayList<String>(tmpjars.length);
- for (String tmpjar : tmpjars) {
- shipFiles.add(new URL(tmpjar).getPath());
- }
- return shipFiles;
+ Configuration conf = new Configuration();
+ TableMapReduceUtil.addHBaseDependencyJars(conf);
+ if (conf.get("tmpjars") != null) {
+ String[] tmpjars = conf.getStrings("tmpjars");
+ List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+ for (String tmpjar : tmpjars) {
+ shipFiles.add(new URL(tmpjar).getPath());
}
+ return shipFiles;
}
- } catch (NoSuchMethodException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
- + " Falling back to previous logic.", e);
- } catch (IllegalAccessException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
- + " not permitted. Falling back to previous logic.", e);
- } catch (InvocationTargetException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
- + " failed. Falling back to previous logic.", e);
- } catch (MalformedURLException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
- + " had malformed url. Falling back to previous logic.", e);
+ } catch (IOException e) {
+ if(e instanceof MalformedURLException){
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+ + " had malformed url. Falling back to previous logic.", e);
+ }else {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ + " failed. Falling back to previous logic.", e);
+ }
}
List<Class> classList = new ArrayList<Class>();
classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
- if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava
- classList.add(com.google.common.collect.Lists.class); // guava
- }
classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
// Additional jars that are specific to v0.95.0+
addClassToList("org.cloudera.htrace.Trace", classList); // htrace
addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
- addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+ addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
return FuncUtils.getShipFiles(classList);
}
@@ -877,27 +865,13 @@
}
if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
- // Will not be entering this block for 0.20.2 as it has no security.
try {
- // getCurrentUser method is not public in 0.20.2
- Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
- UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
- // hasKerberosCredentials method not available in 0.20.2
- Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
- boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null);
- if (hasKerberosCredentials) {
- // Class and method are available only from 0.92 security release
- Class tokenUtilClass = Class
- .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
- Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] {
- Configuration.class, UserGroupInformation.class, Job.class });
- m3.invoke(null, new Object[] { hbaseConf, currentUser, job });
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ if (currentUser.hasKerberosCredentials()) {
+ TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
} else {
LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
}
- } catch (ClassNotFoundException cnfe) {
- throw new RuntimeException("Failure loading TokenUtil class, "
- + "is secure RPC available?", cnfe);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
diff --git a/src/org/apache/pig/builtin/HiveUDFBase.java b/src/org/apache/pig/builtin/HiveUDFBase.java
index 1c87fdb..ebfbbe1 100644
--- a/src/org/apache/pig/builtin/HiveUDFBase.java
+++ b/src/org/apache/pig/builtin/HiveUDFBase.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.Counters;
@@ -180,20 +181,9 @@
@Override
public List<String> getShipFiles() {
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- Class hadoopVersionShimsClass;
- try {
- hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
- hadoopVersion + "Shims");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
- }
List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
- PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
- hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class});
+ PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+ Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
return files;
}
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index 5e05dd2..e449c71 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -389,20 +390,8 @@
@Override
public List<String> getShipFiles() {
- List<String> cacheFiles = new ArrayList<String>();
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- Class hadoopVersionShimsClass;
- try {
- hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
- hadoopVersion + "Shims");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
- }
Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
- org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+ org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
Input.class};
return FuncUtils.getShipFiles(classList);
}
diff --git a/src/org/apache/pig/builtin/PigStorage.java b/src/org/apache/pig/builtin/PigStorage.java
index 730946f..8d31d98 100644
--- a/src/org/apache/pig/builtin/PigStorage.java
+++ b/src/org/apache/pig/builtin/PigStorage.java
@@ -68,7 +68,6 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -412,7 +411,7 @@
@Override
public InputFormat getInputFormat() {
if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
- && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+ && (!bzipinput_usehadoops) ) {
mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
diff --git a/src/org/apache/pig/builtin/TextLoader.java b/src/org/apache/pig/builtin/TextLoader.java
index 8083b63..3f303cb 100644
--- a/src/org/apache/pig/builtin/TextLoader.java
+++ b/src/org/apache/pig/builtin/TextLoader.java
@@ -37,7 +37,6 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -259,8 +258,7 @@
@Override
public InputFormat getInputFormat() {
if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
- && !HadoopShims.isHadoopYARN()
- && !bzipinput_usehadoops ) {
+ && !bzipinput_usehadoops ) {
mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
diff --git a/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java b/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
index f5c816a..f552dec 100644
--- a/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
+++ b/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
@@ -90,7 +90,9 @@
// number of tuples to be skipped
Tuple t = loader.getNext();
if(t == null) {
- return createNumRowTuple(null);
+ // since skipInterval is -1, no previous sample,
+ // and next sample is null -> the data set is empty
+ return null;
}
long availRedMem = (long) ( totalMemory * heapPerc);
// availRedMem = 155084396;
diff --git a/src/org/apache/pig/impl/io/PigFile.java b/src/org/apache/pig/impl/io/PigFile.java
index d46e10b..adad228 100644
--- a/src/org/apache/pig/impl/io/PigFile.java
+++ b/src/org/apache/pig/impl/io/PigFile.java
@@ -102,7 +102,7 @@
if(oc.needsTaskCommit(tac)) {
oc.commitTask(tac);
}
- HadoopShims.commitOrCleanup(oc, jc);
+ oc.commitJob(jc);
}
@Override
diff --git a/src/org/apache/pig/impl/util/JarManager.java b/src/org/apache/pig/impl/util/JarManager.java
index d629234..077c703 100644
--- a/src/org/apache/pig/impl/util/JarManager.java
+++ b/src/org/apache/pig/impl/util/JarManager.java
@@ -47,7 +47,6 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.tools.bzip2r.BZip2Constants;
import org.joda.time.DateTime;
@@ -66,7 +65,6 @@
BZIP2R(BZip2Constants.class),
AUTOMATON(Automaton.class),
ANTLR(CommonTokenStream.class),
- GUAVA(Multimaps.class),
JODATIME(DateTime.class);
private final Class pkgClass;
@@ -208,9 +206,6 @@
public static List<String> getDefaultJars() {
List<String> defaultJars = new ArrayList<String>();
for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
- if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
- continue; //Skip
- }
String jar = findContainingJar(pkgToSend.getPkgClass());
if (!defaultJars.contains(jar)) {
defaultJars.add(jar);
diff --git a/src/org/apache/pig/impl/util/Utils.java b/src/org/apache/pig/impl/util/Utils.java
index 0f037df..1628f68 100644
--- a/src/org/apache/pig/impl/util/Utils.java
+++ b/src/org/apache/pig/impl/util/Utils.java
@@ -94,20 +94,6 @@
return System.getProperty("java.vendor").contains("IBM");
}
- public static boolean isHadoop23() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b0\\.23\\..+\\b"))
- return true;
- return false;
- }
-
- public static boolean isHadoop2() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b2\\.\\d+\\..+"))
- return true;
- return false;
- }
-
public static boolean is64bitJVM() {
String arch = System.getProperties().getProperty("sun.arch.data.model",
System.getProperty("com.ibm.vm.bitmode"));
diff --git a/src/org/apache/pig/tools/pigstats/PigStatsUtil.java b/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
index 542cc2e..e97625f 100644
--- a/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
+++ b/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
@@ -24,7 +24,7 @@
import java.util.regex.Pattern;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
/**
@@ -71,7 +71,7 @@
*/
@Deprecated
public static final String FS_COUNTER_GROUP
- = HadoopShims.getFsCounterGroupName();
+ = MRPigStatsUtil.FS_COUNTER_GROUP;
/**
* Returns an empty PigStats object Use of this method is not advised as it
diff --git a/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java b/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
index c8b9128..e5900c2 100644
--- a/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
+++ b/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
@@ -32,15 +32,16 @@
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigCounters;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.io.FileSpec;
@@ -53,6 +54,8 @@
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.python.google.common.collect.Lists;
+
/**
* This class encapsulates the runtime statistics of a MapReduce job.
@@ -281,7 +284,7 @@
void addCounters(Job job) {
try {
- counters = HadoopShims.getCounters(job);
+ counters = getCounters(job);
} catch (IOException e) {
LOG.warn("Unable to get job counters", e);
}
@@ -349,13 +352,13 @@
void addMapReduceStatistics(Job job) {
Iterator<TaskReport> maps = null;
try {
- maps = HadoopShims.getTaskReports(job, TaskType.MAP);
+ maps = getTaskReports(job, TaskType.MAP);
} catch (IOException e) {
LOG.warn("Failed to get map task report", e);
}
Iterator<TaskReport> reduces = null;
try {
- reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ reduces = getTaskReports(job, TaskType.REDUCE);
} catch (IOException e) {
LOG.warn("Failed to get reduce task report", e);
}
@@ -515,4 +518,35 @@
inputs.add(is);
}
+ public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
+ if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
+ LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
+ return null;
+ }
+ Cluster cluster = new Cluster(job.getJobConf());
+ try {
+ org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+ if (mrJob == null) { // In local mode, mrJob will be null
+ mrJob = job.getJob();
+ }
+ org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
+ return Lists.newArrayList(reports).iterator();
+ } catch (InterruptedException ir) {
+ throw new IOException(ir);
+ }
+ }
+
+ public static Counters getCounters(Job job) throws IOException {
+ try {
+ Cluster cluster = new Cluster(job.getJobConf());
+ org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+ if (mrJob == null) { // In local mode, mrJob will be null
+ mrJob = job.getJob();
+ }
+ return new Counters(mrJob.getCounters());
+ } catch (Exception ir) {
+ throw new IOException(ir);
+ }
+ }
+
}
diff --git a/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java b/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
index 4b59275..6c22b4c 100644
--- a/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
+++ b/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
@@ -33,7 +33,6 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience.Private;
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.JobStats;
@@ -51,7 +50,7 @@
public static final String TASK_COUNTER_GROUP
= "org.apache.hadoop.mapred.Task$Counter";
public static final String FS_COUNTER_GROUP
- = HadoopShims.getFsCounterGroupName();
+ = "org.apache.hadoop.mapreduce.FileSystemCounter";
private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
diff --git a/test/e2e/pig/build.xml b/test/e2e/pig/build.xml
index 2c197ad..a0dc32d 100644
--- a/test/e2e/pig/build.xml
+++ b/test/e2e/pig/build.xml
@@ -27,9 +27,8 @@
<property name="hive.lib.dir"
value="${pig.base.dir}/build/ivy/lib/Pig"/>
- <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S">
- <equals arg1="${hadoopversion}" arg2="23" />
- </condition>
+ <property name="hadoopversion" value="2" />
+ <property name="hive.hadoop.shims.version" value="0.23" />
<property name="mvnrepo" value="http://repo2.maven.org/maven2"/>
diff --git a/test/excluded-tests-20 b/test/excluded-tests-20
deleted file mode 100644
index f453aa8..0000000
--- a/test/excluded-tests-20
+++ /dev/null
@@ -1,10 +0,0 @@
-**/TestSecondarySortTez.java
-**/TestTezCompiler.java
-**/TestTezJobControlCompiler.java
-**/TestTezLauncher.java
-**/TestTezAutoParallelism.java
-**/TestJobSubmissionTez.java
-**/TestGroupConstParallelTez.java
-**/TestLoaderStorerShipCacheFilesTez.java
-**/TestPigStatsTez.java
-**/TestPOPartialAggPlanTez.java
diff --git a/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java b/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
index f944476..803aefc 100644
--- a/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
+++ b/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
@@ -360,7 +360,7 @@
// result, the number of StoreFunc instances is greater by 1 in
// Hadoop-2.0.x.
assertTrue("storer instanciation count increasing: " + Storer.count,
- Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4));
+ Storer.count <= 5);
}
}
diff --git a/test/org/apache/pig/parser/TestQueryParserUtils.java b/test/org/apache/pig/parser/TestQueryParserUtils.java
index a6fb391..5cb7d20 100644
--- a/test/org/apache/pig/parser/TestQueryParserUtils.java
+++ b/test/org/apache/pig/parser/TestQueryParserUtils.java
@@ -82,41 +82,38 @@
QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
- // webhdfs
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
- assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
- assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ // webhdfs
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
+ assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- // har with webhfs
- QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
- assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
- assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
- assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ // har with webhfs
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
+ assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- //viewfs
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
- assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
- assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
- assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ //viewfs
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
+ assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
+ assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
+ assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- //har with viewfs
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
- assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
- assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ //har with viewfs
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
+ assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
+ assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
- }
}
diff --git a/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java b/test/org/apache/pig/test/MiniCluster.java
similarity index 100%
rename from shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
rename to test/org/apache/pig/test/MiniCluster.java
diff --git a/test/org/apache/pig/test/TestBZip.java b/test/org/apache/pig/test/TestBZip.java
index 5a51d20..e19bd59 100644
--- a/test/org/apache/pig/test/TestBZip.java
+++ b/test/org/apache/pig/test/TestBZip.java
@@ -43,7 +43,6 @@
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -67,16 +66,10 @@
@Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
public static Iterable<Object[]> data() {
- if ( HadoopShims.isHadoopYARN() ) {
- return Arrays.asList(new Object[][] {
- { false },
- { true }
- });
- } else {
- return Arrays.asList(new Object[][] {
- { false }
- });
- }
+ return Arrays.asList(new Object[][] {
+ { false },
+ { true }
+ });
}
public TestBZip (Boolean useBzipFromHadoop) {
diff --git a/test/org/apache/pig/test/TestJobControlCompiler.java b/test/org/apache/pig/test/TestJobControlCompiler.java
index 37d6542..22aa638 100644
--- a/test/org/apache/pig/test/TestJobControlCompiler.java
+++ b/test/org/apache/pig/test/TestJobControlCompiler.java
@@ -63,7 +63,6 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -131,7 +130,7 @@
// verifying the jar gets on distributed cache
Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
// guava jar is not shipped with Hadoop 2.x
- Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
+ Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
@@ -235,22 +234,12 @@
// 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
System.out.println("cache.files= " + Arrays.toString(cacheURIs));
System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
- if (HadoopShims.isHadoopYARN()) {
- // Default jars - 5 (pig, antlr, joda-time, automaton)
- // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
- Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
- Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
- Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
- } else {
- // Default jars - 5. Has guava in addition
- // There will be same entries duplicated for udf.jar and udf2.jar
- Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
- Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
- Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
- }
-
+ // Default jars - 5 (pig, antlr, joda-time, automaton)
+ // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+ Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+ Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+ Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+ Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
// Count occurrences of the resources
Map<String, Integer> occurrences = new HashMap<String, Integer>();
@@ -259,22 +248,12 @@
val = (val == null) ? 1 : ++val;
occurrences.put(cacheURI.toString(), val);
}
- if (HadoopShims.isHadoopYARN()) {
- Assert.assertEquals(9, occurrences.size());
- } else {
- Assert.assertEquals(10, occurrences.size()); //guava jar in addition
- }
+ Assert.assertEquals(9, occurrences.size());
for (String file : occurrences.keySet()) {
- if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
- // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
- // We assert path is same by checking count
- Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
- } else {
- // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
- // and second time through pig register jar when there is symlink
- Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
- }
+ // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+ // and second time through pig register jar when there is symlink
+ Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
}
}
diff --git a/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java b/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
index 5e273ab..793e127 100644
--- a/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
+++ b/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
@@ -45,12 +45,8 @@
"store a into 'ooo';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
}
@@ -61,12 +57,8 @@
"store a into 'ooo' using OrcStorage;";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
}
diff --git a/test/org/apache/pig/test/TestMultiQueryCompiler.java b/test/org/apache/pig/test/TestMultiQueryCompiler.java
index 295f2c0..d846064 100644
--- a/test/org/apache/pig/test/TestMultiQueryCompiler.java
+++ b/test/org/apache/pig/test/TestMultiQueryCompiler.java
@@ -1558,14 +1558,7 @@
MROperPlan mrp = null;
try {
- java.lang.reflect.Method compile = launcher.getClass()
- .getDeclaredMethod("compile",
- new Class[] { PhysicalPlan.class, PigContext.class });
-
- compile.setAccessible(true);
-
- mrp = (MROperPlan) compile.invoke(launcher, new Object[] { pp, myPig.getPigContext() });
-
+ mrp = launcher.compile(pp, myPig.getPigContext());
Assert.assertNotNull(mrp);
} catch (Exception e) {
diff --git a/test/org/apache/pig/test/TestPigRunner.java b/test/org/apache/pig/test/TestPigRunner.java
index 13ed468..25380e4 100644
--- a/test/org/apache/pig/test/TestPigRunner.java
+++ b/test/org/apache/pig/test/TestPigRunner.java
@@ -62,6 +62,7 @@
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -763,10 +764,9 @@
}
@Test
+ @Ignore
+ // Skip in hadoop 23 test, see PIG-2449
public void classLoaderTest() throws Exception {
- // Skip in hadoop 23 test, see PIG-2449
- if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
- return;
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();");
diff --git a/test/org/apache/pig/test/TestPigStatsMR.java b/test/org/apache/pig/test/TestPigStatsMR.java
index e429612..7956770 100644
--- a/test/org/apache/pig/test/TestPigStatsMR.java
+++ b/test/org/apache/pig/test/TestPigStatsMR.java
@@ -103,11 +103,7 @@
private static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception {
MapReduceLauncher launcher = new MapReduceLauncher();
- java.lang.reflect.Method compile = launcher.getClass()
- .getDeclaredMethod("compile",
- new Class[] { PhysicalPlan.class, PigContext.class });
- compile.setAccessible(true);
- return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx });
+ return launcher.compile(pp,ctx);
}
private static String getAlias(MapReduceOper mro) throws Exception {
diff --git a/test/org/apache/pig/test/TestSkewedJoin.java b/test/org/apache/pig/test/TestSkewedJoin.java
index dba2241..947a31b 100644
--- a/test/org/apache/pig/test/TestSkewedJoin.java
+++ b/test/org/apache/pig/test/TestSkewedJoin.java
@@ -65,6 +65,7 @@
private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
+ private static final String INPUT_FILE8 = "SkewedJoinInput8.txt";
private static final String TEST_DIR = Util.getTestDirectory(TestSkewedJoin.class);
private static final String INPUT_DIR = TEST_DIR + Path.SEPARATOR + "input";
private static final String OUTPUT_DIR = TEST_DIR + Path.SEPARATOR + "output";
@@ -173,6 +174,11 @@
}
w7.close();
+ //Empty file
+ PrintWriter w8 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE8));
+ w8.close();
+
+
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
@@ -180,6 +186,7 @@
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5, INPUT_FILE5);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6, INPUT_FILE6);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7, INPUT_FILE7);
+ Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE8, INPUT_FILE8);
}
private static void deleteFiles() throws IOException {
@@ -187,6 +194,21 @@
}
@Test
+ public void testSkewedJoinMapLeftEmpty() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE8 + "' as (idM:[]);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+ pigServer.registerQuery("C = join A by idM#'id', B by id using 'skewed' PARALLEL 2;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ int count = 0;
+ while(iter.hasNext()) {
+ count++;
+ iter.next();
+ }
+ assertEquals(0, count);
+ }
+
+
+ @Test
public void testSkewedJoinWithGroup() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
diff --git a/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java b/test/org/apache/pig/test/TezMiniCluster.java
similarity index 100%
rename from shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
rename to test/org/apache/pig/test/TezMiniCluster.java
diff --git a/test/org/apache/pig/test/Util.java b/test/org/apache/pig/test/Util.java
index 67af8e2..14c61f0 100644
--- a/test/org/apache/pig/test/Util.java
+++ b/test/org/apache/pig/test/Util.java
@@ -75,7 +75,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.BagFactory;
@@ -648,13 +648,10 @@
}
}
- static private String getMkDirCommandForHadoop2_0(String fileName) {
- if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
- Path parentDir = new Path(fileName).getParent();
- String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
- return mkdirCommand;
- }
- return "";
+ static private String getFSMkDirCommand(String fileName) {
+ Path parentDir = new Path(fileName).getParent();
+ String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
+ return mkdirCommand;
}
/**
@@ -676,7 +673,7 @@
fileNameOnCluster = fileNameOnCluster.replace('\\','/');
}
PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
- String script = getMkDirCommandForHadoop2_0(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
+ String script = getFSMkDirCommand(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
GruntParser parser = new GruntParser(new StringReader(script), ps);
parser.setInteractive(false);
try {
@@ -907,14 +904,7 @@
MapRedUtil.checkLeafIsStore(pp, pc);
MapReduceLauncher launcher = new MapReduceLauncher();
-
- java.lang.reflect.Method compile = launcher.getClass()
- .getDeclaredMethod("compile",
- new Class[] { PhysicalPlan.class, PigContext.class });
-
- compile.setAccessible(true);
-
- return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc });
+ return launcher.compile(pp,pc);
}
public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception {
@@ -1357,16 +1347,7 @@
// For tez testing, we want to avoid TezResourceManager/LocalResource reuse
// (when switching between local and mapreduce/tez)
- if( HadoopShims.isHadoopYARN() ) {
- try {
- java.lang.reflect.Method tez_dropInstance = Class.forName(
- "org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager").getDeclaredMethod(
- "dropInstance", (Class<?>[]) null );
- tez_dropInstance.invoke(null);
- } catch (Exception e){
- throw new RuntimeException(e);
- }
- }
+ TezResourceManager.dropInstance();
// TODO: once we have Tez local mode, we can get rid of this. For now,
// if we run this test suite in Tez mode and there are some tests
diff --git a/test/perf/pigmix/bin/generate_data.sh b/test/perf/pigmix/bin/generate_data.sh
index cc216d6..a78ea96 100644
--- a/test/perf/pigmix/bin/generate_data.sh
+++ b/test/perf/pigmix/bin/generate_data.sh
@@ -25,20 +25,11 @@
source $PIGMIX_HOME/conf/config.sh
-if [ $HADOOP_VERSION == "23" ]; then
- echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
- $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
-else
- echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot"
- $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot
-fi
+echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
+$HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
shopt -s extglob
-if [ $HADOOP_VERSION == "23" ]; then
- pigjar=`echo $PIG_HOME/pig*-h2.jar`
-else
- pigjar=`echo $PIG_HOME/pig*-h1.jar`
-fi
+pigjar=`echo $PIG_HOME/pig*-h2.jar`
pigmixjar=$PIGMIX_HOME/pigmix.jar
diff --git a/test/perf/pigmix/build.xml b/test/perf/pigmix/build.xml
index 8fe2d15..703b96e 100644
--- a/test/perf/pigmix/build.xml
+++ b/test/perf/pigmix/build.xml
@@ -34,6 +34,8 @@
</fileset>
</path>
+ <property name="hadoopversion" value="2" />
+
<property name="java.dir" value="${basedir}/src/java"/>
<property name="pigmix.build.dir" value="${basedir}/build"/>
<property name="pigmix.jar" value="${basedir}/pigmix.jar"/>