Reverted MAPREDUCE-1888
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@959221 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 27824d3..4ca11fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -129,9 +129,6 @@
MAPREDUCE-1863. Fix NPE in Rumen when processing null CDF for failed task
attempts. (Amar Kamat via cdouglas)
- MAPREDUCE-1888. Fixes Streaming to override output key and value types,
- only if mapper and reducer are commands. (Ravi Gummadi via amareshwari)
-
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
index 94ccad7..2348396 100644
--- a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
+++ b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
@@ -744,15 +744,25 @@
jobConf_.setClass("stream.reduce.input.writer.class",
idResolver.getInputWriterClass(), InputWriter.class);
+ idResolver.resolve(jobConf_.get("stream.map.output", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.map.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
+
+ idResolver.resolve(jobConf_.get("stream.reduce.output", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.reduce.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
- boolean isMapperACommand = false;
if (mapCmd_ != null) {
c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
if (c != null) {
jobConf_.setMapperClass(c);
} else {
- isMapperACommand = true;
jobConf_.setMapperClass(PipeMapper.class);
jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",
@@ -772,50 +782,24 @@
}
boolean reducerNone_ = false;
- boolean isReducerACommand = false;
if (redCmd_ != null) {
reducerNone_ = redCmd_.equals(REDUCE_NONE);
- if (!reducerNone_) {
- if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
- jobConf_.setReducerClass(ValueAggregatorReducer.class);
- jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
- } else {
+ if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
+ jobConf_.setReducerClass(ValueAggregatorReducer.class);
+ jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
+ } else {
- c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
- if (c != null) {
- jobConf_.setReducerClass(c);
- } else {
- isReducerACommand = true;
- jobConf_.setReducerClass(PipeReducer.class);
- jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
- redCmd_, "UTF-8"));
- }
+ c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
+ if (c != null) {
+ jobConf_.setReducerClass(c);
+ } else {
+ jobConf_.setReducerClass(PipeReducer.class);
+ jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
+ redCmd_, "UTF-8"));
}
}
}
- idResolver.resolve(jobConf_.get("stream.map.output",
- IdentifierResolver.TEXT_ID));
- jobConf_.setClass("stream.map.output.reader.class",
- idResolver.getOutputReaderClass(), OutputReader.class);
- if (isMapperACommand) {
- // if mapper is a command, then map output key/value classes come from the
- // idResolver
- jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
- jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
- }
-
- idResolver.resolve(jobConf_.get("stream.reduce.output",
- IdentifierResolver.TEXT_ID));
- jobConf_.setClass("stream.reduce.output.reader.class",
- idResolver.getOutputReaderClass(), OutputReader.class);
- if (isReducerACommand) {
- // if reducer is a command, then output key/value classes come from the
- // idResolver
- jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
- jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
- }
-
if (inReaderSpec_ != null) {
String[] args = inReaderSpec_.split(",");
String readerClass = args[0];
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
index 11d53b2..5a38300 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
@@ -21,6 +21,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
+import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +59,6 @@
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
strNamenode = "fs.default.name=hdfs://" + namenode;
- map = LS_PATH;
FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
// Set up side file
@@ -80,14 +80,22 @@
@Override
protected String[] genArgs() {
- args.add("-file");
- args.add(new java.io.File("sidefile").getAbsolutePath());
- args.add("-numReduceTasks");
- args.add("0");
- args.add("-jobconf");
- args.add(strNamenode);
- args.add("-jobconf");
- args.add(strJobTracker);
- return super.genArgs();
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-file", new java.io.File("sidefile").getAbsolutePath(),
+ "-mapper", LS_PATH,
+ "-numReduceTasks", "0",
+ "-jobconf", strNamenode,
+ "-jobconf", strJobTracker,
+ "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
+ };
}
+
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestFileArgs().testCommandLine();
+ }
+
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
index a9fc5fd..f81ea32 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
@@ -40,4 +40,21 @@
out.write(input.getBytes("UTF-8"));
out.close();
}
+
+
+ protected String[] genArgs() {
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", reduce,
+ };
+
+ }
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestGzipInput().testCommandLine();
+ }
+
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
index a251b19..4e1b2db 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
@@ -82,15 +82,6 @@
mr = new MiniMRCluster(1, namenode, 3);
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
strNamenode = "fs.default.name=" + namenode;
-
- map = "xargs cat";
- reduce = "cat";
- }
-
- @Override
- protected void setInputOutput() {
- inputFile = INPUT_FILE;
- outDir = OUTPUT_DIR;
}
protected void createInput() throws IOException
@@ -123,20 +114,30 @@
String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1";
String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2";
- args.add("-jobconf");
- args.add("mapreduce.job.reduces=1");
- args.add("-cacheArchive");
- args.add(cache1);
- args.add("-cacheArchive");
- args.add(cache2);
- args.add("-jobconf");
- args.add(strNamenode);
- args.add("-jobconf");
- args.add(strJobTracker);
- return super.genArgs();
+ return new String[] {
+ "-input", INPUT_FILE.toString(),
+ "-output", OUTPUT_DIR,
+ "-mapper", "xargs cat",
+ "-reducer", "cat",
+ "-jobconf", "mapreduce.job.reduces=1",
+ "-cacheArchive", cache1,
+ "-cacheArchive", cache2,
+ "-jobconf", strNamenode,
+ "-jobconf", strJobTracker,
+ "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
+ };
}
- protected void checkOutput() throws IOException {
+ //@Test
+ public void testCommandLine() throws Exception {
+ createInput();
+ String args[] = genArgs();
+ LOG.info("Testing streaming command line:\n" +
+ StringUtils.join(" ", Arrays.asList(args)));
+ job = new StreamJob(genArgs(), true);
+ if(job.go() != 0) {
+ throw new Exception("Job Failed");
+ }
StringBuffer output = new StringBuffer(256);
Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
new Path(OUTPUT_DIR)));
@@ -146,4 +147,9 @@
}
assertEquals(expectedOutput, output.toString());
}
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestMultipleArchiveFiles().testCommandLine();
+ }
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
index fa067ae..7d29358 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
@@ -22,23 +22,26 @@
import java.io.FileOutputStream;
import java.io.IOException;
+import org.apache.hadoop.fs.FileUtil;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
/**
* This class tests StreamXmlRecordReader
* The test creates an XML file, uses StreamXmlRecordReader and compares
* the expected output against the generated output
*/
-public class TestStreamXmlRecordReader extends TestStreaming {
+public class TestStreamXmlRecordReader extends TestStreaming
+{
+
+ private StreamJob job;
public TestStreamXmlRecordReader() throws IOException {
INPUT_FILE = new File("input.xml");
- input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\n" +
- "bunnies.are.pink\t\n</xmltag>\t\n";
- map = "cat";
- reduce = "NONE";
- outputExpect = input;
+ input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\nbunnies.are.pink\t\n</xmltag>\t\n";
}
-
- @Override
+
protected void createInput() throws IOException
{
FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
@@ -50,10 +53,42 @@
out.close();
}
- @Override
protected String[] genArgs() {
- args.add("-inputreader");
- args.add("StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>");
- return super.genArgs();
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper","cat",
+ "-reducer", "NONE",
+ "-inputreader", "StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>"
+ };
+ }
+
+ @Test
+ public void testCommandLine() throws Exception {
+ try {
+ try {
+ FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+ } catch (Exception e) {
+ }
+ createInput();
+ job = new StreamJob(genArgs(), false);
+ job.go();
+ File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+ String output = StreamUtil.slurp(outFile);
+ outFile.delete();
+ assertEquals(input, output);
+ } finally {
+ try {
+ INPUT_FILE.delete();
+ FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreamXmlRecordReader().testCommandLine();
}
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
index 445330c..ca8de08 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
@@ -19,11 +19,11 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.util.ArrayList;
import org.junit.Test;
import static org.junit.Assert.*;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
@@ -40,8 +40,6 @@
protected File TEST_DIR;
protected File INPUT_FILE;
protected File OUTPUT_DIR;
- protected String inputFile;
- protected String outDir;
protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
// map behaves like "/usr/bin/tr . \\n"; (split words into lines)
protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
@@ -50,7 +48,6 @@
protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
- protected ArrayList<String> args = new ArrayList<String>();
protected StreamJob job;
public TestStreaming() throws IOException
@@ -75,25 +72,15 @@
out.close();
}
- protected void setInputOutput() {
- inputFile = INPUT_FILE.getAbsolutePath();
- outDir = OUTPUT_DIR.getAbsolutePath();
- }
-
protected String[] genArgs() {
- setInputOutput();
- args.add("-input");args.add(inputFile);
- args.add("-output");args.add(outDir);
- args.add("-mapper");args.add(map);
- args.add("-reducer");args.add(reduce);
- args.add("-jobconf");
- args.add("mapreduce.task.files.preserve.failedtasks=true");
- args.add("-jobconf");
- args.add("stream.tmpdir="+System.getProperty("test.build.data","/tmp"));
-
- String str[] = new String [args.size()];
- args.toArray(str);
- return str;
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", reduce,
+ "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
}
protected Configuration getConf() {
@@ -118,12 +105,9 @@
assertEquals(getExpectedOutput(), output);
}
- /**
- * Runs a streaming job with the given arguments
- * @return the streaming job return status
- * @throws IOException
- */
- protected int runStreamJob() throws IOException {
+ @Test
+ public void testCommandLine() throws Exception
+ {
UtilTest.recursiveDelete(TEST_DIR);
assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
createInput();
@@ -132,14 +116,14 @@
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
job = new StreamJob(genArgs(), mayExit);
- return job.go();
- }
-
- @Test
- public void testCommandLine() throws Exception
- {
- int ret = runStreamJob();
+ int ret = job.go();
assertEquals(0, ret);
checkOutput();
}
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreaming().testCommandLine();
+ }
+
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
index c4d0f03..60d728a 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
@@ -27,21 +27,25 @@
public class TestStreamingCombiner extends TestStreaming {
- protected String combine = StreamUtil.makeJavaCommand(
- UniqApp.class, new String[]{""});
+ protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{""});
public TestStreamingCombiner() throws IOException {
super();
}
protected String[] genArgs() {
- args.add("-combiner");
- args.add(combine);
- return super.genArgs();
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", reduce,
+ "-combiner", combine,
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
}
@Test
- public void testCommandLine() throws Exception {
+ public void testCommandLine() throws Exception {
super.testCommandLine();
// validate combiner counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
@@ -51,4 +55,10 @@
assertTrue(counters.findCounter(
counterGrp, "COMBINE_OUTPUT_RECORDS").getValue() != 0);
}
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreamingCombiner().testCommandLine();
+ }
+
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
index 2d65f4d..ca864ba 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
@@ -21,6 +21,7 @@
import org.junit.Test;
import static org.junit.Assert.*;
+import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileUtil;
@@ -37,11 +38,33 @@
}
@Test
- public void testCommandLine() throws Exception {
+ public void testCommandLine() throws IOException
+ {
try {
- super.testCommandLine();
+ try {
+ FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+ } catch (Exception e) {
+ }
- validateCounters();
+ createInput();
+ boolean mayExit = false;
+
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ StreamJob job = new StreamJob(genArgs(), mayExit);
+ job.go();
+ File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+ String output = StreamUtil.slurp(outFile);
+ outFile.delete();
+ assertEquals(outputExpect, output);
+
+ Counters counters = job.running_.getCounters();
+ assertNotNull("Counters", counters);
+ Group group = counters.getGroup("UserCounters");
+ assertNotNull("Group", group);
+ Counter counter = group.getCounterForName("InputLines");
+ assertNotNull("Counter", counter);
+ assertEquals(3, counter.getCounter());
} finally {
try {
INPUT_FILE.delete();
@@ -52,13 +75,4 @@
}
}
- private void validateCounters() throws IOException {
- Counters counters = job.running_.getCounters();
- assertNotNull("Counters", counters);
- Group group = counters.getGroup("UserCounters");
- assertNotNull("Group", group);
- Counter counter = group.getCounterForName("InputLines");
- assertNotNull("Counter", counter);
- assertEquals(3, counter.getCounter());
- }
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
index 1f89309..ff521a6 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
@@ -19,11 +19,14 @@
package org.apache.hadoop.streaming;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
/**
* This class tests if hadoopStreaming returns Exception
@@ -34,31 +37,55 @@
public class TestStreamingFailure extends TestStreaming
{
- protected File INVALID_INPUT_FILE;
+ protected File INVALID_INPUT_FILE;// = new File("invalid_input.txt");
+ private StreamJob job;
public TestStreamingFailure() throws IOException
{
INVALID_INPUT_FILE = new File("invalid_input.txt");
}
- @Override
- protected void setInputOutput() {
- inputFile = INVALID_INPUT_FILE.getAbsolutePath();
- outDir = OUTPUT_DIR.getAbsolutePath();
+ protected String[] genArgs() {
+ return new String[] {
+ "-input", INVALID_INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", reduce,
+ "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
}
- @Override
@Test
- public void testCommandLine() throws IOException {
- try {
- int returnStatus = runStreamJob();
- assertEquals("Streaming Job Failure code expected", 5, returnStatus);
- } finally {
+ public void testCommandLine()
+ {
+ try {
try {
FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
+ } catch (Exception e) {
+ }
+
+ boolean mayExit = false;
+ int returnStatus = 0;
+
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ job = new StreamJob(genArgs(), mayExit);
+ returnStatus = job.go();
+ assertEquals("Streaming Job Failure code expected", 5, returnStatus);
+ } catch(Exception e) {
+ // Expecting an exception
+ } finally {
+ try {
+ FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
} catch (IOException e) {
e.printStackTrace();
}
}
}
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreamingFailure().testCommandLine();
+ }
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingJavaTasks.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingJavaTasks.java
deleted file mode 100644
index 92f2aff..0000000
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingJavaTasks.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- * Tests stream job with java tasks(not commands; So task is the child process
- * of TaskTracker) in MapReduce local mode.
- * Validates if user-set config properties
- * {@link MRJobConfig#MAP_OUTPUT_KEY_CLASS} and
- * {@link MRJobConfig#OUTPUT_KEY_CLASS} are honored by streaming jobs for the
- * case of java mapper/reducer(mapper and reducer are not commands).
- */
-public class TestStreamingJavaTasks extends TestStreaming {
-
- public TestStreamingJavaTasks() throws IOException {
- super();
- input = "one line dummy input\n";
- map = "org.apache.hadoop.mapred.lib.IdentityMapper";
- }
-
- @Override
- protected String[] genArgs() {
- args.clear();
- // set the testcase-specific config properties first and the remaining
- // arguments are set in TestStreaming.genArgs().
- args.add("-jobconf");
- args.add(MRJobConfig.MAP_OUTPUT_KEY_CLASS +
- "=org.apache.hadoop.io.LongWritable");
- args.add("-jobconf");
- args.add(MRJobConfig.OUTPUT_KEY_CLASS +
- "=org.apache.hadoop.io.LongWritable");
-
- // Using SequenceFileOutputFormat here because with TextOutputFormat, the
- // mapred.output.key.class set in JobConf (which we want to test here) is
- // not read/used at all.
- args.add("-outputformat");
- args.add("org.apache.hadoop.mapred.SequenceFileOutputFormat");
-
- return super.genArgs();
- }
-
- @Override
- protected void checkOutput() throws IOException {
- // No need to validate output for the test cases in this class
- }
-
- // Check with IdentityMapper, IdentityReducer
- @Override
- @Test
- public void testCommandLine() throws Exception {
- reduce = "org.apache.hadoop.mapred.lib.IdentityReducer";
- super.testCommandLine();
- }
-
- // Check the case of Reducer = "NONE"
- @Test
- public void testStreamingJavaTasksWithReduceNone() throws Exception {
- reduce = "NONE";
- super.testCommandLine();
- }
-}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
index 30142ba..b043cc1 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
@@ -23,9 +23,8 @@
import org.apache.hadoop.streaming.Environment;
/** A minimal Java implementation of /usr/bin/tr.
- * Used to test the usage of external applications without adding
- * platform-specific dependencies.
- * Use TrApp as mapper only. For reducer, use TrAppReduce.
+ Used to test the usage of external applications without adding
+ platform-specific dependencies.
*/
public class TrApp
{
@@ -44,8 +43,8 @@
// property names have been escaped in PipeMapRed.safeEnvVarName()
expectDefined("mapreduce_cluster_local_dir");
expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
- expect("mapreduce_map_output_key_class", "org.apache.hadoop.io.Text");
- expect("mapreduce_map_output_value_class", "org.apache.hadoop.io.Text");
+ expect("mapreduce_job_output_key_class", "org.apache.hadoop.io.Text");
+ expect("mapreduce_job_output_value_class", "org.apache.hadoop.io.Text");
expect("mapreduce_task_ismap", "true");
expectDefined("mapreduce_task_attempt_id");
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
index c8812ab..563cc44 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
@@ -23,9 +23,8 @@
import org.apache.hadoop.streaming.Environment;
/** A minimal Java implementation of /usr/bin/tr.
- * Used to test the usage of external applications without adding
- * platform-specific dependencies.
- * Use TrApp as mapper only. For reducer, use TrAppReduce.
+ Used to test the usage of external applications without adding
+ platform-specific dependencies.
*/
public class TrAppReduce
{
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java b/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
index eeaf48b..ba45341 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
@@ -67,6 +67,9 @@
conf.setJobName("fof");
conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);