blob: f295c06b8e934a8f8d7643943fad9b6dba963bbc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.shuffle;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.io.WritableUtils;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.common.Preconditions;
/**
* Responsible for fetching inputs served by the ShuffleHandler for a single
* host. Construct using {@link FetcherBuilder}
*/
public class Fetcher extends CallableWithNdc<FetchResult> {
public static class PathPartition {
final String path;
final int partition;
PathPartition(String path, int partition) {
this.path = path;
this.partition = partition;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((path == null) ? 0 : path.hashCode());
result = prime * result + partition;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PathPartition other = (PathPartition) obj;
if (path == null) {
if (other.path != null)
return false;
} else if (!path.equals(other.path))
return false;
if (partition != other.partition)
return false;
return true;
}
@Override
public String toString() {
return "PathPartition [path=" + path + ", partition=" + partition + "]";
}
}
private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
private final Configuration conf;
private final int shufflePort;
// Configurable fields.
private CompressionCodec codec;
private boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
private int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
private final JobTokenSecretManager jobTokenSecretMgr;
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
private final int dagIdentifier;
private final String logIdentifier;
private final String localHostname;
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
private final int fetcherIdentifier;
// Parameters to track work.
private List<InputAttemptIdentifier> srcAttempts;
@VisibleForTesting
public List<InputAttemptIdentifier> getSrcAttempts() {
return srcAttempts;
}
@VisibleForTesting
Map<String, InputAttemptIdentifier> srcAttemptsRemaining;
private String host;
@VisibleForTesting
public String getHost() {
return host;
}
private int port;
private int partition;
private int partitionCount;
// Maps from the pathComponents (unique per srcTaskId) to the specific taskId
private final Map<PathPartition, InputAttemptIdentifier> pathToAttemptMap;
private URL url;
private volatile DataInputStream input;
BaseHttpConnection httpConnection;
private HttpConnectionParams httpConnectionParams;
private final boolean localDiskFetchEnabled;
private final boolean sharedFetchEnabled;
private final LocalDirAllocator localDirAllocator;
private final Path lockPath;
private final RawLocalFileSystem localFs;
// Initiative value is 0, which means it hasn't retried yet.
private long retryStartTime = 0;
private final boolean asyncHttp;
private final boolean compositeFetch;
private final boolean verifyDiskChecksum;
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator,
Path lockPath,
boolean localDiskFetchEnabled,
boolean sharedFetchEnabled,
String localHostname,
int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
this.asyncHttp = asyncHttp;
this.verifyDiskChecksum = verifyDiskChecksum;
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
this.appId = appId;
this.dagIdentifier = dagIdentifier;
this.pathToAttemptMap = new HashMap<PathPartition, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
this.localDiskFetchEnabled = localDiskFetchEnabled;
this.sharedFetchEnabled = sharedFetchEnabled;
this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
this.logIdentifier = " fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
this.localFs = localFs;
this.localDirAllocator = localDirAllocator;
this.lockPath = lockPath;
this.localHostname = localHostname;
this.shufflePort = shufflePort;
this.compositeFetch = compositeFetch;
try {
if (this.sharedFetchEnabled) {
this.localFs.mkdirs(this.lockPath);
}
} catch (Exception e) {
LOG.warn("Error initializing local dirs for shared transfer " + e);
}
}
// helper method to populate the remaining map
void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
if (srcAttemptsRemaining == null) {
srcAttemptsRemaining = new LinkedHashMap<String, InputAttemptIdentifier>(origlist.size());
}
for (InputAttemptIdentifier id : origlist) {
srcAttemptsRemaining.put(id.toString(), id);
}
}
@Override
public FetchResult callInternal() throws Exception {
boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);
if (srcAttempts.size() == 0) {
return new FetchResult(host, port, partition, partitionCount, srcAttempts);
}
populateRemainingMap(srcAttempts);
for (InputAttemptIdentifier in : srcAttemptsRemaining.values()) {
if (in instanceof CompositeInputAttemptIdentifier) {
CompositeInputAttemptIdentifier cin = (CompositeInputAttemptIdentifier)in;
for (int i = 0; i < cin.getInputIdentifierCount(); i++) {
pathToAttemptMap.put(new PathPartition(cin.getPathComponent(), partition + i), cin.expand(i));
}
} else {
pathToAttemptMap.put(new PathPartition(in.getPathComponent(), 0), in);
}
// do only if all of them are shared fetches
multiplex &= in.isShared();
}
if (multiplex) {
Preconditions.checkArgument(partition == 0,
"Shared fetches cannot be done for partitioned input"
+ "- partition is non-zero (%d)", partition);
}
HostFetchResult hostFetchResult;
boolean isLocalFetch = localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort;
if (isLocalFetch) {
hostFetchResult = setupLocalDiskFetch();
} else if (multiplex) {
hostFetchResult = doSharedFetch();
} else{
hostFetchResult = doHttpFetch();
}
if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
if (!isShutDown.get()) {
LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
for (InputAttemptFetchFailure left : hostFetchResult.failedInputs) {
fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
}
} else {
if (isDebugEnabled) {
LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length +
" inputs since the fetcher has already been stopped");
}
}
}
shutdown();
// Sanity check
if (hostFetchResult.failedInputs == null && !srcAttemptsRemaining.isEmpty()) {
if (!multiplex) {
throw new IOException("server didn't return all expected map outputs: "
+ srcAttemptsRemaining.size() + " left.");
}
}
return hostFetchResult.fetchResult;
}
private final class CachingCallBack {
// this is a closure object wrapping this in an inner class
public void cache(String host,
InputAttemptIdentifier srcAttemptId, FetchedInput fetchedInput,
long compressedLength, long decompressedLength) {
try {
// this breaks badly on partitioned input - please use responsibly
Preconditions.checkArgument(partition == 0, "Partition == 0");
final String tmpSuffix = "" + System.currentTimeMillis() + ".tmp";
final String finalOutput = getMapOutputFile(srcAttemptId.getPathComponent());
final Path outputPath = localDirAllocator.getLocalPathForWrite(finalOutput, compressedLength, conf);
final TezSpillRecord spillRec = new TezSpillRecord(1);
final TezIndexRecord indexRec;
Path tmpIndex = outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING+tmpSuffix);
if (localFs.exists(tmpIndex)) {
LOG.warn("Found duplicate instance of input index file " + tmpIndex);
return;
}
Path tmpPath = null;
switch (fetchedInput.getType()) {
case DISK: {
DiskFetchedInput input = (DiskFetchedInput) fetchedInput;
indexRec = new TezIndexRecord(0, decompressedLength, compressedLength);
localFs.mkdirs(outputPath.getParent());
// avoid pit-falls of speculation
tmpPath = outputPath.suffix(tmpSuffix);
// JDK7 - TODO: use Files implementation to speed up this process
localFs.copyFromLocalFile(input.getInputPath(), tmpPath);
// rename is atomic
boolean renamed = localFs.rename(tmpPath, outputPath);
if(!renamed) {
LOG.warn("Could not rename to cached file name " + outputPath);
localFs.delete(tmpPath, false);
return;
}
}
break;
default:
LOG.warn("Incorrect use of CachingCallback for " + srcAttemptId);
return;
}
spillRec.putIndex(indexRec, 0);
spillRec.writeToFile(tmpIndex, conf, localFs);
// everything went well so far - rename it
boolean renamed = localFs.rename(tmpIndex, outputPath
.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
if (!renamed) {
localFs.delete(tmpIndex, false);
// invariant: outputPath was renamed from tmpPath
localFs.delete(outputPath, false);
LOG.warn("Could not rename the index file to "
+ outputPath
.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
return;
}
} catch (IOException ioe) {
// do mostly nothing
LOG.warn("Cache threw an error " + ioe);
}
}
}
private int findInputs() throws IOException {
int k = 0;
for (InputAttemptIdentifier src : srcAttemptsRemaining.values()) {
try {
if (getShuffleInputFileName(src.getPathComponent(),
Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING) != null) {
k++;
}
} catch (DiskErrorException de) {
// missing file, ignore
}
}
return k;
}
private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
File lockFile = localFs.pathToFile(new Path(lockPath, host + ".lock"));
final boolean created = lockFile.createNewFile();
if (created == false && !lockFile.exists()) {
// bail-out cleanly
return null;
}
// invariant - file created (winner writes to this file)
// caveat: closing lockChannel does close the file (do not double close)
// JDK7 - TODO: use AsynchronousFileChannel instead of RandomAccessFile
FileChannel lockChannel = new RandomAccessFile(lockFile, "rws")
.getChannel();
FileLock xlock = null;
xlock = lockChannel.tryLock(0, Long.MAX_VALUE, false);
if (xlock != null) {
return xlock;
}
lockChannel.close();
return null;
}
private void releaseLock(FileLock lock) throws IOException {
if (lock != null && lock.isValid()) {
FileChannel lockChannel = lock.channel();
lock.release();
lockChannel.close();
}
}
protected HostFetchResult doSharedFetch() throws IOException {
int inputs = findInputs();
if (inputs == srcAttemptsRemaining.size()) {
if (isDebugEnabled) {
LOG.debug("Using the copies found locally");
}
return doLocalDiskFetch(true);
}
if (inputs > 0) {
if (isDebugEnabled) {
LOG.debug("Found " + input
+ " local fetches right now, using them first");
}
return doLocalDiskFetch(false);
}
FileLock lock = null;
try {
lock = getLock();
if (lock == null) {
// re-queue until we get a lock
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount,
srcAttemptsRemaining.values(), "Requeuing as we didn't get a lock"), null, false);
} else {
if (findInputs() == srcAttemptsRemaining.size()) {
// double checked after lock
releaseLock(lock);
lock = null;
return doLocalDiskFetch(true);
}
// cache data if possible
return doHttpFetch(new CachingCallBack());
}
} catch (OverlappingFileLockException jvmCrossLock) {
// fall back to HTTP fetch below
LOG.warn("Double locking detected for " + host);
} catch (InterruptedException sleepInterrupted) {
Thread.currentThread().interrupt();
// fall back to HTTP fetch below
LOG.warn("Lock was interrupted for " + host);
} finally {
releaseLock(lock);
}
if (isShutDown.get()) {
// if any exception was due to shut-down don't bother firing any more
// requests
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount,
srcAttemptsRemaining.values()), null, false);
}
// no more caching
return doHttpFetch();
}
@VisibleForTesting
protected HostFetchResult doHttpFetch() {
return doHttpFetch(null);
}
private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
port, partition, partitionCount, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
httpConnectionParams.isKeepAlive());
httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
logIdentifier, jobTokenSecretMgr);
httpConnection.connect();
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
InputAttemptFetchFailure[] failedFetches = null;
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug(
"Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
e.getClass().getName() + ", Message: " + e.getMessage());
}
} else {
failedFetches = InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedFetches, true);
}
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
if (isDebugEnabled) {
LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false);
}
try {
input = httpConnection.getInputStream();
httpConnection.validate();
//validateConnectionResponse(msgToEncode, encHash);
} catch (IOException e) {
// ioErrs.increment(1);
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug(
"Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
e.getClass().getName() + ", Message: " + e.getMessage());
}
} else {
InputAttemptIdentifier firstAttempt = attempts.iterator().next();
LOG.warn(String.format(
"Fetch Failure while connecting from %s to: %s:%d, attempt: %s Informing ShuffleManager: ",
localHostname, host, port, firstAttempt), e);
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()),
new InputAttemptFetchFailure[] { new InputAttemptFetchFailure(firstAttempt) }, true);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); //reset status
return null;
}
return null;
}
@VisibleForTesting
protected HostFetchResult doHttpFetch(CachingCallBack callback) {
HostFetchResult connectionsWithRetryResult =
setupConnection(srcAttemptsRemaining.values());
if (connectionsWithRetryResult != null) {
return connectionsWithRetryResult;
}
// By this point, the connection is setup and the response has been
// validated.
// Handle any shutdown which may have been invoked.
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
if (isDebugEnabled) {
LOG.debug("Detected fetcher has been shutdown after opening stream. Returning");
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false);
}
// After this point, closing the stream and connection, should cause a
// SocketException,
// which will be ignored since shutdown has been invoked.
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptFetchFailure[] failedInputs = null;
while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) {
InputAttemptIdentifier inputAttemptIdentifier =
srcAttemptsRemaining.entrySet().iterator().next().getValue();
if (isShutDown.get()) {
shutdownInternal(true);
if (isDebugEnabled) {
LOG.debug("Fetcher already shutdown. Aborting queued fetches for " +
srcAttemptsRemaining.size() + " inputs");
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null,
false);
}
try {
failedInputs = fetchInputs(input, callback, inputAttemptIdentifier);
} catch (FetcherReadTimeoutException e) {
//clean up connection
shutdownInternal(true);
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " +
srcAttemptsRemaining.size() + " inputs");
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null,
false);
}
// Connect again.
connectionsWithRetryResult = setupConnection(srcAttemptsRemaining.values());
if (connectionsWithRetryResult != null) {
break;
}
}
}
if (isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
if (isDebugEnabled) {
LOG.debug("Fetcher already shutdown. Not reporting fetch failures for: " +
failedInputs.length + " failed inputs");
}
failedInputs = null;
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedInputs,
false);
}
@VisibleForTesting
protected HostFetchResult setupLocalDiskFetch() {
return doLocalDiskFetch(true);
}
@VisibleForTesting
private HostFetchResult doLocalDiskFetch(boolean failMissing) {
Iterator<Entry<String, InputAttemptIdentifier>> iterator = srcAttemptsRemaining.entrySet().iterator();
while (iterator.hasNext()) {
boolean hasFailures = false;
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug(
"Already shutdown. Skipping fetch for " + srcAttemptsRemaining.size() + " inputs");
}
break;
}
InputAttemptIdentifier srcAttemptId = iterator.next().getValue();
for (int curPartition = 0; curPartition < partitionCount; curPartition++) {
int reduceId = curPartition + partition;
srcAttemptId = pathToAttemptMap.get(new PathPartition(srcAttemptId.getPathComponent(), reduceId));
long startTime = System.currentTimeMillis();
FetchedInput fetchedInput = null;
try {
TezIndexRecord idxRecord;
// for missing files, this will throw an exception
idxRecord = getTezIndexRecord(srcAttemptId, reduceId);
fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
idxRecord.getPartLength(), srcAttemptId,
getShuffleInputFileName(srcAttemptId.getPathComponent(), null),
conf,
new FetchedInputCallback() {
@Override
public void fetchComplete(FetchedInput fetchedInput) {
}
@Override
public void fetchFailed(FetchedInput fetchedInput) {
}
@Override
public void freeResources(FetchedInput fetchedInput) {
}
});
if (isDebugEnabled) {
LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+ " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+ " to " + fetchedInput.getType());
}
long endTime = System.currentTimeMillis();
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
idxRecord.getRawLength(), (endTime - startTime));
} catch (IOException | InternalError e) {
hasFailures = true;
cleanupFetchedInput(fetchedInput);
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug(
"Already shutdown. Ignoring Local Fetch Failure for " +
srcAttemptId +
" from host " +
host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
}
break;
}
if (failMissing) {
LOG.warn(
"Failed to shuffle output of " + srcAttemptId + " from " +
host + "(local fetch)",
e);
}
}
}
if(!hasFailures) {
iterator.remove();
}
}
InputAttemptFetchFailure[] failedFetches = null;
if (failMissing && srcAttemptsRemaining.size() > 0) {
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug(
"Already shutdown, not reporting fetch failures for: " + srcAttemptsRemaining.size() +
" remaining inputs");
}
} else {
failedFetches =
InputAttemptFetchFailure.fromAttemptsLocalFetchFailure(srcAttemptsRemaining.values());
}
} else {
// nothing needs to be done to requeue remaining entries
}
return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()),
failedFetches, false);
}
@VisibleForTesting
protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId, int partition) throws
IOException {
TezIndexRecord idxRecord;
Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
TezSpillRecord spillRecord = new TezSpillRecord(indexFile, localFs);
idxRecord = spillRecord.getIndex(partition);
return idxRecord;
}
private final String getMapOutputFile(String pathComponent) {
String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
pathComponent + Path.SEPARATOR +
Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
if(ShuffleUtils.isTezShuffleHandler(conf)) {
return Constants.DAG_PREFIX + this.dagIdentifier + Path.SEPARATOR +
outputPath;
}
return outputPath;
}
@VisibleForTesting
protected Path getShuffleInputFileName(String pathComponent, String suffix)
throws IOException {
suffix = suffix != null ? suffix : "";
String pathFromLocalDir = getMapOutputFile(pathComponent) + suffix;
return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
}
@VisibleForTesting
public Map<PathPartition, InputAttemptIdentifier> getPathToAttemptMap() {
return pathToAttemptMap;
}
static class HostFetchResult {
private final FetchResult fetchResult;
private final InputAttemptFetchFailure[] failedInputs;
private final boolean connectFailed;
public HostFetchResult(FetchResult fetchResult, InputAttemptFetchFailure[] failedInputs,
boolean connectFailed) {
this.fetchResult = fetchResult;
this.failedInputs = failedInputs;
this.connectFailed = connectFailed;
}
}
public void shutdown() {
if (!isShutDown.getAndSet(true)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Shutting down fetcher for host: " + host);
}
shutdownInternal();
}
}
private void shutdownInternal() {
shutdownInternal(false);
}
private void shutdownInternal(boolean disconnect) {
// Synchronizing on isShutDown to ensure we don't run into a parallel close
// Can't synchronize on the main class itself since that would cause the
// shutdown request to block
synchronized (isShutDown) {
try {
if (httpConnection != null) {
httpConnection.cleanup(disconnect);
}
} catch (IOException e) {
LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : "
+ e.getMessage());
if (isDebugEnabled) {
LOG.debug(StringUtils.EMPTY, e);
}
}
}
}
private static class MapOutputStat {
final InputAttemptIdentifier srcAttemptId;
final long decompressedLength;
final long compressedLength;
final int forReduce;
MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength, int forReduce) {
this.srcAttemptId = srcAttemptId;
this.decompressedLength = decompressedLength;
this.compressedLength = compressedLength;
this.forReduce = forReduce;
}
@Override
public String toString() {
return "id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce;
}
}
@VisibleForTesting
InputAttemptFetchFailure[] fetchInputs(DataInputStream input, CachingCallBack callback,
InputAttemptIdentifier inputAttemptIdentifier)
throws FetcherReadTimeoutException {
FetchedInput fetchedInput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = 0;
long compressedLength = 0;
try {
long startTime = System.currentTimeMillis();
int partitionCount = 1;
if (this.compositeFetch) {
// Multiple partitions are fetched
partitionCount = WritableUtils.readVInt(input);
}
ArrayList<MapOutputStat> mapOutputStats = new ArrayList<>(partitionCount);
for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; mapOutputIndex++) {
MapOutputStat mapOutputStat = null;
int responsePartition = -1;
// Read the shuffle header
String pathComponent = null;
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
pathComponent = header.getMapId();
if (!pathComponent.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
if (pathComponent.startsWith(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString())) {
LOG.warn("Invalid map id: " + header.getMapId() + ", expected to start with "
+ InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
+ " while fetching " + inputAttemptIdentifier);
// this should be treated as local fetch failure while reporting later
return new InputAttemptFetchFailure[] {
InputAttemptFetchFailure.fromDiskErrorAtSource(inputAttemptIdentifier) };
} else {
throw new IllegalArgumentException(
"Invalid map id: " + header.getMapId() + ", expected to start with "
+ InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.getPartition()
+ " while fetching " + inputAttemptIdentifier);
}
}
srcAttemptId = pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition()));
if (srcAttemptId == null) {
throw new IllegalArgumentException("Source attempt not found for map id: " + header.getMapId() +
", partition: " + header.getPartition() + " while fetching " + inputAttemptIdentifier);
}
if (header.getCompressedLength() == 0) {
// Empty partitions are already accounted for
continue;
}
mapOutputStat = new MapOutputStat(srcAttemptId,
header.getUncompressedLength(),
header.getCompressedLength(),
header.getPartition());
mapOutputStats.add(mapOutputStat);
responsePartition = header.getPartition();
} catch (IllegalArgumentException e) {
// badIdErrs.increment(1);
if (!isShutDown.get()) {
LOG.warn("Invalid src id ", e);
// Don't know which one was bad, so consider all of them as bad
return InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
} else {
if (isDebugEnabled) {
LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
}
return null;
}
}
// Do some basic sanity verification
if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength,
responsePartition, mapOutputStat.srcAttemptId, pathComponent)) {
if (!isShutDown.get()) {
srcAttemptId = mapOutputStat.srcAttemptId;
if (srcAttemptId == null) {
LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
srcAttemptId = getNextRemainingAttempt();
}
assert (srcAttemptId != null);
return new InputAttemptFetchFailure[] {
InputAttemptFetchFailure.fromAttempt(srcAttemptId) };
} else {
if (isDebugEnabled) {
LOG.debug("Already shutdown. Ignoring verification failure.");
}
return null;
}
}
if (isDebugEnabled) {
LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength
+ ", decomp len: " + mapOutputStat.decompressedLength);
}
}
for (MapOutputStat mapOutputStat : mapOutputStats) {
// Get the location for the map output - either in-memory or on-disk
srcAttemptId = mapOutputStat.srcAttemptId;
decompressedLength = mapOutputStat.decompressedLength;
compressedLength = mapOutputStat.compressedLength;
// TODO TEZ-957. handle IOException here when Broadcast has better error checking
if (srcAttemptId.isShared() && callback != null) {
// force disk if input is being shared
fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength,
compressedLength, srcAttemptId);
} else {
fetchedInput = inputManager.allocate(decompressedLength,
compressedLength, srcAttemptId);
}
// No concept of WAIT at the moment.
// // Check if we can shuffle *now* ...
// if (fetchedInput.getType() == FetchedInput.WAIT) {
// LOG.info("fetcher#" + id +
// " - MergerManager returned Status.WAIT ...");
// //Not an error but wait to process data.
// return EMPTY_ATTEMPT_ID_ARRAY;
// }
// Go!
if (isDebugEnabled) {
LOG.debug("fetcher" + " about to shuffle output of srcAttempt "
+ fetchedInput.getInputAttemptIdentifier() + " decomp: "
+ decompressedLength + " len: " + compressedLength + " to "
+ fetchedInput.getType());
}
if (fetchedInput.getType() == Type.MEMORY) {
ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
input, (int) decompressedLength, (int) compressedLength, codec,
ifileReadAhead, ifileReadAheadLength, LOG,
fetchedInput.getInputAttemptIdentifier());
} else if (fetchedInput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
(host + ":" + port), input, compressedLength, decompressedLength, LOG,
fetchedInput.getInputAttemptIdentifier(),
ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
} else {
throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
fetchedInput);
}
// offer the fetched input for caching
if (srcAttemptId.isShared() && callback != null) {
// this has to be before the fetchSucceeded, because that goes across
// threads into the reader thread and can potentially shutdown this thread
// while it is still caching.
callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
}
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
compressedLength, decompressedLength, (endTime - startTime));
// Note successful shuffle
// metrics.successFetch();
}
srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
} catch (IOException | InternalError ioe) {
if (isShutDown.get()) {
cleanupFetchedInput(fetchedInput);
if (isDebugEnabled) {
LOG.debug(
"Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() +
", Message: " + ioe.getMessage());
}
return null;
}
if (shouldRetry(srcAttemptId, ioe)) {
//release mem/file handles
cleanupFetchedInput(fetchedInput);
throw new FetcherReadTimeoutException(ioe);
}
// ioErrs.increment(1);
if (srcAttemptId == null || fetchedInput == null) {
LOG.info("fetcher" + " failed to read map header" + srcAttemptId
+ " decomp: " + decompressedLength + ", " + compressedLength, ioe);
// Cleanup the fetchedInput before returning.
cleanupFetchedInput(fetchedInput);
if (srcAttemptId == null) {
return InputAttemptFetchFailure.fromAttempts(srcAttemptsRemaining.values());
} else {
return new InputAttemptFetchFailure[] {
new InputAttemptFetchFailure(srcAttemptId) };
}
}
LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + " (to "
+ localHostname + ")", ioe);
// Cleanup the fetchedInput
cleanupFetchedInput(fetchedInput);
// metrics.failedFetch();
return new InputAttemptFetchFailure[] {
new InputAttemptFetchFailure(srcAttemptId) };
}
return null;
}
private void cleanupFetchedInput(FetchedInput fetchedInput) {
if (fetchedInput != null) {
try {
fetchedInput.abort();
} catch (IOException e) {
LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
}
}
}
/**
* Check connection needs to be re-established.
*
* @param srcAttemptId
* @param ioe
* @return true to indicate connection retry. false otherwise.
* @throws IOException
*/
private boolean shouldRetry(InputAttemptIdentifier srcAttemptId, Throwable ioe) {
if (!(ioe instanceof SocketTimeoutException)) {
return false;
}
// First time to retry.
long currentTime = System.currentTimeMillis();
if (retryStartTime == 0) {
retryStartTime = currentTime;
}
if (currentTime - retryStartTime < httpConnectionParams.getReadTimeout()) {
LOG.warn("Shuffle output from " + srcAttemptId +
" failed (to "+ localHostname +"), retry it.");
//retry connecting to the host
return true;
} else {
// timeout, prepare to be failed.
LOG.warn("Timeout for copying MapOutput with retry on host " + host
+ "after " + httpConnectionParams.getReadTimeout() + "milliseconds.");
return false;
}
}
/**
* Do some basic verification on the input received -- Being defensive
*
* @param compressedLength
* @param decompressedLength
* @param fetchPartition
* @param srcAttemptId
* @param pathComponent
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
if (compressedLength < 0 || decompressedLength < 0) {
// wrongLengthErrs.increment(1);
LOG.warn(" invalid lengths in input header -> headerPathComponent: "
+ pathComponent + ", nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ " len: " + compressedLength + ", decomp len: " + decompressedLength);
return false;
}
if (fetchPartition < this.partition || fetchPartition >= this.partition + this.partitionCount) {
// wrongReduceErrs.increment(1);
LOG.warn(" data for the wrong reduce -> headerPathComponent: "
+ pathComponent + "nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ " len: " + compressedLength + " decomp len: " + decompressedLength
+ " for reduce " + fetchPartition);
return false;
}
return true;
}
private InputAttemptIdentifier getNextRemainingAttempt() {
if (srcAttemptsRemaining.size() > 0) {
return srcAttemptsRemaining.values().iterator().next();
} else {
return null;
}
}
/**
* Builder for the construction of Fetchers
*/
public static class FetcherBuilder {
private Fetcher fetcher;
private boolean workAssigned = false;
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
verifyDiskChecksum, compositeFetch);
}
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
fetcher.httpConnectionParams = httpParams;
return this;
}
public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
fetcher.codec = codec;
return this;
}
public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
fetcher.ifileReadAhead = readAhead;
fetcher.ifileReadAheadLength = readAheadBytes;
return this;
}
public FetcherBuilder assignWork(String host, int port, int partition, int partitionCount,
List<InputAttemptIdentifier> inputs) {
fetcher.host = host;
fetcher.port = port;
fetcher.partition = partition;
fetcher.partitionCount = partitionCount;
fetcher.srcAttempts = inputs;
workAssigned = true;
return this;
}
public Fetcher build() {
Preconditions.checkState(workAssigned == true,
"Cannot build a fetcher withot assigning work to it");
return fetcher;
}
}
@Override
public int hashCode() {
return fetcherIdentifier;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Fetcher other = (Fetcher) obj;
if (fetcherIdentifier != other.fetcherIdentifier)
return false;
return true;
}
}