Reverted MAPREDUCE-1888

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@959221 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 27824d3..4ca11fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -129,9 +129,6 @@
     MAPREDUCE-1863. Fix NPE in Rumen when processing null CDF for failed task
     attempts. (Amar Kamat via cdouglas)
 
-    MAPREDUCE-1888. Fixes Streaming to override output key and value types,
-    only if mapper and reducer are commands. (Ravi Gummadi via amareshwari)
-
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
index 94ccad7..2348396 100644
--- a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
+++ b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
@@ -744,15 +744,25 @@
     jobConf_.setClass("stream.reduce.input.writer.class",
       idResolver.getInputWriterClass(), InputWriter.class);
     
+    idResolver.resolve(jobConf_.get("stream.map.output", IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.map.output.reader.class",
+      idResolver.getOutputReaderClass(), OutputReader.class);
+    jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
+    jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
+    
+    idResolver.resolve(jobConf_.get("stream.reduce.output", IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.reduce.output.reader.class",
+      idResolver.getOutputReaderClass(), OutputReader.class);
+    jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+    jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+    
     jobConf_.set("stream.addenvironment", addTaskEnvironment_);
 
-    boolean isMapperACommand = false;
     if (mapCmd_ != null) {
       c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
       if (c != null) {
         jobConf_.setMapperClass(c);
       } else {
-        isMapperACommand = true;
         jobConf_.setMapperClass(PipeMapper.class);
         jobConf_.setMapRunnerClass(PipeMapRunner.class);
         jobConf_.set("stream.map.streamprocessor", 
@@ -772,50 +782,24 @@
     }
 
     boolean reducerNone_ = false;
-    boolean isReducerACommand = false;
     if (redCmd_ != null) {
       reducerNone_ = redCmd_.equals(REDUCE_NONE);
-      if (!reducerNone_) {
-        if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
-          jobConf_.setReducerClass(ValueAggregatorReducer.class);
-          jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
-        } else {
+      if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
+        jobConf_.setReducerClass(ValueAggregatorReducer.class);
+        jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
+      } else {
 
-          c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
-          if (c != null) {
-            jobConf_.setReducerClass(c);
-          } else {
-            isReducerACommand = true;
-            jobConf_.setReducerClass(PipeReducer.class);
-            jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
-                redCmd_, "UTF-8"));
-          }
+        c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
+        if (c != null) {
+          jobConf_.setReducerClass(c);
+        } else {
+          jobConf_.setReducerClass(PipeReducer.class);
+          jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
+              redCmd_, "UTF-8"));
         }
       }
     }
 
-    idResolver.resolve(jobConf_.get("stream.map.output",
-        IdentifierResolver.TEXT_ID));
-    jobConf_.setClass("stream.map.output.reader.class",
-      idResolver.getOutputReaderClass(), OutputReader.class);
-    if (isMapperACommand) {
-      // if mapper is a command, then map output key/value classes come from the
-      // idResolver
-      jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
-      jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
-    }
-
-    idResolver.resolve(jobConf_.get("stream.reduce.output",
-        IdentifierResolver.TEXT_ID));
-    jobConf_.setClass("stream.reduce.output.reader.class",
-      idResolver.getOutputReaderClass(), OutputReader.class);
-    if (isReducerACommand) {
-      // if reducer is a command, then output key/value classes come from the
-      // idResolver
-      jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
-      jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
-    }
-
     if (inReaderSpec_ != null) {
       String[] args = inReaderSpec_.split(",");
       String readerClass = args[0];
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
index 11d53b2..5a38300 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
@@ -21,6 +21,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.zip.GZIPOutputStream;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +59,6 @@
     strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
     strNamenode = "fs.default.name=hdfs://" + namenode;
 
-    map = LS_PATH;
     FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
 
     // Set up side file
@@ -80,14 +80,22 @@
 
   @Override
   protected String[] genArgs() {
-    args.add("-file");
-    args.add(new java.io.File("sidefile").getAbsolutePath());
-    args.add("-numReduceTasks");
-    args.add("0");
-    args.add("-jobconf");
-    args.add(strNamenode);
-    args.add("-jobconf");
-    args.add(strJobTracker);
-    return super.genArgs();
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-file", new java.io.File("sidefile").getAbsolutePath(),
+      "-mapper", LS_PATH,
+      "-numReduceTasks", "0",
+      "-jobconf", strNamenode,
+      "-jobconf", strJobTracker,
+      "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
+    };
   }
+
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestFileArgs().testCommandLine();
+  }
+
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
index a9fc5fd..f81ea32 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
@@ -40,4 +40,21 @@
     out.write(input.getBytes("UTF-8"));
     out.close();
   }
