| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode.ha; |
| |
| import java.io.IOException; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Proxy; |
| import java.net.URI; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletionService; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.retry.MultiException; |
| import org.apache.hadoop.ipc.Client; |
| import org.apache.hadoop.ipc.Client.ConnectionId; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.ipc.RpcInvocationHandler; |
| import org.apache.hadoop.ipc.StandbyException; |
| |
| /** |
| * A FailoverProxyProvider implementation that technically does not "failover" |
| * per-se. It constructs a wrapper proxy that sends the request to ALL |
| * underlying proxies simultaneously. It assumes the in an HA setup, there will |
| * be only one Active, and the active should respond faster than any configured |
| * standbys. Once it receive a response from any one of the configured proxies, |
| * outstanding requests to other proxies are immediately cancelled. |
| */ |
| public class RequestHedgingProxyProvider<T> extends |
| ConfiguredFailoverProxyProvider<T> { |
| |
| public static final Logger LOG = |
| LoggerFactory.getLogger(RequestHedgingProxyProvider.class); |
| |
| class RequestHedgingInvocationHandler implements RpcInvocationHandler { |
| |
| final Map<String, ProxyInfo<T>> targetProxies; |
| // Proxy of the active nn |
| private volatile ProxyInfo<T> currentUsedProxy = null; |
| |
| public RequestHedgingInvocationHandler( |
| Map<String, ProxyInfo<T>> targetProxies) { |
| this.targetProxies = new HashMap<>(targetProxies); |
| } |
| |
| /** |
| * Creates a Executor and invokes all proxies concurrently. This |
| * implementation assumes that Clients have configured proper socket |
| * timeouts, else the call can block forever. |
| * |
| * @param proxy |
| * @param method |
| * @param args |
| * @return |
| * @throws Throwable |
| */ |
| @Override |
| public Object |
| invoke(Object proxy, final Method method, final Object[] args) |
| throws Throwable { |
| // Need double check locking to guarantee thread-safe since |
| // currentUsedProxy is lazily initialized. |
| if (currentUsedProxy == null) { |
| synchronized (this) { |
| if (currentUsedProxy == null) { |
| Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>(); |
| int numAttempts = 0; |
| |
| ExecutorService executor = null; |
| CompletionService<Object> completionService; |
| try { |
| // Optimization : if only 2 proxies are configured and one had |
| // failed |
| // over, then we dont need to create a threadpool etc. |
| targetProxies.remove(toIgnore); |
| if (targetProxies.size() == 0) { |
| LOG.trace("No valid proxies left"); |
| throw new RemoteException(IOException.class.getName(), |
| "No valid proxies left. " |
| + "All NameNode proxies have failed over."); |
| } |
| if (targetProxies.size() == 1) { |
| ProxyInfo<T> proxyInfo = |
| targetProxies.values().iterator().next(); |
| try { |
| currentUsedProxy = proxyInfo; |
| Object retVal = method.invoke(proxyInfo.proxy, args); |
| LOG.debug("Invocation successful on [{}]", |
| currentUsedProxy.proxyInfo); |
| return retVal; |
| } catch (InvocationTargetException ex) { |
| Exception unwrappedException = |
| unwrapInvocationTargetException(ex); |
| logProxyException(unwrappedException, |
| currentUsedProxy.proxyInfo); |
| LOG.trace("Unsuccessful invocation on [{}]", |
| currentUsedProxy.proxyInfo); |
| throw unwrappedException; |
| } |
| } |
| executor = Executors.newFixedThreadPool(proxies.size()); |
| completionService = new ExecutorCompletionService<>(executor); |
| // Set the callId and other informations from current thread. |
| final int callId = Client.getCallId(); |
| final int retryCount = Client.getRetryCount(); |
| final Object externalHandler = Client.getExternalHandler(); |
| for (final Map.Entry<String, ProxyInfo<T>> pEntry : targetProxies |
| .entrySet()) { |
| Callable<Object> c = new Callable<Object>() { |
| @Override |
| public Object call() throws Exception { |
| // Call Id and other informations from parent thread. |
| Client.setCallIdAndRetryCount(callId, retryCount, |
| externalHandler); |
| LOG.trace("Invoking method {} on proxy {}", method, |
| pEntry.getValue().proxyInfo); |
| return method.invoke(pEntry.getValue().proxy, args); |
| } |
| }; |
| proxyMap.put(completionService.submit(c), pEntry.getValue()); |
| numAttempts++; |
| } |
| // Current thread's callId will not be cleared as RPC happens in |
| // separate threads. Reset the CallId information Forcefully. |
| Client.setCallIdAndRetryCountUnprotected(null, 0, null); |
| Map<String, Exception> badResults = new HashMap<>(); |
| while (numAttempts > 0) { |
| Future<Object> callResultFuture = completionService.take(); |
| Object retVal; |
| try { |
| currentUsedProxy = proxyMap.get(callResultFuture); |
| retVal = callResultFuture.get(); |
| LOG.debug("Invocation successful on [{}]", |
| currentUsedProxy.proxyInfo); |
| return retVal; |
| } catch (ExecutionException ex) { |
| Exception unwrappedException = unwrapExecutionException(ex); |
| ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture); |
| logProxyException(unwrappedException, tProxyInfo.proxyInfo); |
| badResults.put(tProxyInfo.proxyInfo, unwrappedException); |
| LOG.trace("Unsuccessful invocation on [{}]", |
| tProxyInfo.proxyInfo); |
| numAttempts--; |
| } |
| } |
| |
| // At this point we should have All bad results (Exceptions) |
| // Or should have returned with successful result. |
| if (badResults.size() == 1) { |
| throw badResults.values().iterator().next(); |
| } else { |
| throw new MultiException(badResults); |
| } |
| } finally { |
| if (executor != null) { |
| LOG.trace("Shutting down threadpool executor"); |
| executor.shutdownNow(); |
| } |
| } |
| } |
| } |
| } |
| // Because the above synchronized block will return or throw an exception, |
| // so we don't need to do any check to prevent the first initialized |
| // thread from stepping to following codes. |
| try { |
| Object retVal = method.invoke(currentUsedProxy.proxy, args); |
| LOG.debug("Invocation successful on [{}]", currentUsedProxy.proxyInfo); |
| return retVal; |
| } catch (InvocationTargetException ex) { |
| Exception unwrappedException = unwrapInvocationTargetException(ex); |
| logProxyException(unwrappedException, currentUsedProxy.proxyInfo); |
| LOG.trace("Unsuccessful invocation on [{}]", |
| currentUsedProxy.proxyInfo); |
| throw unwrappedException; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| } |
| |
| @Override |
| public ConnectionId getConnectionId() { |
| if (currentUsedProxy == null) { |
| return null; |
| } |
| return RPC.getConnectionIdForProxy(currentUsedProxy.proxy); |
| } |
| } |
| |
| /** A proxy wrapping {@link RequestHedgingInvocationHandler}. */ |
| private ProxyInfo<T> currentUsedHandler = null; |
| private volatile String toIgnore = null; |
| |
| public RequestHedgingProxyProvider(Configuration conf, URI uri, |
| Class<T> xface, HAProxyFactory<T> proxyFactory) { |
| super(conf, uri, xface, proxyFactory); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public synchronized ProxyInfo<T> getProxy() { |
| if (currentUsedHandler != null) { |
| return currentUsedHandler; |
| } |
| Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>(); |
| StringBuilder combinedInfo = new StringBuilder("["); |
| for (int i = 0; i < proxies.size(); i++) { |
| ProxyInfo<T> pInfo = super.getProxy(); |
| incrementProxyIndex(); |
| targetProxyInfos.put(pInfo.proxyInfo, pInfo); |
| combinedInfo.append(pInfo.proxyInfo).append(','); |
| } |
| combinedInfo.append(']'); |
| T wrappedProxy = (T) Proxy.newProxyInstance( |
| RequestHedgingInvocationHandler.class.getClassLoader(), |
| new Class<?>[]{xface}, |
| new RequestHedgingInvocationHandler(targetProxyInfos)); |
| currentUsedHandler = |
| new ProxyInfo<T>(wrappedProxy, combinedInfo.toString()); |
| return currentUsedHandler; |
| } |
| |
| @Override |
| public synchronized void performFailover(T currentProxy) { |
| toIgnore = ((RequestHedgingInvocationHandler) Proxy.getInvocationHandler( |
| currentUsedHandler.proxy)).currentUsedProxy.proxyInfo; |
| this.currentUsedHandler = null; |
| } |
| |
| /** |
| * Check the exception returned by the proxy log a warning message if it's |
| * not a StandbyException (expected exception). |
| * @param ex Exception to evaluate. |
| * @param proxyInfo Information of the proxy reporting the exception. |
| */ |
| private void logProxyException(Exception ex, String proxyInfo) { |
| if (isStandbyException(ex)) { |
| LOG.debug("Invocation returned standby exception on [{}]", proxyInfo, ex); |
| } else { |
| LOG.warn("Invocation returned exception on [{}]", proxyInfo, ex); |
| } |
| } |
| |
| /** |
| * Check if the returned exception is caused by an standby namenode. |
| * @param exception Exception to check. |
| * @return If the exception is caused by an standby namenode. |
| */ |
| private boolean isStandbyException(Exception exception) { |
| if (exception instanceof RemoteException) { |
| return ((RemoteException) exception).unwrapRemoteException() |
| instanceof StandbyException; |
| } |
| return false; |
| } |
| |
| /** |
| * Unwraps the ExecutionException. <p> |
| * Example: |
| * <blockquote><pre> |
| * if ex is |
| * ExecutionException(InvocationTargetException(SomeException)) |
| * returns SomeException |
| * </pre></blockquote> |
| * |
| * @return unwrapped exception |
| */ |
| private Exception unwrapExecutionException(ExecutionException ex) { |
| if (ex != null) { |
| Throwable cause = ex.getCause(); |
| if (cause instanceof InvocationTargetException) { |
| return |
| unwrapInvocationTargetException((InvocationTargetException)cause); |
| } |
| } |
| return ex; |
| |
| } |
| |
| /** |
| * Unwraps the InvocationTargetException. <p> |
| * Example: |
| * <blockquote><pre> |
| * if ex is InvocationTargetException(SomeException) |
| * returns SomeException |
| * </pre></blockquote> |
| * |
| * @return unwrapped exception |
| */ |
| private Exception unwrapInvocationTargetException( |
| InvocationTargetException ex) { |
| if (ex != null) { |
| Throwable cause = ex.getCause(); |
| if (cause instanceof Exception) { |
| return (Exception) cause; |
| } |
| } |
| return ex; |
| } |
| } |