Feature to instruct rumen-folder utility to skip jobs worth
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1097315 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 47b4980..e86c2ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@
NEW FEATURES
IMPROVEMENTS
+ MAPREDUCE-1461. Feature to instruct rumen-folder utility to skip jobs worth
+ of specific duration. (Rajesh Balamohan via amarrk)
MAPREDUCE-2172. Added test-patch.properties required by test-patch.sh (nigel)
diff --git a/src/docs/src/documentation/content/xdocs/rumen.xml b/src/docs/src/documentation/content/xdocs/rumen.xml
index 91de565..ba65c1c 100644
--- a/src/docs/src/documentation/content/xdocs/rumen.xml
+++ b/src/docs/src/documentation/content/xdocs/rumen.xml
@@ -311,6 +311,17 @@
</td>
</tr>
<tr>
+ <td><code>-starts-after</code></td>
+ <td>Specify the time (in milliseconds) relative to the start of
+ the trace, after which this utility should consider the
+ jobs from input trace.
+ </td>
+ <td>If this value is specified as 10000, Folder would ignore
+ first 10000ms worth of jobs in the trace and
+ start considering the rest of the jobs in the trace for folding.
+ </td>
+ </tr>
+ <tr>
<td><code>-temp-directory</code></td>
<td>Temporary directory for the Folder. By default the <strong>output
folder's parent directory</strong> is used as the scratch space.
diff --git a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
index 9d31ec5..822dced 100644
--- a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
+++ b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
@@ -20,6 +20,12 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -71,6 +77,96 @@
TestRumenFolder.<LoggedJob> jsonFileMatchesGold(conf, lfs, foldedTracePath,
foldedGoldFile, LoggedJob.class, "trace");
}
+
+ @Test
+ public void testStartsAfterOption() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ @SuppressWarnings("deprecation")
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", ""))
+ .makeQualified(lfs);
+ @SuppressWarnings("deprecation")
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"))
+ .makeQualified(lfs);
+
+ final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
+ final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
+ lfs.delete(tempDir, true);
+
+ final Path inputFile =
+ new Path(rootInputFile, "goldFoldedTrace.json.gz");
+
+ final Path foldedTracePath = new Path(tempDir,
+ "folded-skippedjob-trace.json");
+ String[] args =
+ { "-input-cycle", "300S", "-output-duration", "300S",
+ "-starts-after", "30S",
+ inputFile.toString(), foldedTracePath.toString() };
+
+ Folder folder = new Folder();
+ int result = ToolRunner.run(folder, args);
+ assertEquals("Non-zero exit", 0, result);
+
+ TestRumenFolder.<LoggedJob> checkValidityAfterSkippingJobs(conf, lfs, foldedTracePath,
+ inputFile, LoggedJob.class, "trace", 30000, 300000);
+ }
+
+ static private <T extends DeepCompare> void
+ checkValidityAfterSkippingJobs(Configuration conf,
+ FileSystem lfs, Path result, Path inputFile,
+ Class<? extends T> clazz, String fileDescription,
+ long startsAfter, long duration) throws IOException {
+
+ JsonObjectMapperParser<T> inputFileParser =
+ new JsonObjectMapperParser<T>(inputFile, clazz, conf);
+ InputStream resultStream = lfs.open(result);
+ JsonObjectMapperParser<T> resultParser =
+ new JsonObjectMapperParser<T>(resultStream, clazz);
+ List<Long> gpSubmitTimes = new LinkedList<Long>();
+ List<Long> rpSubmitTimes = new LinkedList<Long>();
+ try {
+ //Get submitTime of first job
+ LoggedJob firstJob = (LoggedJob)inputFileParser.getNext();
+ gpSubmitTimes.add(firstJob.getSubmitTime());
+ long absoluteStartsAfterTime = firstJob.getSubmitTime() + startsAfter;
+
+ //total duration
+ long endTime = firstJob.getSubmitTime() + duration;
+
+ //read original trace
+ LoggedJob oriJob = null;
+ while((oriJob = (LoggedJob)inputFileParser.getNext()) != null) {
+ gpSubmitTimes.add(oriJob.getSubmitTime());
+ }
+
+ //check if retained jobs have submittime > starts-after
+ LoggedJob job = null;
+ while((job = (LoggedJob) resultParser.getNext()) != null) {
+ assertTrue("job's submit time in the output trace is less " +
+ "than the specified value of starts-after",
+ (job.getSubmitTime() >= absoluteStartsAfterTime));
+ rpSubmitTimes.add(job.getSubmitTime());
+ }
+
+ List<Long> skippedJobs = new LinkedList<Long>();
+ skippedJobs.addAll(gpSubmitTimes);
+ skippedJobs.removeAll(rpSubmitTimes);
+
+ //check if the skipped job submittime < starts-after
+ for(Long submitTime : skippedJobs) {
+ assertTrue("skipped job submit time " + submitTime +
+ " in the trace is greater " +
+ "than the specified value of starts-after "
+ + absoluteStartsAfterTime,
+ (submitTime < absoluteStartsAfterTime));
+ }
+ } finally {
+ IOUtils.cleanup(null, inputFileParser, resultParser);
+ }
+ }
static private <T extends DeepCompare> void jsonFileMatchesGold(
Configuration conf, FileSystem lfs, Path result, Path gold,
diff --git a/src/tools/org/apache/hadoop/tools/rumen/Folder.java b/src/tools/org/apache/hadoop/tools/rumen/Folder.java
index 39364fd..c150654 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/Folder.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/Folder.java
@@ -61,6 +61,7 @@
private boolean debug = false;
private boolean allowMissorting = false;
private int skewBufferLength = 0;
+ private long startsAfter = -1;
static final private Log LOG = LogFactory.getLog(Folder.class);
@@ -130,8 +131,9 @@
for (int i = 0; i < args.length; ++i) {
String thisArg = args[i];
-
- if (thisArg.equalsIgnoreCase("-output-duration")) {
+ if (thisArg.equalsIgnoreCase("-starts-after")) {
+ startsAfter = parseDuration(args[++i]);
+ } else if (thisArg.equalsIgnoreCase("-output-duration")) {
outputDuration = parseDuration(args[++i]);
} else if (thisArg.equalsIgnoreCase("-input-cycle")) {
inputCycle = parseDuration(args[++i]);
@@ -274,6 +276,31 @@
return EMPTY_JOB_TRACE;
}
+
+ // If starts-after time is specified, skip the number of jobs till we reach
+ // the starting time limit.
+ if (startsAfter > 0) {
+ LOG.info("starts-after time is specified. Initial job submit time : "
+ + job.getSubmitTime());
+
+ long approximateTime = job.getSubmitTime() + startsAfter;
+ job = reader.nextJob();
+ long skippedCount = 0;
+ while (job != null && job.getSubmitTime() < approximateTime) {
+ job = reader.nextJob();
+ skippedCount++;
+ }
+
+ LOG.debug("Considering jobs with submit time greater than "
+ + startsAfter + " ms. Skipped " + skippedCount + " jobs.");
+
+ if (job == null) {
+ LOG.error("No more jobs to process in the trace with 'starts-after'"+
+ " set to " + startsAfter + "ms.");
+ return EMPTY_JOB_TRACE;
+ }
+ LOG.info("The first job has a submit time of " + job.getSubmitTime());
+ }
firstJobSubmitTime = job.getSubmitTime();
long lastJobSubmitTime = firstJobSubmitTime;