MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. Contributed by Ranjit Mathew
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1021300 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index eefaf17..8666fef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -322,6 +322,9 @@
MAPREDUCE-2082. Fixes Pipes to create the jobtoken file in the right
place. (Jitendra Pandey via ddas)
+ MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
+ Mathew via amareshwari)
+
Release 0.21.1 - Unreleased
NEW FEATURES
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
index afb1d21..51f10f4 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.ZombieJobProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -133,12 +134,23 @@
LOG.info("Done.");
}
- protected InputStream createInputStream(String in) throws IOException {
- if ("-".equals(in)) {
- return System.in;
+ /**
+ * Create an appropriate {@code JobStoryProducer} object for the
+ * given trace.
+ *
+ * @param traceIn the path to the trace file. The special path
+ * "-" denotes the standard input stream.
+ *
+ * @param conf the configuration to be used.
+ *
+ * @throws IOException if there was an error.
+ */
+ protected JobStoryProducer createJobStoryProducer(String traceIn,
+ Configuration conf) throws IOException {
+ if ("-".equals(traceIn)) {
+ return new ZombieJobProducer(System.in, null);
}
- final Path pin = new Path(in);
- return pin.getFileSystem(getConf()).open(pin);
+ return new ZombieJobProducer(new Path(traceIn), null, conf);
}
/**
@@ -202,9 +214,9 @@
throws IOException {
return GridmixJobSubmissionPolicy.getPolicy(
conf, GridmixJobSubmissionPolicy.STRESS).createJobFactory(
- submitter, new ZombieJobProducer(
- createInputStream(
- traceIn), null), scratchDir, conf, startFlag, resolver); }
+ submitter, createJobStoryProducer(traceIn, conf), scratchDir, conf,
+ startFlag, resolver);
+ }
private static UserResolver userResolver;
diff --git a/src/contrib/gridmix/src/test/data/wordcount.json.gz b/src/contrib/gridmix/src/test/data/wordcount.json.gz
new file mode 100644
index 0000000..c5cc40e
--- /dev/null
+++ b/src/contrib/gridmix/src/test/data/wordcount.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
index 97fccb3..65abe4b 100644
--- a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred.gridmix;
+import java.io.FileInputStream;
+import java.io.InputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -24,10 +26,13 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Counters;
@@ -38,6 +43,7 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.ToolRunner;
@@ -328,6 +334,99 @@
}
}
+ /**
+ * Verifies that the given {@code JobStory} corresponds to the checked-in
+ * WordCount {@code JobStory}. The verification is effected via JUnit
+ * assertions.
+ *
+ * @param js the candidate JobStory.
+ */
+ private void verifyWordCountJobStory(JobStory js) {
+ assertNotNull("Null JobStory", js);
+ String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1";
+ String actualJobStory = js.getName() + ":" + js.getUser() + ":"
+ + js.getQueueName() + ":" + js.getSubmissionTime() + ":"
+ + js.getNumberMaps() + ":" + js.getNumberReduces();
+ assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory);
+ }
+
+ /**
+ * Expands a file compressed using {@code gzip}.
+ *
+ * @param fs the {@code FileSystem} corresponding to the given
+ * file.
+ *
+ * @param in the path to the compressed file.
+ *
+ * @param out the path to the uncompressed output.
+ *
+ * @throws Exception if there was an error during the operation.
+ */
+ private void expandGzippedTrace(FileSystem fs, Path in, Path out)
+ throws Exception {
+ byte[] buff = new byte[4096];
+ GZIPInputStream gis = new GZIPInputStream(fs.open(in));
+ FSDataOutputStream fsdos = fs.create(out);
+ int numRead;
+ while ((numRead = gis.read(buff, 0, buff.length)) != -1) {
+ fsdos.write(buff, 0, numRead);
+ }
+ gis.close();
+ fsdos.close();
+ }
+
+ /**
+ * Tests the reading of traces in GridMix3. These traces are generated
+ * by Rumen and are in the JSON format. The traces can optionally be
+ * compressed and uncompressed traces can also be passed to GridMix3 via
+ * its standard input stream. The testing is effected via JUnit assertions.
+ *
+ * @throws Exception if there was an error.
+ */
+ @Test
+ public void testTraceReader() throws Exception {
+ Configuration conf = new Configuration();
+ FileSystem lfs = FileSystem.getLocal(conf);
+ Path rootInputDir = new Path(System.getProperty("src.test.data"));
+ rootInputDir
+ = rootInputDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+ Path rootTempDir
+ = new Path(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")), "testTraceReader");
+ rootTempDir
+ = rootTempDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+ Path inputFile = new Path(rootInputDir, "wordcount.json.gz");
+ Path tempFile = new Path(rootTempDir, "gridmix3-wc.json");
+
+ InputStream origStdIn = System.in;
+ InputStream tmpIs = null;
+ try {
+ DebugGridmix dgm = new DebugGridmix();
+ JobStoryProducer jsp
+ = dgm.createJobStoryProducer(inputFile.toString(), conf);
+
+ System.out.println("Verifying JobStory from compressed trace...");
+ verifyWordCountJobStory(jsp.getNextJob());
+
+ expandGzippedTrace(lfs, inputFile, tempFile);
+ jsp = dgm.createJobStoryProducer(tempFile.toString(), conf);
+ System.out.println("Verifying JobStory from uncompressed trace...");
+ verifyWordCountJobStory(jsp.getNextJob());
+
+ tmpIs = lfs.open(tempFile);
+ System.setIn(tmpIs);
+ System.out.println("Verifying JobStory from trace in standard input...");
+ jsp = dgm.createJobStoryProducer("-", conf);
+ verifyWordCountJobStory(jsp.getNextJob());
+ } finally {
+ System.setIn(origStdIn);
+ if (tmpIs != null) {
+ tmpIs.close();
+ }
+ lfs.delete(rootTempDir, true);
+ }
+ }
+
@Test
public void testReplaySubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.REPLAY;