/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.Node;


/*************************************************************
 * TaskInProgress maintains all the info needed for a
 * Task in the lifetime of its owning Job.  A given Task
 * might be speculatively executed or reexecuted, so we
 * need a level of indirection above the running-id itself.
 * <br>
 * A given TaskInProgress contains multiple taskids,
 * 0 or more of which might be executing at any one time.
 * (That's what allows speculative execution.)  A taskid
 * is now *never* recycled.  A TIP allocates enough taskids
 * to account for all the speculation and failures it will
 * ever have to handle.  Once those are up, the TIP is dead.
 * **************************************************************
 */
class TaskInProgress {
  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently  
  int maxTaskAttempts = 4;    
  static final long SPECULATIVE_LAG = 60 * 1000;
  private static final int NUM_ATTEMPTS_PER_RESTART = 1000;

  public static final Log LOG = LogFactory.getLog(TaskInProgress.class);

  // Defines the TIP
  private String jobFile = null;
  private TaskSplitMetaInfo splitInfo;
  private int numMaps;
  private int partition;
  private JobTracker jobtracker;
  private JobHistory jobHistory;
  private TaskID id;
  private JobInProgress job;
  private final int numSlotsRequired;

  // Status of the TIP
  private int successEventNumber = -1;
  private int numTaskFailures = 0;
  private int numKilledTasks = 0;
  private double progress = 0;
  private double oldProgressRate;
  private String state = "";
  private long lastDispatchTime = 0;   // most recent time task attempt given to TT
  private long execStartTime = 0;  // when we started first task-attempt
  private long execFinishTime = 0;
  private int completes = 0;
  private boolean failed = false;
  private boolean killed = false;
  private long maxSkipRecords = 0;
  private FailedRanges failedRanges = new FailedRanges();
  private volatile boolean skipping = false;
  private boolean jobCleanup = false; 
  private boolean jobSetup = false;
   
  // The 'next' usable taskid of this tip
  int nextTaskId = 0;
    
  // The taskid that took this TIP to SUCCESS
  private TaskAttemptID successfulTaskId;

  // The first taskid of this tip
  private TaskAttemptID firstTaskId;
  
  // Map from task Id -> TaskTracker Id, contains tasks that are
  // currently runnings
  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
  // All attempt Ids of this TIP
  private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
  private JobConf conf;
  private Map<TaskAttemptID,List<String>> taskDiagnosticData =
    new TreeMap<TaskAttemptID,List<String>>();
  /**
   * Map from taskId -> TaskStatus
   */
  TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
    new TreeMap<TaskAttemptID,TaskStatus>();

  // Map from taskId -> TaskTracker Id, 
  // contains cleanup attempts and where they ran, if any
  private TreeMap<TaskAttemptID, String> cleanupTasks =
    new TreeMap<TaskAttemptID, String>();

  private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
  
  //list of tasks to kill, <taskid> -> <shouldFail> 
  private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
  
  //task to commit, <taskattemptid>  
  private TaskAttemptID taskToCommit;
  
  private volatile Counters counters = new Counters();
  
  private HashMap<TaskAttemptID, Long> dispatchTimeMap = 
    new HashMap<TaskAttemptID, Long>();
  
  private String user;
  

  /**
   * Constructor for MapTask
   */
  public TaskInProgress(JobID jobid, String jobFile, 
                        TaskSplitMetaInfo split, 
                        JobTracker jobtracker, JobConf conf, 
                        JobInProgress job, int partition,
                        int numSlotsRequired) {
    this.jobFile = jobFile;
    this.splitInfo = split;
    this.jobtracker = jobtracker;
    this.job = job;
    this.conf = conf;
    this.partition = partition;
    this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
    this.numSlotsRequired = numSlotsRequired;
    setMaxTaskAttempts();
    init(jobid);
    if (jobtracker != null) {
      this.jobHistory = jobtracker.getJobHistory();
    }
    this.user = job.getUser();
  }
        
  /**
   * Constructor for ReduceTask
   */
  public TaskInProgress(JobID jobid, String jobFile, 
                        int numMaps, 
                        int partition, JobTracker jobtracker, JobConf conf,
                        JobInProgress job, int numSlotsRequired) {
    this.jobFile = jobFile;
    this.numMaps = numMaps;
    this.partition = partition;
    this.jobtracker = jobtracker;
    this.job = job;
    this.conf = conf;
    this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
    this.numSlotsRequired = numSlotsRequired;
    setMaxTaskAttempts();
    init(jobid);
    if (jobtracker != null) {
      this.jobHistory = jobtracker.getJobHistory();
    }
    this.user = job.getUser();
  }
  
  /**
   * Set the max number of attempts before we declare a TIP as "failed"
   */
  private void setMaxTaskAttempts() {
    if (isMapTask()) {
      this.maxTaskAttempts = conf.getMaxMapAttempts();
    } else {
      this.maxTaskAttempts = conf.getMaxReduceAttempts();
    }
  }
    