+
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+    };
+    
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestGzipInput().testCommandLine();
+  }
+
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
index a251b19..4e1b2db 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
@@ -82,15 +82,6 @@
     mr  = new MiniMRCluster(1, namenode, 3);
     strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
     strNamenode = "fs.default.name=" + namenode;
-
-    map = "xargs cat";
-    reduce = "cat";
-  }
-
-  @Override
-  protected void setInputOutput() {
-    inputFile = INPUT_FILE;
-    outDir = OUTPUT_DIR;
   }
   
   protected void createInput() throws IOException
@@ -123,20 +114,30 @@
     String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1";
     String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2";
 
-    args.add("-jobconf");
-    args.add("mapreduce.job.reduces=1");
-    args.add("-cacheArchive");
-    args.add(cache1);
-    args.add("-cacheArchive");
-    args.add(cache2);
-    args.add("-jobconf");
-    args.add(strNamenode);
-    args.add("-jobconf");
-    args.add(strJobTracker);
-    return super.genArgs();
+    return new String[] {
+      "-input", INPUT_FILE.toString(),
+      "-output", OUTPUT_DIR,
+      "-mapper", "xargs cat", 
+      "-reducer", "cat",
+      "-jobconf", "mapreduce.job.reduces=1",
+      "-cacheArchive", cache1,
+      "-cacheArchive", cache2,
+      "-jobconf", strNamenode,
+      "-jobconf", strJobTracker,
+      "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
+    };
   }
 
-  protected void checkOutput() throws IOException {
+  //@Test
+  public void testCommandLine() throws Exception {
+    createInput();
+    String args[] = genArgs();
+    LOG.info("Testing streaming command line:\n" +
+             StringUtils.join(" ", Arrays.asList(args)));
+    job = new StreamJob(genArgs(), true);
+    if(job.go() != 0) {
+      throw new Exception("Job Failed");
+    }
     StringBuffer output = new StringBuffer(256);
     Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                             new Path(OUTPUT_DIR)));
@@ -146,4 +147,9 @@
     }
     assertEquals(expectedOutput, output.toString());
   }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestMultipleArchiveFiles().testCommandLine();
+  }
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
index fa067ae..7d29358 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
@@ -22,23 +22,26 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileUtil;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
 /**
  * This class tests StreamXmlRecordReader
  * The test creates an XML file, uses StreamXmlRecordReader and compares
  * the expected output against the generated output
  */
