| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ipc; |
| |
| import java.lang.reflect.Proxy; |
| import java.lang.reflect.Method; |
| |
| import java.net.ConnectException; |
| import java.net.InetSocketAddress; |
| import java.net.NoRouteToHostException; |
| import java.net.SocketTimeoutException; |
| import java.io.*; |
| import java.util.Map; |
| import java.util.HashMap; |
| |
| import javax.net.SocketFactory; |
| |
| import org.apache.commons.logging.*; |
| |
| import org.apache.hadoop.io.*; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.SaslRpcServer; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.SecretManager; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.conf.*; |
| import org.apache.hadoop.util.ReflectionUtils; |
| |
| /** A simple RPC mechanism. |
| * |
| * A <i>protocol</i> is a Java interface. All parameters and return types must |
| * be one of: |
| * |
| * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>, |
| * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>, |
| * <code>float</code>, <code>double</code>, or <code>void</code>; or</li> |
| * |
| * <li>a {@link String}; or</li> |
| * |
| * <li>a {@link Writable}; or</li> |
| * |
| * <li>an array of the above types</li> </ul> |
| * |
| * All methods in the protocol should throw only IOException. No field data of |
| * the protocol instance is transmitted. |
| */ |
| public class RPC { |
| static final Log LOG = LogFactory.getLog(RPC.class); |
| |
| private RPC() {} // no public ctor |
| |
| // cache of RpcEngines by protocol |
| private static final Map<Class,RpcEngine> PROTOCOL_ENGINES |
| = new HashMap<Class,RpcEngine>(); |
| |
| // track what RpcEngine is used by a proxy class, for stopProxy() |
| private static final Map<Class,RpcEngine> PROXY_ENGINES |
| = new HashMap<Class,RpcEngine>(); |
| |
| private static final String ENGINE_PROP = "rpc.engine"; |
| |
| /** |
| * Set a protocol to use a non-default RpcEngine. |
| * @param conf configuration to use |
| * @param protocol the protocol interface |
| * @param engine the RpcEngine impl |
| */ |
| public static void setProtocolEngine(Configuration conf, |
| Class protocol, Class engine) { |
| conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class); |
| } |
| |
| // return the RpcEngine configured to handle a protocol |
| private static synchronized RpcEngine getProtocolEngine(Class protocol, |
| Configuration conf) { |
| RpcEngine engine = PROTOCOL_ENGINES.get(protocol); |
| if (engine == null) { |
| Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), |
| WritableRpcEngine.class); |
| engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); |
| if (protocol.isInterface()) |
| PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), |
| protocol), |
| engine); |
| PROTOCOL_ENGINES.put(protocol, engine); |
| } |
| return engine; |
| } |
| |
| // return the RpcEngine that handles a proxy object |
| private static synchronized RpcEngine getProxyEngine(Object proxy) { |
| return PROXY_ENGINES.get(proxy.getClass()); |
| } |
| |
| /** |
| * A version mismatch for the RPC protocol. |
| */ |
| public static class VersionMismatch extends IOException { |
| private static final long serialVersionUID = 0; |
| |
| private String interfaceName; |
| private long clientVersion; |
| private long serverVersion; |
| |
| /** |
| * Create a version mismatch exception |
| * @param interfaceName the name of the protocol mismatch |
| * @param clientVersion the client's version of the protocol |
| * @param serverVersion the server's version of the protocol |
| */ |
| public VersionMismatch(String interfaceName, long clientVersion, |
| long serverVersion) { |
| super("Protocol " + interfaceName + " version mismatch. (client = " + |
| clientVersion + ", server = " + serverVersion + ")"); |
| this.interfaceName = interfaceName; |
| this.clientVersion = clientVersion; |
| this.serverVersion = serverVersion; |
| } |
| |
| /** |
| * Get the interface name |
| * @return the java class name |
| * (eg. org.apache.hadoop.mapred.InterTrackerProtocol) |
| */ |
| public String getInterfaceName() { |
| return interfaceName; |
| } |
| |
| /** |
| * Get the client's preferred version |
| */ |
| public long getClientVersion() { |
| return clientVersion; |
| } |
| |
| /** |
| * Get the server's agreed to version. |
| */ |
| public long getServerVersion() { |
| return serverVersion; |
| } |
| } |
| |
| /** |
| * Get a proxy connection to a remote server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param conf configuration to use |
| * @return the proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> T waitForProxy( |
| Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, |
| Configuration conf |
| ) throws IOException { |
| return waitForProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); |
| } |
| |
| /** |
| * Get a protocol proxy that contains a proxy connection to a remote server |
| * and a set of methods that are supported by the server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param conf configuration to use |
| * @return the protocol proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, |
| Configuration conf) throws IOException { |
| return waitForProtocolProxy( |
| protocol, clientVersion, addr, conf, Long.MAX_VALUE); |
| } |
| |
| /** |
| * Get a proxy connection to a remote server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param conf configuration to use |
| * @param connTimeout time in milliseconds before giving up |
| * @return the proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> T waitForProxy(Class<T> protocol, long clientVersion, |
| InetSocketAddress addr, Configuration conf, |
| long connTimeout) throws IOException { |
| return waitForProtocolProxy(protocol, clientVersion, addr, |
| conf, connTimeout).getProxy(); |
| } |
| |
| /** |
| * Get a protocol proxy that contains a proxy connection to a remote server |
| * and a set of methods that are supported by the server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param conf configuration to use |
| * @param connTimeout time in milliseconds before giving up |
| * @return the protocol proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, Configuration conf, |
| long connTimeout) throws IOException { |
| return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout); |
| } |
| |
| /** |
| * Get a proxy connection to a remote server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param conf configuration to use |
| * @param rpcTimeout timeout for each RPC |
| * @param timeout time in milliseconds before giving up |
| * @return the proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> T waitForProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, Configuration conf, |
| int rpcTimeout, |
| long timeout) throws IOException { |
| return waitForProtocolProxy(protocol, clientVersion, addr, |
| conf, rpcTimeout, timeout).getProxy(); |
| } |
| |
| /** |
| * Get a protocol proxy that contains a proxy connection to a remote server |
| * and a set of methods that are supported by the server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param conf configuration to use |
| * @param rpcTimeout timeout for each RPC |
| * @param timeout time in milliseconds before giving up |
| * @return the proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, Configuration conf, |
| int rpcTimeout, |
| long timeout) throws IOException { |
| long startTime = System.currentTimeMillis(); |
| IOException ioe; |
| while (true) { |
| try { |
| return getProtocolProxy(protocol, clientVersion, addr, |
| UserGroupInformation.getCurrentUser(), conf, NetUtils |
| .getDefaultSocketFactory(conf), rpcTimeout); |
| } catch(ConnectException se) { // namenode has not been started |
| LOG.info("Server at " + addr + " not available yet, Zzzzz..."); |
| ioe = se; |
| } catch(SocketTimeoutException te) { // namenode is busy |
| LOG.info("Problem connecting to server: " + addr); |
| ioe = te; |
| } catch(NoRouteToHostException nrthe) { // perhaps a VIP is failing over |
| LOG.info("No route to host for server: " + addr); |
| ioe = nrthe; |
| } |
| // check if timed out |
| if (System.currentTimeMillis()-timeout >= startTime) { |
| throw ioe; |
| } |
| |
| // wait for retry |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) { |
| // IGNORE |
| } |
| } |
| } |
| |
| /** Construct a client-side proxy object that implements the named protocol, |
| * talking to a server at the named address. |
| * @param <T>*/ |
| public static <T> T getProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, Configuration conf, |
| SocketFactory factory) throws IOException { |
| return getProtocolProxy( |
| protocol, clientVersion, addr, conf, factory).getProxy(); |
| } |
| |
| /** |
| * Get a protocol proxy that contains a proxy connection to a remote server |
| * and a set of methods that are supported by the server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param conf configuration to use |
| * @param factory socket factory |
| * @return the protocol proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, Configuration conf, |
| SocketFactory factory) throws IOException { |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); |
| return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory); |
| } |
| |
| /** Construct a client-side proxy object that implements the named protocol, |
| * talking to a server at the named address. |
| * @param <T>*/ |
| public static <T> T getProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, |
| UserGroupInformation ticket, |
| Configuration conf, |
| SocketFactory factory) throws IOException { |
| return getProtocolProxy( |
| protocol, clientVersion, addr, ticket, conf, factory).getProxy(); |
| } |
| |
| /** |
| * Get a protocol proxy that contains a proxy connection to a remote server |
| * and a set of methods that are supported by the server |
| * |
| * @param protocol protocol class |
| * @param clientVersion client version |
| * @param addr remote address |
| * @param ticket user group information |
| * @param conf configuration to use |
| * @param factory socket factory |
| * @return the protocol proxy |
| * @throws IOException if the far end through a RemoteException |
| */ |
| public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, |
| UserGroupInformation ticket, |
| Configuration conf, |
| SocketFactory factory) throws IOException { |
| return getProtocolProxy( |
| protocol, clientVersion, addr, ticket, conf, factory, 0); |
| } |
| |
| /** |
| * Construct a client-side proxy that implements the named protocol, |
| * talking to a server at the named address. |
| * @param <T> |
| * |
| * @param protocol protocol |
| * @param clientVersion client's version |
| * @param addr server address |
| * @param ticket security ticket |
| * @param conf configuration |
| * @param factory socket factory |
| * @param rpcTimeout max time for each rpc; 0 means no timeout |
| * @return the proxy |
| * @throws IOException if any error occurs |
| */ |
| public static <T> T getProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, |
| UserGroupInformation ticket, |
| Configuration conf, |
| SocketFactory factory, |
| int rpcTimeout) throws IOException { |
| return getProtocolProxy(protocol, clientVersion, addr, ticket, |
| conf, factory, rpcTimeout).getProxy(); |
| } |
| |
| /** |
| * Get a protocol proxy that contains a proxy connection to a remote server |
| * and a set of methods that are supported by the server |
| * |
| * @param protocol protocol |
| * @param clientVersion client's version |
| * @param addr server address |
| * @param ticket security ticket |
| * @param conf configuration |
| * @param factory socket factory |
| * @param rpcTimeout max time for each rpc; 0 means no timeout |
| * @return the proxy |
| * @throws IOException if any error occurs |
| */ |
| public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, |
| UserGroupInformation ticket, |
| Configuration conf, |
| SocketFactory factory, |
| int rpcTimeout) throws IOException { |
| if (UserGroupInformation.isSecurityEnabled()) { |
| SaslRpcServer.init(conf); |
| } |
| return getProtocolEngine(protocol,conf).getProxy(protocol, |
| clientVersion, addr, ticket, conf, factory, rpcTimeout); |
| } |
| |
| /** |
| * Construct a client-side proxy object with the default SocketFactory |
| * @param <T> |
| * |
| * @param protocol |
| * @param clientVersion |
| * @param addr |
| * @param conf |
| * @return a proxy instance |
| * @throws IOException |
| */ |
| public static <T> T getProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, Configuration conf) |
| throws IOException { |
| |
| return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); |
| } |
| |
| /** |
| * Get a protocol proxy that contains a proxy connection to a remote server |
| * and a set of methods that are supported by the server |
| * |
| * @param protocol |
| * @param clientVersion |
| * @param addr |
| * @param conf |
| * @return a protocol proxy |
| * @throws IOException |
| */ |
| public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, |
| long clientVersion, |
| InetSocketAddress addr, Configuration conf) |
| throws IOException { |
| |
| return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils |
| .getDefaultSocketFactory(conf)); |
| } |
| |
| /** |
| * Stop this proxy and release its invoker's resource |
| * @param proxy the proxy to be stopped |
| */ |
| public static void stopProxy(Object proxy) { |
| RpcEngine rpcEngine; |
| if (proxy!=null && (rpcEngine = getProxyEngine(proxy)) != null) { |
| rpcEngine.stopProxy(proxy); |
| } |
| } |
| |
| /** |
| * Expert: Make multiple, parallel calls to a set of servers. |
| * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead |
| */ |
| @Deprecated |
| public static Object[] call(Method method, Object[][] params, |
| InetSocketAddress[] addrs, Configuration conf) |
| throws IOException, InterruptedException { |
| return call(method, params, addrs, null, conf); |
| } |
| |
| /** Expert: Make multiple, parallel calls to a set of servers. */ |
| public static Object[] call(Method method, Object[][] params, |
| InetSocketAddress[] addrs, |
| UserGroupInformation ticket, Configuration conf) |
| throws IOException, InterruptedException { |
| |
| return getProtocolEngine(method.getDeclaringClass(), conf) |
| .call(method, params, addrs, ticket, conf); |
| } |
| |
| /** Construct a server for a protocol implementation instance listening on a |
| * port and address. |
| * @deprecated protocol interface should be passed. |
| */ |
| @Deprecated |
| public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) |
| throws IOException { |
| return getServer(instance, bindAddress, port, 1, false, conf); |
| } |
| |
| /** Construct a server for a protocol implementation instance listening on a |
| * port and address. |
| * @deprecated protocol interface should be passed. |
| */ |
| @Deprecated |
| public static Server getServer(final Object instance, final String bindAddress, final int port, |
| final int numHandlers, |
| final boolean verbose, Configuration conf) |
| throws IOException { |
| return getServer(instance.getClass(), // use impl class for protocol |
| instance, bindAddress, port, numHandlers, false, conf, null); |
| } |
| |
| /** Construct a server for a protocol implementation instance. */ |
| public static Server getServer(Class protocol, |
| Object instance, String bindAddress, |
| int port, Configuration conf) |
| throws IOException { |
| return getServer(protocol, instance, bindAddress, port, 1, false, conf, null); |
| } |
| |
| /** Construct a server for a protocol implementation instance. |
| * @deprecated secretManager should be passed. |
| */ |
| @Deprecated |
| public static Server getServer(Class protocol, |
| Object instance, String bindAddress, int port, |
| int numHandlers, |
| boolean verbose, Configuration conf) |
| throws IOException { |
| |
| return getServer(protocol, instance, bindAddress, port, numHandlers, verbose, |
| conf, null); |
| } |
| |
| /** Construct a server for a protocol implementation instance. */ |
| public static Server getServer(Class<?> protocol, |
| Object instance, String bindAddress, int port, |
| int numHandlers, |
| boolean verbose, Configuration conf, |
| SecretManager<? extends TokenIdentifier> secretManager) |
| throws IOException { |
| |
| return getProtocolEngine(protocol, conf) |
| .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1, |
| verbose, conf, secretManager); |
| } |
| |
| /** Construct a server for a protocol implementation instance. */ |
| public static Server getServer(Class<?> protocol, |
| Object instance, String bindAddress, int port, |
| int numHandlers, int numReaders, int queueSizePerHandler, |
| boolean verbose, Configuration conf, |
| SecretManager<? extends TokenIdentifier> secretManager) |
| throws IOException { |
| |
| return getProtocolEngine(protocol, conf) |
| .getServer(protocol, instance, bindAddress, port, numHandlers, |
| numReaders, queueSizePerHandler, verbose, conf, secretManager); |
| } |
| |
| /** An RPC Server. */ |
| public abstract static class Server extends org.apache.hadoop.ipc.Server { |
| |
| protected Server(String bindAddress, int port, |
| Class<? extends Writable> paramClass, int handlerCount, |
| int numReaders, int queueSizePerHandler, |
| Configuration conf, String serverName, |
| SecretManager<? extends TokenIdentifier> secretManager) throws IOException { |
| super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, |
| conf, serverName, secretManager); |
| } |
| } |
| |
| } |