blob: 64824a15cd89cf197df0f07dc0c3e472262a8248 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.retry;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
/**
* A {@link RpcInvocationHandler} which supports client side retry .
*/
@InterfaceAudience.Private
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Logger LOG = LoggerFactory.getLogger(
RetryInvocationHandler.class);
static class Call {
private final Method method;
private final Object[] args;
private final boolean isRpc;
private final int callId;
private final Counters counters = new Counters();
private final RetryPolicy retryPolicy;
private final RetryInvocationHandler<?> retryInvocationHandler;
private RetryInfo retryInfo;
Call(Method method, Object[] args, boolean isRpc, int callId,
RetryInvocationHandler<?> retryInvocationHandler) {
this.method = method;
this.args = args;
this.isRpc = isRpc;
this.callId = callId;
this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
this.retryInvocationHandler = retryInvocationHandler;
}
int getCallId() {
return callId;
}
Counters getCounters() {
return counters;
}
synchronized Long getWaitTime(final long now) {
return retryInfo == null? null: retryInfo.retryTime - now;
}
/** Invoke the call once without retrying. */
synchronized CallReturn invokeOnce() {
try {
if (retryInfo != null) {
return processWaitTimeAndRetryInfo();
}
// The number of times this invocation handler has ever been failed over
// before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts.
final long failoverCount = retryInvocationHandler.getFailoverCount();
try {
return invoke();
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace(toString(), e);
}
if (Thread.currentThread().isInterrupted()) {
// If interrupted, do not retry.
throw e;
}
retryInfo = retryInvocationHandler.handleException(
method, callId, retryPolicy, counters, failoverCount, e);
return processWaitTimeAndRetryInfo();
}
} catch(Throwable t) {
return new CallReturn(t);
}
}
/**
* It first processes the wait time, if there is any,
* and then invokes {@link #processRetryInfo()}.
*
* If the wait time is positive, it either sleeps for synchronous calls
* or immediately returns for asynchronous calls.
*
* @return {@link CallReturn#RETRY} if the retryInfo is processed;
* otherwise, return {@link CallReturn#WAIT_RETRY}.
*/
CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
final Long waitTime = getWaitTime(Time.monotonicNow());
LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
callId, retryInfo, waitTime);
if (waitTime != null && waitTime > 0) {
try {
Thread.sleep(retryInfo.delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted while waiting to retry", e);
}
InterruptedIOException intIOE = new InterruptedIOException(
"Retry interrupted");
intIOE.initCause(e);
throw intIOE;
}
}
processRetryInfo();
return CallReturn.RETRY;
}
synchronized void processRetryInfo() {
counters.retries++;
if (retryInfo.isFailover()) {
retryInvocationHandler.proxyDescriptor.failover(
retryInfo.expectedFailoverCount, method, callId);
counters.failovers++;
}
retryInfo = null;
}
CallReturn invoke() throws Throwable {
return new CallReturn(invokeMethod());
}
Object invokeMethod() throws Throwable {
if (isRpc) {
Client.setCallIdAndRetryCount(callId, counters.retries,
retryInvocationHandler.asyncCallHandler);
}
return retryInvocationHandler.invokeMethod(method, args);
}
@Override
public String toString() {
return getClass().getSimpleName() + "#" + callId + ": "
+ method.getDeclaringClass().getSimpleName() + "." + method.getName()
+ "(" + (args == null || args.length == 0? "": Arrays.toString(args))
+ ")";
}
}
static class Counters {
/** Counter for retries. */
private int retries;
/** Counter for method invocation has been failed over. */
private int failovers;
boolean isZeros() {
return retries == 0 && failovers == 0;
}
}
private static class ProxyDescriptor<T> {
private final FailoverProxyProvider<T> fpp;
/** Count the associated proxy provider has ever been failed over. */
private long failoverCount = 0;
private ProxyInfo<T> proxyInfo;
ProxyDescriptor(FailoverProxyProvider<T> fpp) {
this.fpp = fpp;
this.proxyInfo = fpp.getProxy();
}
synchronized ProxyInfo<T> getProxyInfo() {
return proxyInfo;
}
synchronized T getProxy() {
return proxyInfo.proxy;
}
synchronized long getFailoverCount() {
return failoverCount;
}
synchronized void failover(long expectedFailoverCount, Method method,
int callId) {
// Make sure that concurrent failed invocations only cause a single
// actual failover.
if (failoverCount == expectedFailoverCount) {
fpp.performFailover(proxyInfo.proxy);
failoverCount++;
} else {
LOG.warn("A failover has occurred since the start of call #" + callId
+ " " + proxyInfo.getString(method.getName()));
}
proxyInfo = fpp.getProxy();
}
boolean idempotentOrAtMostOnce(Method method) throws NoSuchMethodException {
final Method m = fpp.getInterface()
.getMethod(method.getName(), method.getParameterTypes());
return m.isAnnotationPresent(Idempotent.class)
|| m.isAnnotationPresent(AtMostOnce.class);
}
void close() throws IOException {
fpp.close();
}
}
private static class RetryInfo {
private final long retryTime;
private final long delay;
private final RetryAction action;
private final long expectedFailoverCount;
private final Exception failException;
RetryInfo(long delay, RetryAction action, long expectedFailoverCount,
Exception failException) {
this.delay = delay;
this.retryTime = Time.monotonicNow() + delay;
this.action = action;
this.expectedFailoverCount = expectedFailoverCount;
this.failException = failException;
}
boolean isFailover() {
return action != null
&& action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY;
}
boolean isFail() {
return action != null
&& action.action == RetryAction.RetryDecision.FAIL;
}
Exception getFailException() {
return failException;
}
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
Counters counters, boolean idempotentOrAtMostOnce,
long expectedFailoverCount) throws Exception {
RetryAction max = null;
long maxRetryDelay = 0;
Exception ex = null;
final Iterable<Exception> exceptions = e instanceof MultiException ?
((MultiException) e).getExceptions().values()
: Collections.singletonList(e);
for (Exception exception : exceptions) {
final RetryAction a = policy.shouldRetry(exception,
counters.retries, counters.failovers, idempotentOrAtMostOnce);
if (a.action != RetryAction.RetryDecision.FAIL) {
// must be a retry or failover
if (a.delayMillis > maxRetryDelay) {
maxRetryDelay = a.delayMillis;
}
}
if (max == null || max.action.compareTo(a.action) < 0) {
max = a;
if (a.action == RetryAction.RetryDecision.FAIL) {
ex = exception;
}
}
}
return new RetryInfo(maxRetryDelay, max, expectedFailoverCount, ex);
}
@Override
public String toString() {
return "RetryInfo{" +
"retryTime=" + retryTime +
", delay=" + delay +
", action=" + action +
", expectedFailoverCount=" + expectedFailoverCount +
", failException=" + failException +
'}';
}
}
private final ProxyDescriptor<T> proxyDescriptor;
private volatile boolean hasSuccessfulCall = false;
private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap;
private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy retryPolicy) {
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
}
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy defaultPolicy,
Map<String, RetryPolicy> methodNameToPolicyMap) {
this.proxyDescriptor = new ProxyDescriptor<>(proxyProvider);
this.defaultPolicy = defaultPolicy;
this.methodNameToPolicyMap = methodNameToPolicyMap;
}
private RetryPolicy getRetryPolicy(Method method) {
final RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
return policy != null? policy: defaultPolicy;
}
private long getFailoverCount() {
return proxyDescriptor.getFailoverCount();
}
private Call newCall(Method method, Object[] args, boolean isRpc,
int callId) {
if (Client.isAsynchronousMode()) {
return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
} else {
return new Call(method, args, isRpc, callId, this);
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
final Call call = newCall(method, args, isRpc, callId);
while (true) {
final CallReturn c = call.invokeOnce();
final CallReturn.State state = c.getState();
if (state == CallReturn.State.ASYNC_INVOKED) {
return null; // return null for async calls
} else if (c.getState() != CallReturn.State.RETRY) {
return c.getReturnValue();
}
}
}
private RetryInfo handleException(final Method method, final int callId,
final RetryPolicy policy, final Counters counters,
final long expectFailoverCount, final Exception e) throws Exception {
final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
counters, proxyDescriptor.idempotentOrAtMostOnce(method),
expectFailoverCount);
if (retryInfo.isFail()) {
// fail.
if (retryInfo.action.reason != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking call #" + callId + " "
+ proxyDescriptor.getProxyInfo().getString(method.getName())
+ ". Not retrying because " + retryInfo.action.reason, e);
}
}
throw retryInfo.getFailException();
}
log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
return retryInfo;
}
private void log(final Method method, final boolean isFailover,
final int failovers, final long delay, final Exception ex) {
// log info if this has made some successful calls or
// this is not the first failover
final boolean info = hasSuccessfulCall || failovers != 0
|| asyncCallHandler.hasSuccessfulCall();
if (!info && !LOG.isDebugEnabled()) {
return;
}
final StringBuilder b = new StringBuilder()
.append(ex + ", while invoking ")
.append(proxyDescriptor.getProxyInfo().getString(method.getName()));
if (failovers > 0) {
b.append(" after ").append(failovers).append(" failover attempts");
}
b.append(isFailover? ". Trying to failover ": ". Retrying ");
b.append(delay > 0? "after sleeping for " + delay + "ms.": "immediately.");
if (info) {
LOG.info(b.toString());
} else {
LOG.debug(b.toString(), ex);
}
}
protected Object invokeMethod(Method method, Object[] args) throws Throwable {
try {
if (!method.isAccessible()) {
method.setAccessible(true);
}
final Object r = method.invoke(proxyDescriptor.getProxy(), args);
hasSuccessfulCall = true;
return r;
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
@VisibleForTesting
static boolean isRpcInvocation(Object proxy) {
if (proxy instanceof ProtocolTranslator) {
proxy = ((ProtocolTranslator) proxy).getUnderlyingProxyObject();
}
if (!Proxy.isProxyClass(proxy.getClass())) {
return false;
}
final InvocationHandler ih = Proxy.getInvocationHandler(proxy);
return ih instanceof RpcInvocationHandler;
}
@Override
public void close() throws IOException {
proxyDescriptor.close();
}
@Override //RpcInvocationHandler
public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(proxyDescriptor.getProxy());
}
@VisibleForTesting
public FailoverProxyProvider<T> getProxyProvider() {
return proxyDescriptor.fpp;
}
}