blob: ff2c1a4a5328c23aa9cb79708f6dbf175934176f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
/**
* RPC Engine for for protobuf based RPCs.
*/
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
new Server.ProtoBufRpcInvoker());
}
private static final ClientCache CLIENTS = new ClientCache();
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
.getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
addr, ticket, conf, factory, rpcTimeout)), false);
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
}
private static class Invoker implements RpcInvocationHandler {
private final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
private final Client.ConnectionId remoteId;
private final Client client;
private final long clientProtocolVersion;
private final String protocolName;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf), conf, factory);
}
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
public Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
}
private HadoopRpcRequestProto constructRpcRequest(Method method,
Object[] params) throws ServiceException {
HadoopRpcRequestProto rpcRequest;
HadoopRpcRequestProto.Builder builder = HadoopRpcRequestProto
.newBuilder();
builder.setMethodName(method.getName());
if (params.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
if (params[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
Message param = (Message) params[1];
builder.setRequest(param.toByteString());
// For protobuf, {@code protocol} used when creating client side proxy is
// the interface extending BlockingInterface, which has the annotations
// such as ProtocolName etc.
//
// Using Method.getDeclaringClass(), as in WritableEngine to get at
// the protocol interface will return BlockingInterface, from where
// the annotation ProtocolName and Version cannot be
// obtained.
//
// Hence we simply use the protocol class used to create the proxy.
// For PB this may limit the use of mixins on client side.
builder.setDeclaringClassProtocolName(protocolName);
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
}
/**
* This is the client side invoker of RPC method. It only throws
* ServiceException, since the invocation proxy expects only
* ServiceException to be thrown by the method in case protobuf service.
*
* ServiceException has the following causes:
* <ol>
* <li>Exceptions encountered on the client side in this method are
* set as cause in ServiceException as is.</li>
* <li>Exceptions from the server are wrapped in RemoteException and are
* set as cause in ServiceException</li>
* </ol>
*
* Note that the client calling protobuf RPC methods, must handle
* ServiceException by getting the cause from the ServiceException. If the
* cause is RemoteException, then unwrap it to get the exception thrown by
* the server.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
}
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
RpcResponseWritable val = null;
try {
val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWritable(rpcRequest), remoteId);
} catch (Throwable e) {
throw new ServiceException(e);
}
HadoopRpcResponseProto response = val.message;
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
// Wrap the received message
ResponseStatus status = response.getStatus();
if (status != ResponseStatus.SUCCESS) {
RemoteException re = new RemoteException(response.getException()
.getExceptionName(), response.getException().getStackTrace());
re.fillInStackTrace();
throw new ServiceException(re);
}
Message prototype = null;
try {
prototype = getReturnProtoType(method);
} catch (Exception e) {
throw new ServiceException(e);
}
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
.mergeFrom(response.getResponse()).build();
} catch (Throwable e) {
throw new ServiceException(e);
}
return returnMessage;
}
public void close() throws IOException {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
private Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
Class<?> returnType = method.getReturnType();
Method newInstMethod = returnType.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null);
returnTypes.put(method.getName(), prototype);
return prototype;
}
@Override //RpcInvocationHandler
public ConnectionId getConnectionId() {
return remoteId;
}
}
@Override
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) {
throw new UnsupportedOperationException();
}
/**
* Writable Wrapper for Protocol Buffer Requests
*/
private static class RpcRequestWritable implements Writable {
HadoopRpcRequestProto message;
@SuppressWarnings("unused")
public RpcRequestWritable() {
}
RpcRequestWritable(HadoopRpcRequestProto message) {
this.message = message;
}
@Override
public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
}
@Override
public void readFields(DataInput in) throws IOException {
int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = HadoopRpcRequestProto.parseFrom(bytes);
}
}
/**
* Writable Wrapper for Protocol Buffer Responses
*/
private static class RpcResponseWritable implements Writable {
HadoopRpcResponseProto message;
@SuppressWarnings("unused")
public RpcResponseWritable() {
}
public RpcResponseWritable(HadoopRpcResponseProto message) {
this.message = message;
}
@Override
public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
}
@Override
public void readFields(DataInput in) throws IOException {
int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = HadoopRpcResponseProto.parseFrom(bytes);
}
}
@VisibleForTesting
@InterfaceAudience.Private
@InterfaceStability.Unstable
static Client getClient(Configuration conf) {
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
RpcResponseWritable.class);
}
@Override
public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(protocol, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
}
public static class Server extends RPC.Server {
/**
* Construct an RPC server.
*
* @param protocolClass the class of protocol
* @param protocolImpl the protocolImpl whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port, int numHandlers,
int numReaders, int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, null, numHandlers,
numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
.getClass().getName()), secretManager);
this.verbose = verbose;
registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);
}
private static RpcResponseWritable handleException(Throwable e) {
HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
.setExceptionName(e.getClass().getName())
.setStackTrace(StringUtils.stringifyException(e)).build();
HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder()
.setStatus(ResponseStatus.ERRROR).setException(exception).build();
return new RpcResponseWritable(response);
}
private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
Message message) {
HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
.setResponse(message.toByteString())
.setStatus(ResponseStatus.SUCCESS)
.build();
return res;
}
/**
* Protobuf invoker for {@link RpcInvoker}
*/
static class ProtoBufRpcInvoker implements RpcInvoker {
private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
String protoName, long version) throws IOException {
ProtoNameVer pv = new ProtoNameVer(protoName, version);
ProtoClassProtoImpl impl =
server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (impl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER,
protoName);
if (highest == null) {
throw new IOException("Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, version,
highest.version);
}
return impl;
}
@Override
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are return in the resposne.
* See {@link HadoopRpcResponseProto}
*
* In this method there three types of exceptions possible and they are
* returned in response as follows.
* <ol>
* <li> Exceptions encountered in this method that are returned
* as {@link RpcServerException} </li>
* <li> Exceptions thrown by the service is wrapped in ServiceException.
* In that this method returns in response the exception thrown by the
* service.</li>
* <li> Other exceptions thrown by the service. They are returned as
* it is.</li>
* </ol>
*/
public Writable call(RPC.Server server, String protocol,
Writable writableRequest, long receiveTime) throws IOException {
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
HadoopRpcRequestProto rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();
String protoName = rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
if (server.verbose)
LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on " + protocol
+ " protocol.";
LOG.warn(msg);
return handleException(new RpcServerException(msg));
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType()
.mergeFrom(rpcRequest.getRequest()).build();
Message result;
try {
long startTime = System.currentTimeMillis();
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param);
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime - receiveTime);
if (LOG.isDebugEnabled()) {
LOG.info("Served: " + methodName + " queueTime= " + qTime +
" procesingTime= " + processingTime);
}
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(methodName,
processingTime);
} catch (ServiceException e) {
Throwable cause = e.getCause();
return handleException(cause != null ? cause : e);
} catch (Exception e) {
return handleException(e);
}
HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
return new RpcResponseWritable(response);
}
}
}
}