Feature to instruct rumen-folder utility to skip jobs worth

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1097315 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 47b4980..e86c2ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@
   NEW FEATURES
 
   IMPROVEMENTS
+    MAPREDUCE-1461. Feature to instruct rumen-folder utility to skip jobs worth 
+    of specific duration. (Rajesh Balamohan via amarrk)
 
     MAPREDUCE-2172. Added test-patch.properties required by test-patch.sh (nigel)
 
diff --git a/src/docs/src/documentation/content/xdocs/rumen.xml b/src/docs/src/documentation/content/xdocs/rumen.xml
index 91de565..ba65c1c 100644
--- a/src/docs/src/documentation/content/xdocs/rumen.xml
+++ b/src/docs/src/documentation/content/xdocs/rumen.xml
@@ -311,6 +311,17 @@
           </td>
         </tr>
         <tr>
+        <td><code>-starts-after</code></td>
+            <td>Specify the time (in milliseconds) relative to the start of   
+              the trace, after which this utility should consider the 
+              jobs from input trace.
+            </td>
+            <td>If this value is specified as 10000, Folder would ignore 
+              first 10000ms worth of jobs in the trace and 
+              start considering the rest of the jobs in the trace for folding.
+            </td>
+          </tr>
+          <tr>
           <td><code>-temp-directory</code></td>
           <td>Temporary directory for the Folder. By default the <strong>output
               folder's parent directory</strong> is used as the scratch space.
diff --git a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
index 9d31ec5..822dced 100644
--- a/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
+++ b/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
@@ -20,6 +20,12 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -71,6 +77,96 @@
     TestRumenFolder.<LoggedJob> jsonFileMatchesGold(conf, lfs, foldedTracePath,
         foldedGoldFile, LoggedJob.class, "trace");
   }
