blob: 4dfeb7ce3cef2983aef3deebb712b8a4c7ff99f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.ipc;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.List;
import java.util.Map;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.ByteBufferInputStream;
import org.apache.avro.util.ByteBufferOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Base class for the client side of a protocol interaction. */
public abstract class Requestor {
private static final Logger LOG = LoggerFactory.getLogger(Requestor.class);
private static final Schema META =
Schema.createMap(Schema.create(Schema.Type.BYTES));
private static final GenericDatumReader<Map<String,ByteBuffer>>
META_READER = new GenericDatumReader<Map<String,ByteBuffer>>(META);
private static final GenericDatumWriter<Map<String,ByteBuffer>>
META_WRITER = new GenericDatumWriter<Map<String,ByteBuffer>>(META);
private final Protocol local;
private volatile Protocol remote;
private volatile boolean sendLocalText;
private final Transceiver transceiver;
private final ReentrantLock handshakeLock = new ReentrantLock();
protected final List<RPCPlugin> rpcMetaPlugins;
public Protocol getLocal() { return local; }
public Transceiver getTransceiver() { return transceiver; }
protected Requestor(Protocol local, Transceiver transceiver)
throws IOException {
this.local = local;
this.transceiver = transceiver;
this.rpcMetaPlugins =
new CopyOnWriteArrayList<RPCPlugin>();
}
/**
* Adds a new plugin to manipulate RPC metadata. Plugins
* are executed in the order that they are added.
* @param plugin a plugin that will manipulate RPC metadata
*/
public void addRPCPlugin(RPCPlugin plugin) {
rpcMetaPlugins.add(plugin);
}
private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
/** Writes a request message and reads a response or error message. */
public Object request(String messageName, Object request)
throws Exception {
// Initialize request
Request rpcRequest = new Request(messageName, request, new RPCContext());
CallFuture<Object> future = /* only need a Future for two-way messages */
rpcRequest.getMessage().isOneWay() ? null : new CallFuture<Object>();
// Send request
request(rpcRequest, future);
if (future == null) // the message is one-way, so return immediately
return null;
try { // the message is two-way, wait for the result
return future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
throw (Exception)e.getCause();
} else {
throw new AvroRemoteException(e.getCause());
}
}
}
/**
* Writes a request message and returns the result through a Callback.
* Clients can also use a Future interface by creating a new CallFuture<T>,
* passing it in as the Callback parameter, and then waiting on that Future.
* @param <T> the return type of the message.
* @param messageName the name of the message to invoke.
* @param request the request data to send.
* @param callback the callback which will be invoked when the response is returned
* or an error occurs.
* @throws Exception if an error occurs sending the message.
*/
public <T> void request(String messageName, Object request, Callback<T> callback)
throws Exception {
request(new Request(messageName, request, new RPCContext()), callback);
}
/** Writes a request message and returns the result through a Callback. */
<T> void request(Request request, Callback<T> callback)
throws Exception {
Transceiver t = getTransceiver();
if (!t.isConnected()) {
// Acquire handshake lock so that only one thread is performing the
// handshake and other threads block until the handshake is completed
handshakeLock.lock();
try {
if (t.isConnected()) {
// Another thread already completed the handshake; no need to hold
// the write lock
handshakeLock.unlock();
} else {
CallFuture<T> callFuture = new CallFuture<T>(callback);
t.transceive(request.getBytes(),
new TransceiverCallback<T>(request, callFuture));
// Block until handshake complete
callFuture.await();
if (request.getMessage().isOneWay()) {
Throwable error = callFuture.getError();
if (error != null) {
if (error instanceof Exception) {
throw (Exception) error;
} else {
throw new AvroRemoteException(error);
}
}
}
return;
}
} finally{
if (handshakeLock.isHeldByCurrentThread()) {
handshakeLock.unlock();
}
}
}
if (request.getMessage().isOneWay()) {
t.lockChannel();
try {
t.writeBuffers(request.getBytes());
if (callback != null) {
callback.handleResult(null);
}
} finally {
t.unlockChannel();
}
} else {
t.transceive(request.getBytes(),
new TransceiverCallback<T>(request, callback));
}
}
private static final ConcurrentMap<String,MD5> REMOTE_HASHES =
new ConcurrentHashMap<String,MD5>();
private static final ConcurrentMap<MD5,Protocol> REMOTE_PROTOCOLS =
new ConcurrentHashMap<MD5,Protocol>();
private static final SpecificDatumWriter<HandshakeRequest> HANDSHAKE_WRITER =
new SpecificDatumWriter<HandshakeRequest>(HandshakeRequest.class);
private static final SpecificDatumReader<HandshakeResponse> HANDSHAKE_READER =
new SpecificDatumReader<HandshakeResponse>(HandshakeResponse.class);
private void writeHandshake(Encoder out) throws IOException {
if (getTransceiver().isConnected()) return;
MD5 localHash = new MD5();
localHash.bytes(local.getMD5());
String remoteName = transceiver.getRemoteName();
MD5 remoteHash = REMOTE_HASHES.get(remoteName);
if (remoteHash == null) { // guess remote is local
remoteHash = localHash;
remote = local;
} else {
remote = REMOTE_PROTOCOLS.get(remoteHash);
}
HandshakeRequest handshake = new HandshakeRequest();
handshake.clientHash = localHash;
handshake.serverHash = remoteHash;
if (sendLocalText)
handshake.clientProtocol = local.toString();
RPCContext context = new RPCContext();
context.setHandshakeRequest(handshake);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientStartConnect(context);
}
handshake.meta = context.requestHandshakeMeta();
HANDSHAKE_WRITER.write(handshake, out);
}
private boolean readHandshake(Decoder in) throws IOException {
if (getTransceiver().isConnected()) return true;
boolean established = false;
HandshakeResponse handshake = HANDSHAKE_READER.read(null, in);
switch (handshake.match) {
case BOTH:
established = true;
sendLocalText = false;
break;
case CLIENT:
LOG.debug("Handshake match = CLIENT");
setRemote(handshake);
established = true;
sendLocalText = false;
break;
case NONE:
LOG.debug("Handshake match = NONE");
setRemote(handshake);
sendLocalText = true;
break;
default:
throw new AvroRuntimeException("Unexpected match: "+handshake.match);
}
RPCContext context = new RPCContext();
context.setHandshakeResponse(handshake);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientFinishConnect(context);
}
if (established)
getTransceiver().setRemote(remote);
return established;
}
private void setRemote(HandshakeResponse handshake) throws IOException {
remote = Protocol.parse(handshake.serverProtocol.toString());
MD5 remoteHash = (MD5)handshake.serverHash;
REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
REMOTE_PROTOCOLS.putIfAbsent(remoteHash, remote);
}
/** Return the remote protocol. Force a handshake if required. */
public Protocol getRemote() throws IOException {
if (remote != null) return remote; // already have it
MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
if (remoteHash != null) {
remote = REMOTE_PROTOCOLS.get(remoteHash);
if (remote != null) return remote; // already cached
}
handshakeLock.lock();
try {
// force handshake
ByteBufferOutputStream bbo = new ByteBufferOutputStream();
// direct because the payload is tiny.
Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
writeHandshake(out);
out.writeInt(0); // empty metadata
out.writeString(""); // bogus message name
List<ByteBuffer> response =
getTransceiver().transceive(bbo.getBufferList());
ByteBufferInputStream bbi = new ByteBufferInputStream(response);
BinaryDecoder in =
DecoderFactory.get().binaryDecoder(bbi, null);
readHandshake(in);
return this.remote;
} finally {
handshakeLock.unlock();
}
}
/** Writes a request message. */
public abstract void writeRequest(Schema schema, Object request,
Encoder out) throws IOException;
@Deprecated // for compatibility in 1.5
public Object readResponse(Schema schema, Decoder in) throws IOException {
return readResponse(schema, schema, in);
}
/** Reads a response message. */
public abstract Object readResponse(Schema writer, Schema reader, Decoder in)
throws IOException;
@Deprecated // for compatibility in 1.5
public Object readError(Schema schema, Decoder in) throws IOException {
return readError(schema, schema, in);
}
/** Reads an error message. */
public abstract Exception readError(Schema writer, Schema reader, Decoder in)
throws IOException;
/**
* Handles callbacks from transceiver invocations.
*/
protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> {
private final Request request;
private final Callback<T> callback;
/**
* Creates a TransceiverCallback.
* @param request the request to set.
* @param callback the callback to set.
*/
public TransceiverCallback(Request request, Callback<T> callback) {
this.request = request;
this.callback = callback;
}
@Override
@SuppressWarnings("unchecked")
public void handleResult(List<ByteBuffer> responseBytes) {
ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes);
BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);
try {
if (!readHandshake(in)) {
// Resend the handshake and return
Request handshake = new Request(request);
getTransceiver().transceive
(handshake.getBytes(),
new TransceiverCallback<T>(handshake, callback));
return;
}
} catch (Exception e) {
LOG.error("Error handling transceiver callback: " + e, e);
}
// Read response; invoke callback
Response response = new Response(request, in);
Object responseObject;
try {
try {
responseObject = response.getResponse();
} catch (Exception e) {
if (callback != null) {
callback.handleError(e);
}
return;
}
if (callback != null) {
callback.handleResult((T)responseObject);
}
} catch (Throwable t) {
LOG.error("Error in callback handler: " + t, t);
}
}
@Override
public void handleError(Throwable error) {
callback.handleError(error);
}
}
/**
* Encapsulates/generates a request.
*/
class Request {
private final String messageName;
private final Object request;
private final RPCContext context;
private final BinaryEncoder encoder;
private Message message;
private List<ByteBuffer> requestBytes;
/**
* Creates a Request.
* @param messageName the name of the message to invoke.
* @param request the request data to send.
* @param context the RPC context to use.
*/
public Request(String messageName, Object request, RPCContext context) {
this(messageName, request, context, null);
}
/**
* Creates a Request.
* @param messageName the name of the message to invoke.
* @param request the request data to send.
* @param context the RPC context to use.
* @param encoder the BinaryEncoder to use to serialize the request.
*/
public Request(String messageName, Object request, RPCContext context,
BinaryEncoder encoder) {
this.messageName = messageName;
this.request = request;
this.context = context;
this.encoder =
ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), encoder);
}
/**
* Copy constructor.
* @param other Request from which to copy fields.
*/
public Request(Request other) {
this.messageName = other.messageName;
this.request = other.request;
this.context = other.context;
this.encoder = other.encoder;
}
/**
* Gets the message name.
* @return the message name.
*/
public String getMessageName() {
return messageName;
}
/**
* Gets the RPC context.
* @return the RPC context.
*/
public RPCContext getContext() {
return context;
}
/**
* Gets the Message associated with this request.
* @return this request's message.
*/
public Message getMessage() {
if (message == null) {
message = getLocal().getMessages().get(messageName);
if (message == null) {
throw new AvroRuntimeException("Not a local message: "+messageName);
}
}
return message;
}
/**
* Gets the request data, generating it first if necessary.
* @return the request data.
* @throws Exception if an error occurs generating the request data.
*/
public List<ByteBuffer> getBytes()
throws Exception {
if (requestBytes == null) {
ByteBufferOutputStream bbo = new ByteBufferOutputStream();
BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
// use local protocol to write request
Message m = getMessage();
context.setMessage(m);
writeRequest(m.getRequest(), request, out); // write request payload
out.flush();
List<ByteBuffer> payload = bbo.getBufferList();
writeHandshake(out); // prepend handshake if needed
context.setRequestPayload(payload);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientSendRequest(context); // get meta-data from plugins
}
META_WRITER.write(context.requestCallMeta(), out);
out.writeString(m.getName()); // write message name
out.flush();
bbo.append(payload);
requestBytes = bbo.getBufferList();
}
return requestBytes;
}
}
/**
* Encapsulates/parses a response.
*/
class Response {
private final Request request;
private final BinaryDecoder in;
/**
* Creates a Response.
* @param request the Request associated with this response.
*/
public Response(Request request) {
this(request, null);
}
/**
* Creates a Creates a Response.
* @param request the Request associated with this response.
* @param in the BinaryDecoder to use to deserialize the response.
*/
public Response(Request request, BinaryDecoder in) {
this.request = request;
this.in = in;
}
/**
* Gets the RPC response, reading/deserializing it first if necessary.
* @return the RPC response.
* @throws Exception if an error occurs reading/deserializing the response.
*/
public Object getResponse()
throws Exception {
Message lm = request.getMessage();
Message rm = remote.getMessages().get(request.getMessageName());
if (rm == null)
throw new AvroRuntimeException
("Not a remote message: "+request.getMessageName());
Transceiver t = getTransceiver();
if ((lm.isOneWay() != rm.isOneWay()) && t.isConnected())
throw new AvroRuntimeException
("Not both one-way messages: "+request.getMessageName());
if (lm.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
RPCContext context = request.getContext();
context.setResponseCallMeta(META_READER.read(null, in));
if (!in.readBoolean()) { // no error
Object response = readResponse(rm.getResponse(), lm.getResponse(), in);
context.setResponse(response);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientReceiveResponse(context);
}
return response;
} else {
Exception error = readError(rm.getErrors(), lm.getErrors(), in);
context.setError(error);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientReceiveResponse(context);
}
throw error;
}
}
}
}