MAPREDUCE-1517. Supports streaming job to run in the background. Contributed by Bochun Bai

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1002050 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ba2371..fc96883 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,9 @@
     MAPREDUCE-1548. Hadoop archives preserve times and other properties from 
     original files. (Rodrigo Schmidt via dhruba)
 
+    MAPREDUCE-1517. Supports streaming job to run in the background. (Bochun Bai
+    via amareshwari)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
index 35a3674..6dcfeea 100644
--- a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
+++ b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
@@ -255,6 +255,7 @@
         return;
       }
       verbose_ =  cmdLine.hasOption("verbose");
+      background_ =  cmdLine.hasOption("background");
       debug_ = cmdLine.hasOption("debug")? debug_ + 1 : debug_;
       
       String[] values = cmdLine.getOptionValues("input");
@@ -432,6 +433,7 @@
     
     // boolean properties
     
+    Option background = createBoolOption("background", "Submit the job and don't wait till it completes."); 
     Option verbose = createBoolOption("verbose", "print verbose output"); 
     Option info = createBoolOption("info", "print verbose output"); 
     Option help = createBoolOption("help", "print this help message"); 
@@ -459,6 +461,7 @@
       addOption(cacheFile).
       addOption(cacheArchive).
       addOption(io).
+      addOption(background).
       addOption(verbose).
       addOption(info).
       addOption(debug).
@@ -510,6 +513,7 @@
         + " for input to and output");
     System.out.println("                  from mapper/reducer commands");
     System.out.println("  -lazyOutput     Optional. Lazily create Output.");
+    System.out.println("  -background     Optional. Submit the job and don't wait till it completes.");
     System.out.println("  -verbose        Optional. Print verbose output.");
     System.out.println("  -info           Optional. Print detailed usage.");
     System.out.println("  -help           Optional. Print help message.");
@@ -997,7 +1001,9 @@
       running_ = jc_.submitJob(jobConf_);
       jobId_ = running_.getID();
       jobInfo();
-      if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
+      if (background_) {
+        LOG.info("Job is running in background.");
+      } else if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
         LOG.error("Job not Successful!");
         return 1;
       }
@@ -1025,6 +1031,7 @@
   }
 
   protected String[] argv_;
+  protected boolean background_;
   protected boolean verbose_;
   protected boolean detailedUsage_;
   protected boolean printUsage = false;
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java
new file mode 100644
index 0000000..546b7eb
--- /dev/null
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/DelayEchoApp.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/**
+ * A simple Java app that will consume all input from stdin, wait a few seconds
+ * and echoing it to stdout.
+ */
+public class DelayEchoApp {
+
+  public DelayEchoApp() {
+  }
+
+  public void go(int seconds) throws IOException, InterruptedException {
+    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+    String line;
+
+    // Consume all input (to make sure streaming will still count this
+    // task as failed even if all input was consumed).
+    while ((line = in.readLine()) != null) {
+      Thread.sleep(seconds * 1000L);
+      System.out.println(line);
+    }
+  }
+
+  public static void main(String[] args) throws IOException, InterruptedException {
+    int seconds = 5;
+    if (args.length >= 1) {
+      try {
+        seconds = Integer.valueOf(args[0]);
+      } catch (NumberFormatException e) {
+        // just use default 5.
+      }
+    }
+
+    DelayEchoApp app = new DelayEchoApp();
+    app.go(seconds);
+  }
+}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java
new file mode 100644
index 0000000..5b24a2a
--- /dev/null
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBackground.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests if hadoopStreaming background works fine. A DelayEchoApp
+ * with 10 seconds delay is submited. 
+ */
+public class TestStreamingBackground {
+  protected File TEST_DIR = new File("TestStreamingBackground")
+      .getAbsoluteFile();
+  protected File INPUT_FILE = new File(TEST_DIR, "input.txt");
+  protected File OUTPUT_DIR = new File(TEST_DIR, "out");
+
+  protected String tenSecondsTask = StreamUtil.makeJavaCommand(
+      DelayEchoApp.class, new String[] { "10" });
+
+  public TestStreamingBackground() throws IOException {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+
+  protected String[] args = new String[] { 
+      "-background",
+      "-input", INPUT_FILE.getAbsolutePath(), 
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", tenSecondsTask, 
+      "-reducer", tenSecondsTask, 
+      "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp"),
+      "-jobconf", "mapreduce.task.io.sort.mb=10" 
+  };
+
+  @Before
+  public void setUp() throws IOException {
+    UtilTest.recursiveDelete(TEST_DIR);
+    assertTrue(TEST_DIR.mkdirs());
+
+    FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
+    out.write("hello\n".getBytes());
+    out.close();
+  }
+
+  public void runStreamJob() throws Exception {
+    boolean mayExit = false;
+    int returnStatus = 0;
+
+    StreamJob job = new StreamJob(args, mayExit);
+    returnStatus = job.go();
+
+    assertEquals("Streaming Job expected to succeed", 0, returnStatus);
+    job.running_.killJob();
+    job.running_.waitForCompletion();
+  }
+
+  @Test
+  public void testBackgroundSubmitOk() throws Exception {
+    runStreamJob();
+  }
+
+}
diff --git a/src/docs/src/documentation/content/xdocs/streaming.xml b/src/docs/src/documentation/content/xdocs/streaming.xml
index 7247413..4e6b042 100644
--- a/src/docs/src/documentation/content/xdocs/streaming.xml
+++ b/src/docs/src/documentation/content/xdocs/streaming.xml
@@ -97,6 +97,7 @@
 <tr><td> -combiner streamingCommand or JavaClassName</td><td> Optional </td><td> Combiner executable for map output</td></tr>
 <tr><td> -cmdenv name=value</td><td> Optional </td><td> Pass environment variable to streaming commands</td></tr>
 <tr><td> -inputreader spec</td><td> Optional </td><td> Specifies a record reader class (instead of an input format class)</td></tr>
+<tr><td> -background</td><td> Optional </td><td> Submit the job and don't wait till it completes.</td></tr>
 <tr><td> -verbose</td><td> Optional </td><td> Verbose output</td></tr>
 <tr><td> -lazyOutput</td><td> Optional </td><td> Create output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write)</td></tr>
 <tr><td> -numReduceTasks num</td><td> Optional </td><td> Specify the number of reducers</td></tr>