catching up to trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/branches/HDFS-641@834556 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index cee806e..0341670 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,9 @@
IMPROVEMENTS
+ MAPREDUCE-707. Provide a jobconf property for explicitly assigning a job to
+ a pool in the Fair Scheduler. (Alan Heirich via matei)
+
MAPREDUCE-999. Improve Sqoop test speed and refactor tests.
(Aaron Kimball via tomwhite)
@@ -66,6 +69,14 @@
MAPREDUCE-1153. Fix tasktracker metrics when trackers are decommissioned.
(sharad)
+ MAPREDUCE-1128. Fix MRUnit to prohibit iterating over values twice. (Aaron
+ Kimball via cdouglas)
+
+ MAPREDUCE-665. Move libhdfs to HDFS subproject. (Eli Collins via dhruba)
+
+ MAPREDUCE-1196. Fix FileOutputCommitter to use the deprecated cleanupJob
+ api correctly. (acmurthy)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
@@ -850,3 +861,19 @@
MAPREDUCE-1038. Weave Mumak aspects only if related files have changed.
(Aaron Kimball via cdouglas)
+
+ MAPREDUCE-1037. Continue running contrib tests if Sqoop tests fail. (Aaron
+ Kimball via cdouglas)
+
+ MAPREDUCE-1163. Remove unused, hard-coded paths from libhdfs. (Allen
+ Wittenauer via cdouglas)
+
+ MAPREDUCE-962. Fix a NullPointerException while killing task process
+ trees. (Ravi Gummadi via yhemanth)
+
+ MAPREDUCE-1177. Correct setup/cleanup inversion in
+ JobTracker::getTaskReports. (Vinod Kumar Vavilapalli via cdouglas)
+
+ MAPREDUCE-1178. Fix ClassCastException in MultipleInputs by adding
+ a DelegatingRecordReader. (Amareshwari Sriramadasu and Jay Booth
+ via sharad)
diff --git a/build.xml b/build.xml
index 97b62fb..2696faa 100644
--- a/build.xml
+++ b/build.xml
@@ -48,7 +48,6 @@
<property name="c++.utils.src" value="${c++.src}/utils"/>
<property name="c++.pipes.src" value="${c++.src}/pipes"/>
<property name="c++.examples.pipes.src" value="${examples.dir}/pipes"/>
- <property name="c++.libhdfs.src" value="${c++.src}/libhdfs"/>
<property name="librecordio.src" value="${c++.src}/librecordio"/>
<property name="tools.src" value="${basedir}/src/tools"/>
@@ -73,7 +72,6 @@
<property name="build.c++" value="${build.dir}/c++-build/${build.platform}"/>
<property name="build.c++.utils" value="${build.c++}/utils"/>
<property name="build.c++.pipes" value="${build.c++}/pipes"/>
- <property name="build.c++.libhdfs" value="${build.c++}/libhdfs"/>
<property name="build.c++.examples.pipes"
value="${build.c++}/examples/pipes"/>
<property name="build.docs" value="${build.dir}/docs"/>
@@ -118,9 +116,6 @@
<property name="test.mapred.commit.tests.file" value="${test.src.dir}/commit-tests" />
<property name="test.mapred.all.tests.file" value="${test.src.dir}/all-tests" />
- <property name="test.libhdfs.conf.dir" value="${c++.libhdfs.src}/tests/conf"/>
- <property name="test.libhdfs.dir" value="${test.build.dir}/libhdfs"/>
-
<property name="librecordio.test.dir" value="${test.build.dir}/librecordio"/>
<property name="web.src.dir" value="${basedir}/src/web"/>
<property name="src.webapps" value="${basedir}/src/webapps"/>
@@ -389,7 +384,7 @@
<target name="compile-core" depends="clover, compile-mapred-classes, compile-c++" description="Compile core only"/>
- <target name="compile-contrib" depends="compile-core,tools,compile-c++-libhdfs">
+ <target name="compile-contrib" depends="compile-core,tools">
<subant target="compile">
<property name="version" value="${version}"/>
<property name="hadoop-core.version" value="${hadoop-core.version}"/>
@@ -648,7 +643,7 @@
<fail if="testsfailed">Tests failed!</fail>
</target>
- <target name="test" depends="test-c++-libhdfs, jar-test, test-core" description="Run all unit tests">
+ <target name="test" depends="jar-test, test-core" description="Run all unit tests">
<subant target="test-contrib">
<fileset file="${basedir}/build.xml"/>
</subant>
@@ -1148,26 +1143,6 @@
</subant>
</target>
- <target name="test-c++-libhdfs" depends="compile-c++-libhdfs, compile-core" if="islibhdfs">
- <delete dir="${test.libhdfs.dir}"/>
- <mkdir dir="${test.libhdfs.dir}"/>
- <mkdir dir="${test.libhdfs.dir}/logs"/>
- <mkdir dir="${test.libhdfs.dir}/hdfs/name"/>
-
- <exec dir="${build.c++.libhdfs}" executable="${make.cmd}" failonerror="true">
- <env key="OS_NAME" value="${os.name}"/>
- <env key="OS_ARCH" value="${os.arch}"/>
- <env key="JVM_ARCH" value="${jvm.arch}"/>
- <env key="LIBHDFS_BUILD_DIR" value="${build.c++.libhdfs}"/>
- <env key="HADOOP_HOME" value="${basedir}"/>
- <env key="HADOOP_CONF_DIR" value="${test.libhdfs.conf.dir}"/>
- <env key="HADOOP_LOG_DIR" value="${test.libhdfs.dir}/logs"/>
- <env key="LIBHDFS_SRC_DIR" value="${c++.libhdfs.src}"/>
- <env key="LIBHDFS_INSTALL_DIR" value="${install.c++}/lib"/>
- <env key="LIB_DIR" value="${common.ivy.lib.dir}"/>
- <arg value="test"/>
- </exec>
- </target>
<!-- ================================================================== -->
<!-- librecordio targets. -->
@@ -1220,16 +1195,8 @@
searchpath="yes" failonerror="yes">
<arg value="-if"/>
</exec>
- <antcall target="create-c++-configure-libhdfs"/>
- </target>
+ </target>
- <target name="create-c++-configure-libhdfs" depends="check-c++-libhdfs" if="islibhdfs">
- <exec executable="autoreconf" dir="${c++.libhdfs.src}"
- searchpath="yes" failonerror="yes">
- <arg value="-if"/>
- </exec>
- </target>
-
<target name="check-c++-makefiles" depends="init" if="compile.c++">
<condition property="need.c++.utils.makefile">
<not> <available file="${build.c++.utils}/Makefile"/> </not>
@@ -1242,33 +1209,6 @@
</condition>
</target>
- <target name="check-c++-libhdfs">
- <condition property="islibhdfs">
- <and>
- <isset property="compile.c++"/>
- <isset property="libhdfs"/>
- </and>
- </condition>
- </target>
-
- <target name="check-c++-makefile-libhdfs" depends="init,check-c++-libhdfs" if="islibhdfs">
- <condition property="need.c++.libhdfs.makefile">
- <not> <available file="${build.c++.libhdfs}/Makefile"/> </not>
- </condition>
- </target>
-
- <target name="create-c++-libhdfs-makefile" depends="check-c++-makefile-libhdfs"
- if="need.c++.libhdfs.makefile">
- <mkdir dir="${build.c++.libhdfs}"/>
- <chmod file="${c++.libhdfs.src}/configure" perm="ugo+x"/>
- <exec executable="${c++.libhdfs.src}/configure" dir="${build.c++.libhdfs}"
- failonerror="yes">
- <env key="ac_cv_func_malloc_0_nonnull" value="yes"/>
- <env key="JVM_ARCH" value="${jvm.arch}"/>
- <arg value="--prefix=${install.c++}"/>
- </exec>
- </target>
-
<target name="create-c++-utils-makefile" depends="check-c++-makefiles"
if="need.c++.utils.makefile">
<mkdir dir="${build.c++.utils}"/>
@@ -1335,15 +1275,6 @@
<target name="compile-c++-examples"
depends="compile-c++-examples-pipes"/>
- <target name="compile-c++-libhdfs" depends="create-c++-libhdfs-makefile" if="islibhdfs">
- <exec executable="${make.cmd}" dir="${build.c++.libhdfs}" searchpath="yes"
- failonerror="yes">
- <env key="ac_cv_func_malloc_0_nonnull" value="yes"/>
- <env key="JVM_ARCH" value="${jvm.arch}"/>
- <arg value="install"/>
- </exec>
- </target>
-
<target name="clover" depends="clover.setup, clover.info" description="Instrument the Unit tests using Clover. To use, specify -Dclover.home=<base of clover installation> -Drun.clover=true on the command line."/>
<target name="clover.setup" if="clover.enabled">
diff --git a/src/c++/libhdfs/hdfsJniHelper.h b/src/c++/libhdfs/hdfsJniHelper.h
index 017e801..442eedf 100644
--- a/src/c++/libhdfs/hdfsJniHelper.h
+++ b/src/c++/libhdfs/hdfsJniHelper.h
@@ -30,8 +30,6 @@
#define PATH_SEPARATOR ':'
-#define USER_CLASSPATH "/home/y/libexec/hadoop/conf:/home/y/libexec/hadoop/lib/hadoop-0.1.0.jar"
-
/** Denote the method we want to invoke as STATIC or INSTANCE */
typedef enum {
diff --git a/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java b/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
index 06561c5..f3fde46 100644
--- a/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
+++ b/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
@@ -61,6 +61,8 @@
*/
public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
+ public static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
+
private final FairScheduler scheduler;
// Map and reduce minimum allocations for each pool
@@ -391,7 +393,7 @@
*/
public synchronized void setPool(JobInProgress job, String pool) {
removeJob(job);
- job.getJobConf().set(poolNameProperty, pool);
+ job.getJobConf().set(EXPLICIT_POOL_PROPERTY, pool);
addJob(job);
}
@@ -403,13 +405,16 @@
}
/**
- * Get the pool name for a JobInProgress from its configuration. This uses
- * the "project" property in the jobconf by default, or the property set with
- * "mapred.fairscheduler.poolnameproperty".
+ * Get the pool name for a JobInProgress from its configuration. This uses
+ * the value of mapred.fairscheduler.pool if specified, otherwise the value
+ * of the property named in mapred.fairscheduler.poolnameproperty if that is
+ * specified. Otherwise if neither is specified it uses the "user.name" property
+ * in the jobconf by default.
*/
public String getPoolName(JobInProgress job) {
Configuration conf = job.getJobConf();
- return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
+ return conf.get(EXPLICIT_POOL_PROPERTY,
+ conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
}
/**
diff --git a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
index 052e46c..ef446b6 100644
--- a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
+++ b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
@@ -52,6 +52,7 @@
"test-pools").getAbsolutePath();
private static final String POOL_PROPERTY = "pool";
+ private static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
private static int jobCounter;
@@ -2471,6 +2472,87 @@
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
}
+
+ /**
+ * This test uses the mapred.fairscheduler.pool property to assign jobs to pools.
+ */
+ public void testPoolAssignment() throws Exception {
+ // Set up pools file
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<pool name=\"default\">");
+ out.println("<schedulingMode>fair</schedulingMode>");
+ out.println("</pool>");
+ out.println("<pool name=\"poolA\">");
+ out.println("<schedulingMode>fair</schedulingMode>");
+ out.println("</pool>");
+ out.println("</allocations>");
+ out.close();
+ scheduler.getPoolManager().reloadAllocs();
+ Pool defaultPool = scheduler.getPoolManager().getPool("default");
+ Pool poolA = scheduler.getPoolManager().getPool("poolA");
+
+ // Submit a job to the default pool. All specifications take default values.
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 3);
+
+ assertEquals(1, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(0, poolA.getMapSchedulable().getDemand());
+ assertEquals(0, poolA.getReduceSchedulable().getDemand());
+
+ // Submit a job to the default pool and move it to poolA using setPool.
+ JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 7);
+
+ assertEquals(6, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(10, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(0, poolA.getMapSchedulable().getDemand());
+ assertEquals(0, poolA.getReduceSchedulable().getDemand());
+
+ scheduler.getPoolManager().setPool(job2, "poolA");
+ assertEquals("poolA", scheduler.getPoolManager().getPoolName(job2));
+
+ defaultPool.getMapSchedulable().updateDemand();
+ defaultPool.getReduceSchedulable().updateDemand();
+ poolA.getMapSchedulable().updateDemand();
+ poolA.getReduceSchedulable().updateDemand();
+
+ assertEquals(1, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(5, poolA.getMapSchedulable().getDemand());
+ assertEquals(7, poolA.getReduceSchedulable().getDemand());
+
+ // Submit a job to poolA by specifying mapred.fairscheduler.pool
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setNumMapTasks(11);
+ jobConf.setNumReduceTasks(13);
+ jobConf.set(POOL_PROPERTY, "nonsense"); // test that this is overridden
+ jobConf.set(EXPLICIT_POOL_PROPERTY, "poolA");
+ JobInProgress job3 = new FakeJobInProgress(jobConf, taskTrackerManager,
+ null, UtilsForTests.getJobTracker());
+ job3.getStatus().setRunState(JobStatus.RUNNING);
+ taskTrackerManager.submitJob(job3);
+
+ assertEquals(1, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(16, poolA.getMapSchedulable().getDemand());
+ assertEquals(20, poolA.getReduceSchedulable().getDemand());
+
+ // Submit a job to poolA by specifying pool and not mapred.fairscheduler.pool
+ JobConf jobConf2 = new JobConf(conf);
+ jobConf2.setNumMapTasks(17);
+ jobConf2.setNumReduceTasks(19);
+ jobConf2.set(POOL_PROPERTY, "poolA");
+ JobInProgress job4 = new FakeJobInProgress(jobConf2, taskTrackerManager,
+ null, UtilsForTests.getJobTracker());
+ job4.getStatus().setRunState(JobStatus.RUNNING);
+ taskTrackerManager.submitJob(job4);
+
+ assertEquals(1, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(33, poolA.getMapSchedulable().getDemand());
+ assertEquals(39, poolA.getReduceSchedulable().getDemand());
+ }
private void advanceTime(long time) {
clock.advance(time);
diff --git a/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java b/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
index 606670f..247806a 100644
--- a/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
+++ b/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
@@ -63,20 +63,41 @@
private class InspectableIterable implements Iterable<VALUEIN> {
private Iterable<VALUEIN> base;
private VALUEIN lastVal;
+ private boolean used; // if true, don't re-iterate.
public InspectableIterable(final Iterable<VALUEIN> baseCollection) {
this.base = baseCollection;
}
public Iterator<VALUEIN> iterator() {
- return new InspectableIterator(this.base.iterator());
+ if (used) {
+ return new NullIterator();
+ } else {
+ used = true;
+ return new InspectableIterator(this.base.iterator());
+ }
}
public VALUEIN getLastVal() {
return lastVal;
}
- private class InspectableIterator
+ private class NullIterator
+ extends ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.ValueIterator
+ implements Iterator<VALUEIN> {
+ public VALUEIN next() {
+ return null;
+ }
+
+ public boolean hasNext() {
+ return false;
+ }
+
+ public void remove() {
+ }
+ }
+
+ private class InspectableIterator
extends ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.ValueIterator
implements Iterator<VALUEIN> {
private Iterator<VALUEIN> iter;
diff --git a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java
index 381f136..3174e41 100644
--- a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java
+++ b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java
@@ -22,13 +22,17 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
@@ -222,5 +226,45 @@
// expected.
}
}
+
+ /**
+ * Reducer that counts its values twice; the second iteration
+ * according to mapreduce semantics should be empty.
+ */
+ private static class DoubleIterReducer<K, V>
+ extends MapReduceBase implements Reducer<K, V, K, LongWritable> {
+ public void reduce(K key, Iterator<V> values,
+ OutputCollector<K, LongWritable> out, Reporter r) throws IOException {
+ long count = 0;
+
+ while (values.hasNext()) {
+ count++;
+ values.next();
+ }
+
+ // This time around, iteration should yield no values.
+ while (values.hasNext()) {
+ count++;
+ values.next();
+ }
+ out.collect(key, new LongWritable(count));
+ }
+ }
+
+ @Test
+ public void testDoubleIteration() {
+ reducer = new DoubleIterReducer<Text, LongWritable>();
+ driver = new ReduceDriver<Text, LongWritable, Text, LongWritable>(
+ reducer);
+
+ driver
+ .withInputKey(new Text("foo"))
+ .withInputValue(new LongWritable(1))
+ .withInputValue(new LongWritable(1))
+ .withInputValue(new LongWritable(1))
+ .withInputValue(new LongWritable(1))
+ .withOutput(new Text("foo"), new LongWritable(4))
+ .runTest();
+ }
}
diff --git a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
index dc51e83..e694264 100644
--- a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
+++ b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
@@ -223,5 +223,43 @@
// expected.
}
}
+
+ /**
+ * Reducer that counts its values twice; the second iteration
+ * according to mapreduce semantics should be empty.
+ */
+ private static class DoubleIterReducer<K, V>
+ extends Reducer<K, V, K, LongWritable> {
+ public void reduce(K key, Iterable<V> values, Context c)
+ throws IOException, InterruptedException {
+ long count = 0;
+
+ for (V val : values) {
+ count++;
+ }
+
+ // This time around, iteration should yield no values.
+ for (V val : values) {
+ count++;
+ }
+ c.write(key, new LongWritable(count));
+ }
+ }
+
+ @Test
+ public void testDoubleIteration() {
+ reducer = new DoubleIterReducer<Text, LongWritable>();
+ driver = new ReduceDriver<Text, LongWritable, Text, LongWritable>(
+ reducer);
+
+ driver
+ .withInputKey(new Text("foo"))
+ .withInputValue(new LongWritable(1))
+ .withInputValue(new LongWritable(1))
+ .withInputValue(new LongWritable(1))
+ .withInputValue(new LongWritable(1))
+ .withOutput(new Text("foo"), new LongWritable(4))
+ .runTest();
+ }
}
diff --git a/src/contrib/sqoop/build.xml b/src/contrib/sqoop/build.xml
index bdf45a0..b5717dd 100644
--- a/src/contrib/sqoop/build.xml
+++ b/src/contrib/sqoop/build.xml
@@ -149,7 +149,7 @@
<fileset dir="${src.test}" includes="**/${testcase}.java"/>
</batchtest>
</junit>
- <fail if="tests.failed">Tests failed!</fail>
+ <antcall target="checkfailure"/>
</target>
<target name="doc">
diff --git a/src/docs/src/documentation/content/xdocs/fair_scheduler.xml b/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
index 7124f41..b5abc4d 100644
--- a/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
+++ b/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
@@ -163,6 +163,15 @@
</tr>
<tr>
<td>
+ mapred.fairscheduler.pool
+ </td>
+ <td>
+ Specify the pool that a job belongs in.
+ If this is specified then mapred.fairscheduler.poolnameproperty is ignored.
+ </td>
+ </tr>
+ <tr>
+ <td>
mapred.fairscheduler.poolnameproperty
</td>
<td>
@@ -171,17 +180,8 @@
(i.e. one pool for each user).
Another useful value is <em>group.name</em> to create a
pool per Unix group.
- Finally, a common setting is to use a non-standard property
- such as <em>pool.name</em> as the pool name property, and make it
- default to <em>mapreduce.job.mapreduce.job.user.name</em> through the following setting:<br/>
- <code><property></code><br/>
- <code> <name>pool.name</name></code><br/>
- <code> <value>${mapreduce.job.mapreduce.job.user.name}</value></code><br/>
- <code></property></code><br/>
- This allows you to specify the pool name explicitly for some jobs
- through the jobconf (e.g. passing <em>-Dpool.name=<name></em>
- to <em>bin/hadoop jar</em>, while having the default be the user's
- pool.
+ mapred.fairscheduler. poolnameproperty is used only for jobs in which
+ mapred.fairscheduler.pool is not explicitly set.
</td>
</tr>
<tr>
diff --git a/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java b/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
index 078046e..667b97d 100644
--- a/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
+++ b/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
@@ -64,7 +64,7 @@
public void commitJob(JobContext context) throws IOException {
// delete the _temporary folder in the output folder
- cleanup(context);
+ cleanupJob(context);
// check if the output-dir marking is required
if (shouldMarkOutputDir(context.getJobConf())) {
// create a _success file in the output folder
@@ -85,9 +85,10 @@
fileSys.create(filePath).close();
}
}
-
- // Deletes the _temporary folder in the job's output dir.
- private void cleanup(JobContext context) throws IOException {
+
+ @Override
+ @Deprecated
+ public void cleanupJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
// do the clean up of temporary directory
Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -107,7 +108,7 @@
public void abortJob(JobContext context, int runState)
throws IOException {
// simply delete the _temporary dir from the o/p folder of the job
- cleanup(context);
+ cleanupJob(context);
}
public void setupTask(TaskAttemptContext context) throws IOException {
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 4536089..8915690 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -3414,9 +3414,9 @@
return getMapTaskReports(JobID.downgrade(jobid));
case REDUCE :
return getReduceTaskReports(JobID.downgrade(jobid));
- case JOB_SETUP:
+ case JOB_CLEANUP:
return getCleanupTaskReports(JobID.downgrade(jobid));
- case JOB_CLEANUP :
+ case JOB_SETUP :
return getSetupTaskReports(JobID.downgrade(jobid));
}
return new TaskReport[0];
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
index 15348ff..0d9b8f9 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
@@ -119,17 +119,9 @@
return splits;
}
- @SuppressWarnings("unchecked")
+ @Override
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
-
- // Find the InputFormat and then the RecordReader from the
- // TaggedInputSplit.
- TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
- InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
- .newInstance(taggedInputSplit.getInputFormatClass(),
- context.getConfiguration());
- return inputFormat.createRecordReader(taggedInputSplit.getInputSplit(),
- context);
+ return new DelegatingRecordReader<K, V>(split, context);
}
}
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
new file mode 100644
index 0000000..f0d060e
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This is a delegating RecordReader, which delegates the functionality to the
+ * underlying record reader in {@link TaggedInputSplit}
+ */
+public class DelegatingRecordReader<K, V> extends RecordReader<K, V> {
+ RecordReader<K, V> originalRR;
+
+ /**
+ * Constructs the DelegatingRecordReader.
+ *
+ * @param split TaggegInputSplit object
+ * @param context TaskAttemptContext object
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @SuppressWarnings("unchecked")
+ public DelegatingRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // Find the InputFormat and then the RecordReader from the
+ // TaggedInputSplit.
+ TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+ InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+ .newInstance(taggedInputSplit.getInputFormatClass(), context
+ .getConfiguration());
+ originalRR = inputFormat.createRecordReader(taggedInputSplit
+ .getInputSplit(), context);
+ }
+
+ @Override
+ public void close() throws IOException {
+ originalRR.close();
+ }
+
+ @Override
+ public K getCurrentKey() throws IOException, InterruptedException {
+ return originalRR.getCurrentKey();
+ }
+
+ @Override
+ public V getCurrentValue() throws IOException, InterruptedException {
+ return originalRR.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return originalRR.getProgress();
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ originalRR.initialize(((TaggedInputSplit) split).getInputSplit(), context);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return originalRR.nextKeyValue();
+ }
+
+}
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 5409bba..0e61f4d 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -107,14 +107,15 @@
*/
public void commitJob(JobContext context) throws IOException {
// delete the _temporary folder and create a _done file in the o/p folder
- cleanup(context);
+ cleanupJob(context);
if (shouldMarkOutputDir(context.getConfiguration())) {
markOutputDirSuccessful(context);
}
}
-
- // Delete the _temporary folder in the output dir.
- private void cleanup(JobContext context) throws IOException {
+
+ @Override
+ @Deprecated
+ public void cleanupJob(JobContext context) throws IOException {
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
@@ -134,7 +135,7 @@
public void abortJob(JobContext context, JobStatus.State state)
throws IOException {
// delete the _temporary folder
- cleanup(context);
+ cleanupJob(context);
}
/**
diff --git a/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java b/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
index 16cb069..c3ced79 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
@@ -228,12 +228,19 @@
/** Verify that the given process id is same as its process group id.
* @param pidStr Process id of the to-be-verified-process
+ * @param procfsDir Procfs root dir
*/
- private static boolean assertPidPgrpidForMatch(String pidStr) {
+ static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
Integer pId = Integer.parseInt(pidStr);
// Get information for this process
ProcessInfo pInfo = new ProcessInfo(pId);
- pInfo = constructProcessInfo(pInfo);
+ pInfo = constructProcessInfo(pInfo, procfsDir);
+ if (pInfo == null) {
+ // process group leader may have finished execution, but we still need to
+ // kill the subProcesses in the process group.
+ return true;
+ }
+
//make sure that pId and its pgrpId match
if (!pInfo.getPgrpId().equals(pId)) {
LOG.warn("Unexpected: Process with PID " + pId +
@@ -258,7 +265,7 @@
boolean inBackground)
throws IOException {
// Make sure that the pid given is a process group leader
- if (!assertPidPgrpidForMatch(pgrpId)) {
+ if (!checkPidPgrpidForMatch(pgrpId, PROCFS)) {
throw new IOException("Process with PID " + pgrpId +
" is not a process group leader.");
}
@@ -391,15 +398,6 @@
}
/**
- *
- * Construct the ProcessInfo using the process' PID and procfs and return the
- * same. Returns null on failing to read from procfs,
- */
- private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
- return constructProcessInfo(pinfo, PROCFS);
- }
-
- /**
* Construct the ProcessInfo using the process' PID and procfs rooted at the
* specified directory and return the same. It is provided mainly to assist
* testing purposes.
@@ -422,6 +420,8 @@
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// The process vanished in the interim!
+ LOG.warn("The process " + pinfo.getPid()
+ + " may have finished in the interim.");
return ret;
}
@@ -436,6 +436,11 @@
.parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
.parseLong(m.group(7)));
}
+ else {
+ LOG.warn("Unexpected: procfs stat file is not in the expected format"
+ + " for process with pid " + pinfo.getPid());
+ ret = null;
+ }
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
ret = null;
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
index 7f93601..61d4a5a 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
@@ -41,6 +41,8 @@
private static String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp") + "/"
+ "test-job-cleanup").toString();
+ private static final String CUSTOM_CLEANUP_FILE_NAME =
+ "_custom_cleanup";
private static final String ABORT_KILLED_FILE_NAME =
"_custom_abort_killed";
private static final String ABORT_FAILED_FILE_NAME =
@@ -86,6 +88,21 @@
}
/**
+ * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+ * making a _failed/_killed in the output folder
+ */
+ static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ System.err.println("---- HERE ----");
+ JobConf conf = context.getJobConf();
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ FileSystem fs = outputPath.getFileSystem(conf);
+ fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+ }
+ }
+
+ /**
* Committer with abort making a _failed/_killed in the output folder
*/
static class CommitterWithCustomAbort extends FileOutputCommitter {
@@ -263,4 +280,26 @@
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
ABORT_FAILED_FILE_NAME});
}
+
+ /**
+ * Test if a failed job with custom committer runs the deprecated
+ * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
+ * compatibility testing.
+ */
+ public void testCustomCleanup() throws IOException {
+ // check with a successful job
+ testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {});
+
+ // check with a failed job
+ testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+ // check with a killed job
+ testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+ }
}
\ No newline at end of file
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
index 6a218f3..903ff7e 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
@@ -17,20 +17,128 @@
*/
package org.apache.hadoop.mapreduce.lib.input;
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.Map;
-import junit.framework.TestCase;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.junit.Before;
+import org.junit.Test;
/**
* @see TestDelegatingInputFormat
*/
-public class TestMultipleInputs extends TestCase {
+public class TestMultipleInputs extends HadoopTestCase {
+
+ public TestMultipleInputs() throws IOException {
+ super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+ }
+
+ private static final Path ROOT_DIR = new Path("testing/mo");
+ private static final Path IN1_DIR = new Path(ROOT_DIR, "input1");
+ private static final Path IN2_DIR = new Path(ROOT_DIR, "input2");
+ private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
+
+ private Path getDir(Path dir) {
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data", "/tmp")
+ .replace(' ', '+');
+ dir = new Path(localPathRoot, dir);
+ }
+ return dir;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Path rootDir = getDir(ROOT_DIR);
+ Path in1Dir = getDir(IN1_DIR);
+ Path in2Dir = getDir(IN2_DIR);
+
+ Configuration conf = createJobConf();
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(rootDir, true);
+ if (!fs.mkdirs(in1Dir)) {
+ throw new IOException("Mkdirs failed to create " + in1Dir.toString());
+ }
+ if (!fs.mkdirs(in2Dir)) {
+ throw new IOException("Mkdirs failed to create " + in2Dir.toString());
+ }
+ }
+
+ @Test
+ public void testDoMultipleInputs() throws IOException {
+ Path in1Dir = getDir(IN1_DIR);
+ Path in2Dir = getDir(IN2_DIR);
+
+ Path outDir = getDir(OUT_DIR);
+
+ Configuration conf = createJobConf();
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(outDir, true);
+
+ DataOutputStream file1 = fs.create(new Path(in1Dir, "part-0"));
+ file1.writeBytes("a\nb\nc\nd\ne");
+ file1.close();
+
+ // write tab delimited to second file because we're doing
+ // KeyValueInputFormat
+ DataOutputStream file2 = fs.create(new Path(in2Dir, "part-0"));
+ file2.writeBytes("a\tblah\nb\tblah\nc\tblah\nd\tblah\ne\tblah");
+ file2.close();
+
+ Job job = new Job(conf);
+ job.setJobName("mi");
+
+ MultipleInputs.addInputPath(job, in1Dir, TextInputFormat.class,
+ MapClass.class);
+ MultipleInputs.addInputPath(job, in2Dir, KeyValueTextInputFormat.class,
+ KeyValueMapClass.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setReducerClass(ReducerClass.class);
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ boolean success = false;
+ try {
+ success = job.waitForCompletion(true);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ClassNotFoundException instante) {
+ throw new RuntimeException(instante);
+ }
+ if (!success)
+ throw new RuntimeException("Job failed!");
+
+ // copy bytes a bunch of times for the ease of readLine() - whatever
+ BufferedReader output = new BufferedReader(new InputStreamReader(fs
+ .open(new Path(outDir, "part-r-00000"))));
+ // reducer should have counted one key from each file
+ assertTrue(output.readLine().equals("a 2"));
+ assertTrue(output.readLine().equals("b 2"));
+ assertTrue(output.readLine().equals("c 2"));
+ assertTrue(output.readLine().equals("d 2"));
+ assertTrue(output.readLine().equals("e 2"));
+ }
+
@SuppressWarnings("unchecked")
public void testAddInputPathWithFormat() throws IOException {
final Job conf = new Job();
@@ -50,7 +158,7 @@
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
- KeyValueTextInputFormat.class, MapClass2.class);
+ KeyValueTextInputFormat.class, KeyValueMapClass.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
@@ -60,12 +168,42 @@
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
- assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+ assertEquals(KeyValueMapClass.class, maps.get(new Path("/bar")));
}
- static class MapClass extends Mapper<String, String, String, String> {
+ static final Text blah = new Text("blah");
+
+ // these 3 classes do a reduce side join with 2 different mappers
+ static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
+ // receives "a", "b", "c" as values
+ @Override
+ public void map(LongWritable key, Text value, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(value, blah);
+ }
}
- static class MapClass2 extends MapClass {
+ static class KeyValueMapClass extends Mapper<Text, Text, Text, Text> {
+ // receives "a", "b", "c" as keys
+ @Override
+ public void map(Text key, Text value, Context ctx) throws IOException,
+ InterruptedException {
+ ctx.write(key, blah);
+ }
}
+
+ static class ReducerClass extends Reducer<Text, Text, NullWritable, Text> {
+ // should receive 2 rows for each key
+ int count = 0;
+
+ @Override
+ public void reduce(Text key, Iterable<Text> values, Context ctx)
+ throws IOException, InterruptedException {
+ count = 0;
+ for (Text value : values)
+ count++;
+ ctx.write(NullWritable.get(), new Text(key.toString() + " " + count));
+ }
+ }
+
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
new file mode 100644
index 0000000..9beaa77
--- /dev/null
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A JUnit test to test Map-Reduce job committer.
+ */
+public class TestJobOutputCommitter extends HadoopTestCase {
+
+ public TestJobOutputCommitter() throws IOException {
+ super(CLUSTER_MR, LOCAL_FS, 1, 1);
+ }
+
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
+ "test.build.data", "/tmp")
+ + "/" + "test-job-cleanup").toString();
+ private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
+ private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
+ private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
+ private static Path inDir = new Path(TEST_ROOT_DIR, "test-input");
+ private static int outDirs = 0;
+ private FileSystem fs;
+ private Configuration conf = null;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ conf = createJobConf();
+ fs = getFileSystem();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ fs.delete(new Path(TEST_ROOT_DIR), true);
+ super.tearDown();
+ }
+
+ /**
+ * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+ * making a _failed/_killed in the output folder
+ */
+ static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+ public CommitterWithCustomDeprecatedCleanup(Path outputPath,
+ TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ }
+
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ System.err.println("---- HERE ----");
+ Path outputPath = FileOutputFormat.getOutputPath(context);
+ FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+ fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+ }
+ }
+
+ /**
+ * Committer with abort making a _failed/_killed in the output folder
+ */
+ static class CommitterWithCustomAbort extends FileOutputCommitter {
+ public CommitterWithCustomAbort(Path outputPath, TaskAttemptContext context)
+ throws IOException {
+ super(outputPath, context);
+ }
+
+ @Override
+ public void abortJob(JobContext context, JobStatus.State state)
+ throws IOException {
+ Path outputPath = FileOutputFormat.getOutputPath(context);
+ FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+ String fileName =
+ (state.equals(JobStatus.State.FAILED)) ? ABORT_FAILED_FILE_NAME
+ : ABORT_KILLED_FILE_NAME;
+ fs.create(new Path(outputPath, fileName)).close();
+ }
+ }
+
+ private Path getNewOutputDir() {
+ return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+ }
+
+ static class MyOutputFormatWithCustomAbort<K, V>
+ extends TextOutputFormat<K, V> {
+ private OutputCommitter committer = null;
+
+ public synchronized OutputCommitter getOutputCommitter(
+ TaskAttemptContext context) throws IOException {
+ if (committer == null) {
+ Path output = getOutputPath(context);
+ committer = new CommitterWithCustomAbort(output, context);
+ }
+ return committer;
+ }
+ }
+
+ static class MyOutputFormatWithCustomCleanup<K, V>
+ extends TextOutputFormat<K, V> {
+ private OutputCommitter committer = null;
+
+ public synchronized OutputCommitter getOutputCommitter(
+ TaskAttemptContext context) throws IOException {
+ if (committer == null) {
+ Path output = getOutputPath(context);
+ committer = new CommitterWithCustomDeprecatedCleanup(output, context);
+ }
+ return committer;
+ }
+ }
+
+ // run a job with 1 map and let it run to completion
+ private void testSuccessfulJob(String filename,
+ Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+ Path outDir = getNewOutputDir();
+ Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0);
+ job.setOutputFormatClass(output);
+
+ assertTrue("Job failed!", job.waitForCompletion(true));
+
+ Path testFile = new Path(outDir, filename);
+ assertTrue("Done file missing for job " + job.getID(), fs.exists(testFile));
+
+ // check if the files from the missing set exists
+ for (String ex : exclude) {
+ Path file = new Path(outDir, ex);
+ assertFalse("File " + file + " should not be present for successful job "
+ + job.getID(), fs.exists(file));
+ }
+ }
+
+ // run a job for which all the attempts simply fail.
+ private void testFailedJob(String fileName,
+ Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+ Path outDir = getNewOutputDir();
+ Job job = MapReduceTestUtil.createFailJob(conf, outDir, inDir);
+ job.setOutputFormatClass(output);
+
+ assertFalse("Job did not fail!", job.waitForCompletion(true));
+
+ if (fileName != null) {
+ Path testFile = new Path(outDir, fileName);
+ assertTrue("File " + testFile + " missing for failed job " + job.getID(),
+ fs.exists(testFile));
+ }
+
+ // check if the files from the missing set exists
+ for (String ex : exclude) {
+ Path file = new Path(outDir, ex);
+ assertFalse("File " + file + " should not be present for failed job "
+ + job.getID(), fs.exists(file));
+ }
+ }
+
+ // run a job which gets stuck in mapper and kill it.
+ private void testKilledJob(String fileName,
+ Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+ Path outDir = getNewOutputDir();
+ Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
+ job.setOutputFormatClass(output);
+
+ job.submit();
+
+ // wait for the setup to be completed
+ while (job.setupProgress() != 1.0f) {
+ UtilsForTests.waitFor(100);
+ }
+
+ job.killJob(); // kill the job
+
+ assertFalse("Job did not get kill", job.waitForCompletion(true));
+
+ if (fileName != null) {
+ Path testFile = new Path(outDir, fileName);
+ assertTrue("File " + testFile + " missing for job " + job.getID(), fs
+ .exists(testFile));
+ }
+
+ // check if the files from the missing set exists
+ for (String ex : exclude) {
+ Path file = new Path(outDir, ex);
+ assertFalse("File " + file + " should not be present for killed job "
+ + job.getID(), fs.exists(file));
+ }
+ }
+
+ /**
+ * Test default cleanup/abort behavior
+ *
+ * @throws Exception
+ */
+ public void testDefaultCleanupAndAbort() throws Exception {
+ // check with a successful job
+ testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ TextOutputFormat.class, new String[] {});
+
+ // check with a failed job
+ testFailedJob(null, TextOutputFormat.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+
+ // check default abort job kill
+ testKilledJob(null, TextOutputFormat.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+ }
+
+ /**
+ * Test if a failed job with custom committer runs the abort code.
+ *
+ * @throws Exception
+ */
+ public void testCustomAbort() throws Exception {
+ // check with a successful job
+ testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ MyOutputFormatWithCustomAbort.class,
+ new String[] {ABORT_FAILED_FILE_NAME,
+ ABORT_KILLED_FILE_NAME});
+
+ // check with a failed job
+ testFailedJob(ABORT_FAILED_FILE_NAME,
+ MyOutputFormatWithCustomAbort.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ ABORT_KILLED_FILE_NAME});
+
+ // check with a killed job
+ testKilledJob(ABORT_KILLED_FILE_NAME,
+ MyOutputFormatWithCustomAbort.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ ABORT_FAILED_FILE_NAME});
+ }
+
+ /**
+ * Test if a failed job with custom committer runs the deprecated
+ * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
+ * compatibility testing.
+ * @throws Exception
+ */
+ public void testCustomCleanup() throws Exception {
+ // check with a successful job
+ testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
+ MyOutputFormatWithCustomCleanup.class,
+ new String[] {});
+
+ // check with a failed job
+ testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
+ MyOutputFormatWithCustomCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+ // check with a killed job
+ testKilledJob(CUSTOM_CLEANUP_FILE_NAME,
+ MyOutputFormatWithCustomCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+ }
+}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java b/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
index b7c3a1e..cc78506 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
@@ -423,6 +423,34 @@
}
/**
+ * Verifies ProcfsBasedProcessTree.checkPidPgrpidForMatch() in case of
+ * 'constructProcessInfo() returning null' by not writing stat file for the
+ * mock process
+ * @throws IOException if there was a problem setting up the
+ * fake procfs directories or files.
+ */
+ public void testDestroyProcessTree() throws IOException {
+ // test process
+ String pid = "100";
+ // create the fake procfs root directory.
+ File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+ try {
+ setupProcfsRootDir(procfsRootDir);
+
+ // crank up the process tree class.
+ ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree(
+ pid, true, 100L, procfsRootDir.getAbsolutePath());
+
+ // Let us not create stat file for pid 100.
+ assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
+ pid, procfsRootDir.getAbsolutePath()));
+ } finally {
+ FileUtil.fullyDelete(procfsRootDir);
+ }
+ }
+
+ /**
* Test the correctness of process-tree dump.
*
* @throws IOException