blob: c1e9da13f742e6f07b39ebe84ff3c823f907ebdd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL
* underlying proxies simultaneously. Each proxy inside the wrapper proxy will
* retry the corresponding target. It assumes the in an HA setup, there will be
* only one Active, and the active should respond faster than any configured
* standbys. Once it receives a response from any one of the configured proxies,
* outstanding requests to other proxies are immediately cancelled.
*/
public class RequestHedgingRMFailoverProxyProvider<T>
extends ConfiguredRMFailoverProxyProvider<T> {
private static final Log LOG =
LogFactory.getLog(RequestHedgingRMFailoverProxyProvider.class);
private volatile String successfulProxy = null;
private ProxyInfo<T> wrappedProxy = null;
private Map<String, T> nonRetriableProxy = new HashMap<>();
@Override
@SuppressWarnings("unchecked")
public void init(Configuration configuration, RMProxy<T> rmProxy,
Class<T> protocol) {
super.init(configuration, rmProxy, protocol);
Map<String, ProxyInfo<T>> retriableProxies = new HashMap<>();
String originalId = HAUtil.getRMHAId(conf);
for (String rmId : rmServiceIds) {
conf.set(YarnConfiguration.RM_HA_ID, rmId);
nonRetriableProxy.put(rmId, super.getProxyInternal());
T proxy = createRetriableProxy();
ProxyInfo<T> pInfo = new ProxyInfo<T>(proxy, rmId);
retriableProxies.put(rmId, pInfo);
}
conf.set(YarnConfiguration.RM_HA_ID, originalId);
T proxyInstance = (T) Proxy.newProxyInstance(
RMRequestHedgingInvocationHandler.class.getClassLoader(),
new Class<?>[] {protocol},
new RMRequestHedgingInvocationHandler(retriableProxies));
String combinedInfo = Arrays.toString(rmServiceIds);
wrappedProxy = new ProxyInfo<T>(proxyInstance, combinedInfo);
LOG.info("Created wrapped proxy for " + combinedInfo);
}
@SuppressWarnings("unchecked")
protected T createRetriableProxy() {
try {
// Create proxy that can retry exceptions properly.
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
T proxy = rmProxy.getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
} catch (IOException ioe) {
LOG.error("Unable to create proxy to the ResourceManager "
+ HAUtil.getRMHAId(conf), ioe);
return null;
}
}
class RMRequestHedgingInvocationHandler implements InvocationHandler {
final private Map<String, ProxyInfo<T>> allProxies;
public RMRequestHedgingInvocationHandler(
Map<String, ProxyInfo<T>> allProxies) {
this.allProxies = new HashMap<>(allProxies);
}
protected Object invokeMethod(Object proxy, Method method, Object[] args)
throws Throwable {
try {
return method.invoke(proxy, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
}
}
private Throwable extraRootException(Exception ex) {
Throwable rootCause = ex;
if (ex instanceof ExecutionException) {
Throwable cause = ex.getCause();
if (cause instanceof InvocationTargetException) {
rootCause = cause.getCause();
}
}
return rootCause;
}
/**
* Creates a Executor and invokes all proxies concurrently.
*/
@Override
public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
if (successfulProxy != null) {
return invokeMethod(nonRetriableProxy.get(successfulProxy), method,
args);
}
LOG.info("Looking for the active RM in " + Arrays.toString(rmServiceIds)
+ "...");
ExecutorService executor = null;
CompletionService<Object> completionService;
try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() {
@Override
public Object call() throws Exception {
return method.invoke(pInfo.proxy, args);
}
};
proxyMap.put(completionService.submit(c), pInfo);
}
Future<Object> callResultFuture = completionService.take();
String pInfo = proxyMap.get(callResultFuture).proxyInfo;
successfulProxy = pInfo;
Object retVal;
try {
retVal = callResultFuture.get();
LOG.info("Found active RM [" + pInfo + "]");
return retVal;
} catch (Exception ex) {
// Throw exception from first responding RM so that clients can handle
// appropriately
Throwable rootCause = extraRootException(ex);
LOG.warn("Invocation returned exception: " + rootCause.toString()
+ " on " + "[" + pInfo + "], so propagating back to caller.");
throw rootCause;
}
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
}
}
@Override
public ProxyInfo<T> getProxy() {
return wrappedProxy;
}
@Override
public void performFailover(T currentProxy) {
LOG.info("Connection lost with " + successfulProxy + ", trying to fail over.");
successfulProxy = null;
}
}