catching up to trunk


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/branches/HDFS-641@834556 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index cee806e..0341670 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,9 @@
 
   IMPROVEMENTS
 
+    MAPREDUCE-707. Provide a jobconf property for explicitly assigning a job to 
+    a pool in the Fair Scheduler. (Alan Heirich via matei)
+
     MAPREDUCE-999. Improve Sqoop test speed and refactor tests.
     (Aaron Kimball via tomwhite)
 
@@ -66,6 +69,14 @@
     MAPREDUCE-1153. Fix tasktracker metrics when trackers are decommissioned.
     (sharad)
 
+    MAPREDUCE-1128. Fix MRUnit to prohibit iterating over values twice. (Aaron
+    Kimball via cdouglas)
+
+    MAPREDUCE-665. Move libhdfs to HDFS subproject. (Eli Collins via dhruba)
+
+    MAPREDUCE-1196. Fix FileOutputCommitter to use the deprecated cleanupJob
+    api correctly. (acmurthy)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
@@ -850,3 +861,19 @@
 
     MAPREDUCE-1038. Weave Mumak aspects only if related files have changed.
     (Aaron Kimball via cdouglas)
+
+    MAPREDUCE-1037. Continue running contrib tests if Sqoop tests fail. (Aaron
+    Kimball via cdouglas)
+
+    MAPREDUCE-1163. Remove unused, hard-coded paths from libhdfs. (Allen
+    Wittenauer via cdouglas)
+
+    MAPREDUCE-962. Fix a NullPointerException while killing task process 
+    trees. (Ravi Gummadi via yhemanth)
+
+    MAPREDUCE-1177. Correct setup/cleanup inversion in
+    JobTracker::getTaskReports. (Vinod Kumar Vavilapalli via cdouglas)
+
+    MAPREDUCE-1178. Fix ClassCastException in MultipleInputs by adding 
+    a DelegatingRecordReader. (Amareshwari Sriramadasu and Jay Booth 
+    via sharad)
diff --git a/build.xml b/build.xml
index 97b62fb..2696faa 100644
--- a/build.xml
+++ b/build.xml
@@ -48,7 +48,6 @@
   <property name="c++.utils.src" value="${c++.src}/utils"/>
   <property name="c++.pipes.src" value="${c++.src}/pipes"/>
   <property name="c++.examples.pipes.src" value="${examples.dir}/pipes"/>
-  <property name="c++.libhdfs.src" value="${c++.src}/libhdfs"/>
   <property name="librecordio.src" value="${c++.src}/librecordio"/>
   <property name="tools.src" value="${basedir}/src/tools"/>
 
@@ -73,7 +72,6 @@
   <property name="build.c++" value="${build.dir}/c++-build/${build.platform}"/>
   <property name="build.c++.utils" value="${build.c++}/utils"/>
   <property name="build.c++.pipes" value="${build.c++}/pipes"/>
-  <property name="build.c++.libhdfs" value="${build.c++}/libhdfs"/>
   <property name="build.c++.examples.pipes" 
             value="${build.c++}/examples/pipes"/>
   <property name="build.docs" value="${build.dir}/docs"/>
@@ -118,9 +116,6 @@
   <property name="test.mapred.commit.tests.file" value="${test.src.dir}/commit-tests" />
   <property name="test.mapred.all.tests.file" value="${test.src.dir}/all-tests" />
 
-  <property name="test.libhdfs.conf.dir" value="${c++.libhdfs.src}/tests/conf"/>
-  <property name="test.libhdfs.dir" value="${test.build.dir}/libhdfs"/>
-
   <property name="librecordio.test.dir" value="${test.build.dir}/librecordio"/>
   <property name="web.src.dir" value="${basedir}/src/web"/>
   <property name="src.webapps" value="${basedir}/src/webapps"/>
@@ -389,7 +384,7 @@
 
   <target name="compile-core" depends="clover, compile-mapred-classes, compile-c++" description="Compile core only"/> 
 
-  <target name="compile-contrib" depends="compile-core,tools,compile-c++-libhdfs">
+  <target name="compile-contrib" depends="compile-core,tools">
      <subant target="compile">
         <property name="version" value="${version}"/>
         <property name="hadoop-core.version" value="${hadoop-core.version}"/>
@@ -648,7 +643,7 @@
     <fail if="testsfailed">Tests failed!</fail>
   </target>
 
-  <target name="test" depends="test-c++-libhdfs, jar-test, test-core" description="Run all unit tests">
+  <target name="test" depends="jar-test, test-core" description="Run all unit tests">
     <subant target="test-contrib">
       <fileset file="${basedir}/build.xml"/>
      </subant>
@@ -1148,26 +1143,6 @@
      </subant>  	
   </target>
 	
