blob: 83be2db5d0b73e2b15fc509726075c57c907e1fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.sasl.SaslTimeoutException;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
/**
* Wraps another BlockFetcher or BlockPusher with the ability to automatically retry block
* transfers which fail due to IOExceptions, which we hope are due to transient network conditions.
*
* This transferor provides stronger guarantees regarding the parent BlockTransferListener. In
* particular, the listener will be invoked exactly once per blockId, with a success or failure.
*/
public class RetryingBlockTransferor {
/**
* Used to initiate the first transfer for all blocks, and subsequently for retrying the
* transfer on any remaining blocks.
*/
public interface BlockTransferStarter {
/**
* Creates a new BlockFetcher or BlockPusher to transfer the given block ids which may do
* some synchronous bootstrapping followed by fully asynchronous block transferring.
* The BlockFetcher or BlockPusher must eventually invoke the Listener on every input blockId,
* or else this method must throw an exception.
*
* This method should always attempt to get a new TransportClient from the
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
* issues.
*/
void createAndStart(String[] blockIds, BlockTransferListener listener)
throws IOException, InterruptedException;
}
/** Shared executor service used for waiting and retrying. */
private static final ExecutorService executorService = Executors.newCachedThreadPool(
NettyUtils.createThreadFactory("Block Transfer Retry"));
private static final SparkLogger logger =
SparkLoggerFactory.getLogger(RetryingBlockTransferor.class);
/** Used to initiate new Block transfer on our remaining blocks. */
private final BlockTransferStarter transferStarter;
/** Parent listener which we delegate all successful or permanently failed block transfers to. */
private final BlockTransferListener listener;
/** Max number of times we are allowed to retry. */
private final int maxRetries;
/** Milliseconds to wait before each retry. */
private final int retryWaitTime;
// NOTE:
// All of our non-final fields are synchronized under 'this' and should only be accessed/mutated
// while inside a synchronized block.
/** Number of times we've attempted to retry so far. */
private int retryCount = 0;
// Number of times SASL timeout has been retried without success.
// If we see maxRetries consecutive failures, the request is failed.
// On the other hand, if sasl succeeds and we are able to send other requests subsequently,
// we reduce the SASL failures from retryCount (since SASL failures were part of
// connection bootstrap - which ended up being successful).
// spark.network.auth.rpcTimeout is much lower than spark.network.timeout and others -
// and so sasl is more susceptible to failures when remote service
// (like external shuffle service) is under load: but once it succeeds, we do not want to
// include it as part of request retries.
private int saslRetryCount = 0;
/**
* Set of all block ids which have not been transferred successfully or with a non-IO Exception.
* A retry involves requesting every outstanding block. Note that since this is a LinkedHashSet,
* input ordering is preserved, so we always request blocks in the same order the user provided.
*/
private final LinkedHashSet<String> outstandingBlocksIds;
/**
* The BlockTransferListener that is active with our current BlockFetcher.
* When we start a retry, we immediately replace this with a new Listener, which causes all any
* old Listeners to ignore all further responses.
*/
private RetryingBlockTransferListener currentListener;
/** Whether sasl retries are enabled. */
private final boolean enableSaslRetries;
private final ErrorHandler errorHandler;
public RetryingBlockTransferor(
TransportConf conf,
BlockTransferStarter transferStarter,
String[] blockIds,
BlockTransferListener listener,
ErrorHandler errorHandler) {
this.transferStarter = transferStarter;
this.listener = listener;
this.maxRetries = conf.maxIORetries();
this.retryWaitTime = conf.ioRetryWaitTimeMs();
this.outstandingBlocksIds = Sets.newLinkedHashSet();
Collections.addAll(outstandingBlocksIds, blockIds);
this.currentListener = new RetryingBlockTransferListener();
this.errorHandler = errorHandler;
this.enableSaslRetries = conf.enableSaslRetries();
this.saslRetryCount = 0;
}
public RetryingBlockTransferor(
TransportConf conf,
BlockTransferStarter transferStarter,
String[] blockIds,
BlockFetchingListener listener) {
this(conf, transferStarter, blockIds, listener, ErrorHandler.NOOP_ERROR_HANDLER);
}
@VisibleForTesting
synchronized void setCurrentListener(RetryingBlockTransferListener listener) {
this.currentListener = listener;
}
/**
* Initiates the transfer of all blocks provided in the constructor, with possible retries
* in the event of transient IOExceptions.
*/
public void start() {
transferAllOutstanding();
}
/**
* Fires off a request to transfer all blocks that have not been transferred successfully or
* permanently failed (i.e., by a non-IOException).
*/
private void transferAllOutstanding() {
// Start by retrieving our shared state within a synchronized block.
String[] blockIdsToTransfer;
int numRetries;
RetryingBlockTransferListener myListener;
synchronized (this) {
blockIdsToTransfer = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
numRetries = retryCount;
myListener = currentListener;
}
// Now initiate the transfer on all outstanding blocks, possibly initiating a retry if that
// fails.
try {
transferStarter.createAndStart(blockIdsToTransfer, myListener);
} catch (Exception e) {
if (numRetries > 0) {
logger.error("Exception while beginning {} of {} outstanding blocks (after {} retries)", e,
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length),
MDC.of(LogKeys.NUM_RETRY$.MODULE$, numRetries));
} else {
logger.error("Exception while beginning {} of {} outstanding blocks", e,
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length));
}
if (shouldRetry(e) && initiateRetry(e)) {
// successfully initiated a retry
return;
}
// retry is not possible, so fail remaining blocks
for (String bid : blockIdsToTransfer) {
listener.onBlockTransferFailure(bid, e);
}
}
}
/**
* Lightweight method which initiates a retry in a different thread. The retry will involve
* calling transferAllOutstanding() after a configured wait time.
* Returns true if the retry was successfully initiated, false otherwise.
*/
@VisibleForTesting
synchronized boolean initiateRetry(Throwable e) {
if (enableSaslRetries && e instanceof SaslTimeoutException) {
saslRetryCount += 1;
}
retryCount += 1;
currentListener = new RetryingBlockTransferListener();
logger.info("Retrying {} ({}/{}) for {} outstanding blocks after {} ms",
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_RETRY$.MODULE$, retryCount),
MDC.of(LogKeys.MAX_ATTEMPTS$.MODULE$, maxRetries),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, outstandingBlocksIds.size()),
MDC.of(LogKeys.RETRY_WAIT_TIME$.MODULE$, retryWaitTime));
try {
executorService.execute(() -> {
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
transferAllOutstanding();
});
} catch (Throwable t) {
logger.error("Exception while trying to initiate retry", t);
return false;
}
return true;
}
/**
* Returns true if we should retry due a block transfer failure. We will retry if and only if
* the exception was an IOException or SaslTimeoutException and we haven't retried
* 'maxRetries' times already.
*/
private synchronized boolean shouldRetry(Throwable e) {
boolean isIOException = e instanceof IOException
|| e.getCause() instanceof IOException;
boolean isSaslTimeout = enableSaslRetries && e instanceof SaslTimeoutException;
// If this is a non SASL request failure, reduce earlier SASL failures from retryCount
// since some subsequent SASL attempt was successful
if (!isSaslTimeout && saslRetryCount > 0) {
Preconditions.checkState(retryCount >= saslRetryCount,
"retryCount must be greater than or equal to saslRetryCount");
retryCount -= saslRetryCount;
saslRetryCount = 0;
}
boolean hasRemainingRetries = retryCount < maxRetries;
boolean shouldRetry = (isSaslTimeout || isIOException) &&
hasRemainingRetries && errorHandler.shouldRetryError(e);
return shouldRetry;
}
@VisibleForTesting
public int getRetryCount() {
return retryCount;
}
/**
* Our RetryListener intercepts block transfer responses and forwards them to our parent
* listener. Note that in the event of a retry, we will immediately replace the 'currentListener'
* field, indicating that any responses from non-current Listeners should be ignored.
*/
@VisibleForTesting
class RetryingBlockTransferListener implements
BlockFetchingListener, BlockPushingListener {
private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) {
// We will only forward this success message to our parent listener if this block request is
// outstanding and we are still the active listener.
boolean shouldForwardSuccess = false;
synchronized (RetryingBlockTransferor.this) {
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
outstandingBlocksIds.remove(blockId);
shouldForwardSuccess = true;
// If there were SASL failures earlier, remove them from retryCount, as there was
// a SASL success (and some other request post bootstrap was also successful).
if (saslRetryCount > 0) {
Preconditions.checkState(retryCount >= saslRetryCount,
"retryCount must be greater than or equal to saslRetryCount");
retryCount -= saslRetryCount;
saslRetryCount = 0;
}
}
}
// Now actually invoke the parent listener, outside of the synchronized block.
if (shouldForwardSuccess) {
listener.onBlockTransferSuccess(blockId, data);
}
}
private void handleBlockTransferFailure(String blockId, Throwable exception) {
// We will only forward this failure to our parent listener if this block request is
// outstanding, we are still the active listener, AND we cannot retry the transfer.
boolean shouldForwardFailure = false;
synchronized (RetryingBlockTransferor.this) {
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
if (shouldRetry(exception)) {
if (!initiateRetry(exception)) {
// failed to initiate a retry, so fail this block
outstandingBlocksIds.remove(blockId);
shouldForwardFailure = true;
}
} else {
if (errorHandler.shouldLogError(exception)) {
logger.error("Failed to {} block {}, and will not retry ({} retries)", exception,
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.BLOCK_ID$.MODULE$, blockId),
MDC.of(LogKeys.NUM_RETRY$.MODULE$,retryCount));
} else {
logger.debug(
String.format("Failed to %s block %s, and will not retry (%s retries)",
listener.getTransferType(), blockId, retryCount), exception);
}
outstandingBlocksIds.remove(blockId);
shouldForwardFailure = true;
}
}
}
// Now actually invoke the parent listener, outside of the synchronized block.
if (shouldForwardFailure) {
listener.onBlockTransferFailure(blockId, exception);
}
}
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
handleBlockTransferSuccess(blockId, data);
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
handleBlockTransferFailure(blockId, exception);
}
@Override
public void onBlockPushSuccess(String blockId, ManagedBuffer data) {
handleBlockTransferSuccess(blockId, data);
}
@Override
public void onBlockPushFailure(String blockId, Throwable exception) {
handleBlockTransferFailure(blockId, exception);
}
// RetryingBlockTransferListener's onBlockTransferSuccess and onBlockTransferFailure
// shouldn't be invoked. We only invoke these 2 methods on the parent listener.
@Override
public void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
throw new RuntimeException(
"Invocation on RetryingBlockTransferListener.onBlockTransferSuccess is unexpected.");
}
@Override
public void onBlockTransferFailure(String blockId, Throwable exception) {
throw new RuntimeException(
"Invocation on RetryingBlockTransferListener.onBlockTransferFailure is unexpected.");
}
@Override
public String getTransferType() {
throw new RuntimeException(
"Invocation on RetryingBlockTransferListener.getTransferType is unexpected.");
}
}
}