MAPREDUCE-1517. Supports streaming job to run in the background. Contributed by Bochun Bai
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1002050 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ba2371..fc96883 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,9 @@
MAPREDUCE-1548. Hadoop archives preserve times and other properties from
original files. (Rodrigo Schmidt via dhruba)
+ MAPREDUCE-1517. Supports streaming job to run in the background. (Bochun Bai
+ via amareshwari)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
index 35a3674..6dcfeea 100644
--- a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
+++ b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
@@ -255,6 +255,7 @@
return;
}
verbose_ = cmdLine.hasOption("verbose");
+ background_ = cmdLine.hasOption("background");
debug_ = cmdLine.hasOption("debug")? debug_ + 1 : debug_;
String[] values = cmdLine.getOptionValues("input");
@@ -432,6 +433,7 @@
// boolean properties
+ Option background = createBoolOption("background", "Submit the job and don't wait till it completes.");
Option verbose = createBoolOption("verbose", "print verbose output");
Option info = createBoolOption("info", "print verbose output");
Option help = createBoolOption("help", "print this help message");
@@ -459,6 +461,7 @@
addOption(cacheFile).
addOption(cacheArchive).
addOption(io).
+ addOption(background).
addOption(verbose).
addOption(info).
addOption(debug).
@@ -510,6 +513,7 @@
+ " for input to and output");
System.out.println(" from mapper/reducer commands");
System.out.println(" -lazyOutput Optional. Lazily create Output.");
+ System.out.println(" -background Optional. Submit the job and don't wait till it completes.");
System.out.println(" -verbose Optional. Print verbose output.");
System.out.println(" -info Optional. Print detailed usage.");
System.out.println(" -help Optional. Print help message.");
@@ -997,7 +1001,9 @@
running_ = jc_.submitJob(jobConf_);
jobId_ = running_.getID();
jobInfo();
- if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
+ if (background_) {
+ LOG.info("Job is running in background.");
+ } else if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
LOG.error("Job not Successful!");
return 1;
}
@@ -1025,6 +1031,7 @@
}
protected String[] argv_;
+ protected boolean background_;
protected boolean verbose_;
protected boolean detailedUsage_;
protected boolean printUsage = false;
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java
new file mode 100644
index 0000000..546b7eb
--- /dev/null
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/**
+ * A simple Java app that will consume all input from stdin, wait a few seconds
+ * and echoing it to stdout.
+ */
+public class DelayEchoApp {
+
+ public DelayEchoApp() {
+ }
+
+ public void go(int seconds) throws IOException, InterruptedException {
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+
+ // Consume all input (to make sure streaming will still count this
+ // task as failed even if all input was consumed).
+ while ((line = in.readLine()) != null) {
+ Thread.sleep(seconds * 1000L);
+ System.out.println(line);
+ }
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ int seconds = 5;
+ if (args.length >= 1) {
+ try {
+ seconds = Integer.valueOf(args[0]);
+ } catch (NumberFormatException e) {
+ // just use default 5.
+ }
+ }
+
+ DelayEchoApp app = new DelayEchoApp();
+ app.go(seconds);
+ }
+}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java
new file mode 100644
index 0000000..5b24a2a
--- /dev/null
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests if hadoopStreaming background works fine. A DelayEchoApp
+ * with 10 seconds delay is submited.
+ */
+public class TestStreamingBackground {
+ protected File TEST_DIR = new File("TestStreamingBackground")
+ .getAbsoluteFile();
+ protected File INPUT_FILE = new File(TEST_DIR, "input.txt");
+ protected File OUTPUT_DIR = new File(TEST_DIR, "out");
+
+ protected String tenSecondsTask = StreamUtil.makeJavaCommand(
+ DelayEchoApp.class, new String[] { "10" });
+
+ public TestStreamingBackground() throws IOException {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
+ }
+
+ protected String[] args = new String[] {
+ "-background",
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", tenSecondsTask,
+ "-reducer", tenSecondsTask,
+ "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp"),
+ "-jobconf", "mapreduce.task.io.sort.mb=10"
+ };
+
+ @Before
+ public void setUp() throws IOException {
+ UtilTest.recursiveDelete(TEST_DIR);
+ assertTrue(TEST_DIR.mkdirs());
+
+ FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
+ out.write("hello\n".getBytes());
+ out.close();
+ }
+
+ public void runStreamJob() throws Exception {
+ boolean mayExit = false;
+ int returnStatus = 0;
+
+ StreamJob job = new StreamJob(args, mayExit);
+ returnStatus = job.go();
+
+ assertEquals("Streaming Job expected to succeed", 0, returnStatus);
+ job.running_.killJob();
+ job.running_.waitForCompletion();
+ }
+
+ @Test
+ public void testBackgroundSubmitOk() throws Exception {
+ runStreamJob();
+ }
+
+}
diff --git a/src/docs/src/documentation/content/xdocs/streaming.xml b/src/docs/src/documentation/content/xdocs/streaming.xml
index 7247413..4e6b042 100644
--- a/src/docs/src/documentation/content/xdocs/streaming.xml
+++ b/src/docs/src/documentation/content/xdocs/streaming.xml
@@ -97,6 +97,7 @@
<tr><td> -combiner streamingCommand or JavaClassName</td><td> Optional </td><td> Combiner executable for map output</td></tr>
<tr><td> -cmdenv name=value</td><td> Optional </td><td> Pass environment variable to streaming commands</td></tr>
<tr><td> -inputreader spec</td><td> Optional </td><td> Specifies a record reader class (instead of an input format class)</td></tr>
+<tr><td> -background</td><td> Optional </td><td> Submit the job and don't wait till it completes.</td></tr>
<tr><td> -verbose</td><td> Optional </td><td> Verbose output</td></tr>
<tr><td> -lazyOutput</td><td> Optional </td><td> Create output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write)</td></tr>
<tr><td> -numReduceTasks num</td><td> Optional </td><td> Specify the number of reducers</td></tr>