- <target name="test-c++-libhdfs" depends="compile-c++-libhdfs, compile-core" if="islibhdfs">
-    <delete dir="${test.libhdfs.dir}"/>
-    <mkdir dir="${test.libhdfs.dir}"/>
-    <mkdir dir="${test.libhdfs.dir}/logs"/>
-    <mkdir dir="${test.libhdfs.dir}/hdfs/name"/>
-
-    <exec dir="${build.c++.libhdfs}" executable="${make.cmd}" failonerror="true">
-        <env key="OS_NAME" value="${os.name}"/>
-        <env key="OS_ARCH" value="${os.arch}"/>
-        <env key="JVM_ARCH" value="${jvm.arch}"/>
-        <env key="LIBHDFS_BUILD_DIR" value="${build.c++.libhdfs}"/>
-        <env key="HADOOP_HOME" value="${basedir}"/>
-        <env key="HADOOP_CONF_DIR" value="${test.libhdfs.conf.dir}"/>
-        <env key="HADOOP_LOG_DIR" value="${test.libhdfs.dir}/logs"/>
-        <env key="LIBHDFS_SRC_DIR" value="${c++.libhdfs.src}"/>
-        <env key="LIBHDFS_INSTALL_DIR" value="${install.c++}/lib"/>  
-        <env key="LIB_DIR" value="${common.ivy.lib.dir}"/>
-		<arg value="test"/>
-    </exec>
-  </target>
 
 <!-- ================================================================== -->
 <!-- librecordio targets.                                               -->
@@ -1220,16 +1195,8 @@
           searchpath="yes" failonerror="yes">
        <arg value="-if"/>
     </exec>
-    <antcall target="create-c++-configure-libhdfs"/>
-  </target>
+   </target>
    
-  <target name="create-c++-configure-libhdfs" depends="check-c++-libhdfs" if="islibhdfs">
-    <exec executable="autoreconf" dir="${c++.libhdfs.src}" 
-          searchpath="yes" failonerror="yes">
-       <arg value="-if"/>
-    </exec>
-  </target>
-
   <target name="check-c++-makefiles" depends="init" if="compile.c++">
     <condition property="need.c++.utils.makefile">
        <not> <available file="${build.c++.utils}/Makefile"/> </not>
@@ -1242,33 +1209,6 @@
     </condition>
   </target>
 
-  <target name="check-c++-libhdfs">
-    <condition property="islibhdfs">
-      <and>
-        <isset property="compile.c++"/>
-        <isset property="libhdfs"/>
-      </and>
-    </condition>
-  </target>
-
-  <target name="check-c++-makefile-libhdfs" depends="init,check-c++-libhdfs" if="islibhdfs">
-    <condition property="need.c++.libhdfs.makefile">
-       <not> <available file="${build.c++.libhdfs}/Makefile"/> </not>
-    </condition>
-  </target>
-
-  <target name="create-c++-libhdfs-makefile" depends="check-c++-makefile-libhdfs" 
-                                           if="need.c++.libhdfs.makefile">
-    <mkdir dir="${build.c++.libhdfs}"/>
-    <chmod file="${c++.libhdfs.src}/configure" perm="ugo+x"/>
-    <exec executable="${c++.libhdfs.src}/configure" dir="${build.c++.libhdfs}"
-          failonerror="yes">
-      <env key="ac_cv_func_malloc_0_nonnull" value="yes"/>
-      <env key="JVM_ARCH" value="${jvm.arch}"/>
-      <arg value="--prefix=${install.c++}"/>
-    </exec>
-  </target>
-
   <target name="create-c++-utils-makefile" depends="check-c++-makefiles" 
                                            if="need.c++.utils.makefile">
     <mkdir dir="${build.c++.utils}"/>
@@ -1335,15 +1275,6 @@
   <target name="compile-c++-examples" 
           depends="compile-c++-examples-pipes"/>
 
-  <target name="compile-c++-libhdfs" depends="create-c++-libhdfs-makefile" if="islibhdfs">
-    <exec executable="${make.cmd}" dir="${build.c++.libhdfs}" searchpath="yes"
-          failonerror="yes">
-      <env key="ac_cv_func_malloc_0_nonnull" value="yes"/>
-      <env key="JVM_ARCH" value="${jvm.arch}"/>
-      <arg value="install"/>
-    </exec>
-  </target>
-
  <target name="clover" depends="clover.setup, clover.info" description="Instrument the Unit tests using Clover.  To use, specify -Dclover.home=&lt;base of clover installation&gt; -Drun.clover=true on the command line."/>
 
 <target name="clover.setup" if="clover.enabled">
diff --git a/src/c++/libhdfs/hdfsJniHelper.h b/src/c++/libhdfs/hdfsJniHelper.h
index 017e801..442eedf 100644
--- a/src/c++/libhdfs/hdfsJniHelper.h
+++ b/src/c++/libhdfs/hdfsJniHelper.h
@@ -30,8 +30,6 @@
 
 #define PATH_SEPARATOR ':'
 
