Merge pull request #26 from Happy0/stream_close
Prevent it being possible to close streams more than once and add logging
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index b0f3639..eb3fadc 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -99,6 +99,7 @@
awaitingAsyncResponse.put(requestNumber, result);
Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
+ logOutgoingRequest(message);
sendBytes(bytes);
}
};
@@ -122,7 +123,16 @@
Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
Runnable closeStreamHandler = () -> {
- endStream(requestNumber);
+
+ // Run on vertx context because this callback may be called from a different
+ // thread by the caller
+ vertx.runOnContext(new Handler<Void>() {
+ @Override
+ public void handle(Void event) {
+ endStream(requestNumber);
+ }
+ });
+
};
ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
@@ -131,6 +141,7 @@
scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
} else {
streams.put(requestNumber, scuttlebuttStreamHandler);
+ logOutgoingRequest(message);
sendBytes(requestBytes);
}
@@ -140,6 +151,15 @@
vertx.runOnContext(synchronizedRequest);
}
+ private void logOutgoingRequest(RPCMessage rpcMessage) {
+ if (logger.isDebugEnabled()) {
+ String requestString = new String(rpcMessage.asString());
+ String logMessage = String.format("[%d] Outgoing request: %s", rpcMessage.requestNumber(), requestString);
+ logger.debug(logMessage);
+ }
+
+ }
+
@Override
public void close() {
vertx.runOnContext((event) -> {
@@ -192,14 +212,14 @@
private void handleRequest(RPCMessage rpcMessage) {
// Not yet implemented
logger.warn("Received incoming request, but we do not yet handle any requests: " + rpcMessage.asString());
-
}
private void handleResponse(RPCMessage response) {
int requestNumber = response.requestNumber() * -1;
if (logger.isDebugEnabled()) {
- logger.debug("Incoming response: " + response.asString());
+ String logMessage = String.format("[%d] incoming response: %s", requestNumber, response.asString());
+ logger.debug(logMessage);
}
byte rpcFlags = response.rpcFlags();
@@ -214,9 +234,8 @@
if (scuttlebuttStreamHandler != null) {
if (response.isSuccessfulLastMessage()) {
- streams.remove(requestNumber);
+ // Confirm our end of the stream close and inform the consumer of the stream that it is closed
endStream(requestNumber);
- scuttlebuttStreamHandler.onStreamEnd();
} else if (exception.isPresent()) {
scuttlebuttStreamHandler.onStreamError(exception.get());
} else {
@@ -260,10 +279,32 @@
messageSender.accept(bytes);
}
+ /**
+ * Sends an stream close message over the RPC channel to for the given request number if we have not already closed
+ * our end of the stream.
+ *
+ * Removes the stream handler from the state, so any newly incoming messages until the other side of the stream has
+ * closed its end will be ignored.
+ *
+ * @param requestNumber the request number of the stream to send a close message over RPC for
+ */
private void endStream(int requestNumber) {
try {
- Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
- sendBytes(streamEnd);
+ ScuttlebuttStreamHandler streamHandler = streams.remove(requestNumber);
+
+ // Only send the message if the stream hasn't already been closed at our end
+ if (streamHandler != null) {
+ Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+
+ streamHandler.onStreamEnd();
+ if (logger.isDebugEnabled()) {
+ String logMessage = String.format("[%d] Sending close stream message.", requestNumber);
+ logger.debug(logMessage);
+ }
+
+ sendBytes(streamEnd);
+ }
+
} catch (JsonProcessingException e) {
logger.warn("Unexpectedly could not encode stream end message to JSON.");
}