blob: 9cb8617ae18cd0195d4b57946502340710b79844 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.shuffle.common;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
import com.google.common.base.Preconditions;
/**
* Responsible for fetching inputs served by the ShuffleHandler for a single
* host. Construct using {@link FetcherBuilder}
*/
public class Fetcher implements Callable<FetchResult> {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
private final Configuration conf;
// Configurable fields.
private CompressionCodec codec;
private boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
private int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
private final SecretKey shuffleSecret;
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
private final String logIdentifier;
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
private final int fetcherIdentifier;
// Parameters to track work.
private List<InputAttemptIdentifier> srcAttempts;
private String host;
private int port;
private int partition;
// Maps from the pathComponents (unique per srcTaskId) to the specific taskId
private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
private LinkedHashSet<InputAttemptIdentifier> remaining;
private URL url;
private volatile DataInputStream input;
private HttpConnection httpConnection;
private HttpConnectionParams httpConnectionParams;
private final boolean localDiskFetchEnabled;
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.shuffleSecret = shuffleSecret;
this.appId = appId;
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
this.localDiskFetchEnabled = localDiskFetchEnabled;
this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
}
@Override
public FetchResult call() throws Exception {
if (srcAttempts.size() == 0) {
return new FetchResult(host, port, partition, srcAttempts);
}
for (InputAttemptIdentifier in : srcAttempts) {
pathToAttemptMap.put(in.getPathComponent(), in);
}
remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
HostFetchResult hostFetchResult;
if (localDiskFetchEnabled &&
host.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
hostFetchResult = setupLocalDiskFetch();
} else {
hostFetchResult = doHttpFetch();
}
if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
}
}
shutdown();
// Sanity check
if (hostFetchResult.failedInputs == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
return hostFetchResult.fetchResult;
}
@VisibleForTesting
protected HostFetchResult doHttpFetch() {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), srcAttempts,
httpConnectionParams.getKeepAlive());
httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, shuffleSecret);
httpConnection.connect();
} catch (IOException e) {
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
InputAttemptIdentifier[] failedFetches = null;
if (isShutDown.get()) {
LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
} else {
failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
}
return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedFetches, true);
}
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
}
try {
input = httpConnection.getInputStream();
httpConnection.validate();
//validateConnectionResponse(msgToEncode, encHash);
} catch (IOException e) {
// ioErrs.increment(1);
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
if (isShutDown.get()) {
LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
} else {
InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
+ " Informing ShuffleManager: ", e);
return new HostFetchResult(new FetchResult(host, port, partition, remaining),
new InputAttemptIdentifier[] { firstAttempt }, false);
}
}
// By this point, the connection is setup and the response has been
// validated.
// Handle any shutdown which may have been invoked.
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
}
// After this point, closing the stream and connection, should cause a
// SocketException,
// which will be ignored since shutdown has been invoked.
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedInputs = null;
while (!remaining.isEmpty() && failedInputs == null) {
failedInputs = fetchInputs(input);
}
return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
false);
}
@VisibleForTesting
protected HostFetchResult setupLocalDiskFetch() {
Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
while (iterator.hasNext()) {
InputAttemptIdentifier srcAttemptId = iterator.next();
//TODO: check for shutdown? - See TEZ-1480
long startTime = System.currentTimeMillis();
FetchedInput fetchedInput = null;
try {
TezIndexRecord idxRecord;
idxRecord = getTezIndexRecord(srcAttemptId);
fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId,
getShuffleInputFileName(srcAttemptId.getPathComponent(), null), conf,
new FetchedInputCallback() {
@Override
public void fetchComplete(FetchedInput fetchedInput) {}
@Override
public void fetchFailed(FetchedInput fetchedInput) {}
@Override
public void freeResources(FetchedInput fetchedInput) {}
});
LOG.info("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+ " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+ " to " + fetchedInput.getType());
long endTime = System.currentTimeMillis();
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
idxRecord.getRawLength(), (endTime - startTime));
iterator.remove();
} catch (IOException e) {
LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)",
e);
if (fetchedInput != null) {
try {
fetchedInput.abort();
} catch (IOException e1) {
LOG.info("Failed to cleanup fetchedInput " + fetchedInput);
}
}
}
}
InputAttemptIdentifier[] failedFetches = null;
if (remaining.size() > 0) {
failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
}
return new HostFetchResult(new FetchResult(host, port, partition, remaining),
failedFetches, false);
}
@VisibleForTesting
protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws
IOException {
TezIndexRecord idxRecord;
Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
idxRecord = spillRecord.getIndex(partition);
return idxRecord;
}
@VisibleForTesting
protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
suffix = suffix != null ? suffix : "";
String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
}
static class HostFetchResult {
private final FetchResult fetchResult;
private final InputAttemptIdentifier[] failedInputs;
private final boolean connectFailed;
public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs,
boolean connectFailed) {
this.fetchResult = fetchResult;
this.failedInputs = failedInputs;
this.connectFailed = connectFailed;
}
}
public void shutdown() {
if (!isShutDown.getAndSet(true)) {
shutdownInternal();
}
}
private void shutdownInternal() {
// Synchronizing on isShutDown to ensure we don't run into a parallel close
// Can't synchronize on the main class itself since that would cause the
// shutdown request to block
synchronized (isShutDown) {
try {
if (httpConnection != null) {
httpConnection.cleanup(false);
}
} catch (IOException e) {
LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : "
+ e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug(e);
}
}
}
}
private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
FetchedInput fetchedInput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = -1;
long compressedLength = -1;
try {
long startTime = System.currentTimeMillis();
int responsePartition = -1;
// Read the shuffle header
String pathComponent = null;
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
pathComponent = header.getMapId();
srcAttemptId = pathToAttemptMap.get(pathComponent);
compressedLength = header.getCompressedLength();
decompressedLength = header.getUncompressedLength();
responsePartition = header.getPartition();
} catch (IllegalArgumentException e) {
// badIdErrs.increment(1);
LOG.warn("Invalid src id ", e);
// Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
}
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength,
responsePartition, srcAttemptId, pathComponent)) {
if (srcAttemptId == null) {
LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
srcAttemptId = getNextRemainingAttempt();
}
assert(srcAttemptId != null);
return new InputAttemptIdentifier[] { srcAttemptId };
}
if (LOG.isDebugEnabled()) {
LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
+ ", decomp len: " + decompressedLength);
}
// Get the location for the map output - either in-memory or on-disk
// TODO TEZ-957. handle IOException here when Broadcast has better error checking
fetchedInput = inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
// TODO NEWTEZ No concept of WAIT at the moment.
// // Check if we can shuffle *now* ...
// if (fetchedInput.getType() == FetchedInput.WAIT) {
// LOG.info("fetcher#" + id +
// " - MergerManager returned Status.WAIT ...");
// //Not an error but wait to process data.
// return EMPTY_ATTEMPT_ID_ARRAY;
// }
// Go!
LOG.info("fetcher" + " about to shuffle output of srcAttempt "
+ fetchedInput.getInputAttemptIdentifier() + " decomp: "
+ decompressedLength + " len: " + compressedLength + " to "
+ fetchedInput.getType());
if (fetchedInput.getType() == Type.MEMORY) {
ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
input, (int) decompressedLength, (int) compressedLength, codec,
ifileReadAhead, ifileReadAheadLength, LOG,
fetchedInput.getInputAttemptIdentifier().toString());
} else if (fetchedInput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
(host +":" +port), input, compressedLength, LOG,
fetchedInput.getInputAttemptIdentifier().toString());
} else {
throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
fetchedInput);
}
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
compressedLength, decompressedLength, (endTime - startTime));
// Note successful shuffle
remaining.remove(srcAttemptId);
// metrics.successFetch();
return null;
} catch (IOException ioe) {
// ZZZ Add some shutdown code here
// ZZZ Make sure any assigned memory inputs are aborted
// ioErrs.increment(1);
if (srcAttemptId == null || fetchedInput == null) {
LOG.info("fetcher" + " failed to read map header" + srcAttemptId
+ " decomp: " + decompressedLength + ", " + compressedLength, ioe);
if (srcAttemptId == null) {
return remaining
.toArray(new InputAttemptIdentifier[remaining.size()]);
} else {
return new InputAttemptIdentifier[] { srcAttemptId };
}
}
LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host,
ioe);
// Inform the shuffle-scheduler
try {
fetchedInput.abort();
} catch (IOException e) {
LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
}
// metrics.failedFetch();
return new InputAttemptIdentifier[] { srcAttemptId };
}
}
/**
* Do some basic verification on the input received -- Being defensive
*
* @param compressedLength
* @param decompressedLength
* @param fetchPartition
* @param srcAttemptId
* @param pathComponent
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
if (compressedLength < 0 || decompressedLength < 0) {
// wrongLengthErrs.increment(1);
LOG.warn(" invalid lengths in input header -> headerPathComponent: "
+ pathComponent + ", nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ " len: " + compressedLength + ", decomp len: " + decompressedLength);
return false;
}
if (fetchPartition != this.partition) {
// wrongReduceErrs.increment(1);
LOG.warn(" data for the wrong reduce -> headerPathComponent: "
+ pathComponent + "nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ " len: " + compressedLength + " decomp len: " + decompressedLength
+ " for reduce " + fetchPartition);
return false;
}
// Sanity check
if (!remaining.contains(srcAttemptId)) {
// wrongMapErrs.increment(1);
LOG.warn("Invalid input. Received output for headerPathComponent: "
+ pathComponent + "nextRemainingSrcAttemptId: "
+ getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId);
return false;
}
return true;
}
private InputAttemptIdentifier getNextRemainingAttempt() {
if (remaining.size() > 0) {
return remaining.iterator().next();
} else {
return null;
}
}
/**
* Builder for the construction of Fetchers
*/
public static class FetcherBuilder {
private Fetcher fetcher;
private boolean workAssigned = false;
public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId,
SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
boolean localDiskFetchEnabled) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
}
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
fetcher.httpConnectionParams = httpParams;
return this;
}
public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
fetcher.codec = codec;
return this;
}
public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
fetcher.ifileReadAhead = readAhead;
fetcher.ifileReadAheadLength = readAheadBytes;
return this;
}
public FetcherBuilder assignWork(String host, int port, int partition,
List<InputAttemptIdentifier> inputs) {
fetcher.host = host;
fetcher.port = port;
fetcher.partition = partition;
fetcher.srcAttempts = inputs;
workAssigned = true;
return this;
}
public Fetcher build() {
Preconditions.checkState(workAssigned == true,
"Cannot build a fetcher withot assigning work to it");
return fetcher;
}
}
@Override
public int hashCode() {
return fetcherIdentifier;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Fetcher other = (Fetcher) obj;
if (fetcherIdentifier != other.fetcherIdentifier)
return false;
return true;
}
}