MAPREDUCE-1523. Making Mumak work with Capacity-Scheduler (Anirban Das via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@984533 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 89a924d..45371d1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -112,6 +112,9 @@
     TestTrackerDistributedCacheManagerWithLinuxTaskController. (Devaraj Das
     via amareshwari)
 
+    MAPREDUCE-1523. Making Mumak work with Capacity-Scheduler (Anirban Das
+    via mahadev)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java b/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
index 23389ce..e2178fe 100644
--- a/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
+++ b/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
@@ -634,4 +634,12 @@
   Set<JobID> getInitializedJobList() {
     return initializedJobs.keySet();
   }
+
+  public HashMap<String, JobInitializationThread> getThreadsToQueueMap() {
+    return threadsToQueueMap;
+  }
+  
+  public long getSleepInterval(){
+    return sleepInterval;
+  }
 }
diff --git a/src/contrib/mumak/bin/mumak.sh b/src/contrib/mumak/bin/mumak.sh
index e657d4a..ed1ecf9 100644
--- a/src/contrib/mumak/bin/mumak.sh
+++ b/src/contrib/mumak/bin/mumak.sh
@@ -172,7 +172,7 @@
   echo "Usage: $script [--config dir] trace.json topology.json"
 }
 
-if [ $# != 2 ]; then
+if [ $# <= 2 ]; then
   print_usage
   exit
 fi
diff --git a/src/contrib/mumak/build.xml b/src/contrib/mumak/build.xml
index 5fecedd..2e0e344 100644
--- a/src/contrib/mumak/build.xml
+++ b/src/contrib/mumak/build.xml
@@ -32,6 +32,11 @@
   <import file="../build-contrib.xml"/>
   <property name="mumak.stamp.file" value="${build.dir}/mumak.uptodate.stamp"/>
 
+  <path id="mumak-classpath">
+    <pathelement location="${hadoop.root}/build/contrib/capacity-scheduler/classes"/>
+    <path refid="contrib-classpath"/>
+  </path>
+
   <target name="compile-java-sources" depends="init, ivy-retrieve-common" unless="skip.contrib">
     <echo message="contrib: ${name}"/>
     <javac
@@ -41,7 +46,7 @@
     destdir="${build.classes}"
     debug="${javac.debug}"
     deprecation="${javac.deprecation}">
-    <classpath refid="contrib-classpath"/>
+    <classpath refid="mumak-classpath"/>
     </javac>
   </target>
 
@@ -59,7 +64,7 @@
     <echo message="Start weaving aspects in place"/>
     <iajc
       encoding="${build.encoding}" 
-      srcdir="${hadoop.root}/src/java/;${hadoop.root}/build/src/;${src.dir}"
+      srcdir="${hadoop.root}/src/java/;${hadoop.root}/src/contrib/capacity-scheduler/src/java/;${hadoop.root}/build/src/;${src.dir}"
       includes="org/apache/hadoop/**/*.java, org/apache/hadoop/**/*.aj"
       destDir="${build.classes}"
       debug="${javac.debug}"
@@ -67,7 +72,7 @@
       source="${javac.version}"
       fork="yes"
       deprecation="${javac.deprecation}">
-      <classpath refid="contrib-classpath"/>
+      <classpath refid="mumak-classpath"/>
     </iajc>
     <touch file="${mumak.stamp.file}" mkdirs="true" />
     <echo message="Weaving of aspects is finished"/>
diff --git a/src/contrib/mumak/conf/mumak.xml b/src/contrib/mumak/conf/mumak.xml
index 55b07e9..ac96840 100644
--- a/src/contrib/mumak/conf/mumak.xml
+++ b/src/contrib/mumak/conf/mumak.xml
@@ -34,4 +34,11 @@
   node-local</description>
 </property>
 
+<property>
+  <name>mumak.job-submission.policy</name>
+  <value>REPLAY</value>
+  <description>Job submission policy
+  </description>
+</property>
+
 </configuration>
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobInitializationPollerAspects.aj b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobInitializationPollerAspects.aj
new file mode 100644
index 0000000..57cb76f
--- /dev/null
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobInitializationPollerAspects.aj
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * This aspect is used for NOT starting the job initialization threads of the capacity scheduler.
+ * We schedule the body of these threads manually from the SimulatorJobTracker according to 
+ * simulation time.
+ */
+ 
+public aspect JobInitializationPollerAspects {
+
+  pointcut overrideInitializationThreadStarts () : 
+    (target (JobInitializationPoller) || 
+     target (JobInitializationPoller.JobInitializationThread)) && 
+    call (public void start());
+  
+  void around() : overrideInitializationThreadStarts () {
+    // no-op
+  }
+  
+  
+}
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorCSJobInitializationThread.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorCSJobInitializationThread.java
new file mode 100644
index 0000000..c758486
--- /dev/null
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorCSJobInitializationThread.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobInitializationPoller.JobInitializationThread;
+
+public class SimulatorCSJobInitializationThread implements SimulatorEventListener {
+
+  long lastCalled;
+  CapacityTaskScheduler taskScheduler;
+  JobInitializationPoller jobPoller;
+  private final String queue;
+  final long sleepInterval;
+  /** The log object to send our messages to; only used for debugging. */
+  private static final Log LOG = LogFactory.getLog(SimulatorCSJobInitializationThread.class);
+
+  public SimulatorCSJobInitializationThread(TaskScheduler taskScheduler, 
+      String queue) {
+    this.taskScheduler = (CapacityTaskScheduler) taskScheduler;
+    jobPoller = this.taskScheduler.getInitializationPoller(); 
+    sleepInterval = jobPoller.getSleepInterval();
+    this.queue = queue;
+  }
+
+  @Override
+  public List<SimulatorEvent> accept(SimulatorEvent event) throws IOException {
+
+    SimulatorThreadWakeUpEvent e;
+    if(event instanceof SimulatorThreadWakeUpEvent) {
+      e = (SimulatorThreadWakeUpEvent) event;
+    }
+    else {
+      throw new IOException("Received an unexpected type of event in " + 
+      "SimThrdCapSchedJobInit");
+    }
+    jobPoller.cleanUpInitializedJobsList();
+    jobPoller.selectJobsToInitialize();
+    JobInitializationThread thread = 
+      jobPoller.getThreadsToQueueMap().get(this.queue);
+    thread.initializeJobs();   	
+    lastCalled = e.getTimeStamp();
+    List<SimulatorEvent> returnEvents = 
+      Collections.<SimulatorEvent>singletonList(
+          new SimulatorThreadWakeUpEvent(this,
+              lastCalled + sleepInterval));
+    return returnEvents;
+  }
+
+  @Override
+  public List<SimulatorEvent> init(long when) throws IOException {
+
+    return Collections.<SimulatorEvent>singletonList(
+        new SimulatorThreadWakeUpEvent(this,
+            when + sleepInterval));
+  }
+
+}
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
index db286c9..7a295ae 100644
--- a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
@@ -21,10 +21,15 @@
 import java.io.PrintStream;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
 import java.util.Iterator;
 import java.util.Random;
 import java.util.regex.Pattern;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,6 +73,13 @@
   boolean shutdown = false;
   long terminateTime = Long.MAX_VALUE;
   long currentTime;
+  /** The HashSet for storing all the simulated threads useful for 
+   * job initialization for capacity scheduler.
+   */
+  HashSet<SimulatorCSJobInitializationThread> threadSet;
+  /** The log object to send our messages to; only used for debugging. */
+  private static final Log LOG = LogFactory.getLog(SimulatorEngine.class);
+  
   /** 
    * Master random seed read from the configuration file, if present.
    * It is (only) used for creating sub seeds for all the random number 
@@ -137,13 +149,13 @@
   }
    
   /**
-   * Initiate components in the simulation.
-   * @throws InterruptedException
-   * @throws IOException if trace or topology files cannot be open
+   * Creates the configuration for mumak simulation. This is kept modular mostly for 
+   * testing purposes. so that the standard configuration can be modified before passing
+   * it to the init() function.
+   * @return JobConf: the configuration for the SimulatorJobTracker 
    */
-  @SuppressWarnings("deprecation")
-  void init() throws InterruptedException, IOException {
     
+  JobConf createMumakConf() {
     JobConf jobConf = new JobConf(getConf());
     jobConf.setClass("topology.node.switch.mapping.impl",
         StaticMapping.class, DNSToSwitchMapping.class);
@@ -157,7 +169,30 @@
     jobConf.setUser("mumak");
     jobConf.set("mapred.system.dir", 
         jobConf.get("hadoop.log.dir", "/tmp/hadoop-"+jobConf.getUser()) + "/mapred/system");
-    jobConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class.getName());
+    
+    return jobConf;
+  }
+
+  /**
+   * Initialize components in the simulation.
+   * @throws InterruptedException
+   * @throws IOException if trace or topology files cannot be opened.
+   */
+  void init() throws InterruptedException, IOException {
+    
+    JobConf jobConf = createMumakConf();
+    init(jobConf);
+  }
+    
+  /**
+   * Initiate components in the simulation. The JobConf is
+   * create separately and passed to the init().
+   * @param JobConf: The configuration for the jobtracker.
+   * @throws InterruptedException
+   * @throws IOException if trace or topology files cannot be opened.
+   */
+  @SuppressWarnings("deprecation")
+  void init(JobConf jobConf) throws InterruptedException, IOException {
     
     FileSystem lfs = FileSystem.getLocal(getConf());
     Path logPath =
@@ -212,11 +247,40 @@
     jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy);
     queue.addAll(jc.init(firstJobStartTime));
 
+    //if the taskScheduler is CapacityTaskScheduler start off the JobInitialization
+    //threads too
+    if (jobConf.get("mapred.jobtracker.taskScheduler").equals
+       (CapacityTaskScheduler.class.getName())) {
+      LOG.info("CapacityScheduler used: starting simulatorThreads");
+      startSimulatorThreadsCapSched(now);
+    }
     terminateTime = getTimeProperty(jobConf, "mumak.terminate.time",
                                     Long.MAX_VALUE);
   }
   
   /**
+   * In this function, we collect the set of leaf queues from JobTracker, and 
+   * for each of them creates a simulated thread that performs the same
+   * check as JobInitializationPoller.JobInitializationThread in Capacity Scheduler.  
+   * @param now
+   * @throws IOException
+   */
+  private void startSimulatorThreadsCapSched(long now) throws IOException {
+    
+    Set<String> queueNames = jt.getQueueManager().getLeafQueueNames();
+    TaskScheduler taskScheduler = jt.getTaskScheduler();
+    threadSet = new HashSet<SimulatorCSJobInitializationThread>();
+    // We create a different thread for each queue and hold a 
+    //reference to  each of them 
+    for (String jobQueue: queueNames) {
+      SimulatorCSJobInitializationThread capThread = 
+        new SimulatorCSJobInitializationThread(taskScheduler,jobQueue);   
+      threadSet.add(capThread);
+      queue.addAll(capThread.init(now));
+    }
+  }
+  
+  /**
    * The main loop of the simulation. First call init() to get objects ready,
    * then go into the main loop, where {@link SimulatorEvent}s are handled removed from
    * the {@link SimulatorEventQueue}, and new {@link SimulatorEvent}s are created and inserted
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
index 1b02930..f640056 100644
--- a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
@@ -197,6 +197,24 @@
     SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, jobSubmitDir, this,
                                                             this.conf, 
                                                             jobStory);
+    // Check whether the queue information provided is valid
+    try {
+      checkQueueValidity(job);
+    } catch(IOException ioe) {
+      LOG.warn("Queue given for job " + job.getJobID() + " is not valid:"
+        + ioe);
+      throw ioe;
+    }
+    
+    // Check the job if it cannot run in the cluster because of invalid memory
+    // requirements.
+    try {
+      checkMemoryRequirements(job);
+    } catch (IOException ioe) {
+      LOG.warn("Exception in checking Memory requirements of jobId: " + jobId
+               + ioe);
+      //throw ioe;
+    }
     return addJob(jobId, job);
   }
   
@@ -457,7 +475,10 @@
           }
           SimulatorLaunchTaskAction newlaunchTask = 
         	  new SimulatorLaunchTaskAction(task, taskAttemptInfo);
-          
+          if (loggingEnabled) {
+            LOG.debug("Job " + jobID + " launched taskattempt " +
+                             taskAttemptID + " at " + getClock().getTime());
+          }
           actions.add(newlaunchTask);
         }
       }
diff --git a/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorThreadWakeUpEvent.java b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorThreadWakeUpEvent.java
new file mode 100644
index 0000000..44fa499
--- /dev/null
+++ b/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorThreadWakeUpEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+public class SimulatorThreadWakeUpEvent extends SimulatorEvent {
+
+  public SimulatorThreadWakeUpEvent(SimulatorEventListener listener, 
+      long timestamp) {
+    super(listener, timestamp);
+  }
+
+}
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java
index 60bc2a3..c9a9183 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java
@@ -40,8 +40,7 @@
 
   public static final Log LOG = LogFactory.getLog(MockSimulatorEngine.class);
   
-  public MockSimulatorEngine(int nJobs,
-      @SuppressWarnings("unused") int nTrackers) {
+  public MockSimulatorEngine(int nJobs, int nTrackers) {
     super();
     fixedJobs = nJobs;
     jobs = new HashMap<JobID, JobStory>();
@@ -52,7 +51,11 @@
   @Override
   public void run() throws IOException, InterruptedException {
     startTime = System.currentTimeMillis();
-    init();
+    JobConf jobConf = createMumakConf();
+    // Adding the default queue since the example trace is from queue-less hadoop
+    jobConf.set("mapred.queue.names",JobConf.DEFAULT_QUEUE_NAME);
+    
+    init(jobConf);
     validateInitialization();
     SimulatorEvent nextEvent;
     while ((nextEvent = queue.get()) != null && nextEvent.getTimeStamp() < terminateTime
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
index d6cfb51..88b377a 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
@@ -56,8 +56,10 @@
     
     MockSimulatorEngine mockMumak = new MockSimulatorEngine(numJobs, nTrackers);
 
+    Configuration mumakConf = new Configuration();
+    mumakConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class.getName());
     String[] args = { traceFile.toString(), topologyFile.toString() };
-    int res = ToolRunner.run(conf, mockMumak, args);
+    int res = ToolRunner.run(mumakConf, mockMumak, args);
     Assert.assertEquals(res, 0);
   }
   
diff --git a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
index 8db3829..2f48607 100644
--- a/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
+++ b/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
@@ -57,6 +57,7 @@
     jtConf.set("mapred.system.dir", jtConf.get("hadoop.tmp.dir", "/tmp/hadoop-"
         + jtConf.getUser())
         + "/mapred/system");
+    jtConf.set("mapred.queue.names",JobConf.DEFAULT_QUEUE_NAME);
     System.out.println("Created JobConf");
     return jtConf;
   }
diff --git a/src/java/org/apache/hadoop/mapred/JobTracker.java b/src/java/org/apache/hadoop/mapred/JobTracker.java
index e5d3bc3..915985d 100644
--- a/src/java/org/apache/hadoop/mapred/JobTracker.java
+++ b/src/java/org/apache/hadoop/mapred/JobTracker.java
@@ -3049,14 +3049,11 @@
         new JobInProgress(this, this.conf, restartCount, jobInfo, ts);
 
     synchronized (this) {
-      String queue = job.getProfile().getQueueName();
-      if (!(queueManager.getLeafQueueNames().contains(queue))) {
-        throw new IOException("Queue \"" + queue + "\" does not exist");
-      }
-
-      // check if queue is RUNNING
-      if (!queueManager.isRunning(queue)) {
-        throw new IOException("Queue \"" + queue + "\" is not running");
+      try {
+        checkQueueValidity(job);
+      } catch(IOException ioe) {
+        LOG.error("Queue given for job " + job.getJobID() + " is not valid: " + ioe);
+        throw ioe;
       }
       try {
         checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
@@ -3128,6 +3125,24 @@
   }
 
   /**
+   * For a JobInProgress that is being submitted, check whether 
+   * queue that the job has been submitted to exists and is RUNNING.
+   * @param job The JobInProgress object being submitted.
+   * @throws IOException
+   */
+  public void checkQueueValidity(JobInProgress job) throws IOException {
+    String queue = job.getProfile().getQueueName();
+    if (!(queueManager.getLeafQueueNames().contains(queue))) {
+        throw new IOException("Queue \"" + queue + "\" does not exist");
+    }
+
+    // check if queue is RUNNING
+    if (!queueManager.isRunning(queue)) {
+        throw new IOException("Queue \"" + queue + "\" is not running");
+    }
+  }
+
+  /**
    * Check the ACLs for a user doing the passed queue-operation and the passed
    * job operation.
    * <ul>
@@ -4480,7 +4495,7 @@
    * @param job
    * @throws IOException 
    */
-  private void checkMemoryRequirements(JobInProgress job)
+  void checkMemoryRequirements(JobInProgress job)
       throws IOException {
     if (!perTaskMemoryConfigurationSetOnJT()) {
       if (LOG.isDebugEnabled()) {
diff --git a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
index 2c6e546..25e5bc0 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
@@ -745,6 +745,10 @@
       // any locality, so this group should count as "distance=2".
       // However, setup/cleanup tasks are also counted in the 4th group.
       // These tasks do not make sense.
+      if(cdfList==null) {
+    	  runtime = -1;
+    	  return runtime;
+      }
       try {
         runtime = makeUpRuntime(cdfList.get(locality));
       } catch (NoValueToMakeUpRuntime e) {
@@ -767,6 +771,9 @@
    */
   private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
     int total = 0;
+    if(mapAttemptCDFs == null) {
+    	return -1;
+    }
     for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
       total += cdf.getNumberValues();
     }
@@ -862,6 +869,11 @@
   }
 
   private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
+	  
+  // if numAttempts == null we are returning FAILED.
+  if(numAttempts == null) {
+    return State.FAILED;
+  }
     if (taskAttemptNumber >= numAttempts.length - 1) {
       // always succeed
       return State.SUCCEEDED;