blob: eec56cbe1a9717273f0a757c85ee27d3dc33a0c2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLConnection;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.IFile.*;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
/** A Reduce task. */
class ReduceTask extends Task {
static { // register a ctor
WritableFactories.setFactory
(ReduceTask.class,
new WritableFactory() {
public Writable newInstance() { return new ReduceTask(); }
});
}
private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
private int numMaps;
private ReduceCopier reduceCopier;
private CompressionCodec codec;
{
getProgress().setStatus("reduce");
setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
}
private Progress copyPhase;
private Progress sortPhase;
private Progress reducePhase;
private Counters.Counter reduceShuffleBytes =
getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES);
private Counters.Counter reduceInputKeyCounter =
getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
private Counters.Counter reduceInputValueCounter =
getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
private Counters.Counter reduceOutputCounter =
getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
private Counters.Counter reduceCombineOutputCounter =
getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
// A custom comparator for map output files. Here the ordering is determined
// by the file's size and path. In case of files with same size and different
// file paths, the first parameter is considered smaller than the second one.
// In case of files with same size and path are considered equal.
private Comparator<FileStatus> mapOutputFileComparator =
new Comparator<FileStatus>() {
public int compare(FileStatus a, FileStatus b) {
if (a.getLen() < b.getLen())
return -1;
else if (a.getLen() == b.getLen())
if (a.getPath().toString().equals(b.getPath().toString()))
return 0;
else
return -1;
else
return 1;
}
};
// A sorted set for keeping a set of map output files on disk
private final SortedSet<FileStatus> mapOutputFilesOnDisk =
new TreeSet<FileStatus>(mapOutputFileComparator);
public ReduceTask() {
super();
}
public ReduceTask(String jobFile, TaskAttemptID taskId,
int partition, int numMaps, int numSlotsRequired) {
super(jobFile, taskId, partition, numSlotsRequired);
this.numMaps = numMaps;
}
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
conf.getMapOutputCompressorClass(DefaultCodec.class);
return ReflectionUtils.newInstance(codecClass, conf);
}
return null;
}
@Override
public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
TaskTracker.RunningJob rjob
) throws IOException {
return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
}
@Override
public boolean isMapTask() {
return false;
}
public int getNumMaps() { return numMaps; }
/**
* Localize the given JobConf to be specific for this task.
*/
@Override
public void localizeConfiguration(JobConf conf) throws IOException {
super.localizeConfiguration(conf);
conf.setNumMapTasks(numMaps);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(numMaps); // write the number of maps
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
numMaps = in.readInt();
}
// Get the input files for the reducer.
private Path[] getMapFiles(FileSystem fs, boolean isLocal)
throws IOException {
List<Path> fileList = new ArrayList<Path>();
if (isLocal) {
// for local jobs
for(int i = 0; i < numMaps; ++i) {
fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
for (FileStatus filestatus : mapOutputFilesOnDisk) {
fileList.add(filestatus.getPath());
}
}
return fileList.toArray(new Path[0]);
}
private class ReduceValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
public ReduceValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
Configuration conf, Progressable reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
@Override
public VALUE next() {
reduceInputValueCounter.increment(1);
return moveToNext();
}
protected VALUE moveToNext() {
return super.next();
}
public void informReduceProgress() {
reducePhase.set(super.in.getProgress().get()); // update progress
reporter.progress();
}
}
private class SkippingReduceValuesIterator<KEY,VALUE>
extends ReduceValuesIterator<KEY,VALUE> {
private SkipRangeIterator skipIt;
private TaskUmbilicalProtocol umbilical;
private Counters.Counter skipGroupCounter;
private Counters.Counter skipRecCounter;
private long grpIndex = -1;
private Class<KEY> keyClass;
private Class<VALUE> valClass;
private SequenceFile.Writer skipWriter;
private boolean toWriteSkipRecs;
private boolean hasNext;
private TaskReporter reporter;
public SkippingReduceValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
this.skipGroupCounter =
reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
this.skipRecCounter =
reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
this.toWriteSkipRecs = toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
skipIt = getSkipRanges().skipRangeIterator();
mayBeSkip();
}
void nextKey() throws IOException {
super.nextKey();
mayBeSkip();
}
boolean more() {
return super.more() && hasNext;
}
private void mayBeSkip() throws IOException {
hasNext = skipIt.hasNext();
if(!hasNext) {
LOG.warn("Further groups got skipped.");
return;
}
grpIndex++;
long nextGrpIndex = skipIt.next();
long skip = 0;
long skipRec = 0;
while(grpIndex<nextGrpIndex && super.more()) {
while (hasNext()) {
VALUE value = moveToNext();
if(toWriteSkipRecs) {
writeSkippedRec(getKey(), value);
}
skipRec++;
}
super.nextKey();
grpIndex++;
skip++;
}
//close the skip writer once all the ranges are skipped
if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
skipWriter.close();
}
skipGroupCounter.increment(skip);
skipRecCounter.increment(skipRec);
reportNextRecordRange(umbilical, grpIndex);
}
@SuppressWarnings("unchecked")
private void writeSkippedRec(KEY key, VALUE value) throws IOException{
if(skipWriter==null) {
Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
Path skipFile = new Path(skipDir, getTaskID().toString());
skipWriter = SequenceFile.createWriter(
skipFile.getFileSystem(conf), conf, skipFile,
keyClass, valClass,
CompressionType.BLOCK, reporter);
}
skipWriter.append(key, value);
}
}
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
this.umbilical = umbilical;
job.setBoolean("mapred.skip.on", isSkipping());
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
// Initialize the codec
codec = initCodec();
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job, reporter);
if (!reduceCopier.fetchOutputs()) {
if(reduceCopier.mergeThrowable instanceof FSError) {
throw (FSError)reduceCopier.mergeThrowable;
}
throw new IOException("Task: " + getTaskID() +
" - The reduce copier failed", reduceCopier.mergeThrowable);
}
}
copyPhase.complete(); // copy is already complete
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null)
: reduceCopier.createKVIterator(job, rfs, reporter);
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
done(umbilical, reporter);
}
private class OldTrackingRecordWriter<K, V> implements RecordWriter<K, V> {
private final RecordWriter<K, V> real;
private final org.apache.hadoop.mapred.Counters.Counter outputRecordCounter;
private final org.apache.hadoop.mapred.Counters.Counter fileOutputByteCounter;
private final Statistics fsStats;
public OldTrackingRecordWriter(
org.apache.hadoop.mapred.Counters.Counter outputRecordCounter,
JobConf job, TaskReporter reporter, String finalName)
throws IOException {
this.outputRecordCounter = outputRecordCounter;
this.fileOutputByteCounter = reporter
.getCounter(FileOutputFormat.Counter.BYTES_WRITTEN);
Statistics matchedStats = null;
if (job.getOutputFormat() instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
fsStats = matchedStats;
FileSystem fs = FileSystem.get(job);
long bytesOutPrev = getOutputBytes(fsStats);
this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,
reporter);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@Override
public void write(K key, V value) throws IOException {
long bytesOutPrev = getOutputBytes(fsStats);
real.write(key, value);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
outputRecordCounter.increment(1);
}
@Override
public void close(Reporter reporter) throws IOException {
long bytesOutPrev = getOutputBytes(fsStats);
real.close(reporter);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
private long getOutputBytes(Statistics stats) {
return stats == null ? 0 : stats.getBytesWritten();
}
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runOldReducer(JobConf job,
TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass) throws IOException {
Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
ReflectionUtils.newInstance(job.getReducerClass(), job);
// make output collector
String finalName = getOutputName(getPartition());
final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
reduceOutputCounter, job, reporter, finalName);
OutputCollector<OUTKEY,OUTVALUE> collector =
new OutputCollector<OUTKEY,OUTVALUE>() {
public void collect(OUTKEY key, OUTVALUE value)
throws IOException {
out.write(key, value);
// indicate that progress update needs to be sent
reporter.progress();
}
};
// apply reduce function
try {
//increment processed counter only if skipping feature is enabled
boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
SkipBadRecords.getAutoIncrReducerProcCount(job);
ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ?
new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter,
comparator, keyClass, valueClass,
job, reporter, umbilical) :
new ReduceValuesIterator<INKEY,INVALUE>(rIter,
job.getOutputValueGroupingComparator(), keyClass, valueClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
}
values.nextKey();
values.informReduceProgress();
}
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
//End of clean up.
} catch (IOException ioe) {
try {
reducer.close();
} catch (IOException ignored) {}
try {
out.close(reporter);
} catch (IOException ignored) {}
throw ioe;
}
}
private class NewTrackingRecordWriter<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
private final Statistics fsStats;
NewTrackingRecordWriter(org.apache.hadoop.mapreduce.Counter recordCounter,
JobConf job, TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.outputRecordCounter = recordCounter;
this.fileOutputByteCounter = reporter
.getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN);
Statistics matchedStats = null;
// TaskAttemptContext taskContext = new TaskAttemptContext(job,
// getTaskID());
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) outputFormat
.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
long bytesOutPrev = getOutputBytes(fsStats);
real.close(context);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
long bytesOutPrev = getOutputBytes(fsStats);
real.write(key,value);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
outputRecordCounter.increment(1);
}
private long getOutputBytes(Statistics stats) {
return stats == null ? 0 : stats.getBytesWritten();
}
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reducePhase.set(rawIter.getProgress().get());
reporter.progress();
return ret;
}
};
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(reduceOutputCounter,
job, reporter, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW, committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
trackedRW.close(reducerContext);
}
private static enum CopyOutputErrorType {
NO_ERROR,
READ_ERROR,
OTHER_ERROR
};
class ReduceCopier<K, V> implements MRConstants {
/** Reference to the umbilical object */
private TaskUmbilicalProtocol umbilical;
private final TaskReporter reporter;
/** Reference to the task object */
/** Number of ms before timing out a copy */
private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
/** Max events to fetch in one go from the tasktracker */
private static final int MAX_EVENTS_TO_FETCH = 10000;
/**
* our reduce task instance
*/
private ReduceTask reduceTask;
/**
* the list of map outputs currently being copied
*/
private List<MapOutputLocation> scheduledCopies;
/**
* the results of dispatched copy attempts
*/
private List<CopyResult> copyResults;
/**
* the number of outputs to copy in parallel
*/
private int numCopiers;
/**
* a number that is set to the max #fetches we'd schedule and then
* pause the schduling
*/
private int maxInFlight;
/**
* the amount of time spent on fetching one map output before considering
* it as failed and notifying the jobtracker about it.
*/
private int maxBackoff;
/**
* busy hosts from which copies are being backed off
* Map of host -> next contact time
*/
private Map<String, Long> penaltyBox;
/**
* the set of unique hosts from which we are copying
*/
private Set<String> uniqueHosts;
/**
* A reference to the RamManager for writing the map outputs to.
*/
private ShuffleRamManager ramManager;
/**
* A reference to the local file system for writing the map outputs to.
*/
private FileSystem localFileSys;
private FileSystem rfs;
/**
* Number of files to merge at a time
*/
private int ioSortFactor;
/**
* A reference to the throwable object (if merge throws an exception)
*/
private volatile Throwable mergeThrowable;
/**
* A flag to indicate when to exit localFS merge
*/
private volatile boolean exitLocalFSMerge = false;
/**
* A flag to indicate when to exit getMapEvents thread
*/
private volatile boolean exitGetMapEvents = false;
/**
* When we accumulate maxInMemOutputs number of files in ram, we merge/spill
*/
private final int maxInMemOutputs;
/**
* Usage threshold for in-memory output accumulation.
*/
private final float maxInMemCopyPer;
/**
* Maximum memory usage of map outputs to merge from memory into
* the reduce, in bytes.
*/
private final long maxInMemReduce;
/**
* The threads for fetching the files.
*/
private List<MapOutputCopier> copiers = null;
/**
* The object for metrics reporting.
*/
private ShuffleClientInstrumentation shuffleClientMetrics;
/**
* the minimum interval between tasktracker polls
*/
private static final long MIN_POLL_INTERVAL = 1000;
/**
* a list of map output locations for fetch retrials
*/
private List<MapOutputLocation> retryFetches =
new ArrayList<MapOutputLocation>();
/**
* The set of required map outputs
*/
private Set <TaskID> copiedMapOutputs =
Collections.synchronizedSet(new TreeSet<TaskID>());
/**
* The set of obsolete map taskids.
*/
private Set <TaskAttemptID> obsoleteMapIds =
Collections.synchronizedSet(new TreeSet<TaskAttemptID>());
private Random random = null;
/**
* the max of all the map completion times
*/
private int maxMapRuntime;
/**
* Maximum number of fetch-retries per-map.
*/
private volatile int maxFetchRetriesPerMap;
/**
* Combiner runner, if a combiner is needed
*/
private CombinerRunner combinerRunner;
/**
* Resettable collector used for combine.
*/
private CombineOutputCollector combineCollector = null;
/**
* Maximum percent of failed fetch attempt before killing the reduce task.
*/
private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
/**
* Minimum percent of progress required to keep the reduce alive.
*/
private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
/**
* Maximum percent of shuffle execution time required to keep the reducer alive.
*/
private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
/**
* Minimum number of map fetch retries.
*/
private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
/**
* The minimum percentage of maps yet to be copied,
* which indicates end of shuffle
*/
private static final float MIN_PENDING_MAPS_PERCENT = 0.25f;
/**
* Maximum no. of unique maps from which we failed to fetch map-outputs
* even after {@link #maxFetchRetriesPerMap} retries; after this the
* reduce task is failed.
*/
private int maxFailedUniqueFetches = 5;
/**
* The maps from which we fail to fetch map-outputs
* even after {@link #maxFetchRetriesPerMap} retries.
*/
Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>();
/**
* A map of taskId -> no. of failed fetches
*/
Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap =
new HashMap<TaskAttemptID, Integer>();
/**
* Initial backoff interval (milliseconds)
*/
private static final int BACKOFF_INIT = 4000;
/**
* The interval for logging in the shuffle
*/
private static final int MIN_LOG_TIME = 60000;
/**
* List of in-memory map-outputs.
*/
private final List<MapOutput> mapOutputsFilesInMemory =
Collections.synchronizedList(new LinkedList<MapOutput>());
/**
* The map for (Hosts, List of MapIds from this Host) maintaining
* map output locations
*/
private final Map<String, List<MapOutputLocation>> mapLocations =
new ConcurrentHashMap<String, List<MapOutputLocation>>();
class ShuffleClientInstrumentation implements MetricsSource {
final MetricsRegistry registry = new MetricsRegistry("shuffleInput");
final MetricMutableCounterLong inputBytes =
registry.newCounter("shuffle_input_bytes", "", 0L);
final MetricMutableCounterInt failedFetches =
registry.newCounter("shuffle_failed_fetches", "", 0);
final MetricMutableCounterInt successFetches =
registry.newCounter("shuffle_success_fetches", "", 0);
private volatile int threadsBusy = 0;
@SuppressWarnings("deprecation")
ShuffleClientInstrumentation(JobConf conf) {
registry.tag("user", "User name", conf.getUser())
.tag("jobName", "Job name", conf.getJobName())
.tag("jobId", "Job ID", ReduceTask.this.getJobID().toString())
.tag("taskId", "Task ID", getTaskID().toString())
.tag("sessionId", "Session ID", conf.getSessionId());
}
//@Override
void inputBytes(long numBytes) {
inputBytes.incr(numBytes);
}
//@Override
void failedFetch() {
failedFetches.incr();
}
//@Override
void successFetch() {
successFetches.incr();
}
//@Override
synchronized void threadBusy() {
++threadsBusy;
}
//@Override
synchronized void threadFree() {
--threadsBusy;
}
@Override
public void getMetrics(MetricsBuilder builder, boolean all) {
MetricsRecordBuilder rb = builder.addRecord(registry.name());
rb.addGauge("shuffle_fetchers_busy_percent", "", numCopiers == 0 ? 0
: 100. * threadsBusy / numCopiers);
registry.snapshot(rb, all);
}
}
private ShuffleClientInstrumentation createShuffleClientInstrumentation() {
return DefaultMetricsSystem.INSTANCE.register("ShuffleClientMetrics",
"Shuffle input metrics", new ShuffleClientInstrumentation(conf));
}
/** Represents the result of an attempt to copy a map output */
private class CopyResult {
// the map output location against which a copy attempt was made
private final MapOutputLocation loc;
// the size of the file copied, -1 if the transfer failed
private final long size;
//a flag signifying whether a copy result is obsolete
private static final int OBSOLETE = -2;
private CopyOutputErrorType error = CopyOutputErrorType.NO_ERROR;
CopyResult(MapOutputLocation loc, long size) {
this.loc = loc;
this.size = size;
}
CopyResult(MapOutputLocation loc, long size, CopyOutputErrorType error) {
this.loc = loc;
this.size = size;
this.error = error;
}
public boolean getSuccess() { return size >= 0; }
public boolean isObsolete() {
return size == OBSOLETE;
}
public long getSize() { return size; }
public String getHost() { return loc.getHost(); }
public MapOutputLocation getLocation() { return loc; }
public CopyOutputErrorType getError() { return error; }
}
private int nextMapOutputCopierId = 0;
private boolean reportReadErrorImmediately;
/**
* Abstraction to track a map-output.
*/
private class MapOutputLocation {
TaskAttemptID taskAttemptId;
TaskID taskId;
String ttHost;
URL taskOutput;
public MapOutputLocation(TaskAttemptID taskAttemptId,
String ttHost, URL taskOutput) {
this.taskAttemptId = taskAttemptId;
this.taskId = this.taskAttemptId.getTaskID();
this.ttHost = ttHost;
this.taskOutput = taskOutput;
}
public TaskAttemptID getTaskAttemptId() {
return taskAttemptId;
}
public TaskID getTaskId() {
return taskId;
}
public String getHost() {
return ttHost;
}
public URL getOutputLocation() {
return taskOutput;
}
}
/** Describes the output of a map; could either be on disk or in-memory. */
private class MapOutput {
final TaskID mapId;
final TaskAttemptID mapAttemptId;
final Path file;
final Configuration conf;
byte[] data;
final boolean inMemory;
long compressedSize;
public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
Configuration conf, Path file, long size) {
this.mapId = mapId;
this.mapAttemptId = mapAttemptId;
this.conf = conf;
this.file = file;
this.compressedSize = size;
this.data = null;
this.inMemory = false;
}
public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data, int compressedLength) {
this.mapId = mapId;
this.mapAttemptId = mapAttemptId;
this.file = null;
this.conf = null;
this.data = data;
this.compressedSize = compressedLength;
this.inMemory = true;
}
public void discard() throws IOException {
if (inMemory) {
data = null;
} else {
FileSystem fs = file.getFileSystem(conf);
fs.delete(file, true);
}
}
}
class ShuffleRamManager implements RamManager {
/* Maximum percentage of the in-memory limit that a single shuffle can
* consume*/
private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
/* Maximum percentage of shuffle-threads which can be stalled
* simultaneously after which a merge is triggered. */
private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f;
private final long maxSize;
private final long maxSingleShuffleLimit;
private long size = 0;
private Object dataAvailable = new Object();
private long fullSize = 0;
private int numPendingRequests = 0;
private int numRequiredMapOutputs = 0;
private int numClosed = 0;
private boolean closed = false;
public ShuffleRamManager(Configuration conf) throws IOException {
final float maxInMemCopyUse =
conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IOException("mapred.job.shuffle.input.buffer.percent" +
maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
* maxInMemCopyUse);
maxSingleShuffleLimit = (long)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
}
public synchronized boolean reserve(int requestedSize, InputStream in)
throws InterruptedException {
// Wait till the request can be fulfilled...
while ((size + requestedSize) > maxSize) {
// Close the input...
if (in != null) {
try {
in.close();
} catch (IOException ie) {
LOG.info("Failed to close connection with: " + ie);
} finally {
in = null;
}
}
// Track pending requests
synchronized (dataAvailable) {
++numPendingRequests;
dataAvailable.notify();
}
// Wait for memory to free up
wait();
// Track pending requests
synchronized (dataAvailable) {
--numPendingRequests;
}
}
size += requestedSize;
return (in != null);
}
public synchronized void unreserve(int requestedSize) {
size -= requestedSize;
synchronized (dataAvailable) {
fullSize -= requestedSize;
--numClosed;
}
// Notify the threads blocked on RamManager.reserve
notifyAll();
}
public boolean waitForDataToMerge() throws InterruptedException {
boolean done = false;
synchronized (dataAvailable) {
// Start in-memory merge if manager has been closed or...
while (!closed
&&
// In-memory threshold exceeded and at least two segments
// have been fetched
(getPercentUsed() < maxInMemCopyPer || numClosed < 2)
&&
// More than "mapred.inmem.merge.threshold" map outputs
// have been fetched into memory
(maxInMemOutputs <= 0 || numClosed < maxInMemOutputs)
&&
// More than MAX... threads are blocked on the RamManager
// or the blocked threads are the last map outputs to be
// fetched. If numRequiredMapOutputs is zero, either
// setNumCopiedMapOutputs has not been called (no map ouputs
// have been fetched, so there is nothing to merge) or the
// last map outputs being transferred without
// contention, so a merge would be premature.
(numPendingRequests <
numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION &&
(0 == numRequiredMapOutputs ||
numPendingRequests < numRequiredMapOutputs))) {
dataAvailable.wait();
}
done = closed;
}
return done;
}
public void closeInMemoryFile(int requestedSize) {
synchronized (dataAvailable) {
fullSize += requestedSize;
++numClosed;
dataAvailable.notify();
}
}
public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
synchronized (dataAvailable) {
this.numRequiredMapOutputs = numRequiredMapOutputs;
dataAvailable.notify();
}
}
public void close() {
synchronized (dataAvailable) {
closed = true;
LOG.info("Closed ram manager");
dataAvailable.notify();
}
}
private float getPercentUsed() {
return (float)fullSize/maxSize;
}
boolean canFitInMemory(long requestedSize) {
return (requestedSize < Integer.MAX_VALUE &&
requestedSize < maxSingleShuffleLimit);
}
}
/** Copies map outputs as they become available */
private class MapOutputCopier extends Thread {
// basic/unit connection timeout (in milliseconds)
private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
// default read timeout (in milliseconds)
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
private final int shuffleConnectionTimeout;
private final int shuffleReadTimeout;
private MapOutputLocation currentLocation = null;
private int id = nextMapOutputCopierId++;
private Reporter reporter;
private boolean readError = false;
// Decompression of map-outputs
private CompressionCodec codec = null;
private Decompressor decompressor = null;
private final SecretKey jobTokenSecret;
public MapOutputCopier(JobConf job, Reporter reporter, SecretKey jobTokenSecret) {
setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
LOG.debug(getName() + " created");
this.reporter = reporter;
this.jobTokenSecret = jobTokenSecret;
shuffleConnectionTimeout =
job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
shuffleReadTimeout =
job.getInt("mapreduce.reduce.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
decompressor = CodecPool.getDecompressor(codec);
}
}
/**
* Fail the current file that we are fetching
* @return were we currently fetching?
*/
public synchronized boolean fail() {
if (currentLocation != null) {
finish(-1, CopyOutputErrorType.OTHER_ERROR);
return true;
} else {
return false;
}
}
/**
* Get the current map output location.
*/
public synchronized MapOutputLocation getLocation() {
return currentLocation;
}
private synchronized void start(MapOutputLocation loc) {
currentLocation = loc;
}
private synchronized void finish(long size, CopyOutputErrorType error) {
if (currentLocation != null) {
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
synchronized (copyResults) {
copyResults.add(new CopyResult(currentLocation, size, error));
copyResults.notify();
}
currentLocation = null;
}
}
/** Loop forever and fetch map outputs as they become available.
* The thread exits when it is interrupted by {@link ReduceTaskRunner}
*/
@Override
public void run() {
while (true) {
try {
MapOutputLocation loc = null;
long size = -1;
synchronized (scheduledCopies) {
while (scheduledCopies.isEmpty()) {
scheduledCopies.wait();
}
loc = scheduledCopies.remove(0);
}
CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
readError = false;
try {
shuffleClientMetrics.threadBusy();
start(loc);
size = copyOutput(loc);
shuffleClientMetrics.successFetch();
error = CopyOutputErrorType.NO_ERROR;
} catch (IOException e) {
LOG.warn(reduceTask.getTaskID() + " copy failed: " +
loc.getTaskAttemptId() + " from " + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
shuffleClientMetrics.failedFetch();
if (readError) {
error = CopyOutputErrorType.READ_ERROR;
}
// Reset
size = -1;
} finally {
shuffleClientMetrics.threadFree();
finish(size, error);
}
} catch (InterruptedException e) {
break; // ALL DONE
} catch (FSError e) {
LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " +
StringUtils.stringifyException(e));
try {
umbilical.fsError(reduceTask.getTaskID(), e.getMessage());
} catch (IOException io) {
LOG.error("Could not notify TT of FSError: " +
StringUtils.stringifyException(io));
}
} catch (Throwable th) {
String msg = getTaskID() + " : Map output copy failure : "
+ StringUtils.stringifyException(th);
reportFatalError(getTaskID(), th, msg);
}
}
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
/** Copies a a map output from a remote host, via HTTP.
* @param currentLocation the map output location to be copied
* @return the path (fully qualified) of the copied file
* @throws IOException if there is an error copying the file
* @throws InterruptedException if the copier should give up
*/
private long copyOutput(MapOutputLocation loc
) throws IOException, InterruptedException {
// check if we still need to copy the output from this location
if (copiedMapOutputs.contains(loc.getTaskId()) ||
obsoleteMapIds.contains(loc.getTaskAttemptId())) {
return CopyResult.OBSOLETE;
}
// a temp filename. If this file gets created in ramfs, we're fine,
// else, we will check the localFS to find a suitable final location
// for this path
TaskAttemptID reduceId = reduceTask.getTaskID();
Path filename =
new Path(String.format(
MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
TaskTracker.OUTPUT, loc.getTaskId().getId()));
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);
// Copy the map output
MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
reduceId.getTaskID().getId());
if (mapOutput == null) {
throw new IOException("Failed to fetch map-output for " +
loc.getTaskAttemptId() + " from " +
loc.getHost());
}
// The size of the map-output
long bytes = mapOutput.compressedSize;
// lock the ReduceTask while we do the rename
synchronized (ReduceTask.this) {
if (copiedMapOutputs.contains(loc.getTaskId())) {
mapOutput.discard();
return CopyResult.OBSOLETE;
}
// Special case: discard empty map-outputs
if (bytes == 0) {
try {
mapOutput.discard();
} catch (IOException ioe) {
LOG.info("Couldn't discard output of " + loc.getTaskId());
}
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
return bytes;
}
// Process map-output
if (mapOutput.inMemory) {
// Save it in the synchronized list of map-outputs
mapOutputsFilesInMemory.add(mapOutput);
} else {
// Rename the temporary file to the final file;
// ensure it is on the same partition
tmpMapOutput = mapOutput.file;
filename = new Path(tmpMapOutput.getParent(), filename.getName());
if (!localFileSys.rename(tmpMapOutput, filename)) {
localFileSys.delete(tmpMapOutput, true);
bytes = -1;
throw new IOException("Failed to rename map output " +
tmpMapOutput + " to " + filename);
}
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
}
}
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
}
return bytes;
}
/**
* Save the map taskid whose output we just copied.
* This function assumes that it has been synchronized on ReduceTask.this.
*
* @param taskId map taskid
*/
private void noteCopiedMapOutput(TaskID taskId) {
copiedMapOutputs.add(taskId);
ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
}
/**
* Get the map output into a local file (either in the inmemory fs or on the
* local fs) from the remote server.
* We use the file system so that we generate checksum files on the data.
* @param mapOutputLoc map-output to be fetched
* @param filename the filename to write the data into
* @param connectionTimeout number of milliseconds for connection timeout
* @param readTimeout number of milliseconds for read timeout
* @return the path of the file that got created
* @throws IOException when something goes wrong
*/
private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
Path filename, int reduce)
throws IOException, InterruptedException {
// Connect
URL url = mapOutputLoc.getOutputLocation();
URLConnection connection = url.openConnection();
InputStream input = setupSecureConnection(mapOutputLoc, connection);
// Validate header from map output
TaskAttemptID mapId = null;
try {
mapId =
TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
} catch (IllegalArgumentException ia) {
LOG.warn("Invalid map id ", ia);
return null;
}
TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
if (!mapId.equals(expectedMapId)) {
LOG.warn("data from wrong map:" + mapId +
" arrived to reduce task " + reduce +
", where as expected map output should be from " + expectedMapId);
return null;
}
long decompressedLength =
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
if (compressedLength < 0 || decompressedLength < 0) {
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
return null;
}
int forReduce =
(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
if (forReduce != reduce) {
LOG.warn("data for the wrong reduce: " + forReduce +
" with compressed len: " + compressedLength +
", decompressed len: " + decompressedLength +
" arrived to reduce task " + reduce);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
}
//We will put a file in memory if it meets certain criteria:
//1. The size of the (decompressed) file should be less than 25% of
// the total inmem fs
//2. There is space available in the inmem fs
// Check if this map-output can be saved in-memory
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);
// Shuffle
MapOutput mapOutput = null;
if (shuffleInMemory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into RAM from " + mapOutputLoc.getTaskAttemptId());
}
mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength,
(int)compressedLength);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into Local-FS from " + mapOutputLoc.getTaskAttemptId());
}
mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
}
return mapOutput;
}
private InputStream setupSecureConnection(MapOutputLocation mapOutputLoc,
URLConnection connection) throws IOException {
// generate hash of the url
String msgToEncode =
SecureShuffleUtils.buildMsgFrom(connection.getURL());
String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
jobTokenSecret);
// put url hash into http header
connection.setRequestProperty(
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
InputStream input = getInputStream(connection, shuffleConnectionTimeout,
shuffleReadTimeout);
// get the replyHash which is HMac of the encHash we sent to the server
String replyHash = connection.getHeaderField(
SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
if(replyHash==null) {
throw new IOException("security validation of TT Map output failed");
}
if (LOG.isDebugEnabled())
LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="
+replyHash);
// verify that replyHash is HMac of encHash
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
if (LOG.isDebugEnabled())
LOG.debug("for url="+msgToEncode+" sent hash and receievd reply");
return input;
}
/**
* The connection establishment is attempted multiple times and is given up
* only on the last failure. Instead of connecting with a timeout of
* X, we try connecting with a timeout of x < X but multiple times.
*/
private InputStream getInputStream(URLConnection connection,
int connectionTimeout,
int readTimeout)
throws IOException {
int unit = 0;
if (connectionTimeout < 0) {
throw new IOException("Invalid timeout "
+ "[timeout = " + connectionTimeout + " ms]");
} else if (connectionTimeout > 0) {
unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
? connectionTimeout
: UNIT_CONNECT_TIMEOUT;
}
// set the read timeout to the total timeout
connection.setReadTimeout(readTimeout);
// set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit);
while (true) {
try {
connection.connect();
break;
} catch (IOException ioe) {
// update the total remaining connect-timeout
connectionTimeout -= unit;
// throw an exception if we have waited for timeout amount of time
// note that the updated value if timeout is used here
if (connectionTimeout == 0) {
throw ioe;
}
// reset the connect timeout for the last try
if (connectionTimeout < unit) {
unit = connectionTimeout;
// reset the connect time out for the final connect
connection.setConnectTimeout(unit);
}
}
}
try {
return connection.getInputStream();
} catch (IOException ioe) {
readError = true;
throw ioe;
}
}
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
URLConnection connection,
InputStream input,
int mapOutputLength,
int compressedLength)
throws IOException, InterruptedException {
// Reserve ram for the map-output
boolean createdNow = ramManager.reserve(mapOutputLength, input);
// Reconnect if we need to
if (!createdNow) {
// Reconnect
try {
connection = mapOutputLoc.getOutputLocation().openConnection();
input = setupSecureConnection(mapOutputLoc, connection);
} catch (IOException ioe) {
LOG.info("Failed reopen connection to fetch map-output from " +
mapOutputLoc.getHost());
// Inform the ram-manager
ramManager.closeInMemoryFile(mapOutputLength);
ramManager.unreserve(mapOutputLength);
throw ioe;
}
}
IFileInputStream checksumIn =
new IFileInputStream(input,compressedLength);
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
input = codec.createInputStream(input, decompressor);
}
// Copy map-output into an in-memory buffer
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
int bytesRead = 0;
try {
int n = input.read(shuffleData, 0, shuffleData.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
// indicate we're making progress
reporter.progress();
n = input.read(shuffleData, bytesRead,
(shuffleData.length-bytesRead));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());
}
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);
// Inform the ram-manager
ramManager.closeInMemoryFile(mapOutputLength);
ramManager.unreserve(mapOutputLength);
// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
readError = true;
throw ioe;
}
// Close the in-memory file
ramManager.closeInMemoryFile(mapOutputLength);
// Sanity check
if (bytesRead != mapOutputLength) {
// Inform the ram-manager
ramManager.unreserve(mapOutputLength);
// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;
throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}
// TODO: Remove this after a 'fix' for HADOOP-3647
if (LOG.isDebugEnabled()) {
if (mapOutputLength > 0) {
DataInputBuffer dib = new DataInputBuffer();
dib.reset(shuffleData, 0, shuffleData.length);
LOG.debug("Rec #1 from " + mapOutputLoc.getTaskAttemptId() +
" -> (" + WritableUtils.readVInt(dib) + ", " +
WritableUtils.readVInt(dib) + ") from " +
mapOutputLoc.getHost());
}
}
return mapOutput;
}
private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
InputStream input,
Path filename,
long mapOutputLength)
throws IOException {
// Find out a suitable location for the output on local-filesystem
Path localFilename =
lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
mapOutputLength, conf);
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(),
conf, localFileSys.makeQualified(localFilename),
mapOutputLength);
// Copy data to local-disk
OutputStream output = null;
long bytesRead = 0;
try {
output = rfs.create(localFilename);
byte[] buf = new byte[64 * 1024];
int n = -1;
try {
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError = true;
throw ioe;
}
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
output.write(buf, 0, n);
// indicate we're making progress
reporter.progress();
try {
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError = true;
throw ioe;
}
}
LOG.info("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());
output.close();
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);
// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;
// Close the streams
IOUtils.cleanup(LOG, input, output);
// Re-throw
throw ioe;
}
// Sanity check
if (bytesRead != mapOutputLength) {
try {
mapOutput.discard();
} catch (Exception ioe) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ioe);
} catch (Throwable t) {
String msg = getTaskID() + " : Failed in shuffle to disk :"
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
mapOutput = null;
throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}
return mapOutput;
}
} // MapOutputCopier
private void configureClasspath(JobConf conf)
throws IOException {
// get the task and the current classloader which will become the parent
Task task = ReduceTask.this;
ClassLoader parent = conf.getClassLoader();
// get the work directory which holds the elements we are dynamically
// adding to the classpath
File workDir = new File(task.getJobFile()).getParentFile();
ArrayList<URL> urllist = new ArrayList<URL>();
// add the jars and directories to the classpath
String jar = conf.getJar();
if (jar != null) {
File jobCacheDir = new File(new Path(jar).getParent().toString());
File[] libs = new File(jobCacheDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
urllist.add(libs[i].toURL());
}
}
urllist.add(new File(jobCacheDir, "classes").toURL());
urllist.add(jobCacheDir.toURL());
}
urllist.add(workDir.toURL());
// create a new classloader with the old classloader as its parent
// then set that classloader as the one used by the current jobconf
URL[] urls = urllist.toArray(new URL[urllist.size()]);
URLClassLoader loader = new URLClassLoader(urls, parent);
conf.setClassLoader(loader);
}
public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
TaskReporter reporter
)throws ClassNotFoundException, IOException {
configureClasspath(conf);
this.reporter = reporter;
this.shuffleClientMetrics = createShuffleClientInstrumentation();
this.umbilical = umbilical;
this.reduceTask = ReduceTask.this;
this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
this.copyResults = new ArrayList<CopyResult>(100);
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
this.maxInFlight = 4 * numCopiers;
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
Counters.Counter combineInputCounter =
reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
combineCollector =
new CombineOutputCollector(reduceCombineOutputCounter);
}
this.ioSortFactor = conf.getInt("io.sort.factor", 10);
// the exponential backoff formula
// backoff (t) = init * base^(t-1)
// so for max retries we get
// backoff(1) + .... + backoff(max_fetch_retries) ~ max
// solving which we get
// max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
// for the default value of max = 300 (5min) we get max_fetch_retries = 6
// the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
// optimizing for the base 2
this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
this.maxFailedUniqueFetches = Math.min(numMaps,
this.maxFailedUniqueFetches);
this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
this.maxInMemCopyPer =
conf.getFloat("mapred.job.shuffle.merge.percent", 0.66f);
final float maxRedPer =
conf.getFloat("mapred.job.reduce.input.buffer.percent", 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new IOException("mapred.job.reduce.input.buffer.percent" +
maxRedPer);
}
this.maxInMemReduce = (int)Math.min(
Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
// Setup the RamManager
ramManager = new ShuffleRamManager(conf);
localFileSys = FileSystem.getLocal(conf);
rfs = ((LocalFileSystem)localFileSys).getRaw();
// hosts -> next contact time
this.penaltyBox = new LinkedHashMap<String, Long>();
// hostnames
this.uniqueHosts = new HashSet<String>();
// Seed the random number generator with a reasonably globally unique seed
long randomSeed = System.nanoTime() +
(long)Math.pow(this.reduceTask.getPartition(),
(this.reduceTask.getPartition()%10)
);
this.random = new Random(randomSeed);
this.maxMapRuntime = 0;
this.reportReadErrorImmediately =
conf.getBoolean("mapreduce.reduce.shuffle.notify.readerror", true);
}
private boolean busyEnough(int numInFlight) {
return numInFlight > maxInFlight;
}
public boolean fetchOutputs() throws IOException {
int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
final Progress copyPhase =
reduceTask.getProgress().phase();
LocalFSMerger localFSMergerThread = null;
InMemFSMergeThread inMemFSMergeThread = null;
GetMapEventsThread getMapEventsThread = null;
for (int i = 0; i < numMaps; i++) {
copyPhase.addPhase(); // add sub-phase per file
}
copiers = new ArrayList<MapOutputCopier>(numCopiers);
// start all the copying threads
for (int i=0; i < numCopiers; i++) {
MapOutputCopier copier = new MapOutputCopier(conf, reporter,
reduceTask.getJobTokenSecret());
copiers.add(copier);
copier.start();
}
//start the on-disk-merge thread
localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
//start the in memory merger thread
inMemFSMergeThread = new InMemFSMergeThread();
localFSMergerThread.start();
inMemFSMergeThread.start();
// start the map events thread
getMapEventsThread = new GetMapEventsThread();
getMapEventsThread.start();
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
long lastProgressTime = startTime;
long lastOutputTime = 0;
// loop until we get all required outputs
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
currentTime = System.currentTimeMillis();
boolean logNow = false;
if (currentTime - lastOutputTime > MIN_LOG_TIME) {
lastOutputTime = currentTime;
logNow = true;
}
if (logNow) {
LOG.info(reduceTask.getTaskID() + " Need another "
+ (numMaps - copiedMapOutputs.size()) + " map output(s) "
+ "where " + numInFlight + " is already in progress");
}
// Put the hash entries for the failed fetches.
Iterator<MapOutputLocation> locItr = retryFetches.iterator();
while (locItr.hasNext()) {
MapOutputLocation loc = locItr.next();
List<MapOutputLocation> locList =
mapLocations.get(loc.getHost());
// Check if the list exists. Map output location mapping is cleared
// once the jobtracker restarts and is rebuilt from scratch.
// Note that map-output-location mapping will be recreated and hence
// we continue with the hope that we might find some locations
// from the rebuild map.
if (locList != null) {
// Add to the beginning of the list so that this map is
//tried again before the others and we can hasten the
//re-execution of this map should there be a problem
locList.add(0, loc);
}
}
if (retryFetches.size() > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + retryFetches.size() +
" map-outputs from previous failures");
}
// clear the "failed" fetches hashmap
retryFetches.clear();
// now walk through the cache and schedule what we can
int numScheduled = 0;
int numDups = 0;
synchronized (scheduledCopies) {
// Randomize the map output locations to prevent
// all reduce-tasks swamping the same tasktracker
List<String> hostList = new ArrayList<String>();
hostList.addAll(mapLocations.keySet());
Collections.shuffle(hostList, this.random);
Iterator<String> hostsItr = hostList.iterator();
while (hostsItr.hasNext()) {
String host = hostsItr.next();
List<MapOutputLocation> knownOutputsByLoc =
mapLocations.get(host);
// Check if the list exists. Map output location mapping is
// cleared once the jobtracker restarts and is rebuilt from
// scratch.
// Note that map-output-location mapping will be recreated and
// hence we continue with the hope that we might find some
// locations from the rebuild map and add then for fetching.
if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
continue;
}
//Identify duplicate hosts here
if (uniqueHosts.contains(host)) {
numDups += knownOutputsByLoc.size();
continue;
}
Long penaltyEnd = penaltyBox.get(host);
boolean penalized = false;
if (penaltyEnd != null) {
if (currentTime < penaltyEnd.longValue()) {
penalized = true;
} else {
penaltyBox.remove(host);
}
}
if (penalized)
continue;
synchronized (knownOutputsByLoc) {
locItr = knownOutputsByLoc.iterator();
while (locItr.hasNext()) {
MapOutputLocation loc = locItr.next();
// Do not schedule fetches from OBSOLETE maps
if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
locItr.remove();
continue;
}
uniqueHosts.add(host);
scheduledCopies.add(loc);
locItr.remove(); // remove from knownOutputs
numInFlight++; numScheduled++;
break; //we have a map from this host
}
}
}
scheduledCopies.notifyAll();
}
if (numScheduled > 0 || logNow) {
LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
" outputs (" + penaltyBox.size() +
" slow hosts and" + numDups + " dup hosts)");
}
if (penaltyBox.size() > 0 && logNow) {
LOG.info("Penalized(slow) Hosts: ");
for (String host : penaltyBox.keySet()) {
LOG.info(host + " Will be considered after: " +
((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
}
}
// if we have no copies in flight and we can't schedule anything
// new, just wait for a bit
try {
if (numInFlight == 0 && numScheduled == 0) {
// we should indicate progress as we don't want TT to think
// we're stuck and kill us
reporter.progress();
Thread.sleep(5000);
}
} catch (InterruptedException e) { } // IGNORE
while (numInFlight > 0 && mergeThrowable == null) {
LOG.debug(reduceTask.getTaskID() + " numInFlight = " +
numInFlight);
//the call to getCopyResult will either
//1) return immediately with a null or a valid CopyResult object,
// or
//2) if the numInFlight is above maxInFlight, return with a
// CopyResult object after getting a notification from a
// fetcher thread,
//So, when getCopyResult returns null, we can be sure that
//we aren't busy enough and we should go and get more mapcompletion
//events from the tasktracker
CopyResult cr = getCopyResult(numInFlight);
if (cr == null) {
break;
}
if (cr.getSuccess()) { // a successful copy
numCopied++;
lastProgressTime = System.currentTimeMillis();
reduceShuffleBytes.increment(cr.getSize());
long secsSinceStart =
(System.currentTimeMillis()-startTime)/1000+1;
float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
float transferRate = mbs/secsSinceStart;
copyPhase.startNextPhase();
copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
+ " at " +
mbpsFormat.format(transferRate) + " MB/s)");
// Note successful fetch for this mapId to invalidate
// (possibly) old fetch-failures
fetchFailedMaps.remove(cr.getLocation().getTaskId());
} else if (cr.isObsolete()) {
//ignore
LOG.info(reduceTask.getTaskID() +
" Ignoring obsolete copy result for Map Task: " +
cr.getLocation().getTaskAttemptId() + " from host: " +
cr.getHost());
} else {
retryFetches.add(cr.getLocation());
// note the failed-fetch
TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
TaskID mapId = cr.getLocation().getTaskId();
totalFailures++;
Integer noFailedFetches =
mapTaskToFailedFetchesMap.get(mapTaskId);
noFailedFetches =
(noFailedFetches == null) ? 1 : (noFailedFetches + 1);
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId);
// half the number of max fetch retries per map during
// the end of shuffle
int fetchRetriesPerMap = maxFetchRetriesPerMap;
int pendingCopies = numMaps - numCopied;
// The check noFailedFetches != maxFetchRetriesPerMap is
// required to make sure of the notification in case of a
// corner case :
// when noFailedFetches reached maxFetchRetriesPerMap and
// reducer reached the end of shuffle, then we may miss sending
// a notification if the difference between
// noFailedFetches and fetchRetriesPerMap is not divisible by 2
if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
noFailedFetches != maxFetchRetriesPerMap) {
fetchRetriesPerMap = fetchRetriesPerMap >> 1;
}
// did the fetch fail too many times?
// using a hybrid technique for notifying the jobtracker.
// a. the first notification is sent after max-retries
// b. subsequent notifications are sent after 2 retries.
// c. send notification immediately if it is a read error and
// "mapreduce.reduce.shuffle.notify.readerror" set true.
if ((reportReadErrorImmediately && cr.getError().equals(
CopyOutputErrorType.READ_ERROR)) ||
((noFailedFetches >= fetchRetriesPerMap)
&& ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
reporter.progress();
LOG.info("Failed to fetch map-output from " + mapTaskId +
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ " or it is a read error, "
+ " reporting to the JobTracker");
}
}
// note unique failed-fetch maps
if (noFailedFetches == maxFetchRetriesPerMap) {
fetchFailedMaps.add(mapId);
// did we have too many unique failed-fetch maps?
// and did we fail on too many fetch attempts?
// and did we progress enough
// or did we wait for too long without any progress?
// check if the reducer is healthy
boolean reducerHealthy =
(((float)totalFailures / (totalFailures + numCopied))
< MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)numCopied / numMaps)
>= MIN_REQUIRED_PROGRESS_PERCENT);
// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
int stallDuration =
(int)(System.currentTimeMillis() - lastProgressTime);
// duration for which the reducer ran with progress
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);
// min time the reducer should run without getting killed
int minShuffleRunDuration =
(shuffleProgressDuration > maxMapRuntime)
? shuffleProgressDuration
: maxMapRuntime;
boolean reducerStalled =
(((float)stallDuration / minShuffleRunDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);
// kill if not healthy and has insufficient progress
if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
&& !reducerHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
LOG.fatal("Shuffle failed with too many fetch failures " +
"and insufficient progress!" +
"Killing task " + getTaskID() + ".");
umbilical.shuffleError(getTaskID(),
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+ " bailing-out.");
}
}
// back off exponentially until num_retries <= max_retries
// back off by max_backoff/2 on subsequent failed attempts
currentTime = System.currentTimeMillis();
int currentBackOff = noFailedFetches <= fetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
// If it is read error,
// back off for maxMapRuntime/2
// during end of shuffle,
// backoff for min(maxMapRuntime/2, currentBackOff)
if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
int backOff = maxMapRuntime >> 1;
if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
backOff = Math.min(backOff, currentBackOff);
}
currentBackOff = backOff;
}
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +
(currentBackOff/1000) + " seconds");
}
uniqueHosts.remove(cr.getHost());
numInFlight--;
}
}
// all done, inform the copiers to exit
exitGetMapEvents= true;
try {
getMapEventsThread.join();
LOG.info("getMapsEventsThread joined.");
} catch (InterruptedException ie) {
LOG.info("getMapsEventsThread threw an exception: " +
StringUtils.stringifyException(ie));
}
synchronized (copiers) {
synchronized (scheduledCopies) {
for (MapOutputCopier copier : copiers) {
copier.interrupt();
}
copiers.clear();
}
}
// copiers are done, exit and notify the waiting merge threads
synchronized (mapOutputFilesOnDisk) {
exitLocalFSMerge = true;
mapOutputFilesOnDisk.notify();
}
ramManager.close();
//Do a merge of in-memory files (if there are any)
if (mergeThrowable == null) {
try {
// Wait for the on-disk merge to complete
localFSMergerThread.join();
LOG.info("Interleaved on-disk merge complete: " +
mapOutputFilesOnDisk.size() + " files left.");
//wait for an ongoing merge (if it is in flight) to complete
inMemFSMergeThread.join();
LOG.info("In-memory merge complete: " +
mapOutputsFilesInMemory.size() + " files left.");
} catch (InterruptedException ie) {
LOG.warn(reduceTask.getTaskID() +
" Final merge of the inmemory files threw an exception: " +
StringUtils.stringifyException(ie));
// check if the last merge generated an error
if (mergeThrowable != null) {
mergeThrowable = ie;
}
return false;
}
}
return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
}
private long createInMemorySegments(
List<Segment<K, V>> inMemorySegments, long leaveBytes)
throws IOException {
long totalSize = 0L;
synchronized (mapOutputsFilesInMemory) {
// fullSize could come from the RamManager, but files can be
// closed but not yet present in mapOutputsFilesInMemory
long fullSize = 0L;
for (MapOutput mo : mapOutputsFilesInMemory) {
fullSize += mo.data.length;
}
while(fullSize > leaveBytes) {
MapOutput mo = mapOutputsFilesInMemory.remove(0);
totalSize += mo.data.length;
fullSize -= mo.data.length;
Reader<K, V> reader =
new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
mo.data, 0, mo.data.length);
Segment<K, V> segment =
new Segment<K, V>(reader, true);
inMemorySegments.add(segment);
}
}
return totalSize;
}
/**
* Create a RawKeyValueIterator from copied map outputs. All copying
* threads have exited, so all of the map outputs are available either in
* memory or on disk. We also know that no merges are in progress, so
* synchronization is more lax, here.
*
* The iterator returned must satisfy the following constraints:
* 1. Fewer than io.sort.factor files may be sources
* 2. No more than maxInMemReduce bytes of map outputs may be resident
* in memory when the reduce begins
*
* If we must perform an intermediate merge to satisfy (1), then we can
* keep the excluded outputs from (2) in memory and include them in the
* first merge pass. If not, then said outputs must be written to disk
* first.
*/
@SuppressWarnings("unchecked")
private RawKeyValueIterator createKVIterator(
JobConf job, FileSystem fs, Reporter reporter) throws IOException {
// merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
boolean keepInputs = job.getKeepFailedTaskFiles();
final Path tmpDir = new Path(getTaskID().toString());
final RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
// segments required to vacate memory
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
long inMemToDiskBytes = 0;
if (mapOutputsFilesInMemory.size() > 0) {
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
inMemToDiskBytes = createInMemorySegments(memDiskSegments,
maxInMemReduce);
final int numMemDiskSegments = memDiskSegments.size();
if (numMemDiskSegments > 0 &&
ioSortFactor > mapOutputFilesOnDisk.size()) {
// must spill to disk, but can't retain in-mem for intermediate merge
final Path outputPath =
mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null);
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null);
try {
Merger.writeFile(rIter, writer, reporter, job);
addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
} catch (Exception e) {
if (null != outputPath) {
fs.delete(outputPath, true);
}
throw new IOException("Final merge failed", e);
} finally {
if (null != writer) {
writer.close();
}
}
LOG.info("Merged " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes to disk to satisfy " +
"reduce memory limit");
inMemToDiskBytes = 0;
memDiskSegments.clear();
} else if (inMemToDiskBytes != 0) {
LOG.info("Keeping " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes in memory for " +
"intermediate, on-disk merge");
}
}
// segments on disk
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes;
Path[] onDisk = getMapFiles(fs, false);
for (Path file : onDisk) {
onDiskBytes += fs.getFileStatus(file).getLen();
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
}
LOG.info("Merging " + onDisk.length + " files, " +
onDiskBytes + " bytes from disk");
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
});
// build final list of segments from merged backed by disk + in-mem
List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
long inMemBytes = createInMemorySegments(finalSegments, 0);
LOG.info("Merging " + finalSegments.size() + " segments, " +
inMemBytes + " bytes from memory into reduce");
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, reporter, spilledRecordsCounter, null);
}
class RawKVIteratorReader extends IFile.Reader<K,V> {
private final RawKeyValueIterator kvIter;
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
throws IOException {
super(null, null, size, null, spilledRecordsCounter);
this.kvIter = kvIter;
}
public boolean next(DataInputBuffer key, DataInputBuffer value)
throws IOException {
if (kvIter.next()) {
final DataInputBuffer kb = kvIter.getKey();
final DataInputBuffer vb = kvIter.getValue();
final int kp = kb.getPosition();
final int klen = kb.getLength() - kp;
key.reset(kb.getData(), kp, klen);
final int vp = vb.getPosition();
final int vlen = vb.getLength() - vp;
value.reset(vb.getData(), vp, vlen);
bytesRead += klen + vlen;
return true;
}
return false;
}
public long getPosition() throws IOException {
return bytesRead;
}
public void close() throws IOException {
kvIter.close();
}
}
private CopyResult getCopyResult(int numInFlight) {
synchronized (copyResults) {
while (copyResults.isEmpty()) {
try {
//The idea is that if we have scheduled enough, we can wait until
//we hear from one of the copiers.
if (busyEnough(numInFlight)) {
copyResults.wait();
} else {
return null;
}
} catch (InterruptedException e) { }
}
return copyResults.remove(0);
}
}
private void addToMapOutputFilesOnDisk(FileStatus status) {
synchronized (mapOutputFilesOnDisk) {
mapOutputFilesOnDisk.add(status);
mapOutputFilesOnDisk.notify();
}
}
/** Starts merging the local copy (on disk) of the map's output so that
* most of the reducer's input is sorted i.e overlapping shuffle
* and merge phases.
*/
private class LocalFSMerger extends Thread {
private LocalFileSystem localFileSys;
public LocalFSMerger(LocalFileSystem fs) {
this.localFileSys = fs;
setName("Thread for merging on-disk files");
setDaemon(true);
}
@SuppressWarnings("unchecked")
public void run() {
try {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
while(!exitLocalFSMerge){
synchronized (mapOutputFilesOnDisk) {
while (!exitLocalFSMerge &&
mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
mapOutputFilesOnDisk.wait();
}
}
if(exitLocalFSMerge) {//to avoid running one extra time in the end
break;
}
List<Path> mapFiles = new ArrayList<Path>();
long approxOutputSize = 0;
int bytesPerSum =
reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
LOG.info(reduceTask.getTaskID() + "We have " +
mapOutputFilesOnDisk.size() + " map outputs on disk. " +
"Triggering merge of " + ioSortFactor + " files");
// 1. Prepare the list of files to be merged. This list is prepared
// using a list of map output files on disk. Currently we merge
// io.sort.factor files into 1.
synchronized (mapOutputFilesOnDisk) {
for (int i = 0; i < ioSortFactor; ++i) {
FileStatus filestatus = mapOutputFilesOnDisk.first();
mapOutputFilesOnDisk.remove(filestatus);
mapFiles.add(filestatus.getPath());
approxOutputSize += filestatus.getLen();
}
}
// sanity check
if (mapFiles.size() == 0) {
return;
}
// add the checksum length
approxOutputSize += ChecksumFileSystem
.getChecksumLength(approxOutputSize,
bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
approxOutputSize, conf)
.suffix(".merged");
Writer writer =
new Writer(conf,rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter = null;
Path tmpDir = new Path(reduceTask.getTaskID().toString());
try {
iter = Merger.merge(conf, rfs,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, mapFiles.toArray(new Path[mapFiles.size()]),
true, ioSortFactor, tmpDir,
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null);
Merger.writeFile(iter, writer, reporter, conf);
writer.close();
} catch (Exception e) {
localFileSys.delete(outputPath, true);
throw new IOException (StringUtils.stringifyException(e));
}
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
}
LOG.info(reduceTask.getTaskID() +
" Finished merging " + mapFiles.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
}
} catch (Exception e) {
LOG.warn(reduceTask.getTaskID()
+ " Merging of the local FS files threw an exception: "
+ StringUtils.stringifyException(e));
if (mergeThrowable == null) {
mergeThrowable = e;
}
} catch (Throwable t) {
String msg = getTaskID() + " : Failed to merge on the local FS"
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
}
}
private class InMemFSMergeThread extends Thread {
public InMemFSMergeThread() {
setName("Thread for merging in memory files");
setDaemon(true);
}
public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
try {
boolean exit = false;
do {
exit = ramManager.waitForDataToMerge();
if (!exit) {
doInMemMerge();
}
} while (!exit);
} catch (Exception e) {
LOG.warn(reduceTask.getTaskID() +
" Merge of the inmemory files threw an exception: "
+ StringUtils.stringifyException(e));
ReduceCopier.this.mergeThrowable = e;
} catch (Throwable t) {
String msg = getTaskID() + " : Failed to merge in memory"
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
}
@SuppressWarnings("unchecked")
private void doInMemMerge() throws IOException{
if (mapOutputsFilesInMemory.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator rIter = null;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(conf, rfs,
(Class<K>)conf.getMapOutputKeyClass(),
(Class<V>)conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null);
if (combinerRunner == null) {
Merger.writeFile(rIter, writer, reporter, conf);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(rIter, combineCollector);
}
writer.close();
LOG.info(reduceTask.getTaskID() +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
} catch (Exception e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFileSys.delete(outputPath, true);
throw (IOException)new IOException
("Intermediate merge failed").initCause(e);
}
// Note the output of the merge
FileStatus status = localFileSys.getFileStatus(outputPath);
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(status);
}
}
}
private class GetMapEventsThread extends Thread {
private IntWritable fromEventId = new IntWritable(0);
private static final long SLEEP_TIME = 1000;
public GetMapEventsThread() {
setName("Thread for polling Map Completion Events");
setDaemon(true);
}
@Override
public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
do {
try {
int numNewMaps = getMapCompletionEvents();
if (LOG.isDebugEnabled()) {
if (numNewMaps > 0) {
LOG.debug(reduceTask.getTaskID() + ": " +
"Got " + numNewMaps + " new map-outputs");
}
}
Thread.sleep(SLEEP_TIME);
}
catch (InterruptedException e) {
LOG.warn(reduceTask.getTaskID() +
" GetMapEventsThread returning after an " +
" interrupted exception");
return;
}
catch (Throwable t) {
String msg = reduceTask.getTaskID()
+ " GetMapEventsThread Ignoring exception : "
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
} while (!exitGetMapEvents);
LOG.info("GetMapEventsThread exiting");
}
/**
* Queries the {@link TaskTracker} for a set of map-completion events
* from a given event ID.
* @throws IOException
*/
private int getMapCompletionEvents() throws IOException {
int numNewMaps = 0;
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
fromEventId.get(),
MAX_EVENTS_TO_FETCH,
reduceTask.getTaskID());
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
// Check if the reset is required.
// Since there is no ordering of the task completion events at the
// reducer, the only option to sync with the new jobtracker is to reset
// the events index
if (update.shouldReset()) {
fromEventId.set(0);
obsoleteMapIds.clear(); // clear the obsolete map
mapLocations.clear(); // clear the map locations mapping
}
// Update the last seen event ID
fromEventId.set(fromEventId.get() + events.length);
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
switch (event.getTaskStatus()) {
case SUCCEEDED:
{
URI u = URI.create(event.getTaskTrackerHttp());
String host = u.getHost();
TaskAttemptID taskId = event.getTaskAttemptId();
int duration = event.getTaskRunTime();
if (duration > maxMapRuntime) {
maxMapRuntime = duration;
// adjust max-fetch-retries based on max-map-run-time
maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
}
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
"/mapOutput?job=" + taskId.getJobID() +
"&map=" + taskId +
"&reduce=" + getPartition());
List<MapOutputLocation> loc = mapLocations.get(host);
if (loc == null) {
loc = Collections.synchronizedList
(new LinkedList<MapOutputLocation>());
mapLocations.put(host, loc);
}
loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
numNewMaps ++;
}
break;
case FAILED:
case KILLED:
case OBSOLETE:
{
obsoleteMapIds.add(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
}
break;
case TIPFAILED:
{
copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
}
break;
}
}
return numNewMaps;
}
}
}
/**
* Return the exponent of the power of two closest to the given
* positive value, or zero if value leq 0.
* This follows the observation that the msb of a given value is
* also the closest power of two, unless the bit following it is
* set.
*/
private static int getClosestPowerOf2(int value) {
if (value <= 0)
throw new IllegalArgumentException("Undefined for " + value);
final int hob = Integer.highestOneBit(value);
return Integer.numberOfTrailingZeros(hob) +
(((hob >>> 1) & value) == 0 ? 0 : 1);
}
}