| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.lang.management.GarbageCollectorMXBean; |
| import java.lang.management.ManagementFactory; |
| import java.text.NumberFormat; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import javax.crypto.SecretKey; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.FileSystem.Statistics; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.RawComparator; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableUtils; |
| import org.apache.hadoop.io.serializer.Deserializer; |
| import org.apache.hadoop.io.serializer.SerializationFactory; |
| import org.apache.hadoop.mapred.IFile.Writer; |
| import org.apache.hadoop.mapreduce.Counter; |
| import org.apache.hadoop.mapreduce.FileSystemCounter; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.TaskCounter; |
| import org.apache.hadoop.mapreduce.JobStatus; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; |
| import org.apache.hadoop.mapreduce.task.ReduceContextImpl; |
| import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin; |
| import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.*; |
| import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.util.Progress; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * Base class for tasks. |
| */ |
| @InterfaceAudience.LimitedPrivate({"MapReduce"}) |
| @InterfaceStability.Unstable |
| abstract public class Task implements Writable, Configurable { |
| private static final Log LOG = |
| LogFactory.getLog(Task.class); |
| |
| public static String MERGED_OUTPUT_PREFIX = ".merged"; |
| |
| /** |
| * Name of the FileSystem counters' group |
| */ |
| protected static final String FILESYSTEM_COUNTER_GROUP = "FileSystemCounters"; |
| |
| /////////////////////////////////////////////////////////// |
| // Helper methods to construct task-output paths |
| /////////////////////////////////////////////////////////// |
| |
| /** Construct output file names so that, when an output directory listing is |
| * sorted lexicographically, positions correspond to output partitions.*/ |
| private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); |
| static { |
| NUMBER_FORMAT.setMinimumIntegerDigits(5); |
| NUMBER_FORMAT.setGroupingUsed(false); |
| } |
| |
| static synchronized String getOutputName(int partition) { |
| return "part-" + NUMBER_FORMAT.format(partition); |
| } |
| |
| //////////////////////////////////////////// |
| // Fields |
| //////////////////////////////////////////// |
| |
| private String jobFile; // job configuration file |
| private String user; // user running the job |
| private TaskAttemptID taskId; // unique, includes job id |
| private TaskAttemptID taskIdForUmbilical; // same, or uber's if subtask |
| private int partition; // id within job |
| TaskStatus taskStatus; // current status of the task |
| protected JobStatus.State jobRunStateForCleanup; |
| protected boolean jobCleanup = false; |
| protected boolean jobSetup = false; |
| protected boolean taskCleanup = false; |
| |
| //skip ranges based on failed ranges from previous attempts |
| private SortedRanges skipRanges = new SortedRanges(); |
| private boolean skipping = false; |
| private boolean writeSkipRecs = true; |
| |
| //currently processing record start index |
| private volatile long currentRecStartIndex; |
| private Iterator<Long> currentRecIndexIterator = |
| skipRanges.skipRangeIterator(); |
| |
| private ResourceCalculatorPlugin resourceCalculator = null; |
| private long initCpuCumulativeTime = 0; |
| |
| protected JobConf conf; |
| protected MapOutputFile mapOutputFile; |
| protected LocalDirAllocator lDirAlloc; |
| private final static int MAX_RETRIES = 10; |
| protected JobContext jobContext; |
| protected TaskAttemptContext taskContext; |
| protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat; |
| protected org.apache.hadoop.mapreduce.OutputCommitter committer; |
| protected final Counters.Counter spilledRecordsCounter; |
| protected final Counters.Counter failedShuffleCounter; |
| protected final Counters.Counter mergedMapOutputsCounter; |
| private int numSlotsRequired; |
| protected TaskUmbilicalProtocol umbilical; |
| protected SecretKey tokenSecret; |
| protected GcTimeUpdater gcUpdater; |
| |
| //////////////////////////////////////////// |
| // Constructors |
| //////////////////////////////////////////// |
| |
| public Task() { |
| taskId = new TaskAttemptID(); |
| taskIdForUmbilical = taskId; |
| spilledRecordsCounter = |
| counters.findCounter(TaskCounter.SPILLED_RECORDS); |
| failedShuffleCounter = |
| counters.findCounter(TaskCounter.FAILED_SHUFFLE); |
| mergedMapOutputsCounter = |
| counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS); |
| gcUpdater = new GcTimeUpdater(); |
| } |
| |
| public Task(String jobFile, TaskAttemptID taskId, int partition, |
| int numSlotsRequired) { |
| this.jobFile = jobFile; |
| this.taskId = taskId; |
| this.taskIdForUmbilical = taskId; |
| |
| this.partition = partition; |
| this.numSlotsRequired = numSlotsRequired; |
| spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); |
| failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE); |
| mergedMapOutputsCounter = |
| counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS); |
| gcUpdater = new GcTimeUpdater(); |
| } |
| |
| //////////////////////////////////////////// |
| // Accessors |
| //////////////////////////////////////////// |
| public void setJobFile(String jobFile) { this.jobFile = jobFile; } |
| public String getJobFile() { return jobFile; } |
| |
| public TaskAttemptID getTaskID() { return taskId; } |
| public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) { |
| this.taskIdForUmbilical = taskIdForUmbilical; |
| } |
| |
| public int getNumSlotsRequired() { |
| return numSlotsRequired; |
| } |
| |
| Counters getCounters() { return counters; } |
| |
| /** |
| * Get the job name for this task. |
| * @return the job name |
| */ |
| public JobID getJobID() { |
| return taskId.getJobID(); |
| } |
| |
| /** |
| * Set the job token secret |
| * @param tokenSecret the secret |
| */ |
| public void setJobTokenSecret(SecretKey tokenSecret) { |
| this.tokenSecret = tokenSecret; |
| } |
| |
| /** |
| * Get the job token secret |
| * @return the token secret |
| */ |
| public SecretKey getJobTokenSecret() { |
| return this.tokenSecret; |
| } |
| |
| |
| /** |
| * Get the index of this task within the job. |
| * @return the integer part of the task id |
| */ |
| public int getPartition() { |
| return partition; |
| } |
| /** |
| * Return current phase of the task. |
| * needs to be synchronized as communication thread sends the phase every second |
| * @return the curent phase of the task |
| */ |
| public synchronized TaskStatus.Phase getPhase(){ |
| return this.taskStatus.getPhase(); |
| } |
| /** |
| * Set current phase of the task. |
| * @param phase task phase |
| */ |
| protected synchronized void setPhase(TaskStatus.Phase phase){ |
| this.taskStatus.setPhase(phase); |
| } |
| |
| /** |
| * Get whether to write skip records. |
| */ |
| protected boolean toWriteSkipRecs() { |
| return writeSkipRecs; |
| } |
| |
| /** |
| * Set whether to write skip records. |
| */ |
| protected void setWriteSkipRecs(boolean writeSkipRecs) { |
| this.writeSkipRecs = writeSkipRecs; |
| } |
| |
| /** |
| * Report a fatal error to the parent (task) tracker. |
| */ |
| protected void reportFatalError(Throwable throwable, |
| String logMsg) { |
| LOG.fatal(logMsg); |
| Throwable tCause = throwable.getCause(); |
| String cause = tCause == null |
| ? StringUtils.stringifyException(throwable) |
| : StringUtils.stringifyException(tCause); |
| try { |
| umbilical.fatalError(taskIdForUmbilical, cause); |
| } catch (IOException ioe) { |
| LOG.fatal("Failed to contact the tasktracker", ioe); |
| System.exit(-1); |
| } |
| } |
| |
| /** |
| * Get skipRanges. |
| */ |
| public SortedRanges getSkipRanges() { |
| return skipRanges; |
| } |
| |
| /** |
| * Set skipRanges. |
| */ |
| public void setSkipRanges(SortedRanges skipRanges) { |
| this.skipRanges = skipRanges; |
| } |
| |
| /** |
| * Is Task in skipping mode. |
| */ |
| public boolean isSkipping() { |
| return skipping; |
| } |
| |
| /** |
| * Sets whether to run Task in skipping mode. |
| * @param skipping |
| */ |
| public void setSkipping(boolean skipping) { |
| this.skipping = skipping; |
| } |
| |
| /** |
| * Return current state of the task. |
| * needs to be synchronized as communication thread |
| * sends the state every second |
| * @return |
| */ |
| synchronized TaskStatus.State getState(){ |
| return this.taskStatus.getRunState(); |
| } |
| /** |
| * Set current state of the task. |
| * @param state |
| */ |
| synchronized void setState(TaskStatus.State state){ |
| this.taskStatus.setRunState(state); |
| } |
| |
| void setTaskCleanupTask() { |
| taskCleanup = true; |
| } |
| |
| boolean isTaskCleanupTask() { |
| return taskCleanup; |
| } |
| |
| boolean isJobCleanupTask() { |
| return jobCleanup; |
| } |
| |
| boolean isJobAbortTask() { |
| // the task is an abort task if its marked for cleanup and the final |
| // expected state is either failed or killed. |
| return isJobCleanupTask() |
| && (jobRunStateForCleanup == JobStatus.State.KILLED |
| || jobRunStateForCleanup == JobStatus.State.FAILED); |
| } |
| |
| boolean isJobSetupTask() { |
| return jobSetup; |
| } |
| |
| void setJobSetupTask() { |
| jobSetup = true; |
| } |
| |
| void setJobCleanupTask() { |
| jobCleanup = true; |
| } |
| |
| /** |
| * Sets the task to do job abort in the cleanup. |
| * @param status the final runstate of the job. |
| */ |
| void setJobCleanupTaskState(JobStatus.State status) { |
| jobRunStateForCleanup = status; |
| } |
| |
| boolean isMapOrReduce() { |
| return !jobSetup && !jobCleanup && !taskCleanup; |
| } |
| |
| /** |
| * Get the name of the user running the job/task. TaskTracker needs task's |
| * user name even before it's JobConf is localized. So we explicitly serialize |
| * the user name. |
| * |
| * @return user |
| */ |
| String getUser() { |
| return user; |
| } |
| |
| void setUser(String user) { |
| this.user = user; |
| } |
| |
| /** |
| * Return the task's MapOutputFile instance. |
| * @return the task's MapOutputFile instance |
| */ |
| MapOutputFile getMapOutputFile() { |
| return mapOutputFile; |
| } |
| |
| //////////////////////////////////////////// |
| // Writable methods |
| //////////////////////////////////////////// |
| |
| public void write(DataOutput out) throws IOException { |
| Text.writeString(out, jobFile); |
| taskId.write(out); |
| taskIdForUmbilical.write(out); |
| out.writeInt(partition); |
| out.writeInt(numSlotsRequired); |
| taskStatus.write(out); |
| skipRanges.write(out); |
| out.writeBoolean(skipping); |
| out.writeBoolean(jobCleanup); |
| if (jobCleanup) { |
| WritableUtils.writeEnum(out, jobRunStateForCleanup); |
| } |
| out.writeBoolean(jobSetup); |
| out.writeBoolean(writeSkipRecs); |
| out.writeBoolean(taskCleanup); |
| Text.writeString(out, user); |
| } |
| |
| public void readFields(DataInput in) throws IOException { |
| jobFile = Text.readString(in); |
| taskId = TaskAttemptID.read(in); |
| taskIdForUmbilical = TaskAttemptID.read(in); |
| partition = in.readInt(); |
| numSlotsRequired = in.readInt(); |
| taskStatus.readFields(in); |
| skipRanges.readFields(in); |
| currentRecIndexIterator = skipRanges.skipRangeIterator(); |
| currentRecStartIndex = currentRecIndexIterator.next(); |
| skipping = in.readBoolean(); |
| jobCleanup = in.readBoolean(); |
| if (jobCleanup) { |
| jobRunStateForCleanup = |
| WritableUtils.readEnum(in, JobStatus.State.class); |
| } |
| jobSetup = in.readBoolean(); |
| writeSkipRecs = in.readBoolean(); |
| taskCleanup = in.readBoolean(); |
| if (taskCleanup) { |
| setPhase(TaskStatus.Phase.CLEANUP); |
| } |
| user = Text.readString(in); |
| } |
| |
| @Override |
| public String toString() { return taskId.toString(); } |
| |
| /** |
| * Localize the given JobConf to be specific for this task. |
| */ |
| public void localizeConfiguration(JobConf conf) throws IOException { |
| conf.set(JobContext.TASK_ID, taskId.getTaskID().toString()); |
| conf.set(JobContext.TASK_ATTEMPT_ID, taskId.toString()); |
| conf.setBoolean(JobContext.TASK_ISMAP, isMapTask()); |
| conf.setInt(JobContext.TASK_PARTITION, partition); |
| conf.set(JobContext.ID, taskId.getJobID().toString()); |
| } |
| |
| /** Run this task as a part of the named job. This method is executed in the |
| * child process and is what invokes user-supplied map, reduce, etc. methods. |
| * @param umbilical for progress reports |
| */ |
| public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical) |
| throws IOException, ClassNotFoundException, InterruptedException; |
| |
| |
| /** Return an approprate thread runner for this task. |
| * @param tip TODO*/ |
| public abstract TaskRunner createRunner(TaskTracker tracker, |
| TaskTracker.TaskInProgress tip, |
| TaskTracker.RunningJob rjob |
| ) throws IOException; |
| |
| /** The number of milliseconds between progress reports. */ |
| public static final int PROGRESS_INTERVAL = 3000; |
| |
| private transient Progress taskProgress = new Progress(); |
| |
| // Current counters |
| private transient Counters counters = new Counters(); |
| |
| /* flag to track whether task is done */ |
| private AtomicBoolean taskDone = new AtomicBoolean(false); |
| |
| public abstract boolean isMapTask(); |
| |
| /** |
| * Is this really a combo-task masquerading as a plain ReduceTask? |
| */ |
| public abstract boolean isUberTask(); |
| |
| /** |
| * This setter allows one to incorporate Tasks into multiple levels of |
| * a Progress addPhase()-generated hierarchy (i.e., not always the root |
| * node), which in turn allows Progress to handle all details of progress |
| * aggregation for an UberTask or even a whole job. |
| */ |
| protected void setProgress(Progress progress) { |
| taskProgress = progress; |
| } |
| |
| public Progress getProgress() { return taskProgress; } |
| |
| public void initialize(JobConf job, JobID id, |
| Reporter reporter, |
| boolean useNewApi) throws IOException, |
| ClassNotFoundException, |
| InterruptedException { |
| jobContext = new JobContextImpl(job, id, reporter); |
| // taskId (rather than taskIdForUmbilical) is required here to avoid |
| // collisions in uber-subtasks' outputs; it ultimately determines both the |
| // name of the pre-commit HDFS working directory (_temporary/_attempt_xxx) |
| // and the (main) filename in that directory (part-[rm]-nnnnn). UberTask |
| // will handle any tempdir/commit-related details. |
| taskContext = new TaskAttemptContextImpl(job, taskId, reporter); |
| if (getState() == TaskStatus.State.UNASSIGNED) { |
| setState(TaskStatus.State.RUNNING); |
| } |
| if (useNewApi) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("using new api for output committer"); |
| } |
| outputFormat = |
| ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job); |
| committer = outputFormat.getOutputCommitter(taskContext); |
| } else { |
| committer = conf.getOutputCommitter(); |
| } |
| // this will typically be on HDFS, not the node's local filesystem: |
| Path outputPath = FileOutputFormat.getOutputPath(conf); |
| if (outputPath != null) { |
| if ((committer instanceof FileOutputCommitter)) { |
| FileOutputFormat.setWorkOutputPath(conf, |
| ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext)); |
| } else { |
| FileOutputFormat.setWorkOutputPath(conf, outputPath); |
| } |
| } |
| committer.setupTask(taskContext); |
| Class<? extends ResourceCalculatorPlugin> clazz = |
| conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN, |
| null, ResourceCalculatorPlugin.class); |
| resourceCalculator = ResourceCalculatorPlugin |
| .getResourceCalculatorPlugin(clazz, conf); |
| LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator); |
| if (resourceCalculator != null) { |
| initCpuCumulativeTime = |
| resourceCalculator.getProcResourceValues().getCumulativeCpuTime(); |
| } |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| protected class TaskReporter |
| extends org.apache.hadoop.mapreduce.StatusReporter |
| implements Runnable, Reporter { |
| private TaskUmbilicalProtocol umbilical; |
| private InputSplit split = null; |
| private Progress taskProgress; |
| private Thread pingThread = null; |
| private boolean done = true; |
| private Object lock = new Object(); |
| |
| /** |
| * flag that indicates whether progress update needs to be sent to parent. |
| * If true, it has been set. If false, it has been reset. |
| * Using AtomicBoolean since we need an atomic read & reset method. |
| */ |
| private AtomicBoolean progressFlag = new AtomicBoolean(false); |
| |
| TaskReporter(Progress taskProgress, |
| TaskUmbilicalProtocol umbilical) { |
| this.umbilical = umbilical; |
| this.taskProgress = taskProgress; |
| } |
| |
| // getters and setters for flag |
| void setProgressFlag() { |
| progressFlag.set(true); |
| } |
| boolean resetProgressFlag() { |
| return progressFlag.getAndSet(false); |
| } |
| public void setStatus(String status) { |
| taskProgress.setStatus(status); |
| // indicate that progress update needs to be sent |
| setProgressFlag(); |
| } |
| public void setProgress(float progress) { |
| // set current phase progress. |
| // This method assumes that task has phases. |
| taskProgress.phase().set(progress); |
| // indicate that progress update needs to be sent |
| setProgressFlag(); |
| } |
| // FIXME? why isn't this deprecated in favor of public setProgressFlag()? |
| public void progress() { |
| // indicate that progress update needs to be sent |
| setProgressFlag(); |
| } |
| public Counters.Counter getCounter(String group, String name) { |
| Counters.Counter counter = null; |
| if (counters != null) { |
| counter = counters.findCounter(group, name); |
| } |
| return counter; |
| } |
| public Counters.Counter getCounter(Enum<?> name) { |
| return counters == null ? null : counters.findCounter(name); |
| } |
| public void incrCounter(Enum key, long amount) { |
| if (counters != null) { |
| counters.incrCounter(key, amount); |
| } |
| setProgressFlag(); |
| } |
| public void incrCounter(String group, String counter, long amount) { |
| if (counters != null) { |
| counters.incrCounter(group, counter, amount); |
| } |
| if(skipping && SkipBadRecords.COUNTER_GROUP.equals(group) && ( |
| SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) || |
| SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.equals(counter))) { |
| //if application reports the processed records, move the |
| //currentRecStartIndex to the next. |
| //currentRecStartIndex is the start index which has not yet been |
| //finished and is still in task's stomach. |
| for(int i=0;i<amount;i++) { |
| currentRecStartIndex = currentRecIndexIterator.next(); |
| } |
| } |
| setProgressFlag(); |
| } |
| public void setInputSplit(InputSplit split) { |
| this.split = split; |
| } |
| public InputSplit getInputSplit() throws UnsupportedOperationException { |
| if (split == null) { |
| throw new UnsupportedOperationException("Input available only on map"); |
| } else { |
| return split; |
| } |
| } |
| /** |
| * The communication thread handles communication with the parent |
| * (TaskTracker). It sends progress updates if progress has been made or |
| * if the task needs to let the parent know that it's alive. It also pings |
| * the parent to see if it's alive. |
| */ |
| public void run() { |
| final int MAX_RETRIES = 3; |
| int remainingRetries = MAX_RETRIES; |
| // get current flag value and reset it as well |
| boolean sendProgress = resetProgressFlag(); |
| while (!taskDone.get()) { |
| synchronized(lock) { |
| done = false; |
| } |
| try { |
| boolean taskFound = true; // whether TT knows about this task |
| // sleep for a bit |
| try { |
| Thread.sleep(PROGRESS_INTERVAL); |
| } |
| catch (InterruptedException e) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(getTaskID() + " Progress/ping thread exiting " + |
| "since it got interrupted"); |
| } |
| break; |
| } |
| |
| if (sendProgress) { |
| // we need to send progress update |
| updateCounters(); |
| taskStatus.statusUpdate(taskProgress.get(), |
| taskProgress.toString(), |
| counters); |
| taskFound = umbilical.statusUpdate(taskId, taskStatus); |
| taskStatus.clearStatus(); |
| } |
| else { |
| // send ping |
| taskFound = umbilical.ping(taskId); |
| } |
| |
| // if TaskTracker is not aware of our task ID (probably because it |
| // died and came back up), kill ourselves |
| if (!taskFound) { |
| LOG.warn("Parent died. Exiting "+taskId); |
| resetDoneFlag(); |
| System.exit(66); |
| } |
| |
| sendProgress = resetProgressFlag(); |
| remainingRetries = MAX_RETRIES; |
| } |
| catch (Throwable t) { |
| LOG.info("Communication exception: " + StringUtils.stringifyException(t)); |
| remainingRetries -=1; |
| if (remainingRetries == 0) { |
| ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); |
| LOG.warn("Last retry, killing "+taskId); |
| resetDoneFlag(); |
| System.exit(65); |
| } |
| } |
| } |
| //Notify that we are done with the work |
| resetDoneFlag(); |
| } |
| |
| void resetDoneFlag() { |
| synchronized(lock) { |
| done = true; |
| lock.notify(); |
| } |
| } |
| |
| public void startCommunicationThread() { |
| if (pingThread == null) { |
| pingThread = new Thread(this, "communication thread"); |
| pingThread.setDaemon(true); |
| pingThread.start(); |
| } |
| } |
| public void stopCommunicationThread() throws InterruptedException { |
| if (pingThread != null) { |
| synchronized(lock) { |
| while(!done) { |
| lock.wait(); |
| } |
| } |
| pingThread.interrupt(); |
| pingThread.join(); |
| } |
| } |
| } |
| |
| /** |
| * Reports the next executing record range to TaskTracker. |
| * |
| * @param umbilical |
| * @param nextRecIndex the record index which would be fed next. |
| * @throws IOException |
| */ |
| protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical, |
| long nextRecIndex) throws IOException{ |
| //currentRecStartIndex is the start index which has not yet been finished |
| //and is still in task's stomach. |
| long len = nextRecIndex - currentRecStartIndex +1; |
| SortedRanges.Range range = |
| new SortedRanges.Range(currentRecStartIndex, len); |
| taskStatus.setNextRecordRange(range); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("sending reportNextRecordRange " + range); |
| } |
| umbilical.reportNextRecordRange(taskIdForUmbilical, range); |
| } |
| |
| /** |
| * Create a TaskReporter and start communication thread |
| */ |
| TaskReporter startReporter(final TaskUmbilicalProtocol umbilical) { |
| // start thread that will handle communication with parent |
| TaskReporter reporter = new TaskReporter(getProgress(), umbilical); |
| reporter.startCommunicationThread(); |
| return reporter; |
| } |
| |
| /** |
| * Update resource information counters |
| */ |
| void updateResourceCounters() { |
| if (resourceCalculator == null) { |
| return; |
| } |
| ProcResourceValues res = resourceCalculator.getProcResourceValues(); |
| long cpuTime = res.getCumulativeCpuTime(); |
| long pMem = res.getPhysicalMemorySize(); |
| long vMem = res.getVirtualMemorySize(); |
| // Remove the CPU time consumed previously by JVM reuse |
| cpuTime -= initCpuCumulativeTime; |
| counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime); |
| counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem); |
| counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem); |
| } |
| |
| /** |
| * An updater that tracks the amount of time this task has spent in GC. |
| */ |
| class GcTimeUpdater { |
| private long lastGcMillis = 0; |
| private List<GarbageCollectorMXBean> gcBeans = null; |
| |
| public GcTimeUpdater() { |
| this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); |
| getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent. |
| } |
| |
| /** |
| * @return the number of milliseconds that the gc has used for CPU |
| * since the last time this method was called. |
| */ |
| protected long getElapsedGc() { |
| long thisGcMillis = 0; |
| for (GarbageCollectorMXBean gcBean : gcBeans) { |
| thisGcMillis += gcBean.getCollectionTime(); |
| } |
| |
| long delta = thisGcMillis - lastGcMillis; |
| this.lastGcMillis = thisGcMillis; |
| return delta; |
| } |
| |
| /** |
| * Increment the gc-elapsed-time counter. |
| */ |
| public void incrementGcCounter() { |
| if (null == counters) { |
| return; // nothing to do. |
| } |
| |
| Counter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS); |
| if (null != gcCounter) { |
| gcCounter.increment(getElapsedGc()); |
| } |
| } |
| } |
| |
| /** |
| * An updater that tracks the last number reported for a given file |
| * system and only creates the counters when they are needed. |
| */ |
| class FileSystemStatisticUpdater { |
| private FileSystem.Statistics stats; |
| private Counters.Counter readBytesCounter, writeBytesCounter, |
| readOpsCounter, largeReadOpsCounter, writeOpsCounter; |
| |
| FileSystemStatisticUpdater(FileSystem.Statistics stats) { |
| this.stats = stats; |
| } |
| |
| void updateCounters() { |
| String scheme = stats.getScheme(); |
| if (readBytesCounter == null) { |
| readBytesCounter = counters.findCounter(scheme, |
| FileSystemCounter.BYTES_READ); |
| } |
| readBytesCounter.setValue(stats.getBytesRead()); |
| if (writeBytesCounter == null) { |
| writeBytesCounter = counters.findCounter(scheme, |
| FileSystemCounter.BYTES_WRITTEN); |
| } |
| writeBytesCounter.setValue(stats.getBytesWritten()); |
| if (readOpsCounter == null) { |
| readOpsCounter = counters.findCounter(scheme, |
| FileSystemCounter.READ_OPS); |
| } |
| readOpsCounter.setValue(stats.getReadOps()); |
| if (largeReadOpsCounter == null) { |
| largeReadOpsCounter = counters.findCounter(scheme, |
| FileSystemCounter.LARGE_READ_OPS); |
| } |
| largeReadOpsCounter.setValue(stats.getLargeReadOps()); |
| if (writeOpsCounter == null) { |
| writeOpsCounter = counters.findCounter(scheme, |
| FileSystemCounter.WRITE_OPS); |
| } |
| writeOpsCounter.setValue(stats.getWriteOps()); |
| } |
| } |
| |
| /** |
| * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater |
| */ |
| private Map<String, FileSystemStatisticUpdater> statisticUpdaters = |
| new HashMap<String, FileSystemStatisticUpdater>(); |
| |
| private synchronized void updateCounters() { |
| for(Statistics stat: FileSystem.getAllStatistics()) { |
| String uriScheme = stat.getScheme(); |
| FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme); |
| if(updater==null) {//new FileSystem has been found in the cache |
| updater = new FileSystemStatisticUpdater(stat); |
| statisticUpdaters.put(uriScheme, updater); |
| } |
| updater.updateCounters(); |
| } |
| |
| gcUpdater.incrementGcCounter(); |
| updateResourceCounters(); |
| } |
| |
| public void done(TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter |
| ) throws IOException, InterruptedException { |
| if (isUberTask()) { |
| LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId |
| + "' is done and is in the process of committing."); |
| } else { |
| LOG.info("Task '" + taskId |
| + "' is done and is in the process of committing."); |
| } |
| updateCounters(); |
| |
| boolean commitRequired = isCommitRequired(); |
| if (commitRequired) { |
| int retries = MAX_RETRIES; |
| setState(TaskStatus.State.COMMIT_PENDING); |
| // say the task tracker that task is commit pending |
| while (true) { |
| try { |
| umbilical.commitPending(taskIdForUmbilical, taskStatus); |
| break; |
| } catch (InterruptedException ie) { |
| // ignore |
| } catch (IOException ie) { |
| LOG.warn("Failure sending commit pending: " + |
| StringUtils.stringifyException(ie)); |
| if (--retries == 0) { |
| System.exit(67); |
| } |
| } |
| } |
| // wait for commit approval (via JT commitAction for this task) and commit |
| commitAfterApproval(umbilical, reporter); |
| } |
| taskDone.set(true); |
| reporter.stopCommunicationThread(); |
| // Make sure we send at least one set of counter increments. It's |
| // ok to call updateCounters() in this thread after comm thread stopped. |
| updateCounters(); |
| sendLastUpdate(umbilical); |
| //signal the tasktracker that we are done |
| sendDone(umbilical); |
| } |
| |
| /** |
| * Checks if this task has anything to commit, depending on the |
| * type of task, as well as on whether the {@link OutputCommitter} |
| * has anything to commit. |
| * |
| * @return true if the task has to commit |
| * @throws IOException |
| */ |
| boolean isCommitRequired() throws IOException { |
| boolean commitRequired = false; |
| if (isMapOrReduce()) { |
| commitRequired = committer.needsTaskCommit(taskContext); |
| } |
| return commitRequired; |
| } |
| |
| /** |
| * Send a status update to the task tracker |
| * @param umbilical |
| * @throws IOException |
| */ |
| public void statusUpdate(TaskUmbilicalProtocol umbilical) |
| throws IOException { |
| int retries = MAX_RETRIES; |
| while (true) { |
| try { |
| // FIXME (later): alternatives to taskIdForUmbilical would be |
| // (1) include taskId as part of umbilical object and protocol; |
| // (2) include taskId as part of taskStatus |
| // (3) extend TaskAttemptID (or create related Task inner class?) to |
| // include taskAttemptId() and taskAtteptIdForUmbilical() method |
| // that's overridden in uber context [Dick] |
| if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) { |
| LOG.warn("Parent died. Exiting " + taskId); |
| System.exit(66); |
| } |
| taskStatus.clearStatus(); |
| return; |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); // interrupt ourself |
| } catch (IOException ie) { |
| LOG.warn("Failure sending status update: " + |
| StringUtils.stringifyException(ie)); |
| if (--retries == 0) { |
| throw ie; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Sends last status update before sending umbilical.done(); |
| */ |
| private void sendLastUpdate(TaskUmbilicalProtocol umbilical) |
| throws IOException { |
| taskStatus.setOutputSize(calculateOutputSize()); |
| // send a final status report |
| taskStatus.statusUpdate(taskProgress.get(), |
| taskProgress.toString(), |
| counters); |
| statusUpdate(umbilical); |
| } |
| |
| /** |
| * Calculates the size of output for this task. |
| * |
| * @return -1 if it can't be found. |
| */ |
| private long calculateOutputSize() throws IOException { |
| if (!isMapOrReduce() || isUberTask()) { |
| return -1; |
| } |
| |
| if (isMapTask() && conf.getNumReduceTasks() > 0) { |
| try { |
| Path mapOutput = mapOutputFile.getOutputFile(); |
| FileSystem localFS = FileSystem.getLocal(conf); |
| return localFS.getFileStatus(mapOutput).getLen(); |
| } catch (IOException e) { |
| LOG.warn ("Could not find output size " , e); |
| } |
| } |
| return -1; |
| } |
| |
| private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException { |
| int retries = MAX_RETRIES; |
| while (true) { |
| try { |
| umbilical.done(taskIdForUmbilical); |
| if (isUberTask()) { |
| LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId |
| + "' done."); |
| } else { |
| LOG.info("Task '" + taskId + "' done."); |
| } |
| return; |
| } catch (IOException ie) { |
| LOG.warn("Failure signalling completion: " + |
| StringUtils.stringifyException(ie)); |
| if (--retries == 0) { |
| throw ie; |
| } |
| } |
| } |
| } |
| |
| private void commitAfterApproval(TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter) throws IOException { |
| int retries = MAX_RETRIES; |
| while (true) { |
| try { |
| while (!umbilical.canCommit(taskIdForUmbilical)) { |
| try { |
| // FIXME 1: shouldn't this count down retries, too, in case JT glitched and no longer knows about us? (else infinite loop) |
| Thread.sleep(1000); // FIXME 2: shouldn't hardcoded 1-second sleep instead correspond to heartbeat interval for task? |
| } catch(InterruptedException ie) { |
| //ignore |
| } |
| reporter.setProgressFlag(); |
| } |
| break; |
| } catch (IOException ie) { |
| LOG.warn("Failure asking whether task can commit: " + |
| StringUtils.stringifyException(ie)); |
| if (--retries == 0) { |
| // if it couldn't query successfully then delete the output |
| discardOutput(taskContext); |
| System.exit(68); |
| } |
| } |
| } |
| |
| // task can Commit now |
| commit(umbilical, reporter); |
| } |
| |
| // this is protected (rather than private) solely for UberTask |
| protected void commit(TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter) throws IOException { |
| try { |
| LOG.info("Task " + taskId + " is allowed to commit now"); |
| committer.commitTask(taskContext); |
| return; |
| } catch (IOException ioe) { |
| LOG.warn("Failure committing: " + |
| StringUtils.stringifyException(ioe)); |
| // if it couldn't commit successfully then delete the output |
| discardOutput(taskContext); |
| throw ioe; |
| } |
| } |
| |
| private |
| void discardOutput(TaskAttemptContext taskContext) { |
| try { |
| committer.abortTask(taskContext); |
| } catch (IOException ioe) { |
| LOG.warn("Failure cleaning up: " + |
| StringUtils.stringifyException(ioe)); |
| } |
| } |
| |
| protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter) |
| throws IOException, InterruptedException { |
| taskCleanup(umbilical); |
| done(umbilical, reporter); |
| } |
| |
| void taskCleanup(TaskUmbilicalProtocol umbilical) |
| throws IOException { |
| // set phase for this task |
| setPhase(TaskStatus.Phase.CLEANUP); |
| getProgress().setStatus("cleanup"); |
| statusUpdate(umbilical); |
| LOG.info("Running cleanup for the task"); |
| // do the cleanup |
| committer.abortTask(taskContext); |
| } |
| |
| protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter |
| ) throws IOException, InterruptedException { |
| // set phase for this task |
| setPhase(TaskStatus.Phase.CLEANUP); |
| getProgress().setStatus("cleanup"); |
| statusUpdate(umbilical); |
| // do the cleanup |
| LOG.info("Cleaning up job"); |
| if (jobRunStateForCleanup == JobStatus.State.FAILED |
| || jobRunStateForCleanup == JobStatus.State.KILLED) { |
| LOG.info("Aborting job with runstate : " + jobRunStateForCleanup.name()); |
| if (conf.getUseNewMapper()) { |
| committer.abortJob(jobContext, jobRunStateForCleanup); |
| } else { |
| org.apache.hadoop.mapred.OutputCommitter oldCommitter = |
| (org.apache.hadoop.mapred.OutputCommitter)committer; |
| oldCommitter.abortJob(jobContext, jobRunStateForCleanup); |
| } |
| } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){ |
| // delete <outputdir>/_temporary and optionally create _SUCCESS file |
| if (!isUberTask()) { // defer since output files have not yet been saved |
| commitJob(); |
| } |
| } else { |
| throw new IOException("Invalid state of the job for cleanup. State found " |
| + jobRunStateForCleanup + " expecting " |
| + JobStatus.State.SUCCEEDED + ", " |
| + JobStatus.State.FAILED + " or " |
| + JobStatus.State.KILLED); |
| } |
| |
| // delete the staging area for the job (e.g., |
| // "hdfs://localhost:9000/tmp/hadoop-<user>/mapred/staging/<user>/.staging/ |
| // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir) |
| JobConf conf = new JobConf(jobContext.getConfiguration()); |
| if (!supportIsolationRunner(conf)) { |
| String jobStagingDir = conf.get("mapreduce.job.dir"); |
| Path jobStagingDirPath = new Path(jobStagingDir); |
| FileSystem fs = jobStagingDirPath.getFileSystem(conf); |
| fs.delete(jobStagingDirPath, true); |
| } |
| // update counters, save any pending output files, shut down the progress- |
| // reporter communication thread and the umbilical, and mark the task done |
| if (!isUberTask()) { // defer so UberTask can send TT final update(s) |
| done(umbilical, reporter); |
| } |
| } |
| |
| protected void commitJob() throws IOException { |
| LOG.info("Committing job"); |
| committer.commitJob(jobContext); |
| } |
| |
| protected boolean supportIsolationRunner(JobConf conf) { |
| return (conf.getKeepTaskFilesPattern() != null || |
| conf.getKeepFailedTaskFiles()); |
| } |
| |
| protected void runJobSetupTask(TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter |
| ) throws IOException, InterruptedException { |
| // do the setup |
| getProgress().setStatus("setup"); |
| committer.setupJob(jobContext); |
| if (!isUberTask()) { |
| // UberTask calls done() directly; don't shut down umbilical prematurely |
| done(umbilical, reporter); |
| } |
| } |
| |
| public void setConf(Configuration conf) { |
| if (conf instanceof JobConf) { |
| this.conf = (JobConf) conf; |
| } else { |
| this.conf = new JobConf(conf); |
| } |
| this.mapOutputFile = ReflectionUtils.newInstance( |
| conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, |
| MROutputFiles.class, MapOutputFile.class), conf); |
| this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); |
| // add the static resolutions (this is required for the junit to |
| // work on testcases that simulate multiple nodes on a single physical |
| // node. |
| String hostToResolved[] = conf.getStrings(TTConfig.TT_STATIC_RESOLUTIONS); |
| if (hostToResolved != null) { |
| for (String str : hostToResolved) { |
| String name = str.substring(0, str.indexOf('=')); |
| String resolvedName = str.substring(str.indexOf('=') + 1); |
| NetUtils.addStaticResolution(name, resolvedName); |
| } |
| } |
| } |
| |
| public Configuration getConf() { |
| return this.conf; |
| } |
| |
| /** |
| * OutputCollector for the combiner. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public static class CombineOutputCollector<K extends Object, V extends Object> |
| implements OutputCollector<K, V> { |
| private Writer<K, V> writer; |
| private Counters.Counter outCounter; |
| public CombineOutputCollector(Counters.Counter outCounter) { |
| this.outCounter = outCounter; |
| } |
| public synchronized void setWriter(Writer<K, V> writer) { |
| this.writer = writer; |
| } |
| public synchronized void collect(K key, V value) |
| throws IOException { |
| outCounter.increment(1); |
| writer.append(key, value); |
| } |
| } |
| |
| /** Iterates values while keys match in sorted input. */ |
| static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> { |
| protected RawKeyValueIterator in; //input iterator |
| private KEY key; // current key |
| private KEY nextKey; |
| private VALUE value; // current value |
| private boolean hasNext; // more w/ this key |
| private boolean more; // more in file |
| private RawComparator<KEY> comparator; |
| protected Progressable reporter; |
| private Deserializer<KEY> keyDeserializer; |
| private Deserializer<VALUE> valDeserializer; |
| private DataInputBuffer keyIn = new DataInputBuffer(); |
| private DataInputBuffer valueIn = new DataInputBuffer(); |
| |
| public ValuesIterator (RawKeyValueIterator in, |
| RawComparator<KEY> comparator, |
| Class<KEY> keyClass, |
| Class<VALUE> valClass, Configuration conf, |
| Progressable reporter) |
| throws IOException { |
| this.in = in; |
| this.comparator = comparator; |
| this.reporter = reporter; |
| SerializationFactory serializationFactory = new SerializationFactory(conf); |
| this.keyDeserializer = serializationFactory.getDeserializer(keyClass); |
| this.keyDeserializer.open(keyIn); |
| this.valDeserializer = serializationFactory.getDeserializer(valClass); |
| this.valDeserializer.open(this.valueIn); |
| readNextKey(); |
| key = nextKey; |
| nextKey = null; // force new instance creation |
| hasNext = more; |
| } |
| |
| RawKeyValueIterator getRawIterator() { return in; } |
| |
| /// Iterator methods |
| |
| public boolean hasNext() { return hasNext; } |
| |
| private int ctr = 0; |
| public VALUE next() { |
| if (!hasNext) { |
| throw new NoSuchElementException("iterate past last value"); |
| } |
| try { |
| readNextValue(); |
| readNextKey(); |
| } catch (IOException ie) { |
| throw new RuntimeException("problem advancing post rec#"+ctr, ie); |
| } |
| reporter.progress(); |
| return value; |
| } |
| |
| public void remove() { throw new RuntimeException("not implemented"); } |
| |
| /// Auxiliary methods |
| |
| /** Start processing next unique key. */ |
| public void nextKey() throws IOException { |
| // read until we find a new key |
| while (hasNext) { |
| readNextKey(); |
| } |
| ++ctr; |
| |
| // move the next key to the current one |
| KEY tmpKey = key; |
| key = nextKey; |
| nextKey = tmpKey; |
| hasNext = more; |
| } |
| |
| /** True iff more keys remain. */ |
| public boolean more() { |
| return more; |
| } |
| |
| /** The current key. */ |
| public KEY getKey() { |
| return key; |
| } |
| |
| /** |
| * read the next key |
| */ |
| private void readNextKey() throws IOException { |
| more = in.next(); |
| if (more) { |
| DataInputBuffer nextKeyBytes = in.getKey(); |
| keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength()); |
| nextKey = keyDeserializer.deserialize(nextKey); |
| hasNext = key != null && (comparator.compare(key, nextKey) == 0); |
| } else { |
| hasNext = false; |
| } |
| } |
| |
| /** |
| * Read the next value |
| * @throws IOException |
| */ |
| private void readNextValue() throws IOException { |
| DataInputBuffer nextValueBytes = in.getValue(); |
| valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength()); |
| value = valDeserializer.deserialize(value); |
| } |
| } |
| |
| /** Iterator to return Combined values */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public static class CombineValuesIterator<KEY,VALUE> |
| extends ValuesIterator<KEY,VALUE> { |
| |
| private final Counters.Counter combineInputCounter; |
| |
| public CombineValuesIterator(RawKeyValueIterator in, |
| RawComparator<KEY> comparator, Class<KEY> keyClass, |
| Class<VALUE> valClass, Configuration conf, Reporter reporter, |
| Counters.Counter combineInputCounter) throws IOException { |
| super(in, comparator, keyClass, valClass, conf, reporter); |
| this.combineInputCounter = combineInputCounter; |
| } |
| |
| public VALUE next() { |
| combineInputCounter.increment(1); |
| return super.next(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> |
| org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context |
| createReduceContext(org.apache.hadoop.mapreduce.Reducer |
| <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer, |
| Configuration job, |
| org.apache.hadoop.mapreduce.TaskAttemptID taskId, |
| RawKeyValueIterator rIter, |
| org.apache.hadoop.mapreduce.Counter inputKeyCounter, |
| org.apache.hadoop.mapreduce.Counter inputValueCounter, |
| org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, |
| org.apache.hadoop.mapreduce.OutputCommitter committer, |
| org.apache.hadoop.mapreduce.StatusReporter reporter, |
| RawComparator<INKEY> comparator, |
| Class<INKEY> keyClass, Class<INVALUE> valueClass |
| ) throws IOException, InterruptedException { |
| org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> |
| reduceContext = |
| new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, |
| rIter, |
| inputKeyCounter, |
| inputValueCounter, |
| output, |
| committer, |
| reporter, |
| comparator, |
| keyClass, |
| valueClass); |
| |
| org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context |
| reducerContext = |
| new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext( |
| reduceContext); |
| |
| return reducerContext; |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| protected static abstract class CombinerRunner<K,V> { |
| protected final Counters.Counter inputCounter; |
| protected final JobConf job; |
| protected final TaskReporter reporter; |
| |
| CombinerRunner(Counters.Counter inputCounter, |
| JobConf job, |
| TaskReporter reporter) { |
| this.inputCounter = inputCounter; |
| this.job = job; |
| this.reporter = reporter; |
| } |
| |
| /** |
| * Run the combiner over a set of inputs. |
| * @param iterator the key/value pairs to use as input |
| * @param collector the output collector |
| */ |
| abstract void combine(RawKeyValueIterator iterator, |
| OutputCollector<K,V> collector |
| ) throws IOException, InterruptedException, |
| ClassNotFoundException; |
| |
| @SuppressWarnings("unchecked") |
| static <K,V> |
| CombinerRunner<K,V> create(JobConf job, |
| TaskAttemptID taskId, |
| Counters.Counter inputCounter, |
| TaskReporter reporter, |
| org.apache.hadoop.mapreduce.OutputCommitter committer |
| ) throws ClassNotFoundException { |
| Class<? extends Reducer<K,V,K,V>> cls = |
| (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); |
| |
| if (cls != null) { |
| return new OldCombinerRunner(cls, job, inputCounter, reporter); |
| } |
| // make a task context so we can get the classes |
| org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = |
| new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId); |
| Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = |
| (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) |
| taskContext.getCombinerClass(); |
| if (newcls != null) { |
| return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, |
| inputCounter, reporter, committer); |
| } |
| |
| return null; |
| } |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| protected static class OldCombinerRunner<K,V> extends CombinerRunner<K,V> { |
| private final Class<? extends Reducer<K,V,K,V>> combinerClass; |
| private final Class<K> keyClass; |
| private final Class<V> valueClass; |
| private final RawComparator<K> comparator; |
| |
| @SuppressWarnings("unchecked") |
| protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls, |
| JobConf conf, |
| Counters.Counter inputCounter, |
| TaskReporter reporter) { |
| super(inputCounter, conf, reporter); |
| combinerClass = cls; |
| keyClass = (Class<K>) job.getMapOutputKeyClass(); |
| valueClass = (Class<V>) job.getMapOutputValueClass(); |
| comparator = (RawComparator<K>) job.getOutputKeyComparator(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected void combine(RawKeyValueIterator kvIter, |
| OutputCollector<K,V> combineCollector |
| ) throws IOException { |
| Reducer<K,V,K,V> combiner = |
| ReflectionUtils.newInstance(combinerClass, job); |
| try { |
| CombineValuesIterator<K,V> values = |
| new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, |
| valueClass, job, Reporter.NULL, |
| inputCounter); |
| while (values.more()) { |
| combiner.reduce(values.getKey(), values, combineCollector, |
| Reporter.NULL); |
| values.nextKey(); |
| } |
| } finally { |
| combiner.close(); |
| } |
| } |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| protected static class NewCombinerRunner<K, V> extends CombinerRunner<K,V> { |
| private final Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> |
| reducerClass; |
| private final org.apache.hadoop.mapreduce.TaskAttemptID taskId; |
| private final RawComparator<K> comparator; |
| private final Class<K> keyClass; |
| private final Class<V> valueClass; |
| private final org.apache.hadoop.mapreduce.OutputCommitter committer; |
| |
| @SuppressWarnings("unchecked") |
| NewCombinerRunner(Class reducerClass, |
| JobConf job, |
| org.apache.hadoop.mapreduce.TaskAttemptID taskId, |
| org.apache.hadoop.mapreduce.TaskAttemptContext context, |
| Counters.Counter inputCounter, |
| TaskReporter reporter, |
| org.apache.hadoop.mapreduce.OutputCommitter committer) { |
| super(inputCounter, job, reporter); |
| this.reducerClass = reducerClass; |
| this.taskId = taskId; |
| keyClass = (Class<K>) context.getMapOutputKeyClass(); |
| valueClass = (Class<V>) context.getMapOutputValueClass(); |
| comparator = (RawComparator<K>) context.getSortComparator(); |
| this.committer = committer; |
| } |
| |
| private static class OutputConverter<K,V> |
| extends org.apache.hadoop.mapreduce.RecordWriter<K,V> { |
| OutputCollector<K,V> output; |
| OutputConverter(OutputCollector<K,V> output) { |
| this.output = output; |
| } |
| |
| @Override |
| public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context){ |
| } |
| |
| @Override |
| public void write(K key, V value |
| ) throws IOException, InterruptedException { |
| output.collect(key,value); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| void combine(RawKeyValueIterator iterator, |
| OutputCollector<K,V> collector |
| ) throws IOException, InterruptedException, |
| ClassNotFoundException { |
| // make a reducer |
| org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer = |
| (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>) |
| ReflectionUtils.newInstance(reducerClass, job); |
| org.apache.hadoop.mapreduce.Reducer.Context |
| reducerContext = createReduceContext(reducer, job, taskId, |
| iterator, null, inputCounter, |
| new OutputConverter(collector), |
| committer, |
| reporter, comparator, keyClass, |
| valueClass); |
| reducer.run(reducerContext); |
| } |
| } |
| } |