blob: 5e83fff6b784ab2b30cbb73a98816768a08e5151 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;
/**
* A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL
* underlying proxies simultaneously. It assumes the in an HA setup, there will
* be only one Active, and the active should respond faster than any configured
* standbys. Once it receive a response from any one of the configured proxies,
* outstanding requests to other proxies are immediately cancelled.
*/
public class RequestHedgingProxyProvider<T> extends
ConfiguredFailoverProxyProvider<T> {
public static final Logger LOG =
LoggerFactory.getLogger(RequestHedgingProxyProvider.class);
class RequestHedgingInvocationHandler implements RpcInvocationHandler {
final Map<String, ProxyInfo<T>> targetProxies;
// Proxy of the active nn
private volatile ProxyInfo<T> currentUsedProxy = null;
public RequestHedgingInvocationHandler(
Map<String, ProxyInfo<T>> targetProxies) {
this.targetProxies = new HashMap<>(targetProxies);
}
/**
* Creates a Executor and invokes all proxies concurrently. This
* implementation assumes that Clients have configured proper socket
* timeouts, else the call can block forever.
*
* @param proxy
* @param method
* @param args
* @return
* @throws Throwable
*/
@Override
public Object
invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
// Need double check locking to guarantee thread-safe since
// currentUsedProxy is lazily initialized.
if (currentUsedProxy == null) {
synchronized (this) {
if (currentUsedProxy == null) {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
ExecutorService executor = null;
CompletionService<Object> completionService;
try {
// Optimization : if only 2 proxies are configured and one had
// failed
// over, then we dont need to create a threadpool etc.
targetProxies.remove(toIgnore);
if (targetProxies.size() == 0) {
LOG.trace("No valid proxies left");
throw new RemoteException(IOException.class.getName(),
"No valid proxies left. "
+ "All NameNode proxies have failed over.");
}
if (targetProxies.size() == 1) {
ProxyInfo<T> proxyInfo =
targetProxies.values().iterator().next();
try {
currentUsedProxy = proxyInfo;
Object retVal = method.invoke(proxyInfo.proxy, args);
LOG.debug("Invocation successful on [{}]",
currentUsedProxy.proxyInfo);
return retVal;
} catch (InvocationTargetException ex) {
Exception unwrappedException =
unwrapInvocationTargetException(ex);
logProxyException(unwrappedException,
currentUsedProxy.proxyInfo);
LOG.trace("Unsuccessful invocation on [{}]",
currentUsedProxy.proxyInfo);
throw unwrappedException;
}
}
executor = Executors.newFixedThreadPool(proxies.size());
completionService = new ExecutorCompletionService<>(executor);
// Set the callId and other informations from current thread.
final int callId = Client.getCallId();
final int retryCount = Client.getRetryCount();
final Object externalHandler = Client.getExternalHandler();
for (final Map.Entry<String, ProxyInfo<T>> pEntry : targetProxies
.entrySet()) {
Callable<Object> c = new Callable<Object>() {
@Override
public Object call() throws Exception {
// Call Id and other informations from parent thread.
Client.setCallIdAndRetryCount(callId, retryCount,
externalHandler);
LOG.trace("Invoking method {} on proxy {}", method,
pEntry.getValue().proxyInfo);
return method.invoke(pEntry.getValue().proxy, args);
}
};
proxyMap.put(completionService.submit(c), pEntry.getValue());
numAttempts++;
}
// Current thread's callId will not be cleared as RPC happens in
// separate threads. Reset the CallId information Forcefully.
Client.setCallIdAndRetryCountUnprotected(null, 0, null);
Map<String, Exception> badResults = new HashMap<>();
while (numAttempts > 0) {
Future<Object> callResultFuture = completionService.take();
Object retVal;
try {
currentUsedProxy = proxyMap.get(callResultFuture);
retVal = callResultFuture.get();
LOG.debug("Invocation successful on [{}]",
currentUsedProxy.proxyInfo);
return retVal;
} catch (ExecutionException ex) {
Exception unwrappedException = unwrapExecutionException(ex);
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
logProxyException(unwrappedException, tProxyInfo.proxyInfo);
badResults.put(tProxyInfo.proxyInfo, unwrappedException);
LOG.trace("Unsuccessful invocation on [{}]",
tProxyInfo.proxyInfo);
numAttempts--;
}
}
// At this point we should have All bad results (Exceptions)
// Or should have returned with successful result.
if (badResults.size() == 1) {
throw badResults.values().iterator().next();
} else {
throw new MultiException(badResults);
}
} finally {
if (executor != null) {
LOG.trace("Shutting down threadpool executor");
executor.shutdownNow();
}
}
}
}
}
// Because the above synchronized block will return or throw an exception,
// so we don't need to do any check to prevent the first initialized
// thread from stepping to following codes.
try {
Object retVal = method.invoke(currentUsedProxy.proxy, args);
LOG.debug("Invocation successful on [{}]", currentUsedProxy.proxyInfo);
return retVal;
} catch (InvocationTargetException ex) {
Exception unwrappedException = unwrapInvocationTargetException(ex);
logProxyException(unwrappedException, currentUsedProxy.proxyInfo);
LOG.trace("Unsuccessful invocation on [{}]",
currentUsedProxy.proxyInfo);
throw unwrappedException;
}
}
@Override
public void close() throws IOException {
}
@Override
public ConnectionId getConnectionId() {
if (currentUsedProxy == null) {
return null;
}
return RPC.getConnectionIdForProxy(currentUsedProxy.proxy);
}
}
/** A proxy wrapping {@link RequestHedgingInvocationHandler}. */
private ProxyInfo<T> currentUsedHandler = null;
private volatile String toIgnore = null;
public RequestHedgingProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> proxyFactory) {
super(conf, uri, xface, proxyFactory);
}
@SuppressWarnings("unchecked")
@Override
public synchronized ProxyInfo<T> getProxy() {
if (currentUsedHandler != null) {
return currentUsedHandler;
}
Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
StringBuilder combinedInfo = new StringBuilder("[");
for (int i = 0; i < proxies.size(); i++) {
ProxyInfo<T> pInfo = super.getProxy();
incrementProxyIndex();
targetProxyInfos.put(pInfo.proxyInfo, pInfo);
combinedInfo.append(pInfo.proxyInfo).append(',');
}
combinedInfo.append(']');
T wrappedProxy = (T) Proxy.newProxyInstance(
RequestHedgingInvocationHandler.class.getClassLoader(),
new Class<?>[]{xface},
new RequestHedgingInvocationHandler(targetProxyInfos));
currentUsedHandler =
new ProxyInfo<T>(wrappedProxy, combinedInfo.toString());
return currentUsedHandler;
}
@Override
public synchronized void performFailover(T currentProxy) {
toIgnore = ((RequestHedgingInvocationHandler) Proxy.getInvocationHandler(
currentUsedHandler.proxy)).currentUsedProxy.proxyInfo;
this.currentUsedHandler = null;
}
/**
* Check the exception returned by the proxy log a warning message if it's
* not a StandbyException (expected exception).
* @param ex Exception to evaluate.
* @param proxyInfo Information of the proxy reporting the exception.
*/
private void logProxyException(Exception ex, String proxyInfo) {
if (isStandbyException(ex)) {
LOG.debug("Invocation returned standby exception on [{}]", proxyInfo, ex);
} else {
LOG.warn("Invocation returned exception on [{}]", proxyInfo, ex);
}
}
/**
* Check if the returned exception is caused by an standby namenode.
* @param exception Exception to check.
* @return If the exception is caused by an standby namenode.
*/
private boolean isStandbyException(Exception exception) {
if (exception instanceof RemoteException) {
return ((RemoteException) exception).unwrapRemoteException()
instanceof StandbyException;
}
return false;
}
/**
* Unwraps the ExecutionException. <p>
* Example:
* <blockquote><pre>
* if ex is
* ExecutionException(InvocationTargetException(SomeException))
* returns SomeException
* </pre></blockquote>
*
* @return unwrapped exception
*/
private Exception unwrapExecutionException(ExecutionException ex) {
if (ex != null) {
Throwable cause = ex.getCause();
if (cause instanceof InvocationTargetException) {
return
unwrapInvocationTargetException((InvocationTargetException)cause);
}
}
return ex;
}
/**
* Unwraps the InvocationTargetException. <p>
* Example:
* <blockquote><pre>
* if ex is InvocationTargetException(SomeException)
* returns SomeException
* </pre></blockquote>
*
* @return unwrapped exception
*/
private Exception unwrapInvocationTargetException(
InvocationTargetException ex) {
if (ex != null) {
Throwable cause = ex.getCause();
if (cause instanceof Exception) {
return (Exception) cause;
}
}
return ex;
}
}