+  
+  @Test
+  public void testStartsAfterOption() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    @SuppressWarnings("deprecation")
+    final Path rootInputDir =
+      new Path(System.getProperty("test.tools.input.dir", ""))
+               .makeQualified(lfs);
+    @SuppressWarnings("deprecation")
+    final Path rootTempDir =
+      new Path(System.getProperty("test.build.data", "/tmp"))
+               .makeQualified(lfs);
+
+    final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
+    lfs.delete(tempDir, true);
+
+    final Path inputFile =
+      new Path(rootInputFile, "goldFoldedTrace.json.gz");
+    
+    final Path foldedTracePath = new Path(tempDir, 
+                                          "folded-skippedjob-trace.json");
+    String[] args =
+       { "-input-cycle", "300S", "-output-duration", "300S",
+         "-starts-after", "30S", 
+         inputFile.toString(), foldedTracePath.toString() };
+
+    Folder folder = new Folder();
+    int result = ToolRunner.run(folder, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenFolder.<LoggedJob> checkValidityAfterSkippingJobs(conf, lfs, foldedTracePath,
+          inputFile, LoggedJob.class, "trace", 30000, 300000);
+  }
+  
+  static private <T extends DeepCompare> void 
+            checkValidityAfterSkippingJobs(Configuration conf, 
+            FileSystem lfs, Path result, Path inputFile,
+            Class<? extends T> clazz, String fileDescription, 
+            long startsAfter, long duration) throws IOException {
+    
+    JsonObjectMapperParser<T> inputFileParser =
+        new JsonObjectMapperParser<T>(inputFile, clazz, conf);
+    InputStream resultStream = lfs.open(result);
+    JsonObjectMapperParser<T> resultParser =
+        new JsonObjectMapperParser<T>(resultStream, clazz);
+    List<Long> gpSubmitTimes = new LinkedList<Long>(); 
+    List<Long> rpSubmitTimes = new LinkedList<Long>();
+    try {
+      //Get submitTime of first job
+      LoggedJob firstJob = (LoggedJob)inputFileParser.getNext();
+      gpSubmitTimes.add(firstJob.getSubmitTime());
+      long absoluteStartsAfterTime = firstJob.getSubmitTime() + startsAfter;
+      
+      //total duration
+      long endTime = firstJob.getSubmitTime() + duration;
+      
+      //read original trace
+      LoggedJob oriJob = null;
+      while((oriJob = (LoggedJob)inputFileParser.getNext()) != null) {
+      	gpSubmitTimes.add(oriJob.getSubmitTime());
+      }
+      
+      //check if retained jobs have submittime > starts-after
+      LoggedJob job = null;
+      while((job = (LoggedJob) resultParser.getNext()) != null) {
+        assertTrue("job's submit time in the output trace is less " +
+                   "than the specified value of starts-after", 
+                   (job.getSubmitTime() >= absoluteStartsAfterTime));
+                   rpSubmitTimes.add(job.getSubmitTime());
+      }
+      
+      List<Long> skippedJobs = new LinkedList<Long>();
+      skippedJobs.addAll(gpSubmitTimes);
+      skippedJobs.removeAll(rpSubmitTimes);
+      
+      //check if the skipped job submittime < starts-after
+      for(Long submitTime : skippedJobs) {
+        assertTrue("skipped job submit time " + submitTime + 
+                   " in the trace is greater " +
+                   "than the specified value of starts-after " 
+                   + absoluteStartsAfterTime, 
+                   (submitTime < absoluteStartsAfterTime));
+      }
+    } finally {
+      IOUtils.cleanup(null, inputFileParser, resultParser);
+    }
+  }
 
   static private <T extends DeepCompare> void jsonFileMatchesGold(
       Configuration conf, FileSystem lfs, Path result, Path gold,
diff --git a/src/tools/org/apache/hadoop/tools/rumen/Folder.java b/src/tools/org/apache/hadoop/tools/rumen/Folder.java
index 39364fd..c150654 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/Folder.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/Folder.java
@@ -61,6 +61,7 @@
   private boolean debug = false;
   private boolean allowMissorting = false;
   private int skewBufferLength = 0;
+  private long startsAfter = -1;
 
   static final private Log LOG = LogFactory.getLog(Folder.class);
 
@@ -130,8 +131,9 @@
 
     for (int i = 0; i < args.length; ++i) {
       String thisArg = args[i];
-
-      if (thisArg.equalsIgnoreCase("-output-duration")) {
+      if (thisArg.equalsIgnoreCase("-starts-after")) {
+        startsAfter = parseDuration(args[++i]);
+      } else if (thisArg.equalsIgnoreCase("-output-duration")) {
         outputDuration = parseDuration(args[++i]);
       } else if (thisArg.equalsIgnoreCase("-input-cycle")) {
         inputCycle = parseDuration(args[++i]);
@@ -274,6 +276,31 @@
 
         return EMPTY_JOB_TRACE;
       }
+      
+      // If starts-after time is specified, skip the number of jobs till we reach
+      // the starting time limit.
+      if (startsAfter > 0) {
+        LOG.info("starts-after time is specified. Initial job submit time : " 
+                 + job.getSubmitTime());
+
+        long approximateTime = job.getSubmitTime() + startsAfter;
+        job = reader.nextJob();
+        long skippedCount = 0;
+        while (job != null && job.getSubmitTime() < approximateTime) {
+          job = reader.nextJob();
+          skippedCount++;
+        }
+
+        LOG.debug("Considering jobs with submit time greater than " 
+                  + startsAfter + " ms. Skipped " + skippedCount + " jobs.");
+
+        if (job == null) {
+          LOG.error("No more jobs to process in the trace with 'starts-after'"+
+                    " set to " + startsAfter + "ms.");
+          return EMPTY_JOB_TRACE;
+        }
+        LOG.info("The first job has a submit time of " + job.getSubmitTime());
+      }
 
       firstJobSubmitTime = job.getSubmitTime();
       long lastJobSubmitTime = firstJobSubmitTime;