blob: eb3fadc8480f03ee98562fadb16d096a71c09907 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.scuttlebutt.rpc.mux;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.concurrent.CompletableAsyncResult;
import org.apache.tuweni.scuttlebutt.handshake.vertx.ClientHandler;
import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
import org.apache.tuweni.scuttlebutt.rpc.RPCCodec;
import org.apache.tuweni.scuttlebutt.rpc.RPCFlag;
import org.apache.tuweni.scuttlebutt.rpc.RPCMessage;
import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import org.logl.Logger;
import org.logl.LoggerProvider;
/**
* Handles RPC requests and responses from an active connection to a scuttlebutt node.
*/
public class RPCHandler implements Multiplexer, ClientHandler {
private final Consumer<Bytes> messageSender;
private final Logger logger;
private final Runnable connectionCloser;
private final ObjectMapper objectMapper;
/**
* We run each each update on the vertx event loop to update the request state synchronously, and to handle the
* underlying connection closing by failing the in progress requests and not accepting future requests
*/
private final Vertx vertx;
private Map<Integer, CompletableAsyncResult<RPCResponse>> awaitingAsyncResponse = new HashMap<>();
private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap<>();
private boolean closed;
/**
* Makes RPC requests over a connection
*
* @param vertx The vertx instance to queue requests with
* @param messageSender sends the request to the node
* @param terminationFn closes the connection
* @param objectMapper the objectMapper to serialize and deserialize message request and response bodies
* @param logger
*/
public RPCHandler(
Vertx vertx,
Consumer<Bytes> messageSender,
Runnable terminationFn,
ObjectMapper objectMapper,
LoggerProvider logger) {
this.vertx = vertx;
this.messageSender = messageSender;
this.connectionCloser = terminationFn;
this.closed = false;
this.objectMapper = objectMapper;
this.logger = logger.getLogger("rpc handler");
}
@Override
public AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException {
Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);
CompletableAsyncResult<RPCResponse> result = AsyncResult.incomplete();
Handler<Void> synchronizedAddRequest = (event) -> {
if (closed) {
result.completeExceptionally(new ConnectionClosedException());
} else {
RPCMessage message = new RPCMessage(bodyBytes);
int requestNumber = message.requestNumber();
awaitingAsyncResponse.put(requestNumber, result);
Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
logOutgoingRequest(message);
sendBytes(bytes);
}
};
vertx.runOnContext(synchronizedAddRequest);
return result;
}
@Override
public void openStream(RPCStreamRequest request, Function<Runnable, ScuttlebuttStreamHandler> responseSink)
throws JsonProcessingException {
Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);
Handler<Void> synchronizedRequest = (event) -> {
RPCFlag[] rpcFlags = request.getRPCFlags();
RPCMessage message = new RPCMessage(bodyBytes);
int requestNumber = message.requestNumber();
Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
Runnable closeStreamHandler = () -> {
// Run on vertx context because this callback may be called from a different
// thread by the caller
vertx.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
endStream(requestNumber);
}
});
};
ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
if (closed) {
scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
} else {
streams.put(requestNumber, scuttlebuttStreamHandler);
logOutgoingRequest(message);
sendBytes(requestBytes);
}
};
vertx.runOnContext(synchronizedRequest);
}
private void logOutgoingRequest(RPCMessage rpcMessage) {
if (logger.isDebugEnabled()) {
String requestString = new String(rpcMessage.asString());
String logMessage = String.format("[%d] Outgoing request: %s", rpcMessage.requestNumber(), requestString);
logger.debug(logMessage);
}
}
@Override
public void close() {
vertx.runOnContext((event) -> {
connectionCloser.run();
});
}
@Override
public void receivedMessage(Bytes message) {
Handler<Void> synchronizedHandleMessage = (event) -> {
RPCMessage rpcMessage = new RPCMessage(message);
// A negative request number indicates that this is a response, rather than a request that this node
// should service
if (rpcMessage.requestNumber() < 0) {
handleResponse(rpcMessage);
} else {
handleRequest(rpcMessage);
}
};
vertx.runOnContext(synchronizedHandleMessage);
}
@Override
public void streamClosed() {
Handler<Void> synchronizedCloseStream = (event) -> {
closed = true;
streams.forEach((key, streamHandler) -> {
streamHandler.onStreamError(new ConnectionClosedException());
});
streams.clear();
awaitingAsyncResponse.forEach((key, value) -> {
if (!value.isDone()) {
value.completeExceptionally(new ConnectionClosedException());
}
});
awaitingAsyncResponse.clear();
};
vertx.runOnContext(synchronizedCloseStream);
}
private void handleRequest(RPCMessage rpcMessage) {
// Not yet implemented
logger.warn("Received incoming request, but we do not yet handle any requests: " + rpcMessage.asString());
}
private void handleResponse(RPCMessage response) {
int requestNumber = response.requestNumber() * -1;
if (logger.isDebugEnabled()) {
String logMessage = String.format("[%d] incoming response: %s", requestNumber, response.asString());
logger.debug(logMessage);
}
byte rpcFlags = response.rpcFlags();
boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags);
Optional<RPCRequestFailedException> exception = response.getException(objectMapper);
if (isStream) {
ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber);
if (scuttlebuttStreamHandler != null) {
if (response.isSuccessfulLastMessage()) {
// Confirm our end of the stream close and inform the consumer of the stream that it is closed
endStream(requestNumber);
} else if (exception.isPresent()) {
scuttlebuttStreamHandler.onStreamError(exception.get());
} else {
RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());
scuttlebuttStreamHandler.onMessage(successfulResponse);
}
} else {
logger.warn(
"Couldn't find stream handler for RPC response with request number "
+ requestNumber
+ " "
+ response.asString());
}
} else {
CompletableAsyncResult<RPCResponse> rpcMessageFuture = awaitingAsyncResponse.remove(requestNumber);
if (rpcMessageFuture != null) {
if (exception.isPresent()) {
rpcMessageFuture.completeExceptionally(exception.get());
} else {
RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());
rpcMessageFuture.complete(successfulResponse);
}
} else {
logger.warn(
"Couldn't find async handler for RPC response with request number "
+ requestNumber
+ " "
+ response.asString());
}
}
}
private void sendBytes(Bytes bytes) {
messageSender.accept(bytes);
}
/**
* Sends an stream close message over the RPC channel to for the given request number if we have not already closed
* our end of the stream.
*
* Removes the stream handler from the state, so any newly incoming messages until the other side of the stream has
* closed its end will be ignored.
*
* @param requestNumber the request number of the stream to send a close message over RPC for
*/
private void endStream(int requestNumber) {
try {
ScuttlebuttStreamHandler streamHandler = streams.remove(requestNumber);
// Only send the message if the stream hasn't already been closed at our end
if (streamHandler != null) {
Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
streamHandler.onStreamEnd();
if (logger.isDebugEnabled()) {
String logMessage = String.format("[%d] Sending close stream message.", requestNumber);
logger.debug(logMessage);
}
sendBytes(streamEnd);
}
} catch (JsonProcessingException e) {
logger.warn("Unexpectedly could not encode stream end message to JSON.");
}
}
}