blob: 9ad2bef86746e83c0d15bf8bdc8b61f8e7106dc0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
/** Number of ms before timing out a copy */
private static final int DEFAULT_STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
/** Basic/unit connection timeout (in milliseconds) */
private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
/* Default read timeout (in milliseconds) */
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
private final Progressable reporter;
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
private final Counters.Counter connectionErrs;
private final Counters.Counter ioErrs;
private final Counters.Counter wrongLengthErrs;
private final Counters.Counter badIdErrs;
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
private final MergeManager<K,V> merger;
private final ShuffleScheduler<K,V> scheduler;
private final ShuffleClientMetrics metrics;
private final ExceptionReporter exceptionReporter;
private final int id;
private static int nextId = 0;
private final int reduce;
private final int connectionTimeout;
private final int readTimeout;
// Decompression of map-outputs
private final CompressionCodec codec;
private final Decompressor decompressor;
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter) {
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
this.metrics = metrics;
this.exceptionReporter = exceptionReporter;
this.id = ++nextId;
this.reduce = reduceId.getTaskID().getId();
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_LENGTH.toString());
badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.BAD_ID.toString());
wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_MAP.toString());
connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.CONNECTION.toString());
wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_REDUCE.toString());
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
decompressor = CodecPool.getDecompressor(codec);
} else {
codec = null;
decompressor = null;
}
this.connectionTimeout =
job.getInt(JobContext.SHUFFLE_CONNECT_TIMEOUT,
DEFAULT_STALLED_COPY_TIMEOUT);
this.readTimeout =
job.getInt(JobContext.SHUFFLE_READ_TIMEOUT, DEFAULT_READ_TIMEOUT);
setName("fetcher#" + id);
setDaemon(true);
}
public void run() {
try {
while (true) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForInMemoryMerge();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
/**
* The crux of the matter...
*
* @param host {@link MapHost} from which we need to
* shuffle available map-outputs.
*/
private void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;
}
LOG.debug("Fetcher " + id + " going to fetch from " + host);
for (TaskAttemptID tmp: maps) {
LOG.debug(tmp);
}
// List of maps to be fetched yet
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
// Construct the url and connect
DataInputStream input;
boolean connectSucceeded = false;
try {
URLConnection connection = getMapOutputURL(host, maps).openConnection();
connectSucceeded = true;
input =
new DataInputStream(getInputStream(connection, connectionTimeout,
readTimeout));
} catch (IOException ie) {
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
if (!connectSucceeded) {
for(TaskAttemptID left: remaining) {
scheduler.copyFailed(left, host, connectSucceeded);
}
} else {
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
TaskAttemptID firstMap = maps.get(0);
scheduler.copyFailed(firstMap, host, connectSucceeded);
}
// Add back all the remaining maps, WITHOUT marking them as failed
for(TaskAttemptID left: remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
return;
}
try {
// Loop through available map-outputs and fetch them
// On any error, good becomes false and we exit after putting back
// the remaining maps to the yet_to_be_fetched list
boolean good = true;
while (!remaining.isEmpty() && good) {
good = copyMapOutput(host, input, remaining);
}
// Drain the buffer, just in case
final int DRAIN_BUF_SIZE = 4096;
byte[] drainBuf = new byte[DRAIN_BUF_SIZE];
int retVal = 0;
while ( retVal != -1) {
retVal = input.read(drainBuf, 0, DRAIN_BUF_SIZE);
}
input.close();
// Sanity check
if (good && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
} finally {
for (TaskAttemptID left : remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
}
}
private boolean copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining) {
MapOutput<K,V> mapOutput = null;
TaskAttemptID mapId = null;
long decompressedLength = -1;
long compressedLength = -1;
try {
long startTime = System.currentTimeMillis();
int forReduce = -1;
//Read the shuffle header
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
return false;
}
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return false;
}
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
// Get the location for the map output - either in-memory or on-disk
mapOutput = merger.reserve(mapId, decompressedLength, id);
// Check if we can shuffle *now* ...
if (mapOutput.getType() == Type.WAIT) {
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
return false;
}
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map " +
mapOutput.getMapId() + " decomp: " +
decompressedLength + " len: " + compressedLength + " to " +
mapOutput.getType());
if (mapOutput.getType() == Type.MEMORY) {
shuffleToMemory(host, mapOutput, input,
(int) decompressedLength, (int) compressedLength);
} else {
shuffleToDisk(host, mapOutput, input, compressedLength);
}
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
scheduler.copySucceeded(mapId, host, compressedLength,
endTime - startTime, mapOutput);
// Note successful shuffle
remaining.remove(mapId);
metrics.successFetch();
return true;
} catch (IOException ioe) {
ioErrs.increment(1);
if (mapId == null || mapOutput == null) {
LOG.info("fetcher#" + id + " failed to read map header" +
mapId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
return false;
}
LOG.info("Failed to shuffle output of " + mapId +
" from " + host.getHostName(), ioe);
// Inform the shuffle-scheduler
mapOutput.abort();
scheduler.copyFailed(mapId, host, true);
metrics.failedFetch();
return false;
}
}
/**
* Do some basic verification on the input received -- Being defensive
* @param compressedLength
* @param decompressedLength
* @param forReduce
* @param remaining
* @param mapId
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
int forReduce, Set<TaskAttemptID> remaining, TaskAttemptID mapId) {
if (compressedLength < 0 || decompressedLength < 0) {
wrongLengthErrs.increment(1);
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " len: " + compressedLength + ", decomp len: " +
decompressedLength);
return false;
}
if (forReduce != reduce) {
wrongReduceErrs.increment(1);
LOG.warn(getName() + " data for the wrong reduce map: " +
mapId + " len: " + compressedLength + " decomp len: " +
decompressedLength + " for reduce " + forReduce);
return false;
}
// Sanity check
if (!remaining.contains(mapId)) {
wrongMapErrs.increment(1);
LOG.warn("Invalid map-output! Received output for " + mapId);
return false;
}
return true;
}
/**
* Create the map-output-url. This will contain all the map ids
* separated by commas
* @param host
* @param maps
* @return
* @throws MalformedURLException
*/
private URL getMapOutputURL(MapHost host, List<TaskAttemptID> maps
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
for (TaskAttemptID mapId : maps) {
if (!first) {
url.append(",");
}
url.append(mapId);
first = false;
}
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
return new URL(url.toString());
}
/**
* The connection establishment is attempted multiple times and is given up
* only on the last failure. Instead of connecting with a timeout of
* X, we try connecting with a timeout of x < X but multiple times.
*/
private InputStream getInputStream(URLConnection connection,
int connectionTimeout,
int readTimeout)
throws IOException {
int unit = 0;
if (connectionTimeout < 0) {
throw new IOException("Invalid timeout "
+ "[timeout = " + connectionTimeout + " ms]");
} else if (connectionTimeout > 0) {
unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
}
// set the read timeout to the total timeout
connection.setReadTimeout(readTimeout);
// set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit);
while (true) {
try {
return connection.getInputStream();
} catch (IOException ioe) {
// update the total remaining connect-timeout
connectionTimeout -= unit;
// throw an exception if we have waited for timeout amount of time
// note that the updated value if timeout is used here
if (connectionTimeout == 0) {
throw ioe;
}
// reset the connect timeout for the last try
if (connectionTimeout < unit) {
unit = connectionTimeout;
// reset the connect time out for the final connect
connection.setConnectTimeout(unit);
}
}
}
}
private void shuffleToMemory(MapHost host, MapOutput<K,V> mapOutput,
InputStream input,
int decompressedLength,
int compressedLength) throws IOException {
IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength);
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
input = codec.createInputStream(input, decompressor);
}
// Copy map-output into an in-memory buffer
byte[] shuffleData = mapOutput.getMemory();
try {
IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
metrics.inputBytes(shuffleData.length);
reporter.progress();
LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
mapOutput.getMapId());
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
}
}
private void shuffleToDisk(MapHost host, MapOutput<K,V> mapOutput,
InputStream input,
long compressedLength)
throws IOException {
// Copy data to local-disk
OutputStream output = mapOutput.getDisk();
long bytesLeft = compressedLength;
try {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
mapOutput.getMapId());
}
output.write(buf, 0, n);
bytesLeft -= n;
metrics.inputBytes(n);
reporter.progress();
}
LOG.info("Read " + (compressedLength - bytesLeft) +
" bytes from map-output for " +
mapOutput.getMapId());
output.close();
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input, output);
// Re-throw
throw ioe;
}
// Sanity check
if (bytesLeft != 0) {
throw new IOException("Incomplete map output received for " +
mapOutput.getMapId() + " from " +
host.getHostName() + " (" +
bytesLeft + " bytes missing of " +
compressedLength + ")"
);
}
}
}