blob: 4f85e905cd34f9100a930c7cfe638b0538fc293a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.io.*;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.BlockingService;
/** A simple RPC mechanism.
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
* be one of:
*
* <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
* <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
* <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
*
* <li>a {@link String}; or</li>
*
* <li>a {@link Writable}; or</li>
*
* <li>an array of the above types</li> </ul>
*
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*/
public class RPC {
interface RpcInvoker {
/**
* Process a client call on the server side
* @param server the server within whose context this rpc call is made
* @param protocol - the protocol name (the class of the client proxy
* used to make calls to the rpc server.
* @param rpcRequest - deserialized
* @param receiveTime time at which the call received (for metrics)
* @return the call's return
* @throws IOException
**/
public Writable call(Server server, String protocol,
Writable rpcRequest, long receiveTime) throws IOException ;
}
static final Log LOG = LogFactory.getLog(RPC.class);
/**
* Get all superInterfaces that extend VersionedProtocol
* @param childInterfaces
* @return the super interfaces that extend VersionedProtocol
*/
static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
for (Class<?> childInterface : childInterfaces) {
if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
allInterfaces.add(childInterface);
allInterfaces.addAll(
Arrays.asList(
getSuperInterfaces(childInterface.getInterfaces())));
} else {
LOG.warn("Interface " + childInterface +
" ignored because it does not extend VersionedProtocol");
}
}
return allInterfaces.toArray(new Class[allInterfaces.size()]);
}
/**
* Get all interfaces that the given protocol implements or extends
* which are assignable from VersionedProtocol.
*/
static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
Class<?>[] interfaces = protocol.getInterfaces();
return getSuperInterfaces(interfaces);
}
/**
* Get the protocol name.
* If the protocol class has a ProtocolAnnotation, then get the protocol
* name from the annotation; otherwise the class name is the protocol name.
*/
static public String getProtocolName(Class<?> protocol) {
if (protocol == null) {
return null;
}
ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
return (anno == null) ? protocol.getName() : anno.protocolName();
}
/**
* Get the protocol version from protocol class.
* If the protocol class has a ProtocolAnnotation, then get the protocol
* name from the annotation; otherwise the class name is the protocol name.
*/
static public long getProtocolVersion(Class<?> protocol) {
if (protocol == null) {
throw new IllegalArgumentException("Null protocol");
}
long version;
ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
if (anno != null) {
version = anno.protocolVersion();
if (version != -1)
return version;
}
try {
Field versionField = protocol.getField("versionID");
versionField.setAccessible(true);
return versionField.getLong(protocol);
} catch (NoSuchFieldException ex) {
throw new RuntimeException(ex);
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}
private RPC() {} // no public ctor
// cache of RpcEngines by protocol
private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES
= new HashMap<Class<?>,RpcEngine>();
private static final String ENGINE_PROP = "rpc.engine";
/**
* Set a protocol to use a non-default RpcEngine.
* @param conf configuration to use
* @param protocol the protocol interface
* @param engine the RpcEngine impl
*/
public static void setProtocolEngine(Configuration conf,
Class<?> protocol, Class<?> engine) {
conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
}
// return the RpcEngine configured to handle a protocol
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
/**
* A version mismatch for the RPC protocol.
*/
public static class VersionMismatch extends IOException {
private static final long serialVersionUID = 0;
private String interfaceName;
private long clientVersion;
private long serverVersion;
/**
* Create a version mismatch exception
* @param interfaceName the name of the protocol mismatch
* @param clientVersion the client's version of the protocol
* @param serverVersion the server's version of the protocol
*/
public VersionMismatch(String interfaceName, long clientVersion,
long serverVersion) {
super("Protocol " + interfaceName + " version mismatch. (client = " +
clientVersion + ", server = " + serverVersion + ")");
this.interfaceName = interfaceName;
this.clientVersion = clientVersion;
this.serverVersion = serverVersion;
}
/**
* Get the interface name
* @return the java class name
* (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
*/
public String getInterfaceName() {
return interfaceName;
}
/**
* Get the client's preferred version
*/
public long getClientVersion() {
return clientVersion;
}
/**
* Get the server's agreed to version.
*/
public long getServerVersion() {
return serverVersion;
}
}
/**
* Get a proxy connection to a remote server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> T waitForProxy(
Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf) throws IOException {
return waitForProtocolProxy(
protocol, clientVersion, addr, conf, Long.MAX_VALUE);
}
/**
* Get a proxy connection to a remote server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param connTimeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> T waitForProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr,
conf, connTimeout).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param connTimeout time in milliseconds before giving up
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
}
/**
* Get a proxy connection to a remote server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param rpcTimeout timeout for each RPC
* @param timeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> T waitForProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
int rpcTimeout,
long timeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr,
conf, rpcTimeout, timeout).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param rpcTimeout timeout for each RPC
* @param timeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
int rpcTimeout,
long timeout) throws IOException {
long startTime = System.currentTimeMillis();
IOException ioe;
while (true) {
try {
return getProtocolProxy(protocol, clientVersion, addr,
UserGroupInformation.getCurrentUser(), conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
ioe = se;
} catch(SocketTimeoutException te) { // namenode is busy
LOG.info("Problem connecting to server: " + addr);
ioe = te;
} catch(NoRouteToHostException nrthe) { // perhaps a VIP is failing over
LOG.info("No route to host for server: " + addr);
ioe = nrthe;
}
// check if timed out
if (System.currentTimeMillis()-timeout >= startTime) {
throw ioe;
}
// wait for retry
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// IGNORE
}
}
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
* @param <T>*/
public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
protocol, clientVersion, addr, conf, factory).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param factory socket factory
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory);
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
* @param <T>*/
public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
protocol, clientVersion, addr, ticket, conf, factory).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param ticket user group information
* @param conf configuration to use
* @param factory socket factory
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
protocol, clientVersion, addr, ticket, conf, factory, 0);
}
/**
* Construct a client-side proxy that implements the named protocol,
* talking to a server at the named address.
* @param <T>
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, ticket,
conf, factory, rpcTimeout).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol,conf).getProxy(protocol,
clientVersion, addr, ticket, conf, factory, rpcTimeout);
}
/**
* Construct a client-side proxy object with the default SocketFactory
* @param <T>
*
* @param protocol
* @param clientVersion
* @param addr
* @param conf
* @return a proxy instance
* @throws IOException
*/
public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf)
throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
/**
* Returns the server address for a given proxy.
*/
public static InetSocketAddress getServerAddress(Object proxy) {
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
.getInvocationHandler(proxy);
return inv.getConnectionId().getAddress();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol
* @param clientVersion
* @param addr
* @param conf
* @return a protocol proxy
* @throws IOException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf)
throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf));
}
/**
* Stop this proxy and release its invoker's resource by getting the
* invocation handler for the given proxy object and calling
* {@link Closeable#close} if that invocation handler implements
* {@link Closeable}.
*
* @param proxy the RPC proxy object to be stopped
*/
public static void stopProxy(Object proxy) {
InvocationHandler invocationHandler = null;
try {
invocationHandler = Proxy.getInvocationHandler(proxy);
} catch (IllegalArgumentException e) {
LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e);
}
if (proxy != null && invocationHandler != null &&
invocationHandler instanceof Closeable) {
try {
((Closeable)invocationHandler).close();
} catch (IOException e) {
LOG.error("Stopping RPC invocation handler caused exception", e);
}
} else {
LOG.error("Could not get invocation handler " + invocationHandler +
" for proxy class " + (proxy == null ? null : proxy.getClass()) +
", or invocation handler is not closeable.");
}
}
/**
* Expert: Make multiple, parallel calls to a set of servers.
* @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead
*/
@Deprecated
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, Configuration conf)
throws IOException, InterruptedException {
return call(method, params, addrs, null, conf);
}
/** Expert: Make multiple, parallel calls to a set of servers. */
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException {
return getProtocolEngine(method.getDeclaringClass(), conf)
.call(method, params, addrs, ticket, conf);
}
/** Construct a server for a protocol implementation instance listening on a
* port and address.
* @deprecated protocol interface should be passed.
*/
@Deprecated
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
throws IOException {
return getServer(instance, bindAddress, port, 1, false, conf);
}
/** Construct a server for a protocol implementation instance listening on a
* port and address.
* @deprecated protocol interface should be passed.
*/
@Deprecated
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf)
throws IOException {
return getServer(instance.getClass(), // use impl class for protocol
instance, bindAddress, port, numHandlers, false, conf, null);
}
/** Construct a server for a protocol implementation instance. */
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress,
int port, Configuration conf)
throws IOException {
return getServer(protocol, instance, bindAddress, port, 1, false, conf, null);
}
/** Construct a server for a protocol implementation instance.
* @deprecated secretManager should be passed.
*/
@Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
boolean verbose, Configuration conf)
throws IOException {
return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
conf, null);
}
/** Construct a server for a protocol implementation instance. */
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
verbose, conf, secretManager);
}
/** Construct a server for a protocol implementation instance. */
public static <PROTO extends VersionedProtocol, IMPL extends PROTO>
Server getServer(Class<PROTO> protocol,
IMPL instance, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers,
numReaders, queueSizePerHandler, verbose, conf, secretManager);
}
/** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server {
boolean verbose;
static String classNameBase(String className) {
String[] names = className.split("\\.", -1);
if (names == null || names.length == 0) {
return className;
}
return names[names.length-1];
}
/**
* Store a map of protocol and version to its implementation
*/
/**
* The key in Map
*/
static class ProtoNameVer {
final String protocol;
final long version;
ProtoNameVer(String protocol, long ver) {
this.protocol = protocol;
this.version = ver;
}
@Override
public boolean equals(Object o) {
if (o == null)
return false;
if (this == o)
return true;
if (! (o instanceof ProtoNameVer))
return false;
ProtoNameVer pv = (ProtoNameVer) o;
return ((pv.protocol.equals(this.protocol)) &&
(pv.version == this.version));
}
@Override
public int hashCode() {
return protocol.hashCode() * 37 + (int) version;
}
}
/**
* The value in map
*/
static class ProtoClassProtoImpl {
final Class<?> protocolClass;
final Object protocolImpl;
ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
this.protocolClass = protocolClass;
this.protocolImpl = protocolImpl;
}
}
ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray =
new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
protocolImplMapArray.add(
new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
}
}
return protocolImplMapArray.get(rpcKind.ordinal());
}
// Register protocol and its impl for rpc calls
void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
Object protocolImpl) throws IOException {
String protocolName = RPC.getProtocolName(protocolClass);
long version;
try {
version = RPC.getProtocolVersion(protocolClass);
} catch (Exception ex) {
LOG.warn("Protocol " + protocolClass +
" NOT registered as cannot get protocol version ");
return;
}
getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
new ProtoClassProtoImpl(protocolClass, protocolImpl));
LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName + " version=" + version +
" ProtocolImpl=" + protocolImpl.getClass().getName() +
" protocolClass=" + protocolClass.getName());
}
static class VerProtocolImpl {
final long version;
final ProtoClassProtoImpl protocolTarget;
VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
this.version = ver;
this.protocolTarget = protocolTarget;
}
}
@SuppressWarnings("unused") // will be useful later.
VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
String protocolName) {
VerProtocolImpl[] resultk =
new VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
int i = 0;
for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
getProtocolImplMap(rpcKind).entrySet()) {
if (pv.getKey().protocol.equals(protocolName)) {
resultk[i++] =
new VerProtocolImpl(pv.getKey().version, pv.getValue());
}
}
if (i == 0) {
return null;
}
VerProtocolImpl[] result = new VerProtocolImpl[i];
System.arraycopy(resultk, 0, result, 0, i);
return result;
}
VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind,
String protocolName) {
Long highestVersion = 0L;
ProtoClassProtoImpl highest = null;
if (LOG.isDebugEnabled()) {
LOG.debug("Size of protoMap for " + rpcKind + " ="
+ getProtocolImplMap(rpcKind).size());
}
for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
getProtocolImplMap(rpcKind).entrySet()) {
if (pv.getKey().protocol.equals(protocolName)) {
if ((highest == null) || (pv.getKey().version > highestVersion)) {
highest = pv.getValue();
highestVersion = pv.getKey().version;
}
}
}
if (highest == null) {
return null;
}
return new VerProtocolImpl(highestVersion, highest);
}
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
int numReaders, int queueSizePerHandler,
Configuration conf, String serverName,
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager);
initProtocolMetaInfo(conf);
}
private void initProtocolMetaInfo(Configuration conf)
throws IOException {
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(this);
BlockingService protocolInfoBlockingService = ProtocolInfoService
.newReflectiveBlockingService(xlator);
addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class,
protocolInfoBlockingService);
}
/**
* Add a protocol to the existing server.
* @param protocolClass - the protocol class
* @param protocolImpl - the impl of the protocol that will be called
* @return the server (for convenience)
*/
public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
Object protocolImpl) throws IOException {
registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
return this;
}
@Override
public Writable call(RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws IOException {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}
}
}