  /**
   * Return the index of the tip within the job, so 
   * "task_200707121733_1313_0002_m_012345" would return 12345;
   * @return int the tip index
   */
  public int idWithinJob() {
    return partition;
  }    

  public boolean isJobCleanupTask() {
   return jobCleanup;
  }
  
  public void setJobCleanupTask() {
    jobCleanup = true;
  }

  public boolean isJobSetupTask() {
    return jobSetup;
  }
	  
  public void setJobSetupTask() {
    jobSetup = true;
  }

  public boolean isOnlyCommitPending() {
    for (TaskStatus t : taskStatuses.values()) {
      if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
        return true;
      }
    }
    return false;
  }
 
  public boolean isCommitPending(TaskAttemptID taskId) {
    TaskStatus t = taskStatuses.get(taskId);
    if (t == null) {
      return false;
    }
    return t.getRunState() ==  TaskStatus.State.COMMIT_PENDING;
  }
  
  /**
   * Initialization common to Map and Reduce
   */
  void init(JobID jobId) {
    this.id = new TaskID(jobId, isMapTask() ? TaskType.MAP : TaskType.REDUCE,
        partition);
    this.skipping = startSkipping();
  }

  ////////////////////////////////////
  // Accessors, info, profiles, etc.
  ////////////////////////////////////
  
  /**
   * Return the dispatch time
   */
  public long getDispatchTime(TaskAttemptID taskid){
    Long l = dispatchTimeMap.get(taskid);
    if (l != null) {
      return l.longValue();
    }
    return 0;
  }

  public long getLastDispatchTime(){
    return this.lastDispatchTime;
  }
  
  /**
   * Set the dispatch time
   */
  public void setDispatchTime(TaskAttemptID taskid, long disTime){
    dispatchTimeMap.put(taskid, disTime);
    this.lastDispatchTime = disTime;
  }
  
  /**
   * Return the exec start time
   */
  public long getExecStartTime() {
    return execStartTime;
  }
  
  /**
   * Set the exec start time
   */
  public void setExecStartTime(long startTime) {
    execStartTime = startTime;
  }
  
  /**
   * Return the exec finish time
   */
  public long getExecFinishTime() {
    return execFinishTime;
  }

  /**
   * Set the exec finish time
   */
  public void setExecFinishTime(long finishTime) {
    execFinishTime = finishTime;
    TaskUpdatedEvent tue = new TaskUpdatedEvent(id, execFinishTime);
    jobHistory.logEvent(tue, id.getJobID());
  }
  
  /**
   * Return the parent job
   */
  public JobInProgress getJob() {
    return job;
  }
  /**
   * Return an ID for this task, not its component taskid-threads
   */
  public TaskID getTIPId() {
    return this.id;
  }
  /**
   * Whether this is a map task
   */
  public boolean isMapTask() {
    return splitInfo != null;
  }
    
  /**
   * Returns the {@link TaskType} of the {@link TaskAttemptID} passed. 
   * The type of an attempt is determined by the nature of the task and not its 
   * id. 
   * For example,
   * - Attempt 'attempt_123_01_m_01_0' might be a job-setup task even though it 
   *   has a _m_ in its id. Hence the task type of this attempt is JOB_SETUP 
   *   instead of MAP.
   * - Similarly reduce attempt 'attempt_123_01_r_01_0' might have failed and is
   *   now supposed to do the task-level cleanup. In such a case this attempt 
   *   will be of type TASK_CLEANUP instead of REDUCE.
   */
  TaskType getAttemptType (TaskAttemptID id) {
    if (isCleanupAttempt(id)) {
      return TaskType.TASK_CLEANUP;
    } else if (isJobSetupTask()) {
      return TaskType.JOB_SETUP;
    } else if (isJobCleanupTask()) {
      return TaskType.JOB_CLEANUP;
    } else if (isMapTask()) {
      return TaskType.MAP;
    } else {
      return TaskType.REDUCE;
    }
  }
  
  /**
   * Is the Task associated with taskid is the first attempt of the tip? 
   * @param taskId
   * @return Returns true if the Task is the first attempt of the tip
   */  
  public boolean isFirstAttempt(TaskAttemptID taskId) {
    return firstTaskId == null ? false : firstTaskId.equals(taskId); 
  }
  
  /**
   * Is this tip currently running any tasks?
   * @return true if any tasks are running
   */
  public boolean isRunning() {
    return !activeTasks.isEmpty();
  }

  /**
   * Is this TaskAttemptid running
   * @param taskId
   * @return true if taskId attempt is running.
   */
  boolean isAttemptRunning(TaskAttemptID taskId) {
    return activeTasks.containsKey(taskId);
  }
    
  TaskAttemptID getSuccessfulTaskid() {
    return successfulTaskId;
  }
  
  private void setSuccessfulTaskid(TaskAttemptID successfulTaskId) {
    this.successfulTaskId = successfulTaskId; 
  }
  
  private void resetSuccessfulTaskid() {
    this.successfulTaskId = null; 
  }

  String getUser() {
    return user;
  }
  
  void setUser(String user) {
    this.user = user;
  }
  /**
   * Is this tip complete?
   * 
   * @return <code>true</code> if the tip is complete, else <code>false</code>
   */
  public synchronized boolean isComplete() {
    return (completes > 0);
  }

  /**
   * Is the given taskid the one that took this tip to completion?
   * 
   * @param taskid taskid of attempt to check for completion
   * @return <code>true</code> if taskid is complete, else <code>false</code>
   */
  public boolean isComplete(TaskAttemptID taskid) {
    return ((completes > 0) 
            && taskid.equals(getSuccessfulTaskid()));
  }

  /**
   * Is the tip a failure?
   * 
   * @return <code>true</code> if tip has failed, else <code>false</code>
   */
  public boolean isFailed() {
    return failed;
  }

  /**
   * Number of times the TaskInProgress has failed.
   */
  public int numTaskFailures() {
    return numTaskFailures;
  }

  /**
   * Number of times the TaskInProgress has been killed by the framework.
   */
  public int numKilledTasks() {
    return numKilledTasks;
  }

  /**
   * Get the overall progress (from 0 to 1.0) for this TIP
   */
  public double getProgress() {
    return progress;
  }
    
  /**
   * Get the task's counters
   */
  public Counters getCounters() {
    return counters;
  }

  /**
   * Returns whether a component task-thread should be 
   * closed because the containing JobInProgress has completed
   * or the task is killed by the user
   */
  public boolean shouldClose(TaskAttemptID taskid) {
    /**
     * If the task hasn't been closed yet, and it belongs to a completed
     * TaskInProgress close it.
     * 
     * However, for completed map tasks we do not close the task which
     * actually was the one responsible for _completing_ the TaskInProgress. 
     */
    boolean close = false;
    TaskStatus ts = taskStatuses.get(taskid);
    if ((ts != null) &&
        (!tasksReportedClosed.contains(taskid)) &&
        ((this.failed) ||
        ((job.getStatus().getRunState() != JobStatus.RUNNING &&
         (job.getStatus().getRunState() != JobStatus.PREP))))) {
      tasksReportedClosed.add(taskid);
      close = true;
    } else if (isComplete() && 
               !(isMapTask() && !jobSetup && 
                   !jobCleanup && isComplete(taskid)) &&
               !tasksReportedClosed.contains(taskid)) {
      tasksReportedClosed.add(taskid);
      close = true; 
      if (isComplete() && !isComplete(taskid)) {
        addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
            " already SUCCEEDED");
      }      
    } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
               !tasksReportedClosed.contains(taskid)) {
      tasksReportedClosed.add(taskid);
      addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
           " went to COMMIT_PENDING state earlier");
      close = true; 
    } else {
      close = tasksToKill.keySet().contains(taskid);
    }   
    return close;
  }

  /**
   * Commit this task attempt for the tip. 
   * @param taskid
   */
  public void doCommit(TaskAttemptID taskid) {
    taskToCommit = taskid;
  }

  /**
   * Returns whether the task attempt should be committed or not 
   */
  public boolean shouldCommit(TaskAttemptID taskid) {
    return !isComplete() && isCommitPending(taskid) && 
           taskToCommit.equals(taskid);
  }

  /**
   * Creates a "status report" for this task.  Includes the
   * task ID and overall status, plus reports for all the
   * component task-threads that have ever been started.
   */
  synchronized TaskReport generateSingleReport() {
    ArrayList<String> diagnostics = new ArrayList<String>();
    for (List<String> l : taskDiagnosticData.values()) {
      diagnostics.addAll(l);
    }
    TIPStatus currentStatus = null;
    if (isRunning() && !isComplete()) {
      currentStatus = TIPStatus.RUNNING;
    } else if (isComplete()) {
      currentStatus = TIPStatus.COMPLETE;
    } else if (wasKilled()) {
      currentStatus = TIPStatus.KILLED;
    } else if (isFailed()) {
      currentStatus = TIPStatus.FAILED;
    } else if (!(isComplete() || isRunning() || wasKilled())) {
      currentStatus = TIPStatus.PENDING;
    }
    
    TaskReport report = new TaskReport
      (getTIPId(), (float)progress, state,
       diagnostics.toArray(new String[diagnostics.size()]),
       currentStatus, execStartTime, execFinishTime, counters);
    if (currentStatus == TIPStatus.RUNNING) {
      report.setRunningTaskAttempts(activeTasks.keySet());
    } else if (currentStatus == TIPStatus.COMPLETE) {
      report.setSuccessfulAttempt(getSuccessfulTaskid());
    }
    return report;
  }

  /**
   * Get the diagnostic messages for a given task within this tip.
   * 
   * @param taskId the id of the required task
   * @return the list of diagnostics for that task
   */
  synchronized List<String> getDiagnosticInfo(TaskAttemptID taskId) {
    return taskDiagnosticData.get(taskId);
  }
    
  ////////////////////////////////////////////////
  // Update methods, usually invoked by the owning
  // job.
  ////////////////////////////////////////////////
  
  /**
   * Save diagnostic information for a given task.
   * 
   * @param taskId id of the task 
   * @param diagInfo diagnostic information for the task
   */
  public void addDiagnosticInfo(TaskAttemptID taskId, String diagInfo) {
    List<String> diagHistory = taskDiagnosticData.get(taskId);
    if (diagHistory == null) {
      diagHistory = new ArrayList<String>();
      taskDiagnosticData.put(taskId, diagHistory);
    }
    diagHistory.add(diagInfo);
  }
  
  /**
   * A status message from a client has arrived.
   * It updates the status of a single component-thread-task,
   * which might result in an overall TaskInProgress status update.
   * @return has the task changed its state noticeably?
   */
  synchronized boolean updateStatus(TaskStatus status) {
    TaskAttemptID taskid = status.getTaskID();
    String tracker = status.getTaskTracker();
    String diagInfo = status.getDiagnosticInfo();
    TaskStatus oldStatus = taskStatuses.get(taskid);
    boolean changed = true;
    if (diagInfo != null && diagInfo.length() > 0) {
      LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
      addDiagnosticInfo(taskid, diagInfo);
    }
    
    if(skipping) {
      failedRanges.updateState(status);
    }
    
    if (oldStatus != null) {
      TaskStatus.State oldState = oldStatus.getRunState();
      TaskStatus.State newState = status.getRunState();
          
      // We should never recieve a duplicate success/failure/killed
      // status update for the same taskid! This is a safety check, 
      // and is addressed better at the TaskTracker to ensure this.
      // @see {@link TaskTracker.transmitHeartbeat()}
      if ((newState != TaskStatus.State.RUNNING && 
           newState != TaskStatus.State.COMMIT_PENDING && 
           newState != TaskStatus.State.FAILED_UNCLEAN && 
           newState != TaskStatus.State.KILLED_UNCLEAN && 
           newState != TaskStatus.State.UNASSIGNED) && 
          (oldState == newState)) {
        LOG.warn("Recieved duplicate status update of '" + newState + 
                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'" +
                 "oldTT=" + oldStatus.getTaskTracker() + 
                 " while newTT=" + status.getTaskTracker());
        return false;
      }

      // The task is not allowed to move from completed back to running.
      // We have seen out of order status messagesmoving tasks from complete
      // to running. This is a spot fix, but it should be addressed more
      // globally.
      if ((newState == TaskStatus.State.RUNNING || 
          newState == TaskStatus.State.UNASSIGNED) &&
          (oldState == TaskStatus.State.FAILED || 
           oldState == TaskStatus.State.KILLED || 
           oldState == TaskStatus.State.FAILED_UNCLEAN || 
           oldState == TaskStatus.State.KILLED_UNCLEAN || 
           oldState == TaskStatus.State.SUCCEEDED ||
           oldState == TaskStatus.State.COMMIT_PENDING)) {
        return false;
      }
      
      //Do not accept any status once the task is marked FAILED/KILLED
      //This is to handle the case of the JobTracker timing out a task
      //due to launch delay, but the TT comes back with any state or 
      //TT got expired
      if (oldState == TaskStatus.State.FAILED ||
          oldState == TaskStatus.State.KILLED) {
        tasksToKill.put(taskid, true);
        return false;	  
      }
          
      changed = oldState != newState;
    }
    // if task is a cleanup attempt, do not replace the complete status,
    // update only specific fields.
    // For example, startTime should not be updated, 
    // but finishTime has to be updated.
    if (!isCleanupAttempt(taskid)) {
      taskStatuses.put(taskid, status);
      //we don't want to include setup tasks in the task execution stats
      if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
          (!isMapTask() && job.hasSpeculativeReduces()))) {
        long now = JobTracker.getClock().getTime();
        double oldProgRate = getOldProgressRate();
        double currProgRate = getCurrentProgressRate(now);
        job.updateStatistics(oldProgRate, currProgRate, isMapTask());
        //we need to store the current progress rate, so that we can
        //update statistics accurately the next time we invoke
        //updateStatistics
        setProgressRate(currProgRate);
      }
    } else {
      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
        status.getProgress(), status.getStateString(), status.getPhase(),
        status.getFinishTime());
    }

    // Recompute progress
    recomputeProgress();
    return changed;
  }

  /**
   * Indicate that one of the taskids in this TaskInProgress
   * has failed.
   */
  public void incompleteSubTask(TaskAttemptID taskid, 
                                JobStatus jobStatus) {
    //
    // Note the failure and its location
    //
    TaskStatus status = taskStatuses.get(taskid);
    String trackerName;
    String trackerHostName = null;
    TaskStatus.State taskState = TaskStatus.State.FAILED;
    if (status != null) {
      trackerName = status.getTaskTracker();
      trackerHostName = 
        JobInProgress.convertTrackerNameToHostName(trackerName);
      // Check if the user manually KILLED/FAILED this task-attempt...
      Boolean shouldFail = tasksToKill.remove(taskid);
      if (shouldFail != null) {
        if (status.getRunState() == TaskStatus.State.FAILED ||
            status.getRunState() == TaskStatus.State.KILLED) {
          taskState = (shouldFail) ? TaskStatus.State.FAILED :
                                     TaskStatus.State.KILLED;
        } else {
          taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
                                     TaskStatus.State.KILLED_UNCLEAN;
          
        }
        status.setRunState(taskState);
        addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
      }
 
      taskState = status.getRunState();
      if (taskState != TaskStatus.State.FAILED && 
          taskState != TaskStatus.State.KILLED &&
          taskState != TaskStatus.State.FAILED_UNCLEAN &&
          taskState != TaskStatus.State.KILLED_UNCLEAN) {
        LOG.info("Task '" + taskid + "' running on '" + trackerName + 
                "' in state: '" + taskState + "' being failed!");
        status.setRunState(TaskStatus.State.FAILED);
        taskState = TaskStatus.State.FAILED;
      }

      // tasktracker went down and failed time was not reported. 
      if (0 == status.getFinishTime()){
        status.setFinishTime(JobTracker.getClock().getTime());
      }
    }

    this.activeTasks.remove(taskid);
    
    // Since we do not fail completed reduces (whose outputs go to hdfs), we 
    // should note this failure only for completed maps, only if this taskid;
    // completed this map. however if the job is done, there is no need to 
    // manipulate completed maps
    if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) && 
        jobStatus.getRunState() != JobStatus.SUCCEEDED) {
      this.completes--;
      
      // Reset the successfulTaskId since we don't have a SUCCESSFUL task now
      resetSuccessfulTaskid();
    }

    // Note that there can be failures of tasks that are hosted on a machine 
    // that has not yet registered with restarted jobtracker
    // recalculate the counts only if its a genuine failure
    if (tasks.contains(taskid)) {
      if (taskState == TaskStatus.State.FAILED) {
        numTaskFailures++;
        if (isMapTask()) {
          jobtracker.getInstrumentation().failedMap(taskid);
        } else {
          jobtracker.getInstrumentation().failedReduce(taskid);
        }
        
        machinesWhereFailed.add(trackerHostName);
        if(maxSkipRecords>0) {
          //skipping feature enabled
          if (LOG.isDebugEnabled()) {
            LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
          }
          failedRanges.add(status.getNextRecordRange());
          skipping = startSkipping();
        }

      } else if (taskState == TaskStatus.State.KILLED) {
        numKilledTasks++;
        if (isMapTask()) {
            jobtracker.getInstrumentation().killedMap(taskid);
          } else {
            jobtracker.getInstrumentation().killedReduce(taskid);
          }
      }
    }

    if (numTaskFailures >= maxTaskAttempts) {
      LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
      kill();
    }
  }
  
  /**
   * Get whether to start skipping mode. 
   */
  private boolean startSkipping() {
    if(maxSkipRecords>0 && 
        numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
      return true;
    }
    return false;
  }

  /**
   * Finalize the <b>completed</b> task; note that this might not be the first 
   * task-attempt of the {@link TaskInProgress} and hence might be declared 
   * {@link TaskStatus.State.SUCCEEDED} or {@link TaskStatus.State.KILLED}
   * 
   * @param taskId id of the completed task-attempt
   * @param finalTaskState final {@link TaskStatus.State} of the task-attempt
   */
  private void completedTask(TaskAttemptID taskId, TaskStatus.State finalTaskState) {
    TaskStatus status = taskStatuses.get(taskId);
    status.setRunState(finalTaskState);
    activeTasks.remove(taskId);
  }
  
  /**
   * Indicate that one of the taskids in this already-completed
   * TaskInProgress has successfully completed; hence we mark this
   * taskid as {@link TaskStatus.State.KILLED}. 
   */
  void alreadyCompletedTask(TaskAttemptID taskid) {
    // 'KILL' the task 
    completedTask(taskid, TaskStatus.State.KILLED);
    
    // Note the reason for the task being 'KILLED'
    addDiagnosticInfo(taskid, "Already completed TIP");
    
    LOG.info("Already complete TIP " + getTIPId() + 
             " has completed task " + taskid);
  }

  /**
   * Indicate that one of the taskids in this TaskInProgress
   * has successfully completed!
   */
  public void completed(TaskAttemptID taskid) {
    //
    // Record that this taskid is complete
    //
    completedTask(taskid, TaskStatus.State.SUCCEEDED);
        
    // Note the successful taskid
    setSuccessfulTaskid(taskid);
    
    //
    // Now that the TIP is complete, the other speculative 
    // subtasks will be closed when the owning tasktracker 
    // reports in and calls shouldClose() on this object.
    //

    this.completes++;
    this.execFinishTime = JobTracker.getClock().getTime();
    recomputeProgress();
    
  }

  /**
   * Get the split locations 
   */
  public String[] getSplitLocations() {
    if (isMapTask() && !jobSetup && !jobCleanup) {
      return splitInfo.getLocations();
    }
    return new String[0];
  }
  
  /**
   * Get the Status of the tasks managed by this TIP
   */
  public TaskStatus[] getTaskStatuses() {
    return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
  }

  /**
   * Get all the {@link TaskAttemptID}s in this {@link TaskInProgress}
   */
  TaskAttemptID[] getAllTaskAttemptIDs() {
    return tasks.toArray(new TaskAttemptID[tasks.size()]);
  }
  
  /**
   * Get the status of the specified task
   * @param taskid
   * @return
   */
  public TaskStatus getTaskStatus(TaskAttemptID taskid) {
    return taskStatuses.get(taskid);
  }
  /**
   * The TIP's been ordered kill()ed.
   */
  public void kill() {
    if (isComplete() || failed) {
      return;
    }
    this.failed = true;
    killed = true;
    this.execFinishTime = JobTracker.getClock().getTime();
    recomputeProgress();
  }

  /**
   * Was the task killed?
   * @return true if the task killed
   */
  public boolean wasKilled() {
    return killed;
  }
  
  /**
   * Kill the given task
   */
  boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
    TaskStatus st = taskStatuses.get(taskId);
    if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
        || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
        st.inTaskCleanupPhase() ||
        st.getRunState() == TaskStatus.State.UNASSIGNED)
        && tasksToKill.put(taskId, shouldFail) == null ) {
      String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
                      + " task '" + taskId + "' by user";
      addDiagnosticInfo(taskId, logStr);
      LOG.info(logStr);
      return true;
    }
    return false;
  }

  /**
   * This method is called whenever there's a status change
   * for one of the TIP's sub-tasks.  It recomputes the overall 
   * progress for the TIP.  We examine all sub-tasks and find 
   * the one that's most advanced (and non-failed).
   */
  void recomputeProgress() {
    if (isComplete()) {
      this.progress = 1;
      // update the counters and the state
      TaskStatus completedStatus = taskStatuses.get(getSuccessfulTaskid());
      this.counters = completedStatus.getCounters();
      this.state = completedStatus.getStateString();
    } else if (failed) {
      this.progress = 0;
      // reset the counters and the state
      this.state = "";
      this.counters = new Counters();
    } else {
      double bestProgress = 0;
      String bestState = "";
      Counters bestCounters = new Counters();
      for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
        TaskAttemptID taskid = it.next();
        TaskStatus status = taskStatuses.get(taskid);
        if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
          bestProgress = 1;
          bestState = status.getStateString();
          bestCounters = status.getCounters();
          break;
        } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
          //for COMMIT_PENDING, we take the last state that we recorded
          //when the task was RUNNING
          bestProgress = this.progress;
          bestState = this.state;
          bestCounters = this.counters;
        } else if (status.getRunState() == TaskStatus.State.RUNNING) {
          if (status.getProgress() >= bestProgress) {
            bestProgress = status.getProgress();
            bestState = status.getStateString();
            if (status.getIncludeCounters()) {
              bestCounters = status.getCounters();
            } else {
              bestCounters = this.counters;
            }
          }
        }
      }
      this.progress = bestProgress;
      this.state = bestState;
      this.counters = bestCounters;
    }
  }

  /////////////////////////////////////////////////
  // "Action" methods that actually require the TIP
  // to do something.
  /////////////////////////////////////////////////

  /**
   * Return whether this TIP still needs to run
   */
  boolean isRunnable() {
    return !failed && (completes == 0);
  }
    
  /**
   * Can this task be speculated? This requires that it isn't done or almost
   * done and that it isn't already being speculatively executed.
   * 
   * Added for use by queue scheduling algorithms.
   * @param currentTime 
   */
  boolean canBeSpeculated(long currentTime) {
    DataStatistics taskStats = job.getRunningTaskStatistics(isMapTask());
    if (LOG.isDebugEnabled()) {
      LOG.debug("activeTasks.size(): " + activeTasks.size() + " "
          + activeTasks.firstKey() + " task's progressrate: " + 
          getCurrentProgressRate(currentTime) + 
          " taskStats : " + taskStats);
    }
    return (!skipping && isRunnable() && isRunning() &&
        activeTasks.size() <= MAX_TASK_EXECS &&
        currentTime - lastDispatchTime >= SPECULATIVE_LAG &&
        completes == 0 && !isOnlyCommitPending() &&
        (taskStats.mean() - getCurrentProgressRate(currentTime) >
              taskStats.std() * job.getSlowTaskThreshold()));
  }
  
  /**
   * Is the task currently speculating?
   */
  boolean isSpeculating() {
   return (activeTasks.size() > MAX_TASK_EXECS);
  }
  
  /**
   * Return a Task that can be sent to a TaskTracker for execution.
   */
  public Task getTaskToRun(String taskTracker) throws IOException {   
    // Create the 'taskid'; do not count the 'killed' tasks against the job!
    TaskAttemptID taskid = null;
    if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
      // Make sure that the attempts are unqiue across restarts
      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
      taskid = new TaskAttemptID( id, attemptId);
      ++nextTaskId;
    } else {
      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
              " (plus " + numKilledTasks + " killed)"  + 
              " attempts for the tip '" + getTIPId() + "'");
      return null;
    }

    //keep track of the last time we started an attempt at this TIP
    //used to calculate the progress rate of this TIP
    setDispatchTime(taskid, JobTracker.getClock().getTime());
 
    //set this the first time we run a taskAttempt in this TIP
    //each Task attempt has its own TaskStatus, which tracks that
    //attempts execStartTime, thus this startTime is TIP wide.
    if (0 == execStartTime){
      setExecStartTime(lastDispatchTime);
    }
    return addRunningTask(taskid, taskTracker);
  }
  
  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
    return addRunningTask(taskid, taskTracker, false);
  }
  
  /**
   * Adds a previously running task to this tip. This is used in case of 
   * jobtracker restarts.
   */
  public Task addRunningTask(TaskAttemptID taskid, 
                             String taskTracker,
                             boolean taskCleanup) {
    // 1 slot is enough for taskCleanup task
    int numSlotsNeeded = taskCleanup ? 1 : numSlotsRequired;
    // create the task
    Task t = null;
    if (isMapTask()) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
                  + failedRanges.getIndicesCount());
      }
      t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
          numSlotsNeeded);
    } else {
      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
    }
    if (jobCleanup) {
      t.setJobCleanupTask();
    }
    if (jobSetup) {
      t.setJobSetupTask();
    }
    if (taskCleanup) {
      t.setTaskCleanupTask();
      t.setState(taskStatuses.get(taskid).getRunState());
      cleanupTasks.put(taskid, taskTracker);
    }
    t.setConf(conf);
    t.setUser(getUser());
    if (LOG.isDebugEnabled()) {
      LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
    }
    t.setSkipRanges(failedRanges.getSkipRanges());
    t.setSkipping(skipping);
    if(failedRanges.isTestAttempt()) {
      t.setWriteSkipRecs(false);
    }

    activeTasks.put(taskid, taskTracker);
    tasks.add(taskid);

    // Ask JobTracker to note that the task exists
    jobtracker.createTaskEntry(taskid, taskTracker, this);

    // check and set the first attempt
    if (firstTaskId == null) {
      firstTaskId = taskid;
    }
    return t;
  }

  boolean isRunningTask(TaskAttemptID taskid) {
    TaskStatus status = taskStatuses.get(taskid);
    return status != null && status.getRunState() == TaskStatus.State.RUNNING;
  }
  
  boolean isCleanupAttempt(TaskAttemptID taskid) {
    return cleanupTasks.containsKey(taskid);
  }
  
  String machineWhereCleanupRan(TaskAttemptID taskid) {
    return cleanupTasks.get(taskid);
  }
  
  String machineWhereTaskRan(TaskAttemptID taskid) {
    return taskStatuses.get(taskid).getTaskTracker();
  }
    
  boolean wasKilled(TaskAttemptID taskid) {
    return tasksToKill.containsKey(taskid);
  }
  
  /**
   * Has this task already failed on this machine?
   * @param trackerHost The task tracker hostname
   * @return Has it failed?
   */
  public boolean hasFailedOnMachine(String trackerHost) {
    return machinesWhereFailed.contains(trackerHost);
  }
    
  /**
   * Was this task ever scheduled to run on this machine?
   * @param trackerHost The task tracker hostname 
   * @param trackerName The tracker name
   * @return Was task scheduled on the tracker?
   */
  public boolean hasRunOnMachine(String trackerHost, String trackerName) {
    return this.activeTasks.values().contains(trackerName) || 
      hasFailedOnMachine(trackerHost);
  }
  /**
   * Get the number of machines where this task has failed.
   * @return the size of the failed machine set
   */
  public int getNumberOfFailedMachines() {
    return machinesWhereFailed.size();
  }
    
  /**
   * Get the id of this map or reduce task.
   * @return The index of this tip in the maps/reduces lists.
   */
  public int getIdWithinJob() {
    return partition;
  }
    
  /**
   * Set the event number that was raised for this tip
   */
  public void setSuccessEventNumber(int eventNumber) {
    successEventNumber = eventNumber;
  }
       
  /**
   * Get the event number that was raised for this tip
   */
  public int getSuccessEventNumber() {
    return successEventNumber;
  }
  
  /** 
   * Gets the Node list of input split locations sorted in rack order.
   */ 
  public String getSplitNodes() {
    if (!isMapTask() || jobSetup || jobCleanup) {
      return "";
    }
    String[] splits = splitInfo.getLocations();
    Node[] nodes = new Node[splits.length];
    for (int i = 0; i < splits.length; i++) {
      nodes[i] = jobtracker.getNode(splits[i]);
    }
    // sort nodes on rack location
    Arrays.sort(nodes, new Comparator<Node>() {
      public int compare(Node a, Node b) {
        String left = a.getNetworkLocation();
        String right = b.getNetworkLocation();
        return left.compareTo(right);
      }
    }); 
    return nodeToString(nodes);
  }

  private static String nodeToString(Node[] nodes) {
    if (nodes == null || nodes.length == 0) {
      return "";
    }
    StringBuffer ret = new StringBuffer(nodes[0].toString());
    for(int i = 1; i < nodes.length;i++) {
      ret.append(",");
      ret.append(nodes[i].toString());
    }
    return ret.toString();
  }

  public long getMapInputSize() {
    if(isMapTask() && !jobSetup && !jobCleanup) {
      return splitInfo.getInputDataLength();
    } else {
      return 0;
    }
  }
  
  /**
   * Compare most recent task attempts dispatch time to current system time so
   * that task progress rate will slow down as time proceeds even if no progress
   * is reported for the task. This allows speculative tasks to be launched for
   * tasks on slow/dead TT's before we realize the TT is dead/slow. Skew isn't
   * an issue since both times are from the JobTrackers perspective.
   * @return the progress rate from the active task that is doing best
   */
  public double getCurrentProgressRate(long currentTime) {
    double bestProgressRate = 0;
    for (TaskStatus ts : taskStatuses.values()){
      if (ts.getRunState() == TaskStatus.State.RUNNING  || 
          ts.getRunState() == TaskStatus.State.SUCCEEDED ||
          ts.getRunState() == TaskStatus.State.COMMIT_PENDING) {
        double progressRate = ts.getProgress()/Math.max(1,
            currentTime - getDispatchTime(ts.getTaskID()));
        if (progressRate > bestProgressRate){
          bestProgressRate = progressRate;
        }
      }
    }
    return bestProgressRate;
  }
  
  private void setProgressRate(double rate) {
    oldProgressRate = rate;
  }
  private double getOldProgressRate() {
    return oldProgressRate;
  }
  /**
   * This class keeps the records to be skipped during further executions 
   * based on failed records from all the previous attempts.
   * It also narrow down the skip records if it is more than the 
   * acceptable value by dividing the failed range into half. In this case one 
   * half is executed in the next attempt (test attempt). 
   * In the test attempt, only the test range gets executed, others get skipped. 
   * Based on the success/failure of the test attempt, the range is divided 
   * further.
   */
  private class FailedRanges {
    private SortedRanges skipRanges = new SortedRanges();
    private Divide divide;
    
    synchronized SortedRanges getSkipRanges() {
      if(divide!=null) {
        return divide.skipRange;
      }
      return skipRanges;
    }
    
    synchronized boolean isTestAttempt() {
      return divide!=null;
    }
    
    synchronized long getIndicesCount() {
      if(isTestAttempt()) {
        return divide.skipRange.getIndicesCount();
      }
      return skipRanges.getIndicesCount();
    }
    
    synchronized void updateState(TaskStatus status){
      if (isTestAttempt() && 
          (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
        divide.testPassed = true;
        //since it was the test attempt we need to set it to failed
        //as it worked only on the test range
        status.setRunState(TaskStatus.State.FAILED);
        
      }
    }
    
    synchronized void add(Range failedRange) {
      LOG.warn("FailedRange:"+ failedRange);
      if(divide!=null) {
        LOG.warn("FailedRange:"+ failedRange +"  test:"+divide.test +
            "  pass:"+divide.testPassed);
        if(divide.testPassed) {
          //test range passed
          //other range would be bad. test it
          failedRange = divide.other;
        }
        else {
          //test range failed
          //other range would be good.
          failedRange = divide.test;
        }
        //reset
        divide = null;
      }
      
      if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) {
        skipRanges.add(failedRange);
      } else {
        //start dividing the range to narrow down the skipped
        //records until maxSkipRecords are met OR all attempts
        //get exhausted
        divide = new Divide(failedRange);
      }
    }
    
    class Divide {
      private final SortedRanges skipRange;
      private final Range test;
      private final Range other;
      private boolean testPassed;
      Divide(Range range){
        long half = range.getLength()/2;
        test = new Range(range.getStartIndex(), half);
        other = new Range(test.getEndIndex(), range.getLength()-half);
        //construct the skip range from the skipRanges
        skipRange = new SortedRanges();
        for(Range r : skipRanges.getRanges()) {
          skipRange.add(r);
        }
        skipRange.add(new Range(0,test.getStartIndex()));
        skipRange.add(new Range(test.getEndIndex(), 
            (Long.MAX_VALUE-test.getEndIndex())));
      }
    }
    
  }

  TreeMap<TaskAttemptID, String> getActiveTasks() {
    return activeTasks;
  }

  int getNumSlotsRequired() {
    return numSlotsRequired;
  }
}
