Merge pull request #26 from Happy0/stream_close

Prevent it being possible to close streams more than once and add logging
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index b0f3639..eb3fadc 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -99,6 +99,7 @@
 
         awaitingAsyncResponse.put(requestNumber, result);
         Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
+        logOutgoingRequest(message);
         sendBytes(bytes);
       }
     };
@@ -122,7 +123,16 @@
       Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
 
       Runnable closeStreamHandler = () -> {
-        endStream(requestNumber);
+
+        // Run on vertx context because this callback may be called from a different
+        // thread by the caller
+        vertx.runOnContext(new Handler<Void>() {
+          @Override
+          public void handle(Void event) {
+            endStream(requestNumber);
+          }
+        });
+
       };
 
       ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
@@ -131,6 +141,7 @@
         scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
       } else {
         streams.put(requestNumber, scuttlebuttStreamHandler);
+        logOutgoingRequest(message);
         sendBytes(requestBytes);
       }
 
@@ -140,6 +151,15 @@
     vertx.runOnContext(synchronizedRequest);
   }
 
+  private void logOutgoingRequest(RPCMessage rpcMessage) {
+    if (logger.isDebugEnabled()) {
+      String requestString = new String(rpcMessage.asString());
+      String logMessage = String.format("[%d] Outgoing request: %s", rpcMessage.requestNumber(), requestString);
+      logger.debug(logMessage);
+    }
+
+  }
+
   @Override
   public void close() {
     vertx.runOnContext((event) -> {
@@ -192,14 +212,14 @@
   private void handleRequest(RPCMessage rpcMessage) {
     // Not yet implemented
     logger.warn("Received incoming request, but we do not yet handle any requests: " + rpcMessage.asString());
-
   }
 
   private void handleResponse(RPCMessage response) {
     int requestNumber = response.requestNumber() * -1;
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Incoming response: " + response.asString());
+      String logMessage = String.format("[%d] incoming response: %s", requestNumber, response.asString());
+      logger.debug(logMessage);
     }
 
     byte rpcFlags = response.rpcFlags();
@@ -214,9 +234,8 @@
       if (scuttlebuttStreamHandler != null) {
 
         if (response.isSuccessfulLastMessage()) {
-          streams.remove(requestNumber);
+          // Confirm our end of the stream close and inform the consumer of the stream that it is closed
           endStream(requestNumber);
-          scuttlebuttStreamHandler.onStreamEnd();
         } else if (exception.isPresent()) {
           scuttlebuttStreamHandler.onStreamError(exception.get());
         } else {
@@ -260,10 +279,32 @@
     messageSender.accept(bytes);
   }
 
+  /**
+   * Sends an stream close message over the RPC channel to for the given request number if we have not already closed
+   * our end of the stream.
+   *
+   * Removes the stream handler from the state, so any newly incoming messages until the other side of the stream has
+   * closed its end will be ignored.
+   *
+   * @param requestNumber the request number of the stream to send a close message over RPC for
+   */
   private void endStream(int requestNumber) {
     try {
-      Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
-      sendBytes(streamEnd);
+      ScuttlebuttStreamHandler streamHandler = streams.remove(requestNumber);
+
+      // Only send the message if the stream hasn't already been closed at our end
+      if (streamHandler != null) {
+        Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+
+        streamHandler.onStreamEnd();
+        if (logger.isDebugEnabled()) {
+          String logMessage = String.format("[%d] Sending close stream message.", requestNumber);
+          logger.debug(logMessage);
+        }
+
+        sendBytes(streamEnd);
+      }
+
     } catch (JsonProcessingException e) {
       logger.warn("Unexpectedly could not encode stream end message to JSON.");
     }