blob: 10282037f6b274cad30c4523bb74c6079ab10014 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.shuffle.common.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.shuffle.common.FetchResult;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.Fetcher;
import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.InputHost;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
// This only knows how to deal with a single srcIndex for a given targetIndex.
// In case the src task generates multiple outputs for the same target Index
// (multiple src-indices), modifications will be required.
public class ShuffleManager implements FetcherCallback {
private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
private final TezInputContext inputContext;
private final Configuration conf;
private final int numInputs;
private final FetchedInputAllocator inputManager;
private final ListeningExecutorService fetcherExecutor;
private final ExecutorService schedulerRawExecutor;
private final ListeningExecutorService schedulerExecutor;
private final RunShuffleCallable schedulerCallable = new RunShuffleCallable();
private final BlockingQueue<FetchedInput> completedInputs;
private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
private final Set<InputIdentifier> completedInputSet;
private final ConcurrentMap<String, InputHost> knownSrcHosts;
private final BlockingQueue<InputHost> pendingHosts;
private final Set<InputAttemptIdentifier> obsoletedInputs;
private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
private final long startTime;
private long lastProgressTime;
// Required to be held when manipulating pendingHosts
private final ReentrantLock lock = new ReentrantLock();
private final Condition wakeLoop = lock.newCondition();
private final int numFetchers;
private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
// Parameters required by Fetchers
private final SecretKey shuffleSecret;
private final int connectionTimeout;
private final int readTimeout;
private final CompressionCodec codec;
private final int ifileBufferSize;
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final TezCounter shuffledInputsCounter;
private final TezCounter failedShufflesCounter;
private final TezCounter bytesShuffledCounter;
private final TezCounter decompressedDataSizeCounter;
private final TezCounter bytesShuffledToDiskCounter;
private final TezCounter bytesShuffledToMemCounter;
private volatile Throwable shuffleError;
// TODO More counters - FetchErrors, speed?
public ShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs,
int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
this.numInputs = numInputs;
this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
this.ifileBufferSize = bufferSize;
this.ifileReadAhead = ifileReadAheadEnabled;
this.ifileReadAheadLength = ifileReadAheadLength;
this.codec = codec;
this.inputManager = inputAllocator;
completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
pendingHosts = new LinkedBlockingQueue<InputHost>();
obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
int maxConfiguredFetchers =
conf.getInt(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
numFetchers,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(
"Fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #%d")
.build());
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
this.schedulerRawExecutor = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(
"ShuffleRunner [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]")
.build());
this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
this.shuffleSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
this.connectionTimeout = conf.getInt(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
this.readTimeout = conf.getInt(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
+ (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+ numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+ ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength);
}
public void run() throws IOException {
Preconditions.checkState(inputManager != null, "InputManager must be configured");
ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
// Shutdown this executor once this task, and the callback complete.
schedulerExecutor.shutdown();
}
private class RunShuffleCallable implements Callable<Void> {
@Override
public Void call() throws Exception {
while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
lock.lock();
try {
if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
if (numCompletedInputs.get() < numInputs) {
wakeLoop.await();
}
}
} finally {
lock.unlock();
}
if (shuffleError != null) {
// InputContext has already been informed of a fatal error. Relying on
// tez to kill the task.
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("NumCompletedInputs: " + numCompletedInputs);
}
if (numCompletedInputs.get() < numInputs) {
lock.lock();
try {
int maxFetchersToRun = numFetchers - numRunningFetchers.get();
int count = 0;
while (pendingHosts.peek() != null) {
InputHost inputHost = null;
try {
inputHost = pendingHosts.take();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
break;
} else {
throw e;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Processing pending host: " + inputHost.toDetailedString());
}
if (inputHost.getNumPendingInputs() > 0) {
LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
Fetcher fetcher = constructFetcherForHost(inputHost);
numRunningFetchers.incrementAndGet();
if (isShutdown.get()) {
LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
}
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
Futures.addCallback(future, fetchFutureCallback);
if (++count >= maxFetchersToRun) {
break;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping host: " + inputHost.getIdentifier()
+ " since it has no inputs to process");
}
}
}
} finally {
lock.unlock();
}
}
}
LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
// TODO NEWTEZ Maybe clean up inputs.
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}
}
private Fetcher constructFetcherForHost(InputHost inputHost) {
FetcherBuilder fetcherBuilder = new FetcherBuilder(
ShuffleManager.this, inputManager,
inputContext.getApplicationId(), shuffleSecret, conf);
fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);
}
fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
// Remove obsolete inputs from the list being given to the fetcher. Also
// remove from the obsolete list.
List<InputAttemptIdentifier> pendingInputsForHost = inputHost
.clearAndGetPendingInputs();
for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
.iterator(); inputIter.hasNext();) {
InputAttemptIdentifier input = inputIter.next();
// Avoid adding attempts which have already completed.
if (completedInputSet.contains(input.getInputIdentifier())) {
inputIter.remove();
continue;
}
// Avoid adding attempts which have been marked as OBSOLETE
if (obsoletedInputs.contains(input)) {
inputIter.remove();
obsoletedInputs.remove(input);
}
}
// TODO NEWTEZ Maybe limit the number of inputs being given to a single
// fetcher, especially in the case where #hosts < #fetchers
fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
LOG.info("Created Fetcher for host: " + inputHost.getHost()
+ ", with inputs: " + pendingInputsForHost);
return fetcherBuilder.build();
}
/////////////////// Methods for InputEventHandler
public void addKnownInput(String hostName, int port,
InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
String identifier = InputHost.createIdentifier(hostName, port);
InputHost host = knownSrcHosts.get(identifier);
if (host == null) {
host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex);
assert identifier.equals(host.getIdentifier());
InputHost old = knownSrcHosts.putIfAbsent(identifier, host);
if (old != null) {
host = old;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
}
host.addKnownInput(srcAttemptIdentifier);
lock.lock();
try {
boolean added = pendingHosts.offer(host);
if (!added) {
String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue";
LOG.error(errorMessage);
throw new TezUncheckedException(errorMessage);
}
wakeLoop.signal();
} finally {
lock.unlock();
}
}
public void addCompletedInputWithNoData(
InputAttemptIdentifier srcAttemptIdentifier) {
InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
if (!completedInputSet.contains(inputIdentifier)) {
synchronized (completedInputSet) {
if (!completedInputSet.contains(inputIdentifier)) {
registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
}
}
}
// Awake the loop to check for termination.
lock.lock();
try {
wakeLoop.signal();
} finally {
lock.unlock();
}
}
public void addCompletedInputWithData(
InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
throws IOException {
InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
+ fetchedInput.getType());
// Count irrespective of whether this is a copy of an already fetched input
lock.lock();
try {
lastProgressTime = System.currentTimeMillis();
} finally {
lock.unlock();
}
boolean committed = false;
if (!completedInputSet.contains(inputIdentifier)) {
synchronized (completedInputSet) {
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
registerCompletedInput(fetchedInput);
}
}
}
if (!committed) {
fetchedInput.abort(); // If this fails, the fetcher may attempt another
// abort.
} else {
lock.lock();
try {
// Signal the wakeLoop to check for termination.
wakeLoop.signal();
} finally {
lock.unlock();
}
}
}
public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
obsoletedInputs.add(srcAttemptIdentifier);
// TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
}
/////////////////// End of Methods for InputEventHandler
/////////////////// Methods from FetcherCallbackHandler
@Override
public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
throws IOException {
InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
// Count irrespective of whether this is a copy of an already fetched input
lock.lock();
try {
lastProgressTime = System.currentTimeMillis();
} finally {
lock.unlock();
}
boolean committed = false;
if (!completedInputSet.contains(inputIdentifier)) {
synchronized (completedInputSet) {
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
// Processing counters for completed and commit fetches only. Need
// additional counters for excessive fetches - which primarily comes
// in after speculation or retries.
shuffledInputsCounter.increment(1);
bytesShuffledCounter.increment(fetchedBytes);
if (fetchedInput.getType() == Type.MEMORY) {
bytesShuffledToMemCounter.increment(fetchedBytes);
} else {
bytesShuffledToDiskCounter.increment(fetchedBytes);
}
decompressedDataSizeCounter.increment(decompressedLength);
registerCompletedInput(fetchedInput);
}
}
}
if (!committed) {
fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
} else {
lock.lock();
try {
// Signal the wakeLoop to check for termination.
wakeLoop.signal();
} finally {
lock.unlock();
}
}
// TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
}
@Override
public void fetchFailed(String host,
InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
// For now, reporting immediately.
LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+ "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+ connectFailed);
failedShufflesCounter.increment(1);
if (srcAttemptIdentifier == null) {
String message = "Received fetchFailure for an unknown src (null)";
LOG.fatal(message);
inputContext.fatalError(null, message);
} else {
InputReadErrorEvent readError = new InputReadErrorEvent(
"Fetch failure while fetching from "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
inputContext.getSourceVertexName(),
srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
srcAttemptIdentifier.getAttemptNumber()),
srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
srcAttemptIdentifier.getAttemptNumber());
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
failedEvents.add(readError);
inputContext.sendEvents(failedEvents);
}
}
/////////////////// End of Methods from FetcherCallbackHandler
public void shutdown() throws InterruptedException {
isShutdown.set(true);
if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
}
if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
}
}
private void registerCompletedInput(FetchedInput fetchedInput) {
lock.lock();
try {
completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
completedInputs.add(fetchedInput);
if (!inputReadyNotificationSent.getAndSet(true)) {
// TODO Should eventually be controlled by Inputs which are processing the data.
inputContext.inputIsReady();
}
int numComplete = numCompletedInputs.incrementAndGet();
if (numComplete == numInputs) {
LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
}
} finally {
lock.unlock();
}
}
/////////////////// Methods for walking the available inputs
/**
* @return true if there is another input ready for consumption.
*/
public boolean newInputAvailable() {
FetchedInput head = completedInputs.peek();
if (head == null || head instanceof NullFetchedInput) {
return false;
} else {
return true;
}
}
/**
* @return true if all of the required inputs have been fetched.
*/
public boolean allInputsFetched() {
lock.lock();
try {
return numCompletedInputs.get() == numInputs;
} finally {
lock.unlock();
}
}
/**
* @return the next available input, or null if there are no available inputs.
* This method will block if there are currently no available inputs,
* but more may become available.
*/
public FetchedInput getNextInput() throws InterruptedException {
FetchedInput input = null;
do {
// Check for no additional inputs
lock.lock();
try {
input = completedInputs.peek();
if (input == null && allInputsFetched()) {
break;
}
} finally {
lock.unlock();
}
input = completedInputs.take(); // block
} while (input instanceof NullFetchedInput);
return input;
}
/////////////////// End of methods for walking the available inputs
/**
* Fake input that is added to the completed input list in case an input does not have any data.
*
*/
private class NullFetchedInput extends FetchedInput {
public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
}
@Override
public OutputStream getOutputStream() throws IOException {
throw new UnsupportedOperationException("Not supported for NullFetchedInput");
}
@Override
public InputStream getInputStream() throws IOException {
throw new UnsupportedOperationException("Not supported for NullFetchedInput");
}
@Override
public void commit() throws IOException {
throw new UnsupportedOperationException("Not supported for NullFetchedInput");
}
@Override
public void abort() throws IOException {
throw new UnsupportedOperationException("Not supported for NullFetchedInput");
}
@Override
public void free() {
throw new UnsupportedOperationException("Not supported for NullFetchedInput");
}
}
private class SchedulerFutureCallback implements FutureCallback<Void> {
@Override
public void onSuccess(Void result) {
LOG.info("Scheduler thread completed");
}
@Override
public void onFailure(Throwable t) {
if (isShutdown.get()) {
LOG.info("Already shutdown. Ignoring error: " + t);
} else {
LOG.error("Scheduler failed with error: ", t);
inputContext.fatalError(t, "Shuffle Scheduler Failed");
}
}
}
private class FetchFutureCallback implements FutureCallback<FetchResult> {
private void doBookKeepingForFetcherComplete() {
numRunningFetchers.decrementAndGet();
lock.lock();
try {
wakeLoop.signal();
} finally {
lock.unlock();
}
}
@Override
public void onSuccess(FetchResult result) {
Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
assert inputHost != null;
for (InputAttemptIdentifier input : pendingInputs) {
inputHost.addKnownInput(input);
}
pendingHosts.add(inputHost);
}
doBookKeepingForFetcherComplete();
}
@Override
public void onFailure(Throwable t) {
if (isShutdown.get()) {
LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
} else {
LOG.error("Fetcher failed with error: ", t);
shuffleError = t;
inputContext.fatalError(t, "Fetch failed");
doBookKeepingForFetcherComplete();
}
}
}
}