-#define USER_CLASSPATH "/home/y/libexec/hadoop/conf:/home/y/libexec/hadoop/lib/hadoop-0.1.0.jar"
-
 
 /** Denote the method we want to invoke as STATIC or INSTANCE */
 typedef enum {
diff --git a/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java b/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
index 06561c5..f3fde46 100644
--- a/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
+++ b/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
@@ -61,6 +61,8 @@
    */
   public static final long ALLOC_RELOAD_WAIT = 5 * 1000; 
 
+  public static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
+
   private final FairScheduler scheduler;
   
   // Map and reduce minimum allocations for each pool
@@ -391,7 +393,7 @@
    */
   public synchronized void setPool(JobInProgress job, String pool) {
     removeJob(job);
-    job.getJobConf().set(poolNameProperty, pool);
+    job.getJobConf().set(EXPLICIT_POOL_PROPERTY, pool);
     addJob(job);
   }
 
@@ -403,13 +405,16 @@
   }
   
   /**
-   * Get the pool name for a JobInProgress from its configuration. This uses
-   * the "project" property in the jobconf by default, or the property set with
-   * "mapred.fairscheduler.poolnameproperty".
+   * Get the pool name for a JobInProgress from its configuration.  This uses
+   * the value of mapred.fairscheduler.pool if specified, otherwise the value 
+   * of the property named in mapred.fairscheduler.poolnameproperty if that is
+   * specified.  Otherwise if neither is specified it uses the "user.name" property 
+   * in the jobconf by default.
    */
   public String getPoolName(JobInProgress job) {
     Configuration conf = job.getJobConf();
-    return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
+    return conf.get(EXPLICIT_POOL_PROPERTY,
+      conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
   }
 
   /**
diff --git a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
index 052e46c..ef446b6 100644
--- a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
+++ b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
@@ -52,6 +52,7 @@
       "test-pools").getAbsolutePath();
   
   private static final String POOL_PROPERTY = "pool";
+  private static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
   
   private static int jobCounter;
   
@@ -2471,6 +2472,87 @@
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
   }
+
+  /**
+   * This test uses the mapred.fairscheduler.pool property to assign jobs to pools.
+   */
+  public void testPoolAssignment() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"default\">");
+    out.println("<schedulingMode>fair</schedulingMode>");
+    out.println("</pool>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<schedulingMode>fair</schedulingMode>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    Pool defaultPool = scheduler.getPoolManager().getPool("default");
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
+ 
+    // Submit a job to the default pool.  All specifications take default values.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 3);
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(0,    poolA.getMapSchedulable().getDemand());
+    assertEquals(0,    poolA.getReduceSchedulable().getDemand());
+
+    // Submit a job to the default pool and move it to poolA using setPool.
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 7);
+
+    assertEquals(6,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(10,   defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(0,    poolA.getMapSchedulable().getDemand());
+    assertEquals(0,    poolA.getReduceSchedulable().getDemand());
+
+    scheduler.getPoolManager().setPool(job2, "poolA");
+    assertEquals("poolA", scheduler.getPoolManager().getPoolName(job2));
+
+    defaultPool.getMapSchedulable().updateDemand();
+    defaultPool.getReduceSchedulable().updateDemand();
+    poolA.getMapSchedulable().updateDemand();
+    poolA.getReduceSchedulable().updateDemand();
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(5,    poolA.getMapSchedulable().getDemand());
+    assertEquals(7,    poolA.getReduceSchedulable().getDemand());
+
+    // Submit a job to poolA by specifying mapred.fairscheduler.pool
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setNumMapTasks(11);
+    jobConf.setNumReduceTasks(13);
+    jobConf.set(POOL_PROPERTY, "nonsense"); // test that this is overridden
+    jobConf.set(EXPLICIT_POOL_PROPERTY, "poolA");
+    JobInProgress job3 = new FakeJobInProgress(jobConf, taskTrackerManager,
+        null, UtilsForTests.getJobTracker());
+    job3.getStatus().setRunState(JobStatus.RUNNING);
+    taskTrackerManager.submitJob(job3);
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(16,   poolA.getMapSchedulable().getDemand());
+    assertEquals(20,   poolA.getReduceSchedulable().getDemand());
+
+    // Submit a job to poolA by specifying pool and not mapred.fairscheduler.pool
+    JobConf jobConf2 = new JobConf(conf);
+    jobConf2.setNumMapTasks(17);
+    jobConf2.setNumReduceTasks(19);
+    jobConf2.set(POOL_PROPERTY, "poolA");
+    JobInProgress job4 = new FakeJobInProgress(jobConf2, taskTrackerManager,
+        null, UtilsForTests.getJobTracker());
+    job4.getStatus().setRunState(JobStatus.RUNNING);
+    taskTrackerManager.submitJob(job4);
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(33,   poolA.getMapSchedulable().getDemand());
+    assertEquals(39,   poolA.getReduceSchedulable().getDemand());
+  }
   
   private void advanceTime(long time) {
     clock.advance(time);
diff --git a/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java b/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
index 606670f..247806a 100644
--- a/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
+++ b/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
@@ -63,20 +63,41 @@
   private class InspectableIterable implements Iterable<VALUEIN> {
     private Iterable<VALUEIN> base;
     private VALUEIN lastVal;
+    private boolean used; // if true, don't re-iterate.
 
     public InspectableIterable(final Iterable<VALUEIN> baseCollection) {
       this.base = baseCollection;
     }
 
     public Iterator<VALUEIN> iterator() {
-      return new InspectableIterator(this.base.iterator());
+      if (used) {
+        return new NullIterator();
+      } else {
+        used = true;
+        return new InspectableIterator(this.base.iterator());
+      }
     }
 
     public VALUEIN getLastVal() {
       return lastVal;
     }
 
-    private class InspectableIterator 
+    private class NullIterator
+        extends ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.ValueIterator
+        implements Iterator<VALUEIN> {
+      public VALUEIN next() {
+        return null;
+      }
+
+      public boolean hasNext() {
+        return false;
+      }
+
+      public void remove() {
+      }
+    }
+
+    private class InspectableIterator
         extends ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.ValueIterator
         implements Iterator<VALUEIN> {
       private Iterator<VALUEIN> iter;
diff --git a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java
index 381f136..3174e41 100644
--- a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java
+++ b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java
@@ -22,13 +22,17 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.junit.Before;
@@ -222,5 +226,45 @@
       // expected.
     }
   }
+
+  /**
+   * Reducer that counts its values twice; the second iteration
+   * according to mapreduce semantics should be empty.
+   */
+  private static class DoubleIterReducer<K, V>
+      extends MapReduceBase implements Reducer<K, V, K, LongWritable> {
+    public void reduce(K key, Iterator<V> values,
+        OutputCollector<K, LongWritable> out, Reporter r) throws IOException {
+      long count = 0;
+
+      while (values.hasNext()) {
+        count++;
+        values.next();
+      }
+
+      // This time around, iteration should yield no values.
+      while (values.hasNext()) {
+        count++;
+        values.next();
+      }
+      out.collect(key, new LongWritable(count));
+    }
+  }
+
+  @Test
+  public void testDoubleIteration() {
+    reducer = new DoubleIterReducer<Text, LongWritable>();
+    driver = new ReduceDriver<Text, LongWritable, Text, LongWritable>(
+        reducer);
+
+    driver
+        .withInputKey(new Text("foo"))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withOutput(new Text("foo"), new LongWritable(4))
+        .runTest();
+  }
 }
 
diff --git a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
index dc51e83..e694264 100644
--- a/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
+++ b/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
@@ -223,5 +223,43 @@
       // expected.
     }
   }