-public class TestStreamXmlRecordReader extends TestStreaming {
+public class TestStreamXmlRecordReader extends TestStreaming
+{
+
+  private StreamJob job;
 
   public TestStreamXmlRecordReader() throws IOException {
     INPUT_FILE = new File("input.xml");
-    input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\n" +
-        "bunnies.are.pink\t\n</xmltag>\t\n";
-    map = "cat";
-    reduce = "NONE";
-    outputExpect = input;
+    input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\nbunnies.are.pink\t\n</xmltag>\t\n";
   }
-
-  @Override
+  
   protected void createInput() throws IOException
   {
     FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
@@ -50,10 +53,42 @@
     out.close();
   }
 
-  @Override
   protected String[] genArgs() {
-    args.add("-inputreader");
-    args.add("StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>");
-    return super.genArgs();
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper","cat", 
+      "-reducer", "NONE", 
+      "-inputreader", "StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>"
+    };
+  }
+
+  @Test
+  public void testCommandLine() throws Exception {
+    try {
+      try {
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (Exception e) {
+      }
+      createInput();
+      job = new StreamJob(genArgs(), false);
+      job.go();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      outFile.delete();
+      assertEquals(input, output);
+    } finally {
+      try {
+        INPUT_FILE.delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamXmlRecordReader().testCommandLine();
   }
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
index 445330c..ca8de08 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
@@ -19,11 +19,11 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.util.ArrayList;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
@@ -40,8 +40,6 @@
   protected File TEST_DIR;
   protected File INPUT_FILE;
   protected File OUTPUT_DIR;
-  protected String inputFile;
-  protected String outDir;
   protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
   // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
   protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
@@ -50,7 +48,6 @@
   protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
   protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
 
-  protected ArrayList<String> args = new ArrayList<String>();
   protected StreamJob job;
 
   public TestStreaming() throws IOException
@@ -75,25 +72,15 @@
     out.close();
   }
 
-  protected void setInputOutput() {
-    inputFile = INPUT_FILE.getAbsolutePath();
-    outDir = OUTPUT_DIR.getAbsolutePath();
-  }
-
   protected String[] genArgs() {
-    setInputOutput();
-    args.add("-input");args.add(inputFile);
-    args.add("-output");args.add(outDir);
-    args.add("-mapper");args.add(map);
-    args.add("-reducer");args.add(reduce);
-    args.add("-jobconf");
-    args.add("mapreduce.task.files.preserve.failedtasks=true");
-    args.add("-jobconf");
-    args.add("stream.tmpdir="+System.getProperty("test.build.data","/tmp"));
-
-    String str[] = new String [args.size()];
-    args.toArray(str);
-    return str;
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
   }
 
   protected Configuration getConf() {
@@ -118,12 +105,9 @@
     assertEquals(getExpectedOutput(), output);
   }
 
-  /**
-   * Runs a streaming job with the given arguments
-   * @return the streaming job return status
-   * @throws IOException
-   */
-  protected int runStreamJob() throws IOException {
+  @Test
+  public void testCommandLine() throws Exception
+  {
     UtilTest.recursiveDelete(TEST_DIR);
     assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
     createInput();
@@ -132,14 +116,14 @@
     // During tests, the default Configuration will use a local mapred
     // So don't specify -config or -cluster
     job = new StreamJob(genArgs(), mayExit);
-    return job.go();
-  }
-
-  @Test
-  public void testCommandLine() throws Exception
-  {
-    int ret = runStreamJob();
+    int ret = job.go();
     assertEquals(0, ret);
     checkOutput();
   }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreaming().testCommandLine();
+  }
+
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
index c4d0f03..60d728a 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
@@ -27,21 +27,25 @@
 
 public class TestStreamingCombiner extends TestStreaming {
 
-  protected String combine = StreamUtil.makeJavaCommand(
-      UniqApp.class, new String[]{""});
+  protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{""});
   
   public TestStreamingCombiner() throws IOException {
     super();
   }
   
   protected String[] genArgs() {
-    args.add("-combiner");
-    args.add(combine);
-    return super.genArgs();
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      "-combiner", combine,
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
   }
 
   @Test
-  public void testCommandLine() throws Exception {
+  public void testCommandLine() throws Exception  {
     super.testCommandLine();
     // validate combiner counters
     String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
@@ -51,4 +55,10 @@
     assertTrue(counters.findCounter(
                counterGrp, "COMBINE_OUTPUT_RECORDS").getValue() != 0);
   }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamingCombiner().testCommandLine();
+  }
+
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
index 2d65f4d..ca864ba 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
@@ -21,6 +21,7 @@
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileUtil;
@@ -37,11 +38,33 @@
   }
 
   @Test
-  public void testCommandLine() throws Exception {
+  public void testCommandLine() throws IOException
+  {
     try {
-      super.testCommandLine();
+      try {
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (Exception e) {
+      }
 
-      validateCounters();
+      createInput();
+      boolean mayExit = false;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      StreamJob job = new StreamJob(genArgs(), mayExit);      
+      job.go();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      outFile.delete();
+      assertEquals(outputExpect, output);
+      
+      Counters counters = job.running_.getCounters();
+      assertNotNull("Counters", counters);
+      Group group = counters.getGroup("UserCounters");
+      assertNotNull("Group", group);
+      Counter counter = group.getCounterForName("InputLines");
+      assertNotNull("Counter", counter);
+      assertEquals(3, counter.getCounter());
     } finally {
       try {
         INPUT_FILE.delete();
@@ -52,13 +75,4 @@
     }
   }
   
-  private void validateCounters() throws IOException {
-    Counters counters = job.running_.getCounters();
-    assertNotNull("Counters", counters);
-    Group group = counters.getGroup("UserCounters");
-    assertNotNull("Group", group);
-    Counter counter = group.getCounterForName("InputLines");
-    assertNotNull("Counter", counter);
-    assertEquals(3, counter.getCounter());
-  }
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
index 1f89309..ff521a6 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.streaming;
 
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 
 /**
  * This class tests if hadoopStreaming returns Exception 
@@ -34,31 +37,55 @@
 public class TestStreamingFailure extends TestStreaming
 {
 
-  protected File INVALID_INPUT_FILE;
+  protected File INVALID_INPUT_FILE;// = new File("invalid_input.txt");
+  private StreamJob job;
 
   public TestStreamingFailure() throws IOException
   {
     INVALID_INPUT_FILE = new File("invalid_input.txt");
   }
 
-  @Override
-  protected void setInputOutput() {
-    inputFile = INVALID_INPUT_FILE.getAbsolutePath();
-    outDir = OUTPUT_DIR.getAbsolutePath();
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INVALID_INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
   }
 
-  @Override
   @Test
-  public void testCommandLine() throws IOException {
-    try {    
-      int returnStatus = runStreamJob();
-      assertEquals("Streaming Job Failure code expected", 5, returnStatus);
-    } finally {
+  public void testCommandLine()
+  {
+    try {
       try {
         FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+      } catch (Exception e) {
+      }
+
+      boolean mayExit = false;
+      int returnStatus = 0;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      job = new StreamJob(genArgs(), mayExit);      
+      returnStatus = job.go();
+      assertEquals("Streaming Job Failure code expected", 5, returnStatus);
+    } catch(Exception e) {
+      // Expecting an exception
+    } finally {
+      try {
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (IOException e) {
         e.printStackTrace();
       }
     }
   }
+
+  public static void main(String[]args) throws Exception
+  {
+      new TestStreamingFailure().testCommandLine();
+  }
 }
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingJavaTasks.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingJavaTasks.java
deleted file mode 100644
index 92f2aff..0000000
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingJavaTasks.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- * Tests stream job with java tasks(not commands; So task is the child process
- * of TaskTracker) in MapReduce local mode.
- * Validates if user-set config properties
- * {@link MRJobConfig#MAP_OUTPUT_KEY_CLASS} and
- * {@link MRJobConfig#OUTPUT_KEY_CLASS} are honored by streaming jobs for the
- * case of java mapper/reducer(mapper and reducer are not commands).
- */
-public class TestStreamingJavaTasks extends TestStreaming {
-
-  public TestStreamingJavaTasks() throws IOException {
-    super();
-    input = "one line dummy input\n";
-    map = "org.apache.hadoop.mapred.lib.IdentityMapper";
-  }
-
-  @Override
-  protected String[] genArgs() {
-    args.clear();
-    // set the testcase-specific config properties first and the remaining
-    // arguments are set in TestStreaming.genArgs().
-    args.add("-jobconf");
-    args.add(MRJobConfig.MAP_OUTPUT_KEY_CLASS +
-        "=org.apache.hadoop.io.LongWritable");
-    args.add("-jobconf");
-    args.add(MRJobConfig.OUTPUT_KEY_CLASS +
-        "=org.apache.hadoop.io.LongWritable");
-
-    // Using SequenceFileOutputFormat here because with TextOutputFormat, the
-    // mapred.output.key.class set in JobConf (which we want to test here) is
-    // not read/used at all.
-    args.add("-outputformat");
-    args.add("org.apache.hadoop.mapred.SequenceFileOutputFormat");
-
-    return super.genArgs();
-  }
-
-  @Override
-  protected void checkOutput() throws IOException {
-    // No need to validate output for the test cases in this class
-  }
-
-  // Check with IdentityMapper, IdentityReducer
-  @Override
-  @Test
-  public void testCommandLine() throws Exception {
-    reduce = "org.apache.hadoop.mapred.lib.IdentityReducer";
-    super.testCommandLine();
-  }
-
-  // Check the case of Reducer = "NONE"
-  @Test
-  public void testStreamingJavaTasksWithReduceNone() throws Exception {
-    reduce = "NONE";
-    super.testCommandLine();
-  }
-}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
index 30142ba..b043cc1 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
@@ -23,9 +23,8 @@
 import org.apache.hadoop.streaming.Environment;
 
 /** A minimal Java implementation of /usr/bin/tr.
- *  Used to test the usage of external applications without adding
- *  platform-specific dependencies.
- *  Use TrApp as mapper only. For reducer, use TrAppReduce.
+    Used to test the usage of external applications without adding
+    platform-specific dependencies.
  */
 public class TrApp
 {
@@ -44,8 +43,8 @@
     // property names have been escaped in PipeMapRed.safeEnvVarName()
     expectDefined("mapreduce_cluster_local_dir");
     expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
-    expect("mapreduce_map_output_key_class", "org.apache.hadoop.io.Text");
-    expect("mapreduce_map_output_value_class", "org.apache.hadoop.io.Text");
+    expect("mapreduce_job_output_key_class", "org.apache.hadoop.io.Text");
+    expect("mapreduce_job_output_value_class", "org.apache.hadoop.io.Text");
 
     expect("mapreduce_task_ismap", "true");
     expectDefined("mapreduce_task_attempt_id");
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
index c8812ab..563cc44 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
@@ -23,9 +23,8 @@
 import org.apache.hadoop.streaming.Environment;
 
 /** A minimal Java implementation of /usr/bin/tr.
- *  Used to test the usage of external applications without adding
- *  platform-specific dependencies.
- *  Use TrApp as mapper only. For reducer, use TrAppReduce.
+    Used to test the usage of external applications without adding
+    platform-specific dependencies.
  */
 public class TrAppReduce
 {
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java b/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
index eeaf48b..ba45341 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
@@ -67,6 +67,9 @@
     conf.setJobName("fof");
     conf.setInputFormat(TextInputFormat.class);
 
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
     conf.setMapOutputKeyClass(LongWritable.class);
     conf.setMapOutputValueClass(Text.class);