blob: 9cb8617ae18cd0195d4b57946502340710b79844 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.tez.runtime.library.shuffle.common;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
* Responsible for fetching inputs served by the ShuffleHandler for a single
* host. Construct using {@link FetcherBuilder}
public class Fetcher implements Callable<FetchResult> {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
private final Configuration conf;
// Configurable fields.
private CompressionCodec codec;
private boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
private int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
private final SecretKey shuffleSecret;
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
private final String logIdentifier;
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
private final int fetcherIdentifier;
// Parameters to track work.
private List<InputAttemptIdentifier> srcAttempts;
private String host;
private int port;
private int partition;
// Maps from the pathComponents (unique per srcTaskId) to the specific taskId
private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
private LinkedHashSet<InputAttemptIdentifier> remaining;
private URL url;
private volatile DataInputStream input;
private HttpConnection httpConnection;
private HttpConnectionParams httpConnectionParams;
private final boolean localDiskFetchEnabled;
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.shuffleSecret = shuffleSecret;
this.appId = appId;
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
this.localDiskFetchEnabled = localDiskFetchEnabled;
this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
public FetchResult call() throws Exception {
if (srcAttempts.size() == 0) {
return new FetchResult(host, port, partition, srcAttempts);
for (InputAttemptIdentifier in : srcAttempts) {
pathToAttemptMap.put(in.getPathComponent(), in);
remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
HostFetchResult hostFetchResult;
if (localDiskFetchEnabled &&
host.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
hostFetchResult = setupLocalDiskFetch();
} else {
hostFetchResult = doHttpFetch();
if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
// Sanity check
if (hostFetchResult.failedInputs == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
return hostFetchResult.fetchResult;
protected HostFetchResult doHttpFetch() {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), srcAttempts,
httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, shuffleSecret);
} catch (IOException e) {
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
InputAttemptIdentifier[] failedFetches = null;
if (isShutDown.get()) {"Not reporting fetch failure, since an Exception was caught after shutdown");
} else {
failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedFetches, true);
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();"Detected fetcher has been shutdown after connection establishment. Returning");
return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
try {
input = httpConnection.getInputStream();
//validateConnectionResponse(msgToEncode, encHash);
} catch (IOException e) {
// ioErrs.increment(1);
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
if (isShutDown.get()) {"Not reporting fetch failure, since an Exception was caught after shutdown");
} else {
InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
+ " Informing ShuffleManager: ", e);
return new HostFetchResult(new FetchResult(host, port, partition, remaining),
new InputAttemptIdentifier[] { firstAttempt }, false);
// By this point, the connection is setup and the response has been
// validated.
// Handle any shutdown which may have been invoked.
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();"Detected fetcher has been shutdown after opening stream. Returning");
return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
// After this point, closing the stream and connection, should cause a
// SocketException,
// which will be ignored since shutdown has been invoked.
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedInputs = null;
while (!remaining.isEmpty() && failedInputs == null) {
failedInputs = fetchInputs(input);
return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
protected HostFetchResult setupLocalDiskFetch() {
Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
while (iterator.hasNext()) {
InputAttemptIdentifier srcAttemptId =;
//TODO: check for shutdown? - See TEZ-1480
long startTime = System.currentTimeMillis();
FetchedInput fetchedInput = null;
try {
TezIndexRecord idxRecord;
idxRecord = getTezIndexRecord(srcAttemptId);
fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId,
getShuffleInputFileName(srcAttemptId.getPathComponent(), null), conf,
new FetchedInputCallback() {
public void fetchComplete(FetchedInput fetchedInput) {}
public void fetchFailed(FetchedInput fetchedInput) {}
public void freeResources(FetchedInput fetchedInput) {}
});"fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+ " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+ " to " + fetchedInput.getType());
long endTime = System.currentTimeMillis();
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
idxRecord.getRawLength(), (endTime - startTime));
} catch (IOException e) {
LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)",
if (fetchedInput != null) {
try {
} catch (IOException e1) {"Failed to cleanup fetchedInput " + fetchedInput);
InputAttemptIdentifier[] failedFetches = null;
if (remaining.size() > 0) {
failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
return new HostFetchResult(new FetchResult(host, port, partition, remaining),
failedFetches, false);
protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws
IOException {
TezIndexRecord idxRecord;
Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
idxRecord = spillRecord.getIndex(partition);
return idxRecord;
protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
suffix = suffix != null ? suffix : "";
String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
static class HostFetchResult {
private final FetchResult fetchResult;
private final InputAttemptIdentifier[] failedInputs;
private final boolean connectFailed;
public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs,
boolean connectFailed) {
this.fetchResult = fetchResult;
this.failedInputs = failedInputs;
this.connectFailed = connectFailed;
public void shutdown() {
if (!isShutDown.getAndSet(true)) {
private void shutdownInternal() {
// Synchronizing on isShutDown to ensure we don't run into a parallel close
// Can't synchronize on the main class itself since that would cause the
// shutdown request to block
synchronized (isShutDown) {
try {
if (httpConnection != null) {
} catch (IOException e) {"Exception while shutting down fetcher on " + logIdentifier + " : "
+ e.getMessage());
if (LOG.isDebugEnabled()) {
private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
FetchedInput fetchedInput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = -1;
long compressedLength = -1;
try {
long startTime = System.currentTimeMillis();
int responsePartition = -1;
// Read the shuffle header
String pathComponent = null;
try {
ShuffleHeader header = new ShuffleHeader();
pathComponent = header.getMapId();
srcAttemptId = pathToAttemptMap.get(pathComponent);
compressedLength = header.getCompressedLength();
decompressedLength = header.getUncompressedLength();
responsePartition = header.getPartition();
} catch (IllegalArgumentException e) {
// badIdErrs.increment(1);
LOG.warn("Invalid src id ", e);
// Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength,
responsePartition, srcAttemptId, pathComponent)) {
if (srcAttemptId == null) {
LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
srcAttemptId = getNextRemainingAttempt();
assert(srcAttemptId != null);
return new InputAttemptIdentifier[] { srcAttemptId };
if (LOG.isDebugEnabled()) {
LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
+ ", decomp len: " + decompressedLength);
// Get the location for the map output - either in-memory or on-disk
// TODO TEZ-957. handle IOException here when Broadcast has better error checking
fetchedInput = inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
// TODO NEWTEZ No concept of WAIT at the moment.
// // Check if we can shuffle *now* ...
// if (fetchedInput.getType() == FetchedInput.WAIT) {
//"fetcher#" + id +
// " - MergerManager returned Status.WAIT ...");
// //Not an error but wait to process data.
// }
// Go!"fetcher" + " about to shuffle output of srcAttempt "
+ fetchedInput.getInputAttemptIdentifier() + " decomp: "
+ decompressedLength + " len: " + compressedLength + " to "
+ fetchedInput.getType());
if (fetchedInput.getType() == Type.MEMORY) {
ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
input, (int) decompressedLength, (int) compressedLength, codec,
ifileReadAhead, ifileReadAheadLength, LOG,
} else if (fetchedInput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
(host +":" +port), input, compressedLength, LOG,
} else {
throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
compressedLength, decompressedLength, (endTime - startTime));
// Note successful shuffle
// metrics.successFetch();
return null;
} catch (IOException ioe) {
// ZZZ Add some shutdown code here
// ZZZ Make sure any assigned memory inputs are aborted
// ioErrs.increment(1);
if (srcAttemptId == null || fetchedInput == null) {"fetcher" + " failed to read map header" + srcAttemptId
+ " decomp: " + decompressedLength + ", " + compressedLength, ioe);
if (srcAttemptId == null) {
return remaining
.toArray(new InputAttemptIdentifier[remaining.size()]);
} else {
return new InputAttemptIdentifier[] { srcAttemptId };
LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host,
// Inform the shuffle-scheduler
try {
} catch (IOException e) {"Failure to cleanup fetchedInput: " + fetchedInput);
// metrics.failedFetch();
return new InputAttemptIdentifier[] { srcAttemptId };
* Do some basic verification on the input received -- Being defensive
* @param compressedLength
* @param decompressedLength
* @param fetchPartition
* @param srcAttemptId
* @param pathComponent
* @return true/false, based on if the verification succeeded or not
private boolean verifySanity(long compressedLength, long decompressedLength,
int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
if (compressedLength < 0 || decompressedLength < 0) {
// wrongLengthErrs.increment(1);
LOG.warn(" invalid lengths in input header -> headerPathComponent: "
+ pathComponent + ", nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ " len: " + compressedLength + ", decomp len: " + decompressedLength);
return false;
if (fetchPartition != this.partition) {
// wrongReduceErrs.increment(1);
LOG.warn(" data for the wrong reduce -> headerPathComponent: "
+ pathComponent + "nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ " len: " + compressedLength + " decomp len: " + decompressedLength
+ " for reduce " + fetchPartition);
return false;
// Sanity check
if (!remaining.contains(srcAttemptId)) {
// wrongMapErrs.increment(1);
LOG.warn("Invalid input. Received output for headerPathComponent: "
+ pathComponent + "nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId);
return false;
return true;
private InputAttemptIdentifier getNextRemainingAttempt() {
if (remaining.size() > 0) {
return remaining.iterator().next();
} else {
return null;
* Builder for the construction of Fetchers
public static class FetcherBuilder {
private Fetcher fetcher;
private boolean workAssigned = false;
public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId,
SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
boolean localDiskFetchEnabled) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
fetcher.httpConnectionParams = httpParams;
return this;
public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
fetcher.codec = codec;
return this;
public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
fetcher.ifileReadAhead = readAhead;
fetcher.ifileReadAheadLength = readAheadBytes;
return this;
public FetcherBuilder assignWork(String host, int port, int partition,
List<InputAttemptIdentifier> inputs) { = host;
fetcher.port = port;
fetcher.partition = partition;
fetcher.srcAttempts = inputs;
workAssigned = true;
return this;
public Fetcher build() {
Preconditions.checkState(workAssigned == true,
"Cannot build a fetcher withot assigning work to it");
return fetcher;
public int hashCode() {
return fetcherIdentifier;
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Fetcher other = (Fetcher) obj;
if (fetcherIdentifier != other.fetcherIdentifier)
return false;
return true;