+
+  /**
+   * Reducer that counts its values twice; the second iteration
+   * according to mapreduce semantics should be empty.
+   */
+  private static class DoubleIterReducer<K, V>
+      extends Reducer<K, V, K, LongWritable> {
+    public void reduce(K key, Iterable<V> values, Context c)
+        throws IOException, InterruptedException {
+      long count = 0;
+
+      for (V val : values) {
+        count++;
+      }
+
+      // This time around, iteration should yield no values.
+      for (V val : values) {
+        count++;
+      }
+      c.write(key, new LongWritable(count));
+    }
+  }
+
+  @Test
+  public void testDoubleIteration() {
+    reducer = new DoubleIterReducer<Text, LongWritable>();
+    driver = new ReduceDriver<Text, LongWritable, Text, LongWritable>(
+        reducer);
+
+    driver
+        .withInputKey(new Text("foo"))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withOutput(new Text("foo"), new LongWritable(4))
+        .runTest();
+  }
 }
 
diff --git a/src/contrib/sqoop/build.xml b/src/contrib/sqoop/build.xml
index bdf45a0..b5717dd 100644
--- a/src/contrib/sqoop/build.xml
+++ b/src/contrib/sqoop/build.xml
@@ -149,7 +149,7 @@
         <fileset dir="${src.test}" includes="**/${testcase}.java"/>
       </batchtest>
     </junit>
-    <fail if="tests.failed">Tests failed!</fail>
+    <antcall target="checkfailure"/>
   </target>
 
   <target name="doc">
diff --git a/src/docs/src/documentation/content/xdocs/fair_scheduler.xml b/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
index 7124f41..b5abc4d 100644
--- a/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
+++ b/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
@@ -163,6 +163,15 @@
           </tr>
           <tr>
           <td>
+            mapred.fairscheduler.pool
+          </td>
+          <td>
+            Specify the pool that a job belongs in.  
+            If this is specified then mapred.fairscheduler.poolnameproperty is ignored.
+          </td>
+          </tr>
+          <tr>
+          <td>
             mapred.fairscheduler.poolnameproperty
           </td>
           <td>
@@ -171,17 +180,8 @@
             (i.e. one pool for each user). 
             Another useful value is <em>group.name</em> to create a
             pool per Unix group.
