| /** | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements.  See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership.  The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * "License"); you may not use this file except in compliance | 
 |  * with the License.  You may obtain a copy of the License at | 
 |  * | 
 |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  */ | 
 |  | 
 | package org.apache.hadoop.mapred; | 
 |  | 
 | import java.io.DataInput; | 
 | import java.io.DataOutput; | 
 | import java.io.IOException; | 
 | import java.lang.management.GarbageCollectorMXBean; | 
 | import java.lang.management.ManagementFactory; | 
 | import java.text.NumberFormat; | 
 | import java.util.HashMap; | 
 | import java.util.Iterator; | 
 | import java.util.List; | 
 | import java.util.Map; | 
 | import java.util.NoSuchElementException; | 
 | import java.util.concurrent.atomic.AtomicBoolean; | 
 |  | 
 | import javax.crypto.SecretKey; | 
 |  | 
 | import org.apache.commons.logging.Log; | 
 | import org.apache.commons.logging.LogFactory; | 
 | import org.apache.hadoop.classification.InterfaceAudience; | 
 | import org.apache.hadoop.classification.InterfaceStability; | 
 | import org.apache.hadoop.conf.Configurable; | 
 | import org.apache.hadoop.conf.Configuration; | 
 | import org.apache.hadoop.fs.FileSystem; | 
 | import org.apache.hadoop.fs.LocalDirAllocator; | 
 | import org.apache.hadoop.fs.Path; | 
 | import org.apache.hadoop.fs.FileSystem.Statistics; | 
 | import org.apache.hadoop.io.DataInputBuffer; | 
 | import org.apache.hadoop.io.RawComparator; | 
 | import org.apache.hadoop.io.Text; | 
 | import org.apache.hadoop.io.Writable; | 
 | import org.apache.hadoop.io.WritableUtils; | 
 | import org.apache.hadoop.io.serializer.Deserializer; | 
 | import org.apache.hadoop.io.serializer.SerializationFactory; | 
 | import org.apache.hadoop.mapred.IFile.Writer; | 
 | import org.apache.hadoop.mapreduce.Counter; | 
 | import org.apache.hadoop.mapreduce.FileSystemCounter; | 
 | import org.apache.hadoop.mapreduce.OutputCommitter; | 
 | import org.apache.hadoop.mapreduce.TaskCounter; | 
 | import org.apache.hadoop.mapreduce.JobStatus; | 
 | import org.apache.hadoop.mapreduce.MRConfig; | 
 | import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; | 
 | import org.apache.hadoop.mapreduce.task.ReduceContextImpl; | 
 | import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin; | 
 | import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.*; | 
 | import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; | 
 | import org.apache.hadoop.net.NetUtils; | 
 | import org.apache.hadoop.util.Progress; | 
 | import org.apache.hadoop.util.Progressable; | 
 | import org.apache.hadoop.util.ReflectionUtils; | 
 | import org.apache.hadoop.util.StringUtils; | 
 |  | 
 | /** | 
 |  * Base class for tasks. | 
 |  */ | 
 | @InterfaceAudience.LimitedPrivate({"MapReduce"}) | 
 | @InterfaceStability.Unstable | 
 | abstract public class Task implements Writable, Configurable { | 
 |   private static final Log LOG = | 
 |     LogFactory.getLog(Task.class); | 
 |  | 
 |   public static String MERGED_OUTPUT_PREFIX = ".merged"; | 
 |    | 
 |   /** | 
 |    * Name of the FileSystem counters' group | 
 |    */ | 
 |   protected static final String FILESYSTEM_COUNTER_GROUP = "FileSystemCounters"; | 
 |  | 
 |   /////////////////////////////////////////////////////////// | 
 |   // Helper methods to construct task-output paths | 
 |   /////////////////////////////////////////////////////////// | 
 |    | 
 |   /** Construct output file names so that, when an output directory listing is | 
 |    * sorted lexicographically, positions correspond to output partitions.*/ | 
 |   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); | 
 |   static { | 
 |     NUMBER_FORMAT.setMinimumIntegerDigits(5); | 
 |     NUMBER_FORMAT.setGroupingUsed(false); | 
 |   } | 
 |  | 
 |   static synchronized String getOutputName(int partition) { | 
 |     return "part-" + NUMBER_FORMAT.format(partition); | 
 |   } | 
 |  | 
 |   //////////////////////////////////////////// | 
 |   // Fields | 
 |   //////////////////////////////////////////// | 
 |  | 
 |   private String jobFile;                         // job configuration file | 
 |   private String user;                            // user running the job | 
 |   private TaskAttemptID taskId;                   // unique, includes job id | 
 |   private TaskAttemptID taskIdForUmbilical;       // same, or uber's if subtask | 
 |   private int partition;                          // id within job | 
 |   TaskStatus taskStatus;                          // current status of the task | 
 |   protected JobStatus.State jobRunStateForCleanup; | 
 |   protected boolean jobCleanup = false; | 
 |   protected boolean jobSetup = false; | 
 |   protected boolean taskCleanup = false; | 
 |    | 
 |   //skip ranges based on failed ranges from previous attempts | 
 |   private SortedRanges skipRanges = new SortedRanges(); | 
 |   private boolean skipping = false; | 
 |   private boolean writeSkipRecs = true; | 
 |    | 
 |   //currently processing record start index | 
 |   private volatile long currentRecStartIndex;  | 
 |   private Iterator<Long> currentRecIndexIterator =  | 
 |     skipRanges.skipRangeIterator(); | 
 |  | 
 |   private ResourceCalculatorPlugin resourceCalculator = null; | 
 |   private long initCpuCumulativeTime = 0; | 
 |  | 
 |   protected JobConf conf; | 
 |   protected MapOutputFile mapOutputFile; | 
 |   protected LocalDirAllocator lDirAlloc; | 
 |   private final static int MAX_RETRIES = 10; | 
 |   protected JobContext jobContext; | 
 |   protected TaskAttemptContext taskContext; | 
 |   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat; | 
 |   protected org.apache.hadoop.mapreduce.OutputCommitter committer; | 
 |   protected final Counters.Counter spilledRecordsCounter; | 
 |   protected final Counters.Counter failedShuffleCounter; | 
 |   protected final Counters.Counter mergedMapOutputsCounter; | 
 |   private int numSlotsRequired; | 
 |   protected TaskUmbilicalProtocol umbilical; | 
 |   protected SecretKey tokenSecret; | 
 |   protected GcTimeUpdater gcUpdater; | 
 |  | 
 |   //////////////////////////////////////////// | 
 |   // Constructors | 
 |   //////////////////////////////////////////// | 
 |  | 
 |   public Task() { | 
 |     taskId = new TaskAttemptID(); | 
 |     taskIdForUmbilical = taskId; | 
 |     spilledRecordsCounter =  | 
 |       counters.findCounter(TaskCounter.SPILLED_RECORDS); | 
 |     failedShuffleCounter =  | 
 |       counters.findCounter(TaskCounter.FAILED_SHUFFLE); | 
 |     mergedMapOutputsCounter =  | 
 |       counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS); | 
 |     gcUpdater = new GcTimeUpdater(); | 
 |   } | 
 |  | 
 |   public Task(String jobFile, TaskAttemptID taskId, int partition,  | 
 |               int numSlotsRequired) { | 
 |     this.jobFile = jobFile; | 
 |     this.taskId = taskId; | 
 |     this.taskIdForUmbilical = taskId; | 
 |       | 
 |     this.partition = partition; | 
 |     this.numSlotsRequired = numSlotsRequired; | 
 |     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); | 
 |     failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE); | 
 |     mergedMapOutputsCounter =  | 
 |       counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS); | 
 |     gcUpdater = new GcTimeUpdater(); | 
 |   } | 
 |  | 
 |   //////////////////////////////////////////// | 
 |   // Accessors | 
 |   //////////////////////////////////////////// | 
 |   public void setJobFile(String jobFile) { this.jobFile = jobFile; } | 
 |   public String getJobFile() { return jobFile; } | 
 |  | 
 |   public TaskAttemptID getTaskID() { return taskId; } | 
 |   public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) { | 
 |     this.taskIdForUmbilical = taskIdForUmbilical; | 
 |   } | 
 |  | 
 |   public int getNumSlotsRequired() { | 
 |     return numSlotsRequired; | 
 |   } | 
 |  | 
 |   Counters getCounters() { return counters; } | 
 |    | 
 |   /** | 
 |    * Get the job name for this task. | 
 |    * @return the job name | 
 |    */ | 
 |   public JobID getJobID() { | 
 |     return taskId.getJobID(); | 
 |   } | 
 |  | 
 |   /** | 
 |    * Set the job token secret  | 
 |    * @param tokenSecret the secret | 
 |    */ | 
 |   public void setJobTokenSecret(SecretKey tokenSecret) { | 
 |     this.tokenSecret = tokenSecret; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Get the job token secret | 
 |    * @return the token secret | 
 |    */ | 
 |   public SecretKey getJobTokenSecret() { | 
 |     return this.tokenSecret; | 
 |   } | 
 |  | 
 |    | 
 |   /** | 
 |    * Get the index of this task within the job. | 
 |    * @return the integer part of the task id | 
 |    */ | 
 |   public int getPartition() { | 
 |     return partition; | 
 |   } | 
 |   /** | 
 |    * Return current phase of the task.  | 
 |    * needs to be synchronized as communication thread sends the phase every second | 
 |    * @return the curent phase of the task | 
 |    */ | 
 |   public synchronized TaskStatus.Phase getPhase(){ | 
 |     return this.taskStatus.getPhase();  | 
 |   } | 
 |   /** | 
 |    * Set current phase of the task.  | 
 |    * @param phase task phase  | 
 |    */ | 
 |   protected synchronized void setPhase(TaskStatus.Phase phase){ | 
 |     this.taskStatus.setPhase(phase);  | 
 |   } | 
 |    | 
 |   /** | 
 |    * Get whether to write skip records. | 
 |    */ | 
 |   protected boolean toWriteSkipRecs() { | 
 |     return writeSkipRecs; | 
 |   } | 
 |        | 
 |   /** | 
 |    * Set whether to write skip records. | 
 |    */ | 
 |   protected void setWriteSkipRecs(boolean writeSkipRecs) { | 
 |     this.writeSkipRecs = writeSkipRecs; | 
 |   } | 
 |    | 
 |   /** | 
 |    * Report a fatal error to the parent (task) tracker. | 
 |    */ | 
 |   protected void reportFatalError(Throwable throwable, | 
 |                                   String logMsg) { | 
 |     LOG.fatal(logMsg); | 
 |     Throwable tCause = throwable.getCause(); | 
 |     String cause = tCause == null  | 
 |                    ? StringUtils.stringifyException(throwable) | 
 |                    : StringUtils.stringifyException(tCause); | 
 |     try { | 
 |       umbilical.fatalError(taskIdForUmbilical, cause); | 
 |     } catch (IOException ioe) { | 
 |       LOG.fatal("Failed to contact the tasktracker", ioe); | 
 |       System.exit(-1); | 
 |     } | 
 |   } | 
 |  | 
 |   /** | 
 |    * Get skipRanges. | 
 |    */ | 
 |   public SortedRanges getSkipRanges() { | 
 |     return skipRanges; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Set skipRanges. | 
 |    */ | 
 |   public void setSkipRanges(SortedRanges skipRanges) { | 
 |     this.skipRanges = skipRanges; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Is Task in skipping mode. | 
 |    */ | 
 |   public boolean isSkipping() { | 
 |     return skipping; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Sets whether to run Task in skipping mode. | 
 |    * @param skipping | 
 |    */ | 
 |   public void setSkipping(boolean skipping) { | 
 |     this.skipping = skipping; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Return current state of the task.  | 
 |    * needs to be synchronized as communication thread  | 
 |    * sends the state every second | 
 |    * @return | 
 |    */ | 
 |   synchronized TaskStatus.State getState(){ | 
 |     return this.taskStatus.getRunState();  | 
 |   } | 
 |   /** | 
 |    * Set current state of the task.  | 
 |    * @param state | 
 |    */ | 
 |   synchronized void setState(TaskStatus.State state){ | 
 |     this.taskStatus.setRunState(state);  | 
 |   } | 
 |  | 
 |   void setTaskCleanupTask() { | 
 |     taskCleanup = true; | 
 |   } | 
 | 	    | 
 |   boolean isTaskCleanupTask() { | 
 |     return taskCleanup; | 
 |   } | 
 |  | 
 |   boolean isJobCleanupTask() { | 
 |     return jobCleanup; | 
 |   } | 
 |  | 
 |   boolean isJobAbortTask() { | 
 |     // the task is an abort task if its marked for cleanup and the final  | 
 |     // expected state is either failed or killed. | 
 |     return isJobCleanupTask()  | 
 |            && (jobRunStateForCleanup == JobStatus.State.KILLED  | 
 |                || jobRunStateForCleanup == JobStatus.State.FAILED); | 
 |   } | 
 |    | 
 |   boolean isJobSetupTask() { | 
 |     return jobSetup; | 
 |   } | 
 |  | 
 |   void setJobSetupTask() { | 
 |     jobSetup = true;  | 
 |   } | 
 |  | 
 |   void setJobCleanupTask() { | 
 |     jobCleanup = true;  | 
 |   } | 
 |  | 
 |   /** | 
 |    * Sets the task to do job abort in the cleanup. | 
 |    * @param status the final runstate of the job.  | 
 |    */ | 
 |   void setJobCleanupTaskState(JobStatus.State status) { | 
 |     jobRunStateForCleanup = status; | 
 |   } | 
 |    | 
 |   boolean isMapOrReduce() { | 
 |     return !jobSetup && !jobCleanup && !taskCleanup; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Get the name of the user running the job/task. TaskTracker needs task's | 
 |    * user name even before it's JobConf is localized. So we explicitly serialize | 
 |    * the user name. | 
 |    *  | 
 |    * @return user | 
 |    */ | 
 |   String getUser() { | 
 |     return user; | 
 |   } | 
 |    | 
 |   void setUser(String user) { | 
 |     this.user = user; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Return the task's MapOutputFile instance. | 
 |    * @return the task's MapOutputFile instance | 
 |    */ | 
 |   MapOutputFile getMapOutputFile() { | 
 |     return mapOutputFile; | 
 |   } | 
 |  | 
 |   //////////////////////////////////////////// | 
 |   // Writable methods | 
 |   //////////////////////////////////////////// | 
 |  | 
 |   public void write(DataOutput out) throws IOException { | 
 |     Text.writeString(out, jobFile); | 
 |     taskId.write(out); | 
 |     taskIdForUmbilical.write(out); | 
 |     out.writeInt(partition); | 
 |     out.writeInt(numSlotsRequired); | 
 |     taskStatus.write(out); | 
 |     skipRanges.write(out); | 
 |     out.writeBoolean(skipping); | 
 |     out.writeBoolean(jobCleanup); | 
 |     if (jobCleanup) { | 
 |       WritableUtils.writeEnum(out, jobRunStateForCleanup); | 
 |     } | 
 |     out.writeBoolean(jobSetup); | 
 |     out.writeBoolean(writeSkipRecs); | 
 |     out.writeBoolean(taskCleanup); | 
 |     Text.writeString(out, user); | 
 |   } | 
 |    | 
 |   public void readFields(DataInput in) throws IOException { | 
 |     jobFile = Text.readString(in); | 
 |     taskId = TaskAttemptID.read(in); | 
 |     taskIdForUmbilical = TaskAttemptID.read(in); | 
 |     partition = in.readInt(); | 
 |     numSlotsRequired = in.readInt(); | 
 |     taskStatus.readFields(in); | 
 |     skipRanges.readFields(in); | 
 |     currentRecIndexIterator = skipRanges.skipRangeIterator(); | 
 |     currentRecStartIndex = currentRecIndexIterator.next(); | 
 |     skipping = in.readBoolean(); | 
 |     jobCleanup = in.readBoolean(); | 
 |     if (jobCleanup) { | 
 |       jobRunStateForCleanup =  | 
 |         WritableUtils.readEnum(in, JobStatus.State.class); | 
 |     } | 
 |     jobSetup = in.readBoolean(); | 
 |     writeSkipRecs = in.readBoolean(); | 
 |     taskCleanup = in.readBoolean(); | 
 |     if (taskCleanup) { | 
 |       setPhase(TaskStatus.Phase.CLEANUP); | 
 |     } | 
 |     user = Text.readString(in); | 
 |   } | 
 |  | 
 |   @Override | 
 |   public String toString() { return taskId.toString(); } | 
 |  | 
 |   /** | 
 |    * Localize the given JobConf to be specific for this task. | 
 |    */ | 
 |   public void localizeConfiguration(JobConf conf) throws IOException { | 
 |     conf.set(JobContext.TASK_ID, taskId.getTaskID().toString());  | 
 |     conf.set(JobContext.TASK_ATTEMPT_ID, taskId.toString()); | 
 |     conf.setBoolean(JobContext.TASK_ISMAP, isMapTask()); | 
 |     conf.setInt(JobContext.TASK_PARTITION, partition); | 
 |     conf.set(JobContext.ID, taskId.getJobID().toString()); | 
 |   } | 
 |    | 
 |   /** Run this task as a part of the named job.  This method is executed in the | 
 |    * child process and is what invokes user-supplied map, reduce, etc. methods. | 
 |    * @param umbilical for progress reports | 
 |    */ | 
 |   public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical) | 
 |     throws IOException, ClassNotFoundException, InterruptedException; | 
 |  | 
 |  | 
 |   /** Return an approprate thread runner for this task.  | 
 |    * @param tip TODO*/ | 
 |   public abstract TaskRunner createRunner(TaskTracker tracker,  | 
 |                                           TaskTracker.TaskInProgress tip,  | 
 |                                           TaskTracker.RunningJob rjob | 
 |                                           ) throws IOException; | 
 |  | 
 |   /** The number of milliseconds between progress reports. */ | 
 |   public static final int PROGRESS_INTERVAL = 3000; | 
 |  | 
 |   private transient Progress taskProgress = new Progress(); | 
 |  | 
 |   // Current counters | 
 |   private transient Counters counters = new Counters(); | 
 |  | 
 |   /* flag to track whether task is done */ | 
 |   private AtomicBoolean taskDone = new AtomicBoolean(false); | 
 |    | 
 |   public abstract boolean isMapTask(); | 
 |  | 
 |   /** | 
 |    * Is this really a combo-task masquerading as a plain ReduceTask? | 
 |    */ | 
 |   public abstract boolean isUberTask(); | 
 |  | 
 |   /** | 
 |    * This setter allows one to incorporate Tasks into multiple levels of | 
 |    * a Progress addPhase()-generated hierarchy (i.e., not always the root | 
 |    * node), which in turn allows Progress to handle all details of progress | 
 |    * aggregation for an UberTask or even a whole job. | 
 |    */ | 
 |   protected void setProgress(Progress progress) { | 
 |     taskProgress = progress; | 
 |   } | 
 |  | 
 |   public Progress getProgress() { return taskProgress; } | 
 |  | 
 |   public void initialize(JobConf job, JobID id,  | 
 |                          Reporter reporter, | 
 |                          boolean useNewApi) throws IOException,  | 
 |                                                    ClassNotFoundException, | 
 |                                                    InterruptedException { | 
 |     jobContext = new JobContextImpl(job, id, reporter); | 
 |     // taskId (rather than taskIdForUmbilical) is required here to avoid | 
 |     // collisions in uber-subtasks' outputs; it ultimately determines both the | 
 |     // name of the pre-commit HDFS working directory (_temporary/_attempt_xxx) | 
 |     // and the (main) filename in that directory (part-[rm]-nnnnn).  UberTask | 
 |     // will handle any tempdir/commit-related details. | 
 |     taskContext = new TaskAttemptContextImpl(job, taskId, reporter); | 
 |     if (getState() == TaskStatus.State.UNASSIGNED) { | 
 |       setState(TaskStatus.State.RUNNING); | 
 |     } | 
 |     if (useNewApi) { | 
 |       if (LOG.isDebugEnabled()) { | 
 |         LOG.debug("using new api for output committer"); | 
 |       } | 
 |       outputFormat = | 
 |         ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job); | 
 |       committer = outputFormat.getOutputCommitter(taskContext); | 
 |     } else { | 
 |       committer = conf.getOutputCommitter(); | 
 |     } | 
 |     // this will typically be on HDFS, not the node's local filesystem: | 
 |     Path outputPath = FileOutputFormat.getOutputPath(conf); | 
 |     if (outputPath != null) { | 
 |       if ((committer instanceof FileOutputCommitter)) { | 
 |         FileOutputFormat.setWorkOutputPath(conf,  | 
 |           ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext)); | 
 |       } else { | 
 |         FileOutputFormat.setWorkOutputPath(conf, outputPath); | 
 |       } | 
 |     } | 
 |     committer.setupTask(taskContext); | 
 |     Class<? extends ResourceCalculatorPlugin> clazz = | 
 |         conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN, | 
 |             null, ResourceCalculatorPlugin.class); | 
 |     resourceCalculator = ResourceCalculatorPlugin | 
 |             .getResourceCalculatorPlugin(clazz, conf); | 
 |     LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator); | 
 |     if (resourceCalculator != null) { | 
 |       initCpuCumulativeTime = | 
 |         resourceCalculator.getProcResourceValues().getCumulativeCpuTime(); | 
 |     } | 
 |   } | 
 |  | 
 |   @InterfaceAudience.Private | 
 |   @InterfaceStability.Unstable | 
 |   protected class TaskReporter  | 
 |       extends org.apache.hadoop.mapreduce.StatusReporter | 
 |       implements Runnable, Reporter { | 
 |     private TaskUmbilicalProtocol umbilical; | 
 |     private InputSplit split = null; | 
 |     private Progress taskProgress; | 
 |     private Thread pingThread = null; | 
 |     private boolean done = true; | 
 |     private Object lock = new Object(); | 
 |  | 
 |     /** | 
 |      * flag that indicates whether progress update needs to be sent to parent. | 
 |      * If true, it has been set. If false, it has been reset.  | 
 |      * Using AtomicBoolean since we need an atomic read & reset method.  | 
 |      */   | 
 |     private AtomicBoolean progressFlag = new AtomicBoolean(false); | 
 |      | 
 |     TaskReporter(Progress taskProgress, | 
 |                  TaskUmbilicalProtocol umbilical) { | 
 |       this.umbilical = umbilical; | 
 |       this.taskProgress = taskProgress; | 
 |     } | 
 |  | 
 |     // getters and setters for flag | 
 |     void setProgressFlag() { | 
 |       progressFlag.set(true); | 
 |     } | 
 |     boolean resetProgressFlag() { | 
 |       return progressFlag.getAndSet(false); | 
 |     } | 
 |     public void setStatus(String status) { | 
 |       taskProgress.setStatus(status); | 
 |       // indicate that progress update needs to be sent | 
 |       setProgressFlag(); | 
 |     } | 
 |     public void setProgress(float progress) { | 
 |       // set current phase progress. | 
 |       // This method assumes that task has phases. | 
 |       taskProgress.phase().set(progress); | 
 |       // indicate that progress update needs to be sent | 
 |       setProgressFlag(); | 
 |     } | 
 |     // FIXME?  why isn't this deprecated in favor of public setProgressFlag()? | 
 |     public void progress() { | 
 |       // indicate that progress update needs to be sent | 
 |       setProgressFlag(); | 
 |     } | 
 |     public Counters.Counter getCounter(String group, String name) { | 
 |       Counters.Counter counter = null; | 
 |       if (counters != null) { | 
 |         counter = counters.findCounter(group, name); | 
 |       } | 
 |       return counter; | 
 |     } | 
 |     public Counters.Counter getCounter(Enum<?> name) { | 
 |       return counters == null ? null : counters.findCounter(name); | 
 |     } | 
 |     public void incrCounter(Enum key, long amount) { | 
 |       if (counters != null) { | 
 |         counters.incrCounter(key, amount); | 
 |       } | 
 |       setProgressFlag(); | 
 |     } | 
 |     public void incrCounter(String group, String counter, long amount) { | 
 |       if (counters != null) { | 
 |         counters.incrCounter(group, counter, amount); | 
 |       } | 
 |       if(skipping && SkipBadRecords.COUNTER_GROUP.equals(group) && ( | 
 |           SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) || | 
 |           SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.equals(counter))) { | 
 |         //if application reports the processed records, move the  | 
 |         //currentRecStartIndex to the next. | 
 |         //currentRecStartIndex is the start index which has not yet been  | 
 |         //finished and is still in task's stomach. | 
 |         for(int i=0;i<amount;i++) { | 
 |           currentRecStartIndex = currentRecIndexIterator.next(); | 
 |         } | 
 |       } | 
 |       setProgressFlag(); | 
 |     } | 
 |     public void setInputSplit(InputSplit split) { | 
 |       this.split = split; | 
 |     } | 
 |     public InputSplit getInputSplit() throws UnsupportedOperationException { | 
 |       if (split == null) { | 
 |         throw new UnsupportedOperationException("Input available only on map"); | 
 |       } else { | 
 |         return split; | 
 |       } | 
 |     }   | 
 |     /**  | 
 |      * The communication thread handles communication with the parent | 
 |      * (TaskTracker). It sends progress updates if progress has been made or | 
 |      * if the task needs to let the parent know that it's alive. It also pings | 
 |      * the parent to see if it's alive.  | 
 |      */ | 
 |     public void run() { | 
 |       final int MAX_RETRIES = 3; | 
 |       int remainingRetries = MAX_RETRIES; | 
 |       // get current flag value and reset it as well | 
 |       boolean sendProgress = resetProgressFlag(); | 
 |       while (!taskDone.get()) { | 
 |         synchronized(lock) { | 
 |           done = false; | 
 |         } | 
 |         try { | 
 |           boolean taskFound = true; // whether TT knows about this task | 
 |           // sleep for a bit | 
 |           try { | 
 |             Thread.sleep(PROGRESS_INTERVAL); | 
 |           }  | 
 |           catch (InterruptedException e) { | 
 |             if (LOG.isDebugEnabled()) { | 
 |               LOG.debug(getTaskID() + " Progress/ping thread exiting " + | 
 |                         "since it got interrupted"); | 
 |             } | 
 |             break; | 
 |           } | 
 |  | 
 |           if (sendProgress) { | 
 |             // we need to send progress update | 
 |             updateCounters(); | 
 |             taskStatus.statusUpdate(taskProgress.get(), | 
 |                                     taskProgress.toString(),  | 
 |                                     counters); | 
 |             taskFound = umbilical.statusUpdate(taskId, taskStatus); | 
 |             taskStatus.clearStatus(); | 
 |           } | 
 |           else { | 
 |             // send ping  | 
 |             taskFound = umbilical.ping(taskId); | 
 |           } | 
 |  | 
 |           // if TaskTracker is not aware of our task ID (probably because it | 
 |           // died and came back up), kill ourselves | 
 |           if (!taskFound) { | 
 |             LOG.warn("Parent died.  Exiting "+taskId); | 
 |             resetDoneFlag(); | 
 |             System.exit(66); | 
 |           } | 
 |  | 
 |           sendProgress = resetProgressFlag();  | 
 |           remainingRetries = MAX_RETRIES; | 
 |         }  | 
 |         catch (Throwable t) { | 
 |           LOG.info("Communication exception: " + StringUtils.stringifyException(t)); | 
 |           remainingRetries -=1; | 
 |           if (remainingRetries == 0) { | 
 |             ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); | 
 |             LOG.warn("Last retry, killing "+taskId); | 
 |             resetDoneFlag(); | 
 |             System.exit(65); | 
 |           } | 
 |         } | 
 |       } | 
 |       //Notify that we are done with the work | 
 |       resetDoneFlag(); | 
 |     } | 
 |  | 
 |     void resetDoneFlag() { | 
 |       synchronized(lock) { | 
 |         done = true; | 
 |         lock.notify(); | 
 |       } | 
 |     } | 
 |  | 
 |     public void startCommunicationThread() { | 
 |       if (pingThread == null) { | 
 |         pingThread = new Thread(this, "communication thread"); | 
 |         pingThread.setDaemon(true); | 
 |         pingThread.start(); | 
 |       } | 
 |     } | 
 |     public void stopCommunicationThread() throws InterruptedException { | 
 |       if (pingThread != null) { | 
 |         synchronized(lock) { | 
 |           while(!done) { | 
 |             lock.wait(); | 
 |           } | 
 |         } | 
 |         pingThread.interrupt(); | 
 |         pingThread.join(); | 
 |       } | 
 |     } | 
 |   } | 
 |  | 
 |   /** | 
 |    *  Reports the next executing record range to TaskTracker. | 
 |    *   | 
 |    * @param umbilical | 
 |    * @param nextRecIndex the record index which would be fed next. | 
 |    * @throws IOException | 
 |    */ | 
 |   protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical,  | 
 |       long nextRecIndex) throws IOException{ | 
 |     //currentRecStartIndex is the start index which has not yet been finished  | 
 |     //and is still in task's stomach. | 
 |     long len = nextRecIndex - currentRecStartIndex +1; | 
 |     SortedRanges.Range range =  | 
 |       new SortedRanges.Range(currentRecStartIndex, len); | 
 |     taskStatus.setNextRecordRange(range); | 
 |     if (LOG.isDebugEnabled()) { | 
 |       LOG.debug("sending reportNextRecordRange " + range); | 
 |     } | 
 |     umbilical.reportNextRecordRange(taskIdForUmbilical, range); | 
 |   } | 
 |  | 
 |   /** | 
 |    * Create a TaskReporter and start communication thread | 
 |    */ | 
 |   TaskReporter startReporter(final TaskUmbilicalProtocol umbilical) {   | 
 |     // start thread that will handle communication with parent | 
 |     TaskReporter reporter = new TaskReporter(getProgress(), umbilical); | 
 |     reporter.startCommunicationThread(); | 
 |     return reporter; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Update resource information counters | 
 |    */ | 
 |   void updateResourceCounters() { | 
 |     if (resourceCalculator == null) { | 
 |       return; | 
 |     } | 
 |     ProcResourceValues res = resourceCalculator.getProcResourceValues(); | 
 |     long cpuTime = res.getCumulativeCpuTime(); | 
 |     long pMem = res.getPhysicalMemorySize(); | 
 |     long vMem = res.getVirtualMemorySize(); | 
 |     // Remove the CPU time consumed previously by JVM reuse | 
 |     cpuTime -= initCpuCumulativeTime; | 
 |     counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime); | 
 |     counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem); | 
 |     counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem); | 
 |   } | 
 |  | 
 |   /** | 
 |    * An updater that tracks the amount of time this task has spent in GC. | 
 |    */ | 
 |   class GcTimeUpdater { | 
 |     private long lastGcMillis = 0; | 
 |     private List<GarbageCollectorMXBean> gcBeans = null; | 
 |  | 
 |     public GcTimeUpdater() { | 
 |       this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); | 
 |       getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent. | 
 |     } | 
 |  | 
 |     /** | 
 |      * @return the number of milliseconds that the gc has used for CPU | 
 |      * since the last time this method was called. | 
 |      */ | 
 |     protected long getElapsedGc() { | 
 |       long thisGcMillis = 0; | 
 |       for (GarbageCollectorMXBean gcBean : gcBeans) { | 
 |         thisGcMillis += gcBean.getCollectionTime(); | 
 |       } | 
 |  | 
 |       long delta = thisGcMillis - lastGcMillis; | 
 |       this.lastGcMillis = thisGcMillis; | 
 |       return delta; | 
 |     } | 
 |  | 
 |     /** | 
 |      * Increment the gc-elapsed-time counter. | 
 |      */ | 
 |     public void incrementGcCounter() { | 
 |       if (null == counters) { | 
 |         return; // nothing to do. | 
 |       } | 
 |  | 
 |       Counter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS); | 
 |       if (null != gcCounter) { | 
 |         gcCounter.increment(getElapsedGc()); | 
 |       } | 
 |     } | 
 |   } | 
 |  | 
 |   /** | 
 |    * An updater that tracks the last number reported for a given file | 
 |    * system and only creates the counters when they are needed. | 
 |    */ | 
 |   class FileSystemStatisticUpdater { | 
 |     private FileSystem.Statistics stats; | 
 |     private Counters.Counter readBytesCounter, writeBytesCounter, | 
 |         readOpsCounter, largeReadOpsCounter, writeOpsCounter; | 
 |      | 
 |     FileSystemStatisticUpdater(FileSystem.Statistics stats) { | 
 |       this.stats = stats; | 
 |     } | 
 |  | 
 |     void updateCounters() { | 
 |       String scheme = stats.getScheme(); | 
 |       if (readBytesCounter == null) { | 
 |         readBytesCounter = counters.findCounter(scheme, | 
 |             FileSystemCounter.BYTES_READ); | 
 |       } | 
 |       readBytesCounter.setValue(stats.getBytesRead()); | 
 |       if (writeBytesCounter == null) { | 
 |         writeBytesCounter = counters.findCounter(scheme, | 
 |             FileSystemCounter.BYTES_WRITTEN); | 
 |       } | 
 |       writeBytesCounter.setValue(stats.getBytesWritten()); | 
 |       if (readOpsCounter == null) { | 
 |         readOpsCounter = counters.findCounter(scheme, | 
 |             FileSystemCounter.READ_OPS); | 
 |       } | 
 |       readOpsCounter.setValue(stats.getReadOps()); | 
 |       if (largeReadOpsCounter == null) { | 
 |         largeReadOpsCounter = counters.findCounter(scheme, | 
 |             FileSystemCounter.LARGE_READ_OPS); | 
 |       } | 
 |       largeReadOpsCounter.setValue(stats.getLargeReadOps()); | 
 |       if (writeOpsCounter == null) { | 
 |         writeOpsCounter = counters.findCounter(scheme, | 
 |             FileSystemCounter.WRITE_OPS); | 
 |       } | 
 |       writeOpsCounter.setValue(stats.getWriteOps()); | 
 |     } | 
 |   } | 
 |    | 
 |   /** | 
 |    * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater | 
 |    */ | 
 |   private Map<String, FileSystemStatisticUpdater> statisticUpdaters = | 
 |      new HashMap<String, FileSystemStatisticUpdater>(); | 
 |    | 
 |   private synchronized void updateCounters() { | 
 |     for(Statistics stat: FileSystem.getAllStatistics()) { | 
 |       String uriScheme = stat.getScheme(); | 
 |       FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme); | 
 |       if(updater==null) {//new FileSystem has been found in the cache | 
 |         updater = new FileSystemStatisticUpdater(stat); | 
 |         statisticUpdaters.put(uriScheme, updater); | 
 |       } | 
 |       updater.updateCounters();       | 
 |     } | 
 |  | 
 |     gcUpdater.incrementGcCounter(); | 
 |     updateResourceCounters(); | 
 |   } | 
 |  | 
 |   public void done(TaskUmbilicalProtocol umbilical, | 
 |                    TaskReporter reporter | 
 |                    ) throws IOException, InterruptedException { | 
 |     if (isUberTask()) { | 
 |       LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId | 
 |                + "' is done and is in the process of committing."); | 
 |     } else { | 
 |       LOG.info("Task '" + taskId | 
 |                + "' is done and is in the process of committing."); | 
 |     } | 
 |     updateCounters(); | 
 |  | 
 |     boolean commitRequired = isCommitRequired(); | 
 |     if (commitRequired) { | 
 |       int retries = MAX_RETRIES; | 
 |       setState(TaskStatus.State.COMMIT_PENDING); | 
 |       // say the task tracker that task is commit pending | 
 |       while (true) { | 
 |         try { | 
 |           umbilical.commitPending(taskIdForUmbilical, taskStatus); | 
 |           break; | 
 |         } catch (InterruptedException ie) { | 
 |           // ignore | 
 |         } catch (IOException ie) { | 
 |           LOG.warn("Failure sending commit pending: " +  | 
 |                     StringUtils.stringifyException(ie)); | 
 |           if (--retries == 0) { | 
 |             System.exit(67); | 
 |           } | 
 |         } | 
 |       } | 
 |       // wait for commit approval (via JT commitAction for this task) and commit | 
 |       commitAfterApproval(umbilical, reporter); | 
 |     } | 
 |     taskDone.set(true); | 
 |     reporter.stopCommunicationThread(); | 
 |     // Make sure we send at least one set of counter increments. It's | 
 |     // ok to call updateCounters() in this thread after comm thread stopped. | 
 |     updateCounters(); | 
 |     sendLastUpdate(umbilical); | 
 |     //signal the tasktracker that we are done | 
 |     sendDone(umbilical); | 
 |   } | 
 |  | 
 |   /** | 
 |    * Checks if this task has anything to commit, depending on the | 
 |    * type of task, as well as on whether the {@link OutputCommitter} | 
 |    * has anything to commit. | 
 |    *  | 
 |    * @return true if the task has to commit | 
 |    * @throws IOException | 
 |    */ | 
 |   boolean isCommitRequired() throws IOException { | 
 |     boolean commitRequired = false; | 
 |     if (isMapOrReduce()) { | 
 |       commitRequired = committer.needsTaskCommit(taskContext); | 
 |     } | 
 |     return commitRequired; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Send a status update to the task tracker | 
 |    * @param umbilical | 
 |    * @throws IOException | 
 |    */ | 
 |   public void statusUpdate(TaskUmbilicalProtocol umbilical)  | 
 |   throws IOException { | 
 |     int retries = MAX_RETRIES; | 
 |     while (true) { | 
 |       try { | 
 |         // FIXME (later):  alternatives to taskIdForUmbilical would be | 
 |         // (1) include taskId as part of umbilical object and protocol; | 
 |         // (2) include taskId as part of taskStatus | 
 |         // (3) extend TaskAttemptID (or create related Task inner class?) to | 
 |         //     include taskAttemptId() and taskAtteptIdForUmbilical() method | 
 |         //     that's overridden in uber context [Dick] | 
 |         if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) { | 
 |           LOG.warn("Parent died.  Exiting " + taskId); | 
 |           System.exit(66); | 
 |         } | 
 |         taskStatus.clearStatus(); | 
 |         return; | 
 |       } catch (InterruptedException ie) { | 
 |         Thread.currentThread().interrupt(); // interrupt ourself | 
 |       } catch (IOException ie) { | 
 |         LOG.warn("Failure sending status update: " +  | 
 |                   StringUtils.stringifyException(ie)); | 
 |         if (--retries == 0) { | 
 |           throw ie; | 
 |         } | 
 |       } | 
 |     } | 
 |   } | 
 |    | 
 |   /** | 
 |    * Sends last status update before sending umbilical.done();  | 
 |    */ | 
 |   private void sendLastUpdate(TaskUmbilicalProtocol umbilical)  | 
 |   throws IOException { | 
 |     taskStatus.setOutputSize(calculateOutputSize()); | 
 |     // send a final status report | 
 |     taskStatus.statusUpdate(taskProgress.get(), | 
 |                             taskProgress.toString(),  | 
 |                             counters); | 
 |     statusUpdate(umbilical); | 
 |   } | 
 |  | 
 |   /** | 
 |    * Calculates the size of output for this task. | 
 |    *  | 
 |    * @return -1 if it can't be found. | 
 |    */ | 
 |    private long calculateOutputSize() throws IOException { | 
 |     if (!isMapOrReduce() || isUberTask()) { | 
 |       return -1; | 
 |     } | 
 |  | 
 |     if (isMapTask() && conf.getNumReduceTasks() > 0) { | 
 |       try { | 
 |         Path mapOutput =  mapOutputFile.getOutputFile(); | 
 |         FileSystem localFS = FileSystem.getLocal(conf); | 
 |         return localFS.getFileStatus(mapOutput).getLen(); | 
 |       } catch (IOException e) { | 
 |         LOG.warn ("Could not find output size " , e); | 
 |       } | 
 |     } | 
 |     return -1; | 
 |   } | 
 |  | 
 |   private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException { | 
 |     int retries = MAX_RETRIES; | 
 |     while (true) { | 
 |       try { | 
 |         umbilical.done(taskIdForUmbilical); | 
 |         if (isUberTask()) { | 
 |           LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId | 
 |                    + "' done."); | 
 |         } else { | 
 |           LOG.info("Task '" + taskId + "' done."); | 
 |         } | 
 |         return; | 
 |       } catch (IOException ie) { | 
 |         LOG.warn("Failure signalling completion: " +  | 
 |                  StringUtils.stringifyException(ie)); | 
 |         if (--retries == 0) { | 
 |           throw ie; | 
 |         } | 
 |       } | 
 |     } | 
 |   } | 
 |  | 
 |   private void commitAfterApproval(TaskUmbilicalProtocol umbilical, | 
 |                                    TaskReporter reporter) throws IOException { | 
 |     int retries = MAX_RETRIES; | 
 |     while (true) { | 
 |       try { | 
 |         while (!umbilical.canCommit(taskIdForUmbilical)) { | 
 |           try { | 
 |             // FIXME 1:  shouldn't this count down retries, too, in case JT glitched and no longer knows about us?  (else infinite loop) | 
 |             Thread.sleep(1000);  // FIXME 2:  shouldn't hardcoded 1-second sleep instead correspond to heartbeat interval for task? | 
 |           } catch(InterruptedException ie) { | 
 |             //ignore | 
 |           } | 
 |           reporter.setProgressFlag(); | 
 |         } | 
 |         break; | 
 |       } catch (IOException ie) { | 
 |         LOG.warn("Failure asking whether task can commit: " + | 
 |             StringUtils.stringifyException(ie)); | 
 |         if (--retries == 0) { | 
 |           // if it couldn't query successfully then delete the output | 
 |           discardOutput(taskContext); | 
 |           System.exit(68); | 
 |         } | 
 |       } | 
 |     } | 
 |  | 
 |     // task can Commit now   | 
 |     commit(umbilical, reporter); | 
 |   } | 
 |  | 
 |   // this is protected (rather than private) solely for UberTask | 
 |   protected void commit(TaskUmbilicalProtocol umbilical, | 
 |                         TaskReporter reporter) throws IOException { | 
 |     try { | 
 |       LOG.info("Task " + taskId + " is allowed to commit now"); | 
 |       committer.commitTask(taskContext); | 
 |       return; | 
 |     } catch (IOException ioe) { | 
 |       LOG.warn("Failure committing: " +  | 
 |         StringUtils.stringifyException(ioe)); | 
 |       // if it couldn't commit successfully then delete the output | 
 |       discardOutput(taskContext); | 
 |       throw ioe; | 
 |     } | 
 |   } | 
 |  | 
 |   private  | 
 |   void discardOutput(TaskAttemptContext taskContext) { | 
 |     try { | 
 |       committer.abortTask(taskContext); | 
 |     } catch (IOException ioe)  { | 
 |       LOG.warn("Failure cleaning up: " +  | 
 |                StringUtils.stringifyException(ioe)); | 
 |     } | 
 |   } | 
 |  | 
 |   protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical, | 
 |                                 TaskReporter reporter)  | 
 |   throws IOException, InterruptedException { | 
 |     taskCleanup(umbilical); | 
 |     done(umbilical, reporter); | 
 |   } | 
 |  | 
 |   void taskCleanup(TaskUmbilicalProtocol umbilical)  | 
 |   throws IOException { | 
 |     // set phase for this task | 
 |     setPhase(TaskStatus.Phase.CLEANUP); | 
 |     getProgress().setStatus("cleanup"); | 
 |     statusUpdate(umbilical); | 
 |     LOG.info("Running cleanup for the task"); | 
 |     // do the cleanup | 
 |     committer.abortTask(taskContext); | 
 |   } | 
 |  | 
 |   protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical, | 
 |                                TaskReporter reporter | 
 |                               ) throws IOException, InterruptedException { | 
 |     // set phase for this task | 
 |     setPhase(TaskStatus.Phase.CLEANUP); | 
 |     getProgress().setStatus("cleanup"); | 
 |     statusUpdate(umbilical); | 
 |     // do the cleanup | 
 |     LOG.info("Cleaning up job"); | 
 |     if (jobRunStateForCleanup == JobStatus.State.FAILED  | 
 |         || jobRunStateForCleanup == JobStatus.State.KILLED) { | 
 |       LOG.info("Aborting job with runstate : " + jobRunStateForCleanup.name()); | 
 |       if (conf.getUseNewMapper()) { | 
 |         committer.abortJob(jobContext, jobRunStateForCleanup); | 
 |       } else { | 
 |         org.apache.hadoop.mapred.OutputCommitter oldCommitter =  | 
 |           (org.apache.hadoop.mapred.OutputCommitter)committer; | 
 |         oldCommitter.abortJob(jobContext, jobRunStateForCleanup); | 
 |       } | 
 |     } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){ | 
 |       // delete <outputdir>/_temporary and optionally create _SUCCESS file | 
 |       if (!isUberTask()) {  // defer since output files have not yet been saved | 
 |         commitJob(); | 
 |       } | 
 |     } else { | 
 |       throw new IOException("Invalid state of the job for cleanup. State found " | 
 |                             + jobRunStateForCleanup + " expecting " | 
 |                             + JobStatus.State.SUCCEEDED + ", "  | 
 |                             + JobStatus.State.FAILED + " or " | 
 |                             + JobStatus.State.KILLED); | 
 |     } | 
 |      | 
 |     // delete the staging area for the job (e.g., | 
 |     // "hdfs://localhost:9000/tmp/hadoop-<user>/mapred/staging/<user>/.staging/ | 
 |     // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir) | 
 |     JobConf conf = new JobConf(jobContext.getConfiguration()); | 
 |     if (!supportIsolationRunner(conf)) { | 
 |       String jobStagingDir = conf.get("mapreduce.job.dir"); | 
 |       Path jobStagingDirPath = new Path(jobStagingDir); | 
 |       FileSystem fs = jobStagingDirPath.getFileSystem(conf); | 
 |       fs.delete(jobStagingDirPath, true); | 
 |     } | 
 |     // update counters, save any pending output files, shut down the progress- | 
 |     // reporter communication thread and the umbilical, and mark the task done | 
 |     if (!isUberTask()) {  // defer so UberTask can send TT final update(s) | 
 |       done(umbilical, reporter); | 
 |     } | 
 |   } | 
 |  | 
 |   protected void commitJob() throws IOException { | 
 |     LOG.info("Committing job"); | 
 |     committer.commitJob(jobContext); | 
 |   } | 
 |  | 
 |   protected boolean supportIsolationRunner(JobConf conf) { | 
 |     return (conf.getKeepTaskFilesPattern() != null || | 
 |             conf.getKeepFailedTaskFiles()); | 
 |   } | 
 |  | 
 |   protected void runJobSetupTask(TaskUmbilicalProtocol umbilical, | 
 |                              TaskReporter reporter | 
 |                              ) throws IOException, InterruptedException { | 
 |     // do the setup | 
 |     getProgress().setStatus("setup"); | 
 |     committer.setupJob(jobContext); | 
 |     if (!isUberTask()) { | 
 |       // UberTask calls done() directly; don't shut down umbilical prematurely | 
 |       done(umbilical, reporter); | 
 |     } | 
 |   } | 
 |  | 
 |   public void setConf(Configuration conf) { | 
 |     if (conf instanceof JobConf) { | 
 |       this.conf = (JobConf) conf; | 
 |     } else { | 
 |       this.conf = new JobConf(conf); | 
 |     } | 
 |     this.mapOutputFile = ReflectionUtils.newInstance( | 
 |         conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, | 
 |           MROutputFiles.class, MapOutputFile.class), conf); | 
 |     this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); | 
 |     // add the static resolutions (this is required for the junit to | 
 |     // work on testcases that simulate multiple nodes on a single physical | 
 |     // node. | 
 |     String hostToResolved[] = conf.getStrings(TTConfig.TT_STATIC_RESOLUTIONS); | 
 |     if (hostToResolved != null) { | 
 |       for (String str : hostToResolved) { | 
 |         String name = str.substring(0, str.indexOf('=')); | 
 |         String resolvedName = str.substring(str.indexOf('=') + 1); | 
 |         NetUtils.addStaticResolution(name, resolvedName); | 
 |       } | 
 |     } | 
 |   } | 
 |  | 
 |   public Configuration getConf() { | 
 |     return this.conf; | 
 |   } | 
 |  | 
 |   /** | 
 |    * OutputCollector for the combiner. | 
 |    */ | 
 |   @InterfaceAudience.Private | 
 |   @InterfaceStability.Unstable | 
 |   public static class CombineOutputCollector<K extends Object, V extends Object>  | 
 |   implements OutputCollector<K, V> { | 
 |     private Writer<K, V> writer; | 
 |     private Counters.Counter outCounter; | 
 |     public CombineOutputCollector(Counters.Counter outCounter) { | 
 |       this.outCounter = outCounter; | 
 |     } | 
 |     public synchronized void setWriter(Writer<K, V> writer) { | 
 |       this.writer = writer; | 
 |     } | 
 |     public synchronized void collect(K key, V value) | 
 |         throws IOException { | 
 |       outCounter.increment(1); | 
 |       writer.append(key, value); | 
 |     } | 
 |   } | 
 |  | 
 |   /** Iterates values while keys match in sorted input. */ | 
 |   static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> { | 
 |     protected RawKeyValueIterator in; //input iterator | 
 |     private KEY key;               // current key | 
 |     private KEY nextKey; | 
 |     private VALUE value;             // current value | 
 |     private boolean hasNext;                      // more w/ this key | 
 |     private boolean more;                         // more in file | 
 |     private RawComparator<KEY> comparator; | 
 |     protected Progressable reporter; | 
 |     private Deserializer<KEY> keyDeserializer; | 
 |     private Deserializer<VALUE> valDeserializer; | 
 |     private DataInputBuffer keyIn = new DataInputBuffer(); | 
 |     private DataInputBuffer valueIn = new DataInputBuffer(); | 
 |      | 
 |     public ValuesIterator (RawKeyValueIterator in,  | 
 |                            RawComparator<KEY> comparator,  | 
 |                            Class<KEY> keyClass, | 
 |                            Class<VALUE> valClass, Configuration conf,  | 
 |                            Progressable reporter) | 
 |       throws IOException { | 
 |       this.in = in; | 
 |       this.comparator = comparator; | 
 |       this.reporter = reporter; | 
 |       SerializationFactory serializationFactory = new SerializationFactory(conf); | 
 |       this.keyDeserializer = serializationFactory.getDeserializer(keyClass); | 
 |       this.keyDeserializer.open(keyIn); | 
 |       this.valDeserializer = serializationFactory.getDeserializer(valClass); | 
 |       this.valDeserializer.open(this.valueIn); | 
 |       readNextKey(); | 
 |       key = nextKey; | 
 |       nextKey = null; // force new instance creation | 
 |       hasNext = more; | 
 |     } | 
 |  | 
 |     RawKeyValueIterator getRawIterator() { return in; } | 
 |      | 
 |     /// Iterator methods | 
 |  | 
 |     public boolean hasNext() { return hasNext; } | 
 |  | 
 |     private int ctr = 0; | 
 |     public VALUE next() { | 
 |       if (!hasNext) { | 
 |         throw new NoSuchElementException("iterate past last value"); | 
 |       } | 
 |       try { | 
 |         readNextValue(); | 
 |         readNextKey(); | 
 |       } catch (IOException ie) { | 
 |         throw new RuntimeException("problem advancing post rec#"+ctr, ie); | 
 |       } | 
 |       reporter.progress(); | 
 |       return value; | 
 |     } | 
 |  | 
 |     public void remove() { throw new RuntimeException("not implemented"); } | 
 |  | 
 |     /// Auxiliary methods | 
 |  | 
 |     /** Start processing next unique key. */ | 
 |     public void nextKey() throws IOException { | 
 |       // read until we find a new key | 
 |       while (hasNext) {  | 
 |         readNextKey(); | 
 |       } | 
 |       ++ctr; | 
 |        | 
 |       // move the next key to the current one | 
 |       KEY tmpKey = key; | 
 |       key = nextKey; | 
 |       nextKey = tmpKey; | 
 |       hasNext = more; | 
 |     } | 
 |  | 
 |     /** True iff more keys remain. */ | 
 |     public boolean more() {  | 
 |       return more;  | 
 |     } | 
 |  | 
 |     /** The current key. */ | 
 |     public KEY getKey() {  | 
 |       return key;  | 
 |     } | 
 |  | 
 |     /**  | 
 |      * read the next key  | 
 |      */ | 
 |     private void readNextKey() throws IOException { | 
 |       more = in.next(); | 
 |       if (more) { | 
 |         DataInputBuffer nextKeyBytes = in.getKey(); | 
 |         keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength()); | 
 |         nextKey = keyDeserializer.deserialize(nextKey); | 
 |         hasNext = key != null && (comparator.compare(key, nextKey) == 0); | 
 |       } else { | 
 |         hasNext = false; | 
 |       } | 
 |     } | 
 |  | 
 |     /** | 
 |      * Read the next value | 
 |      * @throws IOException | 
 |      */ | 
 |     private void readNextValue() throws IOException { | 
 |       DataInputBuffer nextValueBytes = in.getValue(); | 
 |       valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength()); | 
 |       value = valDeserializer.deserialize(value); | 
 |     } | 
 |   } | 
 |  | 
 |     /** Iterator to return Combined values */ | 
 |   @InterfaceAudience.Private | 
 |   @InterfaceStability.Unstable | 
 |   public static class CombineValuesIterator<KEY,VALUE> | 
 |       extends ValuesIterator<KEY,VALUE> { | 
 |  | 
 |     private final Counters.Counter combineInputCounter; | 
 |  | 
 |     public CombineValuesIterator(RawKeyValueIterator in, | 
 |         RawComparator<KEY> comparator, Class<KEY> keyClass, | 
 |         Class<VALUE> valClass, Configuration conf, Reporter reporter, | 
 |         Counters.Counter combineInputCounter) throws IOException { | 
 |       super(in, comparator, keyClass, valClass, conf, reporter); | 
 |       this.combineInputCounter = combineInputCounter; | 
 |     } | 
 |  | 
 |     public VALUE next() { | 
 |       combineInputCounter.increment(1); | 
 |       return super.next(); | 
 |     } | 
 |   } | 
 |  | 
 |   @SuppressWarnings("unchecked") | 
 |   protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>  | 
 |   org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context | 
 |   createReduceContext(org.apache.hadoop.mapreduce.Reducer | 
 |                         <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer, | 
 |                       Configuration job, | 
 |                       org.apache.hadoop.mapreduce.TaskAttemptID taskId,  | 
 |                       RawKeyValueIterator rIter, | 
 |                       org.apache.hadoop.mapreduce.Counter inputKeyCounter, | 
 |                       org.apache.hadoop.mapreduce.Counter inputValueCounter, | 
 |                       org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,  | 
 |                       org.apache.hadoop.mapreduce.OutputCommitter committer, | 
 |                       org.apache.hadoop.mapreduce.StatusReporter reporter, | 
 |                       RawComparator<INKEY> comparator, | 
 |                       Class<INKEY> keyClass, Class<INVALUE> valueClass | 
 |   ) throws IOException, InterruptedException { | 
 |     org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>  | 
 |     reduceContext =  | 
 |       new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId,  | 
 |                                                               rIter,  | 
 |                                                               inputKeyCounter,  | 
 |                                                               inputValueCounter,  | 
 |                                                               output,  | 
 |                                                               committer,  | 
 |                                                               reporter,  | 
 |                                                               comparator,  | 
 |                                                               keyClass,  | 
 |                                                               valueClass); | 
 |  | 
 |     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context  | 
 |         reducerContext =  | 
 |           new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext( | 
 |               reduceContext); | 
 |  | 
 |     return reducerContext; | 
 |   } | 
 |  | 
 |   @InterfaceAudience.Private | 
 |   @InterfaceStability.Unstable | 
 |   protected static abstract class CombinerRunner<K,V> { | 
 |     protected final Counters.Counter inputCounter; | 
 |     protected final JobConf job; | 
 |     protected final TaskReporter reporter; | 
 |  | 
 |     CombinerRunner(Counters.Counter inputCounter, | 
 |                    JobConf job, | 
 |                    TaskReporter reporter) { | 
 |       this.inputCounter = inputCounter; | 
 |       this.job = job; | 
 |       this.reporter = reporter; | 
 |     } | 
 |      | 
 |     /** | 
 |      * Run the combiner over a set of inputs. | 
 |      * @param iterator the key/value pairs to use as input | 
 |      * @param collector the output collector | 
 |      */ | 
 |     abstract void combine(RawKeyValueIterator iterator,  | 
 |                           OutputCollector<K,V> collector | 
 |                          ) throws IOException, InterruptedException,  | 
 |                                   ClassNotFoundException; | 
 |  | 
 |     @SuppressWarnings("unchecked") | 
 |     static <K,V>  | 
 |     CombinerRunner<K,V> create(JobConf job, | 
 |                                TaskAttemptID taskId, | 
 |                                Counters.Counter inputCounter, | 
 |                                TaskReporter reporter, | 
 |                                org.apache.hadoop.mapreduce.OutputCommitter committer | 
 |                               ) throws ClassNotFoundException { | 
 |       Class<? extends Reducer<K,V,K,V>> cls =  | 
 |         (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); | 
 |  | 
 |       if (cls != null) { | 
 |         return new OldCombinerRunner(cls, job, inputCounter, reporter); | 
 |       } | 
 |       // make a task context so we can get the classes | 
 |       org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = | 
 |         new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId); | 
 |       Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =  | 
 |         (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) | 
 |            taskContext.getCombinerClass(); | 
 |       if (newcls != null) { | 
 |         return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,  | 
 |                                           inputCounter, reporter, committer); | 
 |       } | 
 |        | 
 |       return null; | 
 |     } | 
 |   } | 
 |    | 
 |   @InterfaceAudience.Private | 
 |   @InterfaceStability.Unstable | 
 |   protected static class OldCombinerRunner<K,V> extends CombinerRunner<K,V> { | 
 |     private final Class<? extends Reducer<K,V,K,V>> combinerClass; | 
 |     private final Class<K> keyClass; | 
 |     private final Class<V> valueClass; | 
 |     private final RawComparator<K> comparator; | 
 |  | 
 |     @SuppressWarnings("unchecked") | 
 |     protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls, | 
 |                                 JobConf conf, | 
 |                                 Counters.Counter inputCounter, | 
 |                                 TaskReporter reporter) { | 
 |       super(inputCounter, conf, reporter); | 
 |       combinerClass = cls; | 
 |       keyClass = (Class<K>) job.getMapOutputKeyClass(); | 
 |       valueClass = (Class<V>) job.getMapOutputValueClass(); | 
 |       comparator = (RawComparator<K>) job.getOutputKeyComparator(); | 
 |     } | 
 |  | 
 |     @SuppressWarnings("unchecked") | 
 |     protected void combine(RawKeyValueIterator kvIter, | 
 |                            OutputCollector<K,V> combineCollector | 
 |                            ) throws IOException { | 
 |       Reducer<K,V,K,V> combiner =  | 
 |         ReflectionUtils.newInstance(combinerClass, job); | 
 |       try { | 
 |         CombineValuesIterator<K,V> values =  | 
 |           new CombineValuesIterator<K,V>(kvIter, comparator, keyClass,  | 
 |                                          valueClass, job, Reporter.NULL, | 
 |                                          inputCounter); | 
 |         while (values.more()) { | 
 |           combiner.reduce(values.getKey(), values, combineCollector, | 
 |               Reporter.NULL); | 
 |           values.nextKey(); | 
 |         } | 
 |       } finally { | 
 |         combiner.close(); | 
 |       } | 
 |     } | 
 |   } | 
 |    | 
 |   @InterfaceAudience.Private | 
 |   @InterfaceStability.Unstable | 
 |   protected static class NewCombinerRunner<K, V> extends CombinerRunner<K,V> { | 
 |     private final Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>  | 
 |         reducerClass; | 
 |     private final org.apache.hadoop.mapreduce.TaskAttemptID taskId; | 
 |     private final RawComparator<K> comparator; | 
 |     private final Class<K> keyClass; | 
 |     private final Class<V> valueClass; | 
 |     private final org.apache.hadoop.mapreduce.OutputCommitter committer; | 
 |  | 
 |     @SuppressWarnings("unchecked") | 
 |     NewCombinerRunner(Class reducerClass, | 
 |                       JobConf job, | 
 |                       org.apache.hadoop.mapreduce.TaskAttemptID taskId, | 
 |                       org.apache.hadoop.mapreduce.TaskAttemptContext context, | 
 |                       Counters.Counter inputCounter, | 
 |                       TaskReporter reporter, | 
 |                       org.apache.hadoop.mapreduce.OutputCommitter committer) { | 
 |       super(inputCounter, job, reporter); | 
 |       this.reducerClass = reducerClass; | 
 |       this.taskId = taskId; | 
 |       keyClass = (Class<K>) context.getMapOutputKeyClass(); | 
 |       valueClass = (Class<V>) context.getMapOutputValueClass(); | 
 |       comparator = (RawComparator<K>) context.getSortComparator(); | 
 |       this.committer = committer; | 
 |     } | 
 |  | 
 |     private static class OutputConverter<K,V> | 
 |             extends org.apache.hadoop.mapreduce.RecordWriter<K,V> { | 
 |       OutputCollector<K,V> output; | 
 |       OutputConverter(OutputCollector<K,V> output) { | 
 |         this.output = output; | 
 |       } | 
 |  | 
 |       @Override | 
 |       public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context){ | 
 |       } | 
 |  | 
 |       @Override | 
 |       public void write(K key, V value | 
 |                         ) throws IOException, InterruptedException { | 
 |         output.collect(key,value); | 
 |       } | 
 |     } | 
 |  | 
 |     @SuppressWarnings("unchecked") | 
 |     @Override | 
 |     void combine(RawKeyValueIterator iterator,  | 
 |                  OutputCollector<K,V> collector | 
 |                  ) throws IOException, InterruptedException, | 
 |                           ClassNotFoundException { | 
 |       // make a reducer | 
 |       org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer = | 
 |         (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>) | 
 |           ReflectionUtils.newInstance(reducerClass, job); | 
 |       org.apache.hadoop.mapreduce.Reducer.Context  | 
 |            reducerContext = createReduceContext(reducer, job, taskId, | 
 |                                                 iterator, null, inputCounter,  | 
 |                                                 new OutputConverter(collector), | 
 |                                                 committer, | 
 |                                                 reporter, comparator, keyClass, | 
 |                                                 valueClass); | 
 |       reducer.run(reducerContext); | 
 |     }  | 
 |   } | 
 | } |