MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. Contributed by Ranjit Mathew

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1021300 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index eefaf17..8666fef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -322,6 +322,9 @@
     MAPREDUCE-2082. Fixes Pipes to create the jobtoken file in the right
     place. (Jitendra Pandey via ddas)
 
+    MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
+    Mathew via amareshwari)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
index afb1d21..51f10f4 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -133,12 +134,23 @@
     LOG.info("Done.");
   }
 
-  protected InputStream createInputStream(String in) throws IOException {
-    if ("-".equals(in)) {
-      return System.in;
+  /**
+   * Create an appropriate {@code JobStoryProducer} object for the
+   * given trace.
+   * 
+   * @param traceIn the path to the trace file. The special path
+   * "-" denotes the standard input stream.
+   *
+   * @param conf the configuration to be used.
+   *
+   * @throws IOException if there was an error.
+   */
+  protected JobStoryProducer createJobStoryProducer(String traceIn,
+      Configuration conf) throws IOException {
+    if ("-".equals(traceIn)) {
+      return new ZombieJobProducer(System.in, null);
     }
-    final Path pin = new Path(in);
-    return pin.getFileSystem(getConf()).open(pin);
+    return new ZombieJobProducer(new Path(traceIn), null, conf);
   }
 
   /**
@@ -202,9 +214,9 @@
       throws IOException {
      return GridmixJobSubmissionPolicy.getPolicy(
        conf, GridmixJobSubmissionPolicy.STRESS).createJobFactory(
-       submitter, new ZombieJobProducer(
-         createInputStream(
-           traceIn), null), scratchDir, conf, startFlag, resolver);  }
+       submitter, createJobStoryProducer(traceIn, conf), scratchDir, conf,
+       startFlag, resolver);
+  }
 
   private static UserResolver userResolver;
 
diff --git a/src/contrib/gridmix/src/test/data/wordcount.json.gz b/src/contrib/gridmix/src/test/data/wordcount.json.gz
new file mode 100644
index 0000000..c5cc40e
--- /dev/null
+++ b/src/contrib/gridmix/src/test/data/wordcount.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
index 97fccb3..65abe4b 100644
--- a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -24,10 +26,13 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.GZIPInputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.Counters;
@@ -38,6 +43,7 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -328,6 +334,99 @@
     }
   }
 
+  /**
+   * Verifies that the given {@code JobStory} corresponds to the checked-in
+   * WordCount {@code JobStory}. The verification is effected via JUnit
+   * assertions.
+   *
+   * @param js the candidate JobStory.
+   */
+  private void verifyWordCountJobStory(JobStory js) {
+    assertNotNull("Null JobStory", js);
+    String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1";
+    String actualJobStory = js.getName() + ":" + js.getUser() + ":"
+      + js.getQueueName() + ":" + js.getSubmissionTime() + ":"
+      + js.getNumberMaps() + ":" + js.getNumberReduces();
+    assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory);
+  }
+
+  /**
+   * Expands a file compressed using {@code gzip}.
+   *
+   * @param fs the {@code FileSystem} corresponding to the given
+   * file.
+   *
+   * @param in the path to the compressed file.
+   *
+   * @param out the path to the uncompressed output.
+   *
+   * @throws Exception if there was an error during the operation.
+   */
+  private void expandGzippedTrace(FileSystem fs, Path in, Path out)
+    throws Exception {
+    byte[] buff = new byte[4096];
+    GZIPInputStream gis = new GZIPInputStream(fs.open(in));
+    FSDataOutputStream fsdos = fs.create(out);
+    int numRead;
+    while ((numRead = gis.read(buff, 0, buff.length)) != -1) {
+      fsdos.write(buff, 0, numRead);
+    }
+    gis.close();
+    fsdos.close();
+  }
+
+  /**
+   * Tests the reading of traces in GridMix3. These traces are generated
+   * by Rumen and are in the JSON format. The traces can optionally be
+   * compressed and uncompressed traces can also be passed to GridMix3 via
+   * its standard input stream. The testing is effected via JUnit assertions.
+   *
+   * @throws Exception if there was an error.
+   */
+  @Test
+  public void testTraceReader() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    Path rootInputDir = new Path(System.getProperty("src.test.data"));
+    rootInputDir
+      = rootInputDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    Path rootTempDir
+      = new Path(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir")), "testTraceReader");
+    rootTempDir
+      = rootTempDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    Path inputFile = new Path(rootInputDir, "wordcount.json.gz");
+    Path tempFile = new Path(rootTempDir, "gridmix3-wc.json");
+
+    InputStream origStdIn = System.in;
+    InputStream tmpIs = null;
+    try {
+      DebugGridmix dgm = new DebugGridmix();
+      JobStoryProducer jsp
+        = dgm.createJobStoryProducer(inputFile.toString(), conf);
+
+      System.out.println("Verifying JobStory from compressed trace...");
+      verifyWordCountJobStory(jsp.getNextJob());
+
+      expandGzippedTrace(lfs, inputFile, tempFile);
+      jsp = dgm.createJobStoryProducer(tempFile.toString(), conf);
+      System.out.println("Verifying JobStory from uncompressed trace...");
+      verifyWordCountJobStory(jsp.getNextJob());
+
+      tmpIs = lfs.open(tempFile);
+      System.setIn(tmpIs);
+      System.out.println("Verifying JobStory from trace in standard input...");
+      jsp = dgm.createJobStoryProducer("-", conf);
+      verifyWordCountJobStory(jsp.getNextJob());
+    } finally {
+      System.setIn(origStdIn);
+      if (tmpIs != null) {
+	tmpIs.close();
+      }
+      lfs.delete(rootTempDir, true);
+    }
+  }
+
   @Test
   public void testReplaySubmit() throws Exception {
     policy = GridmixJobSubmissionPolicy.REPLAY;