| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.federation.router; |
| |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT; |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY; |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS; |
| import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; |
| |
| import java.io.EOFException; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.net.ConnectException; |
| import java.net.InetSocketAddress; |
| import java.net.SocketException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.TreeMap; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.LongAdder; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.SnapshotException; |
| import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; |
| import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; |
| import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; |
| import org.apache.hadoop.io.retry.RetryPolicies; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; |
| import org.apache.hadoop.ipc.CallerContext; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.ipc.RetriableException; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.ipc.Server.Call; |
| import org.apache.hadoop.ipc.StandbyException; |
| import org.apache.hadoop.net.ConnectTimeoutException; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.StringUtils; |
| import org.eclipse.jetty.util.ajax.JSON; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * A client proxy for Router to NN communication using the NN ClientProtocol. |
| * <p> |
| * Provides routers to invoke remote ClientProtocol methods and handle |
| * retries/failover. |
| * <ul> |
| * <li>invokeSingle Make a single request to a single namespace |
| * <li>invokeSequential Make a sequential series of requests to multiple |
| * ordered namespaces until a condition is met. |
| * <li>invokeConcurrent Make concurrent requests to multiple namespaces and |
| * return all of the results. |
| * </ul> |
| * Also maintains a cached pool of connections to NNs. Connections are managed |
| * by the ConnectionManager and are unique to each user + NN. The size of the |
| * connection pool can be configured. Larger pools allow for more simultaneous |
| * requests to a single NN from a single user. |
| */ |
| public class RouterRpcClient { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(RouterRpcClient.class); |
| |
| |
| /** Router using this RPC client. */ |
| private final Router router; |
| |
| /** Interface to identify the active NN for a nameservice or blockpool ID. */ |
| private final ActiveNamenodeResolver namenodeResolver; |
| |
| /** Connection pool to the Namenodes per user for performance. */ |
| private final ConnectionManager connectionManager; |
| /** Service to run asynchronous calls. */ |
| private final ThreadPoolExecutor executorService; |
| /** Retry policy for router -> NN communication. */ |
| private final RetryPolicy retryPolicy; |
| /** Optional perf monitor. */ |
| private final RouterRpcMonitor rpcMonitor; |
| /** Field separator of CallerContext. */ |
| private final String contextFieldSeparator; |
| |
| /** Pattern to parse a stack trace line. */ |
| private static final Pattern STACK_TRACE_PATTERN = |
| Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); |
| |
| /** Fairness manager to control handlers assigned per NS. */ |
| private volatile RouterRpcFairnessPolicyController routerRpcFairnessPolicyController; |
| private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>(); |
| private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>(); |
| |
| private final boolean enableProxyUser; |
| |
| /** |
| * Create a router RPC client to manage remote procedure calls to NNs. |
| * |
| * @param conf Hdfs Configuration. |
| * @param router A router using this RPC client. |
| * @param resolver A NN resolver to determine the currently active NN in HA. |
| * @param monitor Optional performance monitor. |
| */ |
| public RouterRpcClient(Configuration conf, Router router, |
| ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { |
| this.router = router; |
| |
| this.namenodeResolver = resolver; |
| |
| Configuration clientConf = getClientConfiguration(conf); |
| this.contextFieldSeparator = |
| clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY, |
| HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); |
| this.connectionManager = new ConnectionManager(clientConf); |
| this.connectionManager.start(); |
| this.routerRpcFairnessPolicyController = |
| FederationUtil.newFairnessPolicyController(conf); |
| |
| int numThreads = conf.getInt( |
| RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, |
| RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT); |
| ThreadFactory threadFactory = new ThreadFactoryBuilder() |
| .setNameFormat("RPC Router Client-%d") |
| .build(); |
| BlockingQueue<Runnable> workQueue; |
| if (conf.getBoolean( |
| RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, |
| RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) { |
| workQueue = new ArrayBlockingQueue<>(numThreads); |
| } else { |
| workQueue = new LinkedBlockingQueue<>(); |
| } |
| this.executorService = new ThreadPoolExecutor(numThreads, numThreads, |
| 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); |
| |
| this.rpcMonitor = monitor; |
| |
| int maxFailoverAttempts = conf.getInt( |
| HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, |
| HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); |
| int maxRetryAttempts = conf.getInt( |
| RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS, |
| RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT); |
| int failoverSleepBaseMillis = conf.getInt( |
| HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, |
| HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); |
| int failoverSleepMaxMillis = conf.getInt( |
| HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, |
| HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); |
| this.retryPolicy = RetryPolicies.failoverOnNetworkException( |
| RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts, |
| failoverSleepBaseMillis, failoverSleepMaxMillis); |
| String[] ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS); |
| this.enableProxyUser = ipProxyUsers != null && ipProxyUsers.length > 0; |
| } |
| |
| /** |
| * Get the configuration for the RPC client. It takes the Router |
| * configuration and transforms it into regular RPC Client configuration. |
| * @param conf Input configuration. |
| * @return Configuration for the RPC client. |
| */ |
| private Configuration getClientConfiguration(final Configuration conf) { |
| Configuration clientConf = new Configuration(conf); |
| int maxRetries = conf.getInt( |
| RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT, |
| RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT); |
| if (maxRetries >= 0) { |
| clientConf.setInt( |
| IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetries); |
| } |
| long connectTimeOut = conf.getTimeDuration( |
| RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT, |
| RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| if (connectTimeOut >= 0) { |
| clientConf.setLong(IPC_CLIENT_CONNECT_TIMEOUT_KEY, connectTimeOut); |
| } |
| return clientConf; |
| } |
| |
| /** |
| * Get the active namenode resolver used by this client. |
| * @return Active namenode resolver. |
| */ |
| public ActiveNamenodeResolver getNamenodeResolver() { |
| return this.namenodeResolver; |
| } |
| |
| /** |
| * Shutdown the client. |
| */ |
| public void shutdown() { |
| if (this.connectionManager != null) { |
| this.connectionManager.close(); |
| } |
| if (this.executorService != null) { |
| this.executorService.shutdownNow(); |
| } |
| if (this.routerRpcFairnessPolicyController != null) { |
| this.routerRpcFairnessPolicyController.shutdown(); |
| } |
| } |
| |
| /** |
| * Total number of available sockets between the router and NNs. |
| * |
| * @return Number of namenode clients. |
| */ |
| public int getNumConnections() { |
| return this.connectionManager.getNumConnections(); |
| } |
| |
| /** |
| * Total number of available sockets between the router and NNs. |
| * |
| * @return Number of namenode clients. |
| */ |
| public int getNumActiveConnections() { |
| return this.connectionManager.getNumActiveConnections(); |
| } |
| |
| /** |
| * Total number of idle sockets between the router and NNs. |
| * |
| * @return Number of namenode clients. |
| */ |
| public int getNumIdleConnections() { |
| return this.connectionManager.getNumIdleConnections(); |
| } |
| |
| /** |
| * Total number of active sockets between the router and NNs. |
| * |
| * @return Number of recently active namenode clients. |
| */ |
| public int getNumActiveConnectionsRecently() { |
| return this.connectionManager.getNumActiveConnectionsRecently(); |
| } |
| |
| /** |
| * Total number of open connection pools to a NN. Each connection pool. |
| * represents one user + one NN. |
| * |
| * @return Number of connection pools. |
| */ |
| public int getNumConnectionPools() { |
| return this.connectionManager.getNumConnectionPools(); |
| } |
| |
| /** |
| * Number of connections between the router and NNs being created sockets. |
| * |
| * @return Number of connections waiting to be created. |
| */ |
| public int getNumCreatingConnections() { |
| return this.connectionManager.getNumCreatingConnections(); |
| } |
| |
| /** |
| * JSON representation of the connection pool. |
| * |
| * @return String representation of the JSON. |
| */ |
| public String getJSON() { |
| return this.connectionManager.getJSON(); |
| } |
| |
| /** |
| * JSON representation of the async caller thread pool. |
| * |
| * @return String representation of the JSON. |
| */ |
| public String getAsyncCallerPoolJson() { |
| final Map<String, Integer> info = new LinkedHashMap<>(); |
| info.put("active", executorService.getActiveCount()); |
| info.put("total", executorService.getPoolSize()); |
| info.put("max", executorService.getMaximumPoolSize()); |
| return JSON.toString(info); |
| } |
| |
| /** |
| * JSON representation of the rejected permits for each nameservice. |
| * |
| * @return String representation of the rejected permits for each nameservice. |
| */ |
| public String getRejectedPermitsPerNsJSON() { |
| return JSON.toString(rejectedPermitsPerNs); |
| } |
| |
| /** |
| * JSON representation of the accepted permits for each nameservice. |
| * |
| * @return String representation of the accepted permits for each nameservice. |
| */ |
| public String getAcceptedPermitsPerNsJSON() { |
| return JSON.toString(acceptedPermitsPerNs); |
| } |
| /** |
| * Get ClientProtocol proxy client for a NameNode. Each combination of user + |
| * NN must use a unique proxy client. Previously created clients are cached |
| * and stored in a connection pool by the ConnectionManager. |
| * |
| * @param ugi User group information. |
| * @param nsId Nameservice identifier. |
| * @param rpcAddress RPC server address of the NN. |
| * @param proto Protocol of the connection. |
| * @return ConnectionContext containing a ClientProtocol proxy client for the |
| * NN + current user. |
| * @throws IOException If we cannot get a connection to the NameNode. |
| */ |
| private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, |
| String rpcAddress, Class<?> proto) throws IOException { |
| ConnectionContext connection = null; |
| try { |
| // Each proxy holds the UGI info for the current user when it is created. |
| // This cache does not scale very well, one entry per user per namenode, |
| // and may need to be adjusted and/or selectively pruned. The cache is |
| // important due to the excessive overhead of creating a new proxy wrapper |
| // for each individual request. |
| |
| // TODO Add tokens from the federated UGI |
| UserGroupInformation connUGI = ugi; |
| if (UserGroupInformation.isSecurityEnabled() || this.enableProxyUser) { |
| UserGroupInformation routerUser = UserGroupInformation.getLoginUser(); |
| connUGI = UserGroupInformation.createProxyUser( |
| ugi.getUserName(), routerUser); |
| } |
| connection = this.connectionManager.getConnection( |
| connUGI, rpcAddress, proto); |
| LOG.debug("User {} NN {} is using connection {}", |
| ugi.getUserName(), rpcAddress, connection); |
| } catch (Exception ex) { |
| LOG.error("Cannot open NN client to address: {}", rpcAddress, ex); |
| } |
| |
| if (connection == null) { |
| throw new ConnectionNullException("Cannot get a connection to " |
| + rpcAddress); |
| } |
| return connection; |
| } |
| |
| /** |
| * Convert an exception to an IOException. |
| * |
| * For a non-IOException, wrap it with IOException. For a RemoteException, |
| * unwrap it. For an IOException which is not a RemoteException, return it. |
| * |
| * @param e Exception to convert into an exception. |
| * @return Created IO exception. |
| */ |
| private static IOException toIOException(Exception e) { |
| if (e instanceof RemoteException) { |
| return ((RemoteException) e).unwrapRemoteException(); |
| } |
| if (e instanceof IOException) { |
| return (IOException)e; |
| } |
| return new IOException(e); |
| } |
| |
| /** |
| * If we should retry the RPC call. |
| * |
| * @param ioe IOException reported. |
| * @param retryCount Number of retries. |
| * @param nsId Nameservice ID. |
| * @return Retry decision. |
| * @throws NoNamenodesAvailableException Exception that the retry policy |
| * generates for no available namenodes. |
| */ |
| private RetryDecision shouldRetry(final IOException ioe, final int retryCount, |
| final String nsId) throws IOException { |
| // check for the case of cluster unavailable state |
| if (isClusterUnAvailable(nsId)) { |
| // we allow to retry once if cluster is unavailable |
| if (retryCount == 0) { |
| return RetryDecision.RETRY; |
| } else { |
| throw new NoNamenodesAvailableException(nsId, ioe); |
| } |
| } |
| |
| try { |
| final RetryPolicy.RetryAction a = |
| this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); |
| return a.action; |
| } catch (Exception ex) { |
| LOG.error("Re-throwing API exception, no more retries", ex); |
| throw toIOException(ex); |
| } |
| } |
| |
| /** |
| * Invokes a method against the ClientProtocol proxy server. If a standby |
| * exception is generated by the call to the client, retries using the |
| * alternate server. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param ugi User group information. |
| * @param namenodes A prioritized list of namenodes within the same |
| * nameservice. |
| * @param method Remote ClientProtocol method to invoke. |
| * @param params Variable list of parameters matching the method. |
| * @return The result of invoking the method. |
| * @throws ConnectException If it cannot connect to any Namenode. |
| * @throws StandbyException If all Namenodes are in Standby. |
| * @throws IOException If it cannot invoke the method. |
| */ |
| @VisibleForTesting |
| public Object invokeMethod( |
| final UserGroupInformation ugi, |
| final List<? extends FederationNamenodeContext> namenodes, |
| final Class<?> protocol, final Method method, final Object... params) |
| throws ConnectException, StandbyException, IOException { |
| |
| if (namenodes == null || namenodes.isEmpty()) { |
| throw new IOException("No namenodes to invoke " + method.getName() + |
| " with params " + Arrays.deepToString(params) + " from " |
| + router.getRouterId()); |
| } |
| |
| addClientInfoToCallerContext(); |
| |
| Object ret = null; |
| if (rpcMonitor != null) { |
| rpcMonitor.proxyOp(); |
| } |
| boolean failover = false; |
| Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>(); |
| for (FederationNamenodeContext namenode : namenodes) { |
| ConnectionContext connection = null; |
| String nsId = namenode.getNameserviceId(); |
| String rpcAddress = namenode.getRpcAddress(); |
| try { |
| connection = this.getConnection(ugi, nsId, rpcAddress, protocol); |
| ProxyAndInfo<?> client = connection.getClient(); |
| final Object proxy = client.getProxy(); |
| |
| ret = invoke(nsId, 0, method, proxy, params); |
| if (failover) { |
| // Success on alternate server, update |
| InetSocketAddress address = client.getAddress(); |
| namenodeResolver.updateActiveNamenode(nsId, address); |
| } |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpComplete(true, nsId); |
| } |
| if (this.router.getRouterClientMetrics() != null) { |
| this.router.getRouterClientMetrics().incInvokedMethod(method); |
| } |
| return ret; |
| } catch (IOException ioe) { |
| ioes.put(namenode, ioe); |
| if (ioe instanceof StandbyException) { |
| // Fail over indicated by retry policy and/or NN |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpFailureStandby(nsId); |
| } |
| failover = true; |
| } else if (isUnavailableException(ioe)) { |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpFailureCommunicate(nsId); |
| } |
| failover = true; |
| } else if (ioe instanceof RemoteException) { |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpComplete(true, nsId); |
| } |
| RemoteException re = (RemoteException) ioe; |
| ioe = re.unwrapRemoteException(); |
| ioe = getCleanException(ioe); |
| // RemoteException returned by NN |
| throw ioe; |
| } else if (ioe instanceof ConnectionNullException) { |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpFailureCommunicate(nsId); |
| } |
| LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress, |
| ioe.getMessage()); |
| // Throw StandbyException so that client can retry |
| StandbyException se = new StandbyException(ioe.getMessage()); |
| se.initCause(ioe); |
| throw se; |
| } else if (ioe instanceof NoNamenodesAvailableException) { |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpNoNamenodes(nsId); |
| } |
| LOG.error("Cannot get available namenode for {} {} error: {}", |
| nsId, rpcAddress, ioe.getMessage()); |
| // Throw RetriableException so that client can retry |
| throw new RetriableException(ioe); |
| } else { |
| // Other communication error, this is a failure |
| // Communication retries are handled by the retry policy |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpFailureCommunicate(nsId); |
| this.rpcMonitor.proxyOpComplete(false, nsId); |
| } |
| throw ioe; |
| } |
| } finally { |
| if (connection != null) { |
| connection.release(); |
| } |
| } |
| } |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpComplete(false, null); |
| } |
| |
| // All namenodes were unavailable or in standby |
| String msg = "No namenode available to invoke " + method.getName() + " " + |
| Arrays.deepToString(params) + " in " + namenodes + " from " + |
| router.getRouterId(); |
| LOG.error(msg); |
| int exConnect = 0; |
| for (Entry<FederationNamenodeContext, IOException> entry : |
| ioes.entrySet()) { |
| FederationNamenodeContext namenode = entry.getKey(); |
| String nnKey = namenode.getNamenodeKey(); |
| String addr = namenode.getRpcAddress(); |
| IOException ioe = entry.getValue(); |
| if (ioe instanceof StandbyException) { |
| LOG.error("{} at {} is in Standby: {}", |
| nnKey, addr, ioe.getMessage()); |
| } else if (isUnavailableException(ioe)) { |
| exConnect++; |
| LOG.error("{} at {} cannot be reached: {}", |
| nnKey, addr, ioe.getMessage()); |
| } else { |
| LOG.error("{} at {} error: \"{}\"", nnKey, addr, ioe.getMessage()); |
| } |
| } |
| if (exConnect == ioes.size()) { |
| throw new ConnectException(msg); |
| } else { |
| throw new StandbyException(msg); |
| } |
| } |
| |
| /** |
| * For tracking some information about the actual client. |
| * It adds trace info "clientIp:ip", "clientPort:port", |
| * "clientId:id" and "clientCallId:callId" |
| * in the caller context, removing the old values if they were |
| * already present. |
| */ |
| private void addClientInfoToCallerContext() { |
| CallerContext ctx = CallerContext.getCurrent(); |
| String origContext = ctx == null ? null : ctx.getContext(); |
| byte[] origSignature = ctx == null ? null : ctx.getSignature(); |
| CallerContext.Builder builder = |
| new CallerContext.Builder("", contextFieldSeparator) |
| .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress()) |
| .append(CallerContext.CLIENT_PORT_STR, |
| Integer.toString(Server.getRemotePort())) |
| .append(CallerContext.CLIENT_ID_STR, |
| StringUtils.byteToHexString(Server.getClientId())) |
| .append(CallerContext.CLIENT_CALL_ID_STR, |
| Integer.toString(Server.getCallId())) |
| .setSignature(origSignature); |
| // Append the original caller context |
| if (origContext != null) { |
| for (String part : origContext.split(contextFieldSeparator)) { |
| String[] keyValue = |
| part.split(CallerContext.Builder.KEY_VALUE_SEPARATOR, 2); |
| if (keyValue.length == 2) { |
| builder.appendIfAbsent(keyValue[0], keyValue[1]); |
| } else if (keyValue.length == 1) { |
| builder.append(keyValue[0]); |
| } |
| } |
| } |
| CallerContext.setCurrent(builder.build()); |
| } |
| |
| /** |
| * Invokes a method on the designated object. Catches exceptions specific to |
| * the invocation. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param nsId Identifier for the namespace |
| * @param retryCount Current retry times |
| * @param method Method to invoke |
| * @param obj Target object for the method |
| * @param params Variable parameters |
| * @return Response from the remote server |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| private Object invoke(String nsId, int retryCount, final Method method, |
| final Object obj, final Object... params) throws IOException { |
| try { |
| return method.invoke(obj, params); |
| } catch (IllegalAccessException e) { |
| LOG.error("Unexpected exception while proxying API", e); |
| return null; |
| } catch (IllegalArgumentException e) { |
| LOG.error("Unexpected exception while proxying API", e); |
| return null; |
| } catch (InvocationTargetException e) { |
| Throwable cause = e.getCause(); |
| if (cause instanceof IOException) { |
| IOException ioe = (IOException) cause; |
| |
| // Check if we should retry. |
| RetryDecision decision = shouldRetry(ioe, retryCount, nsId); |
| if (decision == RetryDecision.RETRY) { |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.proxyOpRetries(); |
| } |
| |
| // retry |
| return invoke(nsId, ++retryCount, method, obj, params); |
| } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { |
| // failover, invoker looks for standby exceptions for failover. |
| if (ioe instanceof StandbyException) { |
| throw ioe; |
| } else if (isUnavailableException(ioe)) { |
| throw ioe; |
| } else { |
| throw new StandbyException(ioe.getMessage()); |
| } |
| } else { |
| throw ioe; |
| } |
| } else { |
| throw new IOException(e); |
| } |
| } |
| } |
| |
| /** |
| * Check if the exception comes from an unavailable subcluster. |
| * @param ioe IOException to check. |
| * @return If the exception comes from an unavailable subcluster. |
| */ |
| public static boolean isUnavailableException(IOException ioe) { |
| if (ioe instanceof ConnectTimeoutException || |
| ioe instanceof EOFException || |
| ioe instanceof SocketException || |
| ioe instanceof StandbyException) { |
| return true; |
| } |
| if (ioe instanceof RetriableException) { |
| Throwable cause = ioe.getCause(); |
| if (cause instanceof NoNamenodesAvailableException) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Check if the cluster of given nameservice id is available. |
| * @param nsId nameservice ID. |
| * @return |
| * @throws IOException |
| */ |
| private boolean isClusterUnAvailable(String nsId) throws IOException { |
| List<? extends FederationNamenodeContext> nnState = this.namenodeResolver |
| .getNamenodesForNameserviceId(nsId); |
| |
| if (nnState != null) { |
| for (FederationNamenodeContext nnContext : nnState) { |
| // Once we find one NN is in active state, we assume this |
| // cluster is available. |
| if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) { |
| return false; |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Get a clean copy of the exception. Sometimes the exceptions returned by the |
| * server contain the full stack trace in the message. |
| * |
| * @param ioe Exception to clean up. |
| * @return Copy of the original exception with a clean message. |
| */ |
| private static IOException getCleanException(IOException ioe) { |
| IOException ret = null; |
| |
| String msg = ioe.getMessage(); |
| Throwable cause = ioe.getCause(); |
| StackTraceElement[] stackTrace = ioe.getStackTrace(); |
| |
| // Clean the message by removing the stack trace |
| int index = msg.indexOf("\n"); |
| if (index > 0) { |
| String[] msgSplit = msg.split("\n"); |
| msg = msgSplit[0]; |
| |
| // Parse stack trace from the message |
| List<StackTraceElement> elements = new LinkedList<>(); |
| for (int i=1; i<msgSplit.length; i++) { |
| String line = msgSplit[i]; |
| Matcher matcher = STACK_TRACE_PATTERN.matcher(line); |
| if (matcher.find()) { |
| String declaringClass = matcher.group(1); |
| String methodName = matcher.group(2); |
| String fileName = matcher.group(3); |
| int lineNumber = Integer.parseInt(matcher.group(4)); |
| StackTraceElement element = new StackTraceElement( |
| declaringClass, methodName, fileName, lineNumber); |
| elements.add(element); |
| } |
| } |
| stackTrace = elements.toArray(new StackTraceElement[elements.size()]); |
| } |
| |
| // Create the new output exception |
| if (ioe instanceof RemoteException) { |
| RemoteException re = (RemoteException)ioe; |
| ret = new RemoteException(re.getClassName(), msg); |
| } else { |
| // Try the simple constructor and initialize the fields |
| Class<? extends IOException> ioeClass = ioe.getClass(); |
| try { |
| Constructor<? extends IOException> constructor = |
| ioeClass.getDeclaredConstructor(String.class); |
| ret = constructor.newInstance(msg); |
| } catch (ReflectiveOperationException e) { |
| // If there are errors, just use the input one |
| LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e); |
| ret = ioe; |
| } |
| } |
| if (ret != null) { |
| ret.initCause(cause); |
| ret.setStackTrace(stackTrace); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Invokes a ClientProtocol method. Determines the target nameservice via a |
| * provided block. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param block Block used to determine appropriate nameservice. |
| * @param method The remote method and parameters to invoke. |
| * @return The result of invoking the method. |
| * @throws IOException If the invoke generated an error. |
| */ |
| public Object invokeSingle(final ExtendedBlock block, RemoteMethod method) |
| throws IOException { |
| String bpId = block.getBlockPoolId(); |
| return invokeSingleBlockPool(bpId, method); |
| } |
| |
| /** |
| * Invokes a ClientProtocol method. Determines the target nameservice using |
| * the block pool id. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param bpId Block pool identifier. |
| * @param method The remote method and parameters to invoke. |
| * @return The result of invoking the method. |
| * @throws IOException If the invoke generated an error. |
| */ |
| public Object invokeSingleBlockPool(final String bpId, RemoteMethod method) |
| throws IOException { |
| String nsId = getNameserviceForBlockPoolId(bpId); |
| return invokeSingle(nsId, method); |
| } |
| |
| /** |
| * Invokes a ClientProtocol method against the specified namespace. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param nsId Target namespace for the method. |
| * @param method The remote method and parameters to invoke. |
| * @return The result of invoking the method. |
| * @throws IOException If the invoke generated an error. |
| */ |
| public Object invokeSingle(final String nsId, RemoteMethod method) |
| throws IOException { |
| UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); |
| RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); |
| acquirePermit(nsId, ugi, method, controller); |
| try { |
| List<? extends FederationNamenodeContext> nns = |
| getNamenodesForNameservice(nsId); |
| RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); |
| Class<?> proto = method.getProtocol(); |
| Method m = method.getMethod(); |
| Object[] params = method.getParams(loc); |
| return invokeMethod(ugi, nns, proto, m, params); |
| } finally { |
| releasePermit(nsId, ugi, method, controller); |
| } |
| } |
| |
| /** |
| * Invokes a remote method against the specified namespace. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote method return. |
| * @param nsId Target namespace for the method. |
| * @param method The remote method and parameters to invoke. |
| * @param clazz Class for the return type. |
| * @return The result of invoking the method. |
| * @throws IOException If the invoke generated an error. |
| */ |
| public <T> T invokeSingle(final String nsId, RemoteMethod method, |
| Class<T> clazz) throws IOException { |
| @SuppressWarnings("unchecked") |
| T ret = (T)invokeSingle(nsId, method); |
| return ret; |
| } |
| |
| /** |
| * Invokes a remote method against the specified extendedBlock. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote method return. |
| * @param extendedBlock Target extendedBlock for the method. |
| * @param method The remote method and parameters to invoke. |
| * @param clazz Class for the return type. |
| * @return The result of invoking the method. |
| * @throws IOException If the invoke generated an error. |
| */ |
| public <T> T invokeSingle(final ExtendedBlock extendedBlock, |
| RemoteMethod method, Class<T> clazz) throws IOException { |
| String nsId = getNameserviceForBlockPoolId(extendedBlock.getBlockPoolId()); |
| @SuppressWarnings("unchecked") |
| T ret = (T)invokeSingle(nsId, method); |
| return ret; |
| } |
| |
| /** |
| * Invokes a single proxy call for a single location. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param location RemoteLocation to invoke. |
| * @param remoteMethod The remote method and parameters to invoke. |
| * @return The result of invoking the method if successful. |
| * @throws IOException If the invoke generated an error. |
| */ |
| public <T> T invokeSingle(final RemoteLocationContext location, |
| RemoteMethod remoteMethod, Class<T> clazz) throws IOException { |
| List<RemoteLocationContext> locations = Collections.singletonList(location); |
| @SuppressWarnings("unchecked") |
| T ret = (T)invokeSequential(locations, remoteMethod); |
| return ret; |
| } |
| |
| /** |
| * Invokes sequential proxy calls to different locations. Continues to invoke |
| * calls until a call returns without throwing a remote exception. |
| * |
| * @param locations List of locations/nameservices to call concurrently. |
| * @param remoteMethod The remote method and parameters to invoke. |
| * @return The result of the first successful call, or if no calls are |
| * successful, the result of the last RPC call executed. |
| * @throws IOException if the success condition is not met and one of the RPC |
| * calls generated a remote exception. |
| */ |
| public Object invokeSequential( |
| final List<? extends RemoteLocationContext> locations, |
| final RemoteMethod remoteMethod) throws IOException { |
| return invokeSequential(locations, remoteMethod, null, null); |
| } |
| |
| /** |
| * Invokes sequential proxy calls to different locations. Continues to invoke |
| * calls until the success condition is met, or until all locations have been |
| * attempted. |
| * |
| * The success condition may be specified by: |
| * <ul> |
| * <li>An expected result class |
| * <li>An expected result value |
| * </ul> |
| * |
| * If no expected result class/values are specified, the success condition is |
| * a call that does not throw a remote exception. |
| * |
| * @param <T> The type of the remote method return. |
| * @param locations List of locations/nameservices to call concurrently. |
| * @param remoteMethod The remote method and parameters to invoke. |
| * @param expectedResultClass In order to be considered a positive result, the |
| * return type must be of this class. |
| * @param expectedResultValue In order to be considered a positive result, the |
| * return value must equal the value of this object. |
| * @return The result of the first successful call, or if no calls are |
| * successful, the result of the first RPC call executed. |
| * @throws IOException if the success condition is not met, return the first |
| * remote exception generated. |
| */ |
| public <T> T invokeSequential( |
| final List<? extends RemoteLocationContext> locations, |
| final RemoteMethod remoteMethod, Class<T> expectedResultClass, |
| Object expectedResultValue) throws IOException { |
| return (T) invokeSequential(remoteMethod, locations, expectedResultClass, |
| expectedResultValue).getResult(); |
| } |
| |
| /** |
| * Invokes sequential proxy calls to different locations. Continues to invoke |
| * calls until the success condition is met, or until all locations have been |
| * attempted. |
| * |
| * The success condition may be specified by: |
| * <ul> |
| * <li>An expected result class |
| * <li>An expected result value |
| * </ul> |
| * |
| * If no expected result class/values are specified, the success condition is |
| * a call that does not throw a remote exception. |
| * |
| * This returns RemoteResult, which contains the invoked location as well |
| * as the result. |
| * |
| * @param <R> The type of the remote location. |
| * @param <T> The type of the remote method return. |
| * @param remoteMethod The remote method and parameters to invoke. |
| * @param locations List of locations/nameservices to call concurrently. |
| * @param expectedResultClass In order to be considered a positive result, the |
| * return type must be of this class. |
| * @param expectedResultValue In order to be considered a positive result, the |
| * return value must equal the value of this object. |
| * @return The result of the first successful call, or if no calls are |
| * successful, the result of the first RPC call executed, along with |
| * the invoked location in form of RemoteResult. |
| * @throws IOException if the success condition is not met, return the first |
| * remote exception generated. |
| */ |
| public <R extends RemoteLocationContext, T> RemoteResult invokeSequential( |
| final RemoteMethod remoteMethod, final List<R> locations, |
| Class<T> expectedResultClass, Object expectedResultValue) |
| throws IOException { |
| |
| RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); |
| final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); |
| final Method m = remoteMethod.getMethod(); |
| List<IOException> thrownExceptions = new ArrayList<>(); |
| Object firstResult = null; |
| // Invoke in priority order |
| for (final RemoteLocationContext loc : locations) { |
| String ns = loc.getNameserviceId(); |
| acquirePermit(ns, ugi, remoteMethod, controller); |
| List<? extends FederationNamenodeContext> namenodes = |
| getNamenodesForNameservice(ns); |
| try { |
| Class<?> proto = remoteMethod.getProtocol(); |
| Object[] params = remoteMethod.getParams(loc); |
| Object result = invokeMethod(ugi, namenodes, proto, m, params); |
| // Check if the result is what we expected |
| if (isExpectedClass(expectedResultClass, result) && |
| isExpectedValue(expectedResultValue, result)) { |
| // Valid result, stop here |
| @SuppressWarnings("unchecked") R location = (R) loc; |
| @SuppressWarnings("unchecked") T ret = (T) result; |
| return new RemoteResult<>(location, ret); |
| } |
| if (firstResult == null) { |
| firstResult = result; |
| } |
| } catch (IOException ioe) { |
| // Localize the exception |
| |
| ioe = processException(ioe, loc); |
| |
| // Record it and move on |
| thrownExceptions.add(ioe); |
| } catch (Exception e) { |
| // Unusual error, ClientProtocol calls always use IOException (or |
| // RemoteException). Re-wrap in IOException for compatibility with |
| // ClientProtocol. |
| LOG.error("Unexpected exception {} proxying {} to {}", |
| e.getClass(), m.getName(), ns, e); |
| IOException ioe = new IOException( |
| "Unexpected exception proxying API " + e.getMessage(), e); |
| thrownExceptions.add(ioe); |
| } finally { |
| releasePermit(ns, ugi, remoteMethod, controller); |
| } |
| } |
| |
| if (!thrownExceptions.isEmpty()) { |
| // An unavailable subcluster may be the actual cause |
| // We cannot surface other exceptions (e.g., FileNotFoundException) |
| for (int i = 0; i < thrownExceptions.size(); i++) { |
| IOException ioe = thrownExceptions.get(i); |
| if (isUnavailableException(ioe)) { |
| throw ioe; |
| } |
| } |
| |
| // re-throw the first exception thrown for compatibility |
| throw thrownExceptions.get(0); |
| } |
| // Return the first result, whether it is the value or not |
| @SuppressWarnings("unchecked") T ret = (T) firstResult; |
| return new RemoteResult<>(locations.get(0), ret); |
| } |
| |
| /** |
| * Exception messages might contain local subcluster paths. This method |
| * generates a new exception with the proper message. |
| * @param ioe Original IOException. |
| * @param loc Location we are processing. |
| * @return Exception processed for federation. |
| */ |
| private IOException processException( |
| IOException ioe, RemoteLocationContext loc) { |
| |
| if (ioe instanceof RemoteException) { |
| RemoteException re = (RemoteException)ioe; |
| String newMsg = processExceptionMsg( |
| re.getMessage(), loc.getDest(), loc.getSrc()); |
| RemoteException newException = |
| new RemoteException(re.getClassName(), newMsg); |
| newException.setStackTrace(ioe.getStackTrace()); |
| return newException; |
| } |
| |
| if (ioe instanceof FileNotFoundException) { |
| String newMsg = processExceptionMsg( |
| ioe.getMessage(), loc.getDest(), loc.getSrc()); |
| FileNotFoundException newException = new FileNotFoundException(newMsg); |
| newException.setStackTrace(ioe.getStackTrace()); |
| return newException; |
| } |
| |
| if (ioe instanceof SnapshotException) { |
| String newMsg = processExceptionMsg( |
| ioe.getMessage(), loc.getDest(), loc.getSrc()); |
| SnapshotException newException = new SnapshotException(newMsg); |
| newException.setStackTrace(ioe.getStackTrace()); |
| return newException; |
| } |
| |
| return ioe; |
| } |
| |
| /** |
| * Process a subcluster message and make it federated. |
| * @param msg Original exception message. |
| * @param dst Path in federation. |
| * @param src Path in the subcluster. |
| * @return Message processed for federation. |
| */ |
| @VisibleForTesting |
| static String processExceptionMsg( |
| final String msg, final String dst, final String src) { |
| if (dst.equals(src) || !dst.startsWith("/") || !src.startsWith("/")) { |
| return msg; |
| } |
| |
| String newMsg = msg.replaceFirst(dst, src); |
| int minLen = Math.min(dst.length(), src.length()); |
| for (int i = 0; newMsg.equals(msg) && i < minLen; i++) { |
| // Check if we can replace sub folders |
| String dst1 = dst.substring(0, dst.length() - 1 - i); |
| String src1 = src.substring(0, src.length() - 1 - i); |
| newMsg = msg.replaceFirst(dst1, src1); |
| } |
| |
| return newMsg; |
| } |
| |
| /** |
| * Checks if a result matches the required result class. |
| * |
| * @param expectedClass Required result class, null to skip the check. |
| * @param clazz The result to check. |
| * @return True if the result is an instance of the required class or if the |
| * expected class is null. |
| */ |
| private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) { |
| if (expectedClass == null) { |
| return true; |
| } else if (clazz == null) { |
| return false; |
| } else { |
| return expectedClass.isInstance(clazz); |
| } |
| } |
| |
| /** |
| * Checks if a result matches the expected value. |
| * |
| * @param expectedValue The expected value, null to skip the check. |
| * @param value The result to check. |
| * @return True if the result is equals to the expected value or if the |
| * expected value is null. |
| */ |
| private static boolean isExpectedValue(Object expectedValue, Object value) { |
| if (expectedValue == null) { |
| return true; |
| } else if (value == null) { |
| return false; |
| } else { |
| return value.equals(expectedValue); |
| } |
| } |
| |
| /** |
| * Invoke method in all locations and return success if any succeeds. |
| * |
| * @param <T> The type of the remote location. |
| * @param locations List of remote locations to call concurrently. |
| * @param method The remote method and parameters to invoke. |
| * @return If the call succeeds in any location. |
| * @throws IOException If any of the calls return an exception. |
| */ |
| public <T extends RemoteLocationContext> boolean invokeAll( |
| final Collection<T> locations, final RemoteMethod method) |
| throws IOException { |
| Map<T, Boolean> results = |
| invokeConcurrent(locations, method, false, false, Boolean.class); |
| return results.containsValue(true); |
| } |
| |
| /** |
| * Invoke multiple concurrent proxy calls to different clients. Returns an |
| * array of results. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote location. |
| * @param <R> The type of the remote method return. |
| * @param locations List of remote locations to call concurrently. |
| * @param method The remote method and parameters to invoke. |
| * @throws IOException If all the calls throw an exception. |
| */ |
| public <T extends RemoteLocationContext, R> void invokeConcurrent( |
| final Collection<T> locations, final RemoteMethod method) |
| throws IOException { |
| invokeConcurrent(locations, method, void.class); |
| } |
| |
| /** |
| * Invoke multiple concurrent proxy calls to different clients. Returns an |
| * array of results. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote location. |
| * @param <R> The type of the remote method return. |
| * @param locations List of remote locations to call concurrently. |
| * @param method The remote method and parameters to invoke. |
| * @return Result of invoking the method per subcluster: nsId to result. |
| * @throws IOException If all the calls throw an exception. |
| */ |
| public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( |
| final Collection<T> locations, final RemoteMethod method, Class<R> clazz) |
| throws IOException { |
| return invokeConcurrent(locations, method, false, false, clazz); |
| } |
| |
| /** |
| * Invoke multiple concurrent proxy calls to different clients. Returns an |
| * array of results. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote location. |
| * @param <R> The type of the remote method return. |
| * @param locations List of remote locations to call concurrently. |
| * @param method The remote method and parameters to invoke. |
| * @param requireResponse If true an exception will be thrown if all calls do |
| * not complete. If false exceptions are ignored and all data results |
| * successfully received are returned. |
| * @param standby If the requests should go to the standby namenodes too. |
| * @throws IOException If all the calls throw an exception. |
| */ |
| public <T extends RemoteLocationContext, R> void invokeConcurrent( |
| final Collection<T> locations, final RemoteMethod method, |
| boolean requireResponse, boolean standby) throws IOException { |
| invokeConcurrent(locations, method, requireResponse, standby, void.class); |
| } |
| |
| /** |
| * Invokes multiple concurrent proxy calls to different clients. Returns an |
| * array of results. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote location. |
| * @param <R> The type of the remote method return. |
| * @param locations List of remote locations to call concurrently. |
| * @param method The remote method and parameters to invoke. |
| * @param requireResponse If true an exception will be thrown if all calls do |
| * not complete. If false exceptions are ignored and all data results |
| * successfully received are returned. |
| * @param standby If the requests should go to the standby namenodes too. |
| * @param clazz Type of the remote return type. |
| * @return Result of invoking the method per subcluster: nsId to result. |
| * @throws IOException If requiredResponse=true and any of the calls throw an |
| * exception. |
| */ |
| public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( |
| final Collection<T> locations, final RemoteMethod method, |
| boolean requireResponse, boolean standby, Class<R> clazz) |
| throws IOException { |
| return invokeConcurrent( |
| locations, method, requireResponse, standby, -1, clazz); |
| } |
| |
| /** |
| * Invokes multiple concurrent proxy calls to different clients. Returns an |
| * array of results. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote location. |
| * @param <R> The type of the remote method return. |
| * @param locations List of remote locations to call concurrently. |
| * @param method The remote method and parameters to invoke. |
| * @param requireResponse If true an exception will be thrown if all calls do |
| * not complete. If false exceptions are ignored and all data results |
| * successfully received are returned. |
| * @param standby If the requests should go to the standby namenodes too. |
| * @param timeOutMs Timeout for each individual call. |
| * @param clazz Type of the remote return type. |
| * @return Result of invoking the method per subcluster: nsId to result. |
| * @throws IOException If requiredResponse=true and any of the calls throw an |
| * exception. |
| */ |
| public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( |
| final Collection<T> locations, final RemoteMethod method, |
| boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz) |
| throws IOException { |
| final List<RemoteResult<T, R>> results = invokeConcurrent( |
| locations, method, standby, timeOutMs, clazz); |
| |
| // Go over the results and exceptions |
| final Map<T, R> ret = new TreeMap<>(); |
| final List<IOException> thrownExceptions = new ArrayList<>(); |
| IOException firstUnavailableException = null; |
| for (final RemoteResult<T, R> result : results) { |
| if (result.hasException()) { |
| IOException ioe = result.getException(); |
| thrownExceptions.add(ioe); |
| // Track unavailable exceptions to throw them first |
| if (isUnavailableException(ioe)) { |
| firstUnavailableException = ioe; |
| } |
| } |
| if (result.hasResult()) { |
| ret.put(result.getLocation(), result.getResult()); |
| } |
| } |
| |
| // Throw exceptions if needed |
| if (!thrownExceptions.isEmpty()) { |
| // Throw if response from all servers required or no results |
| if (requireResponse || ret.isEmpty()) { |
| // Throw unavailable exceptions first |
| if (firstUnavailableException != null) { |
| throw firstUnavailableException; |
| } else { |
| throw thrownExceptions.get(0); |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Invokes multiple concurrent proxy calls to different clients. Returns an |
| * array of results. |
| * |
| * Re-throws exceptions generated by the remote RPC call as either |
| * RemoteException or IOException. |
| * |
| * @param <T> The type of the remote location. |
| * @param <R> The type of the remote method return |
| * @param locations List of remote locations to call concurrently. |
| * @param method The remote method and parameters to invoke. |
| * @param standby If the requests should go to the standby namenodes too. |
| * @param timeOutMs Timeout for each individual call. |
| * @param clazz Type of the remote return type. |
| * @return Result of invoking the method per subcluster (list of results). |
| * This includes the exception for each remote location. |
| * @throws IOException If there are errors invoking the method. |
| */ |
| @SuppressWarnings("unchecked") |
| public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> |
| invokeConcurrent(final Collection<T> locations, |
| final RemoteMethod method, boolean standby, long timeOutMs, |
| Class<R> clazz) throws IOException { |
| |
| final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); |
| final Method m = method.getMethod(); |
| |
| if (locations.isEmpty()) { |
| throw new IOException("No remote locations available"); |
| } else if (locations.size() == 1 && timeOutMs <= 0) { |
| // Shortcut, just one call |
| T location = locations.iterator().next(); |
| String ns = location.getNameserviceId(); |
| RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); |
| acquirePermit(ns, ugi, method, controller); |
| final List<? extends FederationNamenodeContext> namenodes = |
| getNamenodesForNameservice(ns); |
| try { |
| Class<?> proto = method.getProtocol(); |
| Object[] paramList = method.getParams(location); |
| R result = (R) invokeMethod(ugi, namenodes, proto, m, paramList); |
| RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result); |
| return Collections.singletonList(remoteResult); |
| } catch (IOException ioe) { |
| // Localize the exception |
| throw processException(ioe, location); |
| } finally { |
| releasePermit(ns, ugi, method, controller); |
| } |
| } |
| |
| List<T> orderedLocations = new ArrayList<>(); |
| List<Callable<Object>> callables = new ArrayList<>(); |
| // transfer originCall & callerContext to worker threads of executor. |
| final Call originCall = Server.getCurCall().get(); |
| final CallerContext originContext = CallerContext.getCurrent(); |
| for (final T location : locations) { |
| String nsId = location.getNameserviceId(); |
| final List<? extends FederationNamenodeContext> namenodes = |
| getNamenodesForNameservice(nsId); |
| final Class<?> proto = method.getProtocol(); |
| final Object[] paramList = method.getParams(location); |
| if (standby) { |
| // Call the objectGetter to all NNs (including standby) |
| for (final FederationNamenodeContext nn : namenodes) { |
| String nnId = nn.getNamenodeId(); |
| final List<FederationNamenodeContext> nnList = |
| Collections.singletonList(nn); |
| T nnLocation = location; |
| if (location instanceof RemoteLocation) { |
| nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); |
| } |
| orderedLocations.add(nnLocation); |
| callables.add( |
| () -> { |
| transferThreadLocalContext(originCall, originContext); |
| return invokeMethod(ugi, nnList, proto, m, paramList); |
| }); |
| } |
| } else { |
| // Call the objectGetter in order of nameservices in the NS list |
| orderedLocations.add(location); |
| callables.add( |
| () -> { |
| transferThreadLocalContext(originCall, originContext); |
| return invokeMethod(ugi, namenodes, proto, m, paramList); |
| }); |
| } |
| } |
| |
| if (rpcMonitor != null) { |
| rpcMonitor.proxyOp(); |
| } |
| if (this.router.getRouterClientMetrics() != null) { |
| this.router.getRouterClientMetrics().incInvokedConcurrent(m); |
| } |
| |
| RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); |
| acquirePermit(CONCURRENT_NS, ugi, method, controller); |
| try { |
| List<Future<Object>> futures = null; |
| if (timeOutMs > 0) { |
| futures = executorService.invokeAll( |
| callables, timeOutMs, TimeUnit.MILLISECONDS); |
| } else { |
| futures = executorService.invokeAll(callables); |
| } |
| List<RemoteResult<T, R>> results = new ArrayList<>(); |
| for (int i=0; i<futures.size(); i++) { |
| T location = orderedLocations.get(i); |
| try { |
| Future<Object> future = futures.get(i); |
| R result = (R) future.get(); |
| results.add(new RemoteResult<>(location, result)); |
| } catch (CancellationException ce) { |
| T loc = orderedLocations.get(i); |
| String msg = "Invocation to \"" + loc + "\" for \"" |
| + method.getMethodName() + "\" timed out"; |
| LOG.error(msg); |
| IOException ioe = new SubClusterTimeoutException(msg); |
| results.add(new RemoteResult<>(location, ioe)); |
| } catch (ExecutionException ex) { |
| Throwable cause = ex.getCause(); |
| LOG.debug("Cannot execute {} in {}: {}", |
| m.getName(), location, cause.getMessage()); |
| |
| // Convert into IOException if needed |
| IOException ioe = null; |
| if (cause instanceof IOException) { |
| ioe = (IOException) cause; |
| } else { |
| ioe = new IOException("Unhandled exception while proxying API " + |
| m.getName() + ": " + cause.getMessage(), cause); |
| } |
| |
| // Store the exceptions |
| results.add(new RemoteResult<>(location, ioe)); |
| } |
| } |
| |
| return results; |
| } catch (RejectedExecutionException e) { |
| if (rpcMonitor != null) { |
| rpcMonitor.proxyOpFailureClientOverloaded(); |
| } |
| int active = executorService.getActiveCount(); |
| int total = executorService.getMaximumPoolSize(); |
| String msg = "Not enough client threads " + active + "/" + total; |
| LOG.error(msg); |
| throw new StandbyException( |
| "Router " + router.getRouterId() + " is overloaded: " + msg); |
| } catch (InterruptedException ex) { |
| LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); |
| throw new IOException( |
| "Unexpected error while invoking API " + ex.getMessage(), ex); |
| } finally { |
| releasePermit(CONCURRENT_NS, ugi, method, controller); |
| } |
| } |
| |
| /** |
| * Transfer origin thread local context which is necessary to current |
| * worker thread when invoking method concurrently by executor service. |
| * |
| * @param originCall origin Call required for getting remote client ip. |
| * @param originContext origin CallerContext which should be transferred |
| * to server side. |
| */ |
| private void transferThreadLocalContext( |
| final Call originCall, final CallerContext originContext) { |
| Server.getCurCall().set(originCall); |
| CallerContext.setCurrent(originContext); |
| } |
| |
| /** |
| * Get a prioritized list of NNs that share the same nameservice ID (in the |
| * same namespace). NNs that are reported as ACTIVE will be first in the list. |
| * |
| * @param nsId The nameservice ID for the namespace. |
| * @return A prioritized list of NNs to use for communication. |
| * @throws IOException If a NN cannot be located for the nameservice ID. |
| */ |
| private List<? extends FederationNamenodeContext> getNamenodesForNameservice( |
| final String nsId) throws IOException { |
| |
| final List<? extends FederationNamenodeContext> namenodes = |
| namenodeResolver.getNamenodesForNameserviceId(nsId); |
| |
| if (namenodes == null || namenodes.isEmpty()) { |
| throw new IOException("Cannot locate a registered namenode for " + nsId + |
| " from " + router.getRouterId()); |
| } |
| return namenodes; |
| } |
| |
| /** |
| * Get a prioritized list of NNs that share the same block pool ID (in the |
| * same namespace). NNs that are reported as ACTIVE will be first in the list. |
| * |
| * @param bpId The blockpool ID for the namespace. |
| * @return A prioritized list of NNs to use for communication. |
| * @throws IOException If a NN cannot be located for the block pool ID. |
| */ |
| private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId( |
| final String bpId) throws IOException { |
| |
| List<? extends FederationNamenodeContext> namenodes = |
| namenodeResolver.getNamenodesForBlockPoolId(bpId); |
| |
| if (namenodes == null || namenodes.isEmpty()) { |
| throw new IOException("Cannot locate a registered namenode for " + bpId + |
| " from " + router.getRouterId()); |
| } |
| return namenodes; |
| } |
| |
| /** |
| * Get the nameservice identifier for a block pool. |
| * |
| * @param bpId Identifier of the block pool. |
| * @return Nameservice identifier. |
| * @throws IOException If a NN cannot be located for the block pool ID. |
| */ |
| private String getNameserviceForBlockPoolId(final String bpId) |
| throws IOException { |
| List<? extends FederationNamenodeContext> namenodes = |
| getNamenodesForBlockPoolId(bpId); |
| FederationNamenodeContext namenode = namenodes.get(0); |
| return namenode.getNameserviceId(); |
| } |
| |
| /** |
| * Acquire permit to continue processing the request for specific nsId. |
| * |
| * @param nsId Identifier of the block pool. |
| * @param ugi UserGroupIdentifier associated with the user. |
| * @param m Remote method that needs to be invoked. |
| * @param controller fairness policy controller to acquire permit from |
| * @throws IOException If permit could not be acquired for the nsId. |
| */ |
| private void acquirePermit(final String nsId, final UserGroupInformation ugi, |
| final RemoteMethod m, RouterRpcFairnessPolicyController controller) |
| throws IOException { |
| if (controller != null) { |
| if (!controller.acquirePermit(nsId)) { |
| // Throw StandByException, |
| // Clients could fail over and try another router. |
| if (rpcMonitor != null) { |
| rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected(); |
| } |
| incrRejectedPermitForNs(nsId); |
| LOG.debug("Permit denied for ugi: {} for method: {}", |
| ugi, m.getMethodName()); |
| String msg = |
| "Router " + router.getRouterId() + |
| " is overloaded for NS: " + nsId; |
| throw new StandbyException(msg); |
| } |
| incrAcceptedPermitForNs(nsId); |
| } |
| } |
| |
| /** |
| * Release permit for specific nsId after processing against downstream |
| * nsId is completed. |
| * @param nsId Identifier of the block pool. |
| * @param ugi UserGroupIdentifier associated with the user. |
| * @param m Remote method that needs to be invoked. |
| * @param controller fairness policy controller to release permit from |
| */ |
| private void releasePermit(final String nsId, final UserGroupInformation ugi, |
| final RemoteMethod m, RouterRpcFairnessPolicyController controller) { |
| if (controller != null) { |
| controller.releasePermit(nsId); |
| LOG.trace("Permit released for ugi: {} for method: {}", ugi, |
| m.getMethodName()); |
| } |
| } |
| |
| public RouterRpcFairnessPolicyController |
| getRouterRpcFairnessPolicyController() { |
| return routerRpcFairnessPolicyController; |
| } |
| |
| private void incrRejectedPermitForNs(String ns) { |
| rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); |
| } |
| |
| public Long getRejectedPermitForNs(String ns) { |
| return rejectedPermitsPerNs.containsKey(ns) ? |
| rejectedPermitsPerNs.get(ns).longValue() : 0L; |
| } |
| |
| private void incrAcceptedPermitForNs(String ns) { |
| acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); |
| } |
| |
| public Long getAcceptedPermitForNs(String ns) { |
| return acceptedPermitsPerNs.containsKey(ns) ? |
| acceptedPermitsPerNs.get(ns).longValue() : 0L; |
| } |
| |
| /** |
| * Refreshes/changes the fairness policy controller implementation if possible |
| * and returns the controller class name |
| * @param conf Configuration |
| * @return New controller class name if successfully refreshed, else old controller class name |
| */ |
| public synchronized String refreshFairnessPolicyController(Configuration conf) { |
| RouterRpcFairnessPolicyController newController; |
| try { |
| newController = FederationUtil.newFairnessPolicyController(conf); |
| } catch (RuntimeException e) { |
| LOG.error("Failed to create router fairness policy controller", e); |
| return getCurrentFairnessPolicyControllerClassName(); |
| } |
| |
| if (newController != null) { |
| if (routerRpcFairnessPolicyController != null) { |
| routerRpcFairnessPolicyController.shutdown(); |
| } |
| routerRpcFairnessPolicyController = newController; |
| } |
| return getCurrentFairnessPolicyControllerClassName(); |
| } |
| |
| private String getCurrentFairnessPolicyControllerClassName() { |
| if (routerRpcFairnessPolicyController != null) { |
| return routerRpcFairnessPolicyController.getClass().getCanonicalName(); |
| } |
| return null; |
| } |
| } |