-            Finally, a common setting is to use a non-standard property
-            such as <em>pool.name</em> as the pool name property, and make it
-            default to <em>mapreduce.job.mapreduce.job.user.name</em> through the following setting:<br/>
-            <code>&lt;property&gt;</code><br/> 
-            <code>&nbsp;&nbsp;&lt;name&gt;pool.name&lt;/name&gt;</code><br/>
-            <code>&nbsp;&nbsp;&lt;value&gt;${mapreduce.job.mapreduce.job.user.name}&lt;/value&gt;</code><br/>
-            <code>&lt;/property&gt;</code><br/>
-            This allows you to specify the pool name explicitly for some jobs
-            through the jobconf (e.g. passing <em>-Dpool.name=&lt;name&gt;</em>
-            to <em>bin/hadoop jar</em>, while having the default be the user's
-            pool.
+            mapred.fairscheduler. poolnameproperty is used only for jobs in which 
+            mapred.fairscheduler.pool is not explicitly set.
           </td>
           </tr>
           <tr>
diff --git a/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java b/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
index 078046e..667b97d 100644
--- a/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
+++ b/src/java/org/apache/hadoop/mapred/FileOutputCommitter.java
@@ -64,7 +64,7 @@
   
   public void commitJob(JobContext context) throws IOException {
     // delete the _temporary folder in the output folder
-    cleanup(context);
+    cleanupJob(context);
     // check if the output-dir marking is required
     if (shouldMarkOutputDir(context.getJobConf())) {
       // create a _success file in the output folder
@@ -85,9 +85,10 @@
       fileSys.create(filePath).close();
     }
   }
-  
-  // Deletes the _temporary folder in the job's output dir.
-  private void cleanup(JobContext context) throws IOException {
+
+  @Override
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException {
     JobConf conf = context.getJobConf();
     // do the clean up of temporary directory
     Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -107,7 +108,7 @@
   public void abortJob(JobContext context, int runState) 
   throws IOException {
     // simply delete the _temporary dir from the o/p folder of the job
-    cleanup(context);
+    cleanupJob(context);
   }
   
   public void setupTask(TaskAttemptContext context) throws IOException {
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index 4536089..8915690 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -3414,9 +3414,9 @@
         return getMapTaskReports(JobID.downgrade(jobid));
       case REDUCE :
         return getReduceTaskReports(JobID.downgrade(jobid));
-      case JOB_SETUP:
+      case JOB_CLEANUP:
         return getCleanupTaskReports(JobID.downgrade(jobid));
-      case JOB_CLEANUP :
+      case JOB_SETUP :
         return getSetupTaskReports(JobID.downgrade(jobid));
     }
     return new TaskReport[0];
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
index 15348ff..0d9b8f9 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
@@ -119,17 +119,9 @@
     return splits;
   }
 
-  @SuppressWarnings("unchecked")
+  @Override
   public RecordReader<K, V> createRecordReader(InputSplit split,
       TaskAttemptContext context) throws IOException, InterruptedException {
-
-    // Find the InputFormat and then the RecordReader from the
-    // TaggedInputSplit.
-    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
-    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
-      .newInstance(taggedInputSplit.getInputFormatClass(),
-         context.getConfiguration());
-    return inputFormat.createRecordReader(taggedInputSplit.getInputSplit(),
-      context);
+    return new DelegatingRecordReader<K, V>(split, context);
   }
 }
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
new file mode 100644
index 0000000..f0d060e
--- /dev/null
+++ b/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This is a delegating RecordReader, which delegates the functionality to the
+ * underlying record reader in {@link TaggedInputSplit}  
+ */
+public class DelegatingRecordReader<K, V> extends RecordReader<K, V> {
+  RecordReader<K, V> originalRR;
+
+  /**
+   * Constructs the DelegatingRecordReader.
+   * 
+   * @param split TaggegInputSplit object
+   * @param context TaskAttemptContext object
+   *  
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @SuppressWarnings("unchecked")
+  public DelegatingRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // Find the InputFormat and then the RecordReader from the
+    // TaggedInputSplit.
+    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+        .newInstance(taggedInputSplit.getInputFormatClass(), context
+            .getConfiguration());
+    originalRR = inputFormat.createRecordReader(taggedInputSplit
+        .getInputSplit(), context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    originalRR.close();
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return originalRR.getCurrentKey();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return originalRR.getCurrentValue();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return originalRR.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    originalRR.initialize(((TaggedInputSplit) split).getInputSplit(), context);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return originalRR.nextKeyValue();
+  }
+
+}
diff --git a/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 5409bba..0e61f4d 100644
--- a/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -107,14 +107,15 @@
    */
   public void commitJob(JobContext context) throws IOException {
     // delete the _temporary folder and create a _done file in the o/p folder
-    cleanup(context);
+    cleanupJob(context);
     if (shouldMarkOutputDir(context.getConfiguration())) {
       markOutputDirSuccessful(context);
     }
   }
-  
-  // Delete the _temporary folder in the output dir.
-  private void cleanup(JobContext context) throws IOException {
+
+  @Override
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
@@ -134,7 +135,7 @@
   public void abortJob(JobContext context, JobStatus.State state) 
   throws IOException {
     // delete the _temporary folder
-    cleanup(context);
+    cleanupJob(context);
   }
   
   /**
diff --git a/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java b/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
index 16cb069..c3ced79 100644
--- a/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
+++ b/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
@@ -228,12 +228,19 @@
 
   /** Verify that the given process id is same as its process group id.
    * @param pidStr Process id of the to-be-verified-process
+   * @param procfsDir  Procfs root dir
    */
-  private static boolean assertPidPgrpidForMatch(String pidStr) {
+  static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
     Integer pId = Integer.parseInt(pidStr);
     // Get information for this process
     ProcessInfo pInfo = new ProcessInfo(pId);
-    pInfo = constructProcessInfo(pInfo);
+    pInfo = constructProcessInfo(pInfo, procfsDir);
+    if (pInfo == null) {
+      // process group leader may have finished execution, but we still need to
+      // kill the subProcesses in the process group.
+      return true;
+    }
+
     //make sure that pId and its pgrpId match
     if (!pInfo.getPgrpId().equals(pId)) {
       LOG.warn("Unexpected: Process with PID " + pId +
@@ -258,7 +265,7 @@
                        boolean inBackground)
          throws IOException {
     // Make sure that the pid given is a process group leader
-    if (!assertPidPgrpidForMatch(pgrpId)) {
+    if (!checkPidPgrpidForMatch(pgrpId, PROCFS)) {
       throw new IOException("Process with PID " + pgrpId  +
                           " is not a process group leader.");
     }
@@ -391,15 +398,6 @@
   }
 
   /**
-   * 
-   * Construct the ProcessInfo using the process' PID and procfs and return the
-   * same. Returns null on failing to read from procfs,
-   */
-  private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
-    return constructProcessInfo(pinfo, PROCFS);
-  }
-
-  /**
    * Construct the ProcessInfo using the process' PID and procfs rooted at the
    * specified directory and return the same. It is provided mainly to assist
    * testing purposes.
@@ -422,6 +420,8 @@
       in = new BufferedReader(fReader);
     } catch (FileNotFoundException f) {
       // The process vanished in the interim!
+      LOG.warn("The process " + pinfo.getPid()
+          + " may have finished in the interim.");
       return ret;
     }
 
@@ -436,6 +436,11 @@
             .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
             .parseLong(m.group(7)));
       }
+      else {
+        LOG.warn("Unexpected: procfs stat file is not in the expected format"
+            + " for process with pid " + pinfo.getPid());
+        ret = null;
+      }
     } catch (IOException io) {
       LOG.warn("Error reading the stream " + io);
       ret = null;
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java b/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
index 7f93601..61d4a5a 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
@@ -41,6 +41,8 @@
   private static String TEST_ROOT_DIR =
       new File(System.getProperty("test.build.data", "/tmp") + "/" 
                + "test-job-cleanup").toString();
+  private static final String CUSTOM_CLEANUP_FILE_NAME = 
+    "_custom_cleanup";
   private static final String ABORT_KILLED_FILE_NAME = 
     "_custom_abort_killed";
   private static final String ABORT_FAILED_FILE_NAME = 
@@ -86,6 +88,21 @@
   }
   
   /** 
+   * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+   * making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      System.err.println("---- HERE ----");
+      JobConf conf = context.getJobConf();
+      Path outputPath = FileOutputFormat.getOutputPath(conf);
+      FileSystem fs = outputPath.getFileSystem(conf);
+      fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+    }
+  }
+  
+  /** 
    * Committer with abort making a _failed/_killed in the output folder
    */
   static class CommitterWithCustomAbort extends FileOutputCommitter {
@@ -263,4 +280,26 @@
                   new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
                                 ABORT_FAILED_FILE_NAME});
   }
+
+  /**
+   * Test if a failed job with custom committer runs the deprecated
+   * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api 
+   * compatibility testing.
+   */
+  public void testCustomCleanup() throws IOException {
+    // check with a successful job
+    testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, 
+                      CommitterWithCustomDeprecatedCleanup.class,
+                      new String[] {});
+    
+    // check with a failed job
+    testFailedJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  CommitterWithCustomDeprecatedCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+    
+    // check with a killed job
+    testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, 
+                  CommitterWithCustomDeprecatedCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
 }
\ No newline at end of file
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
index 6a218f3..903ff7e 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
@@ -17,20 +17,128 @@
  */
 package org.apache.hadoop.mapreduce.lib.input;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Map;
 
-import junit.framework.TestCase;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * @see TestDelegatingInputFormat
  */
-public class TestMultipleInputs extends TestCase {
+public class TestMultipleInputs extends HadoopTestCase {
+
+  public TestMultipleInputs() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  private static final Path ROOT_DIR = new Path("testing/mo");
+  private static final Path IN1_DIR = new Path(ROOT_DIR, "input1");
+  private static final Path IN2_DIR = new Path(ROOT_DIR, "input2");
+  private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
+
+  private Path getDir(Path dir) {
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+          .replace(' ', '+');
+      dir = new Path(localPathRoot, dir);
+    }
+    return dir;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Path rootDir = getDir(ROOT_DIR);
+    Path in1Dir = getDir(IN1_DIR);
+    Path in2Dir = getDir(IN2_DIR);
+
+    Configuration conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(rootDir, true);
+    if (!fs.mkdirs(in1Dir)) {
+      throw new IOException("Mkdirs failed to create " + in1Dir.toString());
+    }
+    if (!fs.mkdirs(in2Dir)) {
+      throw new IOException("Mkdirs failed to create " + in2Dir.toString());
+    }
+  }
+
+  @Test
+  public void testDoMultipleInputs() throws IOException {
+    Path in1Dir = getDir(IN1_DIR);
+    Path in2Dir = getDir(IN2_DIR);
+
+    Path outDir = getDir(OUT_DIR);
+
+    Configuration conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(outDir, true);
+
+    DataOutputStream file1 = fs.create(new Path(in1Dir, "part-0"));
+    file1.writeBytes("a\nb\nc\nd\ne");
+    file1.close();
+
+    // write tab delimited to second file because we're doing
+    // KeyValueInputFormat
+    DataOutputStream file2 = fs.create(new Path(in2Dir, "part-0"));
+    file2.writeBytes("a\tblah\nb\tblah\nc\tblah\nd\tblah\ne\tblah");
+    file2.close();
+
+    Job job = new Job(conf);
+    job.setJobName("mi");
+
+    MultipleInputs.addInputPath(job, in1Dir, TextInputFormat.class,
+        MapClass.class);
+    MultipleInputs.addInputPath(job, in2Dir, KeyValueTextInputFormat.class,
+        KeyValueMapClass.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Text.class);
+    job.setReducerClass(ReducerClass.class);
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    boolean success = false;
+    try {
+      success = job.waitForCompletion(true);
+    } catch (InterruptedException ie) {
+      throw new RuntimeException(ie);
+    } catch (ClassNotFoundException instante) {
+      throw new RuntimeException(instante);
+    }
+    if (!success)
+      throw new RuntimeException("Job failed!");
+
+    // copy bytes a bunch of times for the ease of readLine() - whatever
+    BufferedReader output = new BufferedReader(new InputStreamReader(fs
+        .open(new Path(outDir, "part-r-00000"))));
+    // reducer should have counted one key from each file
+    assertTrue(output.readLine().equals("a 2"));
+    assertTrue(output.readLine().equals("b 2"));
+    assertTrue(output.readLine().equals("c 2"));
+    assertTrue(output.readLine().equals("d 2"));
+    assertTrue(output.readLine().equals("e 2"));
+  }
+
   @SuppressWarnings("unchecked")
   public void testAddInputPathWithFormat() throws IOException {
     final Job conf = new Job();
@@ -50,7 +158,7 @@
     MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
        MapClass.class);
     MultipleInputs.addInputPath(conf, new Path("/bar"),
-       KeyValueTextInputFormat.class, MapClass2.class);
+        KeyValueTextInputFormat.class, KeyValueMapClass.class);
     final Map<Path, InputFormat> inputs = MultipleInputs
        .getInputFormatMap(conf);
     final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
@@ -60,12 +168,42 @@
     assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
        .getClass());
     assertEquals(MapClass.class, maps.get(new Path("/foo")));
-    assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+    assertEquals(KeyValueMapClass.class, maps.get(new Path("/bar")));
   }
 
-  static class MapClass extends Mapper<String, String, String, String> {
+  static final Text blah = new Text("blah");
+
+  // these 3 classes do a reduce side join with 2 different mappers
+  static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
+    // receives "a", "b", "c" as values
+    @Override
+    public void map(LongWritable key, Text value, Context ctx)
+        throws IOException, InterruptedException {
+      ctx.write(value, blah);
+    }
   }
 
-  static class MapClass2 extends MapClass {
+  static class KeyValueMapClass extends Mapper<Text, Text, Text, Text> {
+    // receives "a", "b", "c" as keys
+    @Override
+    public void map(Text key, Text value, Context ctx) throws IOException,
+        InterruptedException {
+      ctx.write(key, blah);
+    }
   }
+
+  static class ReducerClass extends Reducer<Text, Text, NullWritable, Text> {
+    // should receive 2 rows for each key
+    int count = 0;
+
+    @Override
+    public void reduce(Text key, Iterable<Text> values, Context ctx)
+        throws IOException, InterruptedException {
+      count = 0;
+      for (Text value : values)
+        count++;
+      ctx.write(NullWritable.get(), new Text(key.toString() + " " + count));
+    }
+  }
+
 }
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java b/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
new file mode 100644
index 0000000..9beaa77
--- /dev/null
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A JUnit test to test Map-Reduce job committer.
+ */
+public class TestJobOutputCommitter extends HadoopTestCase {
+
+  public TestJobOutputCommitter() throws IOException {
+    super(CLUSTER_MR, LOCAL_FS, 1, 1);
+  }
+
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")
+      + "/" + "test-job-cleanup").toString();
+  private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
+  private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
+  private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
+  private static Path inDir = new Path(TEST_ROOT_DIR, "test-input");
+  private static int outDirs = 0;
+  private FileSystem fs;
+  private Configuration conf = null;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf = createJobConf();
+    fs = getFileSystem();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    fs.delete(new Path(TEST_ROOT_DIR), true);
+    super.tearDown();
+  }
+
+  /** 
+   * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
+   * making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
+    public CommitterWithCustomDeprecatedCleanup(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      System.err.println("---- HERE ----");
+      Path outputPath = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+    }
+  }
+  
+  /**
+   * Committer with abort making a _failed/_killed in the output folder
+   */
+  static class CommitterWithCustomAbort extends FileOutputCommitter {
+    public CommitterWithCustomAbort(Path outputPath, TaskAttemptContext context)
+        throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state)
+        throws IOException {
+      Path outputPath = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      String fileName = 
+        (state.equals(JobStatus.State.FAILED)) ? ABORT_FAILED_FILE_NAME
+          : ABORT_KILLED_FILE_NAME;
+      fs.create(new Path(outputPath, fileName)).close();
+    }
+  }
+
+  private Path getNewOutputDir() {
+    return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+  }
+
+  static class MyOutputFormatWithCustomAbort<K, V> 
+  extends TextOutputFormat<K, V> {
+    private OutputCommitter committer = null;
+
+    public synchronized OutputCommitter getOutputCommitter(
+        TaskAttemptContext context) throws IOException {
+      if (committer == null) {
+        Path output = getOutputPath(context);
+        committer = new CommitterWithCustomAbort(output, context);
+      }
+      return committer;
+    }
+  }
+
+  static class MyOutputFormatWithCustomCleanup<K, V> 
+  extends TextOutputFormat<K, V> {
+    private OutputCommitter committer = null;
+
+    public synchronized OutputCommitter getOutputCommitter(
+        TaskAttemptContext context) throws IOException {
+      if (committer == null) {
+        Path output = getOutputPath(context);
+        committer = new CommitterWithCustomDeprecatedCleanup(output, context);
+      }
+      return committer;
+    }
+  }
+
+  // run a job with 1 map and let it run to completion
+  private void testSuccessfulJob(String filename,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0);
+    job.setOutputFormatClass(output);
+
+    assertTrue("Job failed!", job.waitForCompletion(true));
+
+    Path testFile = new Path(outDir, filename);
+    assertTrue("Done file missing for job " + job.getID(), fs.exists(testFile));
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for successful job "
+          + job.getID(), fs.exists(file));
+    }
+  }
+
+  // run a job for which all the attempts simply fail.
+  private void testFailedJob(String fileName,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createFailJob(conf, outDir, inDir);
+    job.setOutputFormatClass(output);
+
+    assertFalse("Job did not fail!", job.waitForCompletion(true));
+
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for failed job " + job.getID(),
+          fs.exists(testFile));
+    }
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for failed job "
+          + job.getID(), fs.exists(file));
+    }
+  }
+
+  // run a job which gets stuck in mapper and kill it.
+  private void testKilledJob(String fileName,
+      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
+    Path outDir = getNewOutputDir();
+    Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
+    job.setOutputFormatClass(output);
+
+    job.submit();
+
+    // wait for the setup to be completed
+    while (job.setupProgress() != 1.0f) {
+      UtilsForTests.waitFor(100);
+    }
+
+    job.killJob(); // kill the job
+
+    assertFalse("Job did not get kill", job.waitForCompletion(true));
+
+    if (fileName != null) {
+      Path testFile = new Path(outDir, fileName);
+      assertTrue("File " + testFile + " missing for job " + job.getID(), fs
+          .exists(testFile));
+    }
+
+    // check if the files from the missing set exists
+    for (String ex : exclude) {
+      Path file = new Path(outDir, ex);
+      assertFalse("File " + file + " should not be present for killed job "
+          + job.getID(), fs.exists(file));
+    }
+  }
+
+  /**
+   * Test default cleanup/abort behavior
+   * 
+   * @throws Exception
+   */
+  public void testDefaultCleanupAndAbort() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      TextOutputFormat.class, new String[] {});
+
+    // check with a failed job
+    testFailedJob(null, TextOutputFormat.class,
+                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+
+    // check default abort job kill
+    testKilledJob(null, TextOutputFormat.class,
+                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+  }
+
+  /**
+   * Test if a failed job with custom committer runs the abort code.
+   * 
+   * @throws Exception
+   */
+  public void testCustomAbort() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+                      MyOutputFormatWithCustomAbort.class, 
+                      new String[] {ABORT_FAILED_FILE_NAME,
+                                    ABORT_KILLED_FILE_NAME});
+
+    // check with a failed job
+    testFailedJob(ABORT_FAILED_FILE_NAME, 
+                  MyOutputFormatWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                                ABORT_KILLED_FILE_NAME});
+
+    // check with a killed job
+    testKilledJob(ABORT_KILLED_FILE_NAME, 
+                  MyOutputFormatWithCustomAbort.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
+                                ABORT_FAILED_FILE_NAME});
+  }
+
+  /**
+   * Test if a failed job with custom committer runs the deprecated
+   * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api 
+   * compatibility testing.
+   * @throws Exception 
+   */
+  public void testCustomCleanup() throws Exception {
+    // check with a successful job
+    testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, 
+                      MyOutputFormatWithCustomCleanup.class, 
+                      new String[] {});
+
+    // check with a failed job
+    testFailedJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  MyOutputFormatWithCustomCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+    // check with a killed job
+    testKilledJob(CUSTOM_CLEANUP_FILE_NAME, 
+                  MyOutputFormatWithCustomCleanup.class, 
+                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+  }
+}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java b/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
index b7c3a1e..cc78506 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
@@ -423,6 +423,34 @@
   }
 
   /**
+   * Verifies ProcfsBasedProcessTree.checkPidPgrpidForMatch() in case of
+   * 'constructProcessInfo() returning null' by not writing stat file for the
+   * mock process
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  public void testDestroyProcessTree() throws IOException {
+    // test process
+    String pid = "100";
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree(
+                        pid, true, 100L, procfsRootDir.getAbsolutePath());
+
+      // Let us not create stat file for pid 100.
+      assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
+                            pid, procfsRootDir.getAbsolutePath()));
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+  
+  /**
    * Test the correctness of process-tree dump.
    * 
    * @throws IOException