[REEF-1651] Improve logging in REEF Remote Manager and related classes

Summary of changes:
    * Improve logging in the `RemoteManager` and around
    * Implement `.toString()` methods for some remote messages
    * Log unhandled exceptions in `DefaultErrorHandler`
    * Multiple code readability improvements

JIRA:
  [REEF-1651](https://issues.apache.org/jira/browse/REEF-1651)

Pull request:
  This closes #1166
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index abdecd7..c090dec 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -38,29 +38,33 @@
  * This is bound to the start event of the clock and dispatches it to the appropriate application code.
  */
 public final class DriverStartHandler implements EventHandler<StartTime> {
+
   private static final Logger LOG = Logger.getLogger(DriverStartHandler.class.getName());
 
   private final Set<EventHandler<StartTime>> startHandlers;
   private final Set<EventHandler<DriverRestarted>> restartHandlers;
   private final Set<EventHandler<DriverRestarted>> serviceRestartHandlers;
+
   private final DriverRestartManager driverRestartManager;
 
   @Inject
-  DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
-                     final Set<EventHandler<StartTime>> startHandlers,
-                     @Parameter(DriverRestartHandler.class)
-                     final Set<EventHandler<DriverRestarted>> restartHandlers,
-                     @Parameter(ServiceDriverRestartedHandlers.class)
-                     final Set<EventHandler<DriverRestarted>> serviceRestartHandlers,
-                     final DriverRestartManager driverRestartManager) {
+  private DriverStartHandler(
+      @Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
+        final Set<EventHandler<StartTime>> startHandlers,
+      @Parameter(DriverRestartHandler.class)
+        final Set<EventHandler<DriverRestarted>> restartHandlers,
+      @Parameter(ServiceDriverRestartedHandlers.class)
+        final Set<EventHandler<DriverRestarted>> serviceRestartHandlers,
+      final DriverRestartManager driverRestartManager) {
+
     this.startHandlers = startHandlers;
     this.restartHandlers = restartHandlers;
     this.serviceRestartHandlers = serviceRestartHandlers;
     this.driverRestartManager = driverRestartManager;
-    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers [{0}], RestartHandlers [{1}]," +
-            "and ServiceRestartHandlers [{2}].",
-        new String[] {this.startHandlers.toString(), this.restartHandlers.toString(),
-            this.serviceRestartHandlers.toString()});
+
+    LOG.log(Level.FINE,
+        "Instantiated DriverStartHandler: StartHandlers:{0} RestartHandlers:{1} ServiceRestartHandlers:{2}",
+        new Object[] {this.startHandlers, this.restartHandlers, this.serviceRestartHandlers});
   }
 
   @Override
@@ -73,19 +77,20 @@
   }
 
   private void onRestart(final StartTime startTime) {
-    if (this.restartHandlers.size() > 0) {
-      final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
-          new ArrayList<>(this.serviceRestartHandlers.size() + this.restartHandlers.size());
 
-      orderedRestartHandlers.addAll(this.serviceRestartHandlers);
-      orderedRestartHandlers.addAll(this.restartHandlers);
-
-      // This can only be called after calling client restart handlers because REEF.NET
-      // JobDriver requires making this call to set up the InterOp handlers.
-      this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
-    } else {
-      throw new DriverFatalRuntimeException("Driver restart happened, but no ON_DRIVER_RESTART handler is bound.");
+    if (this.restartHandlers.isEmpty()) {
+      throw new DriverFatalRuntimeException("Driver restarted, but no ON_DRIVER_RESTART handler is bound.");
     }
+
+    final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
+        new ArrayList<>(this.serviceRestartHandlers.size() + this.restartHandlers.size());
+
+    orderedRestartHandlers.addAll(this.serviceRestartHandlers);
+    orderedRestartHandlers.addAll(this.restartHandlers);
+
+    // This can only be called after calling client restart handlers because REEF.NET
+    // JobDriver requires making this call to set up the InterOp handlers.
+    this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
   }
 
   private void onStart(final StartTime startTime) {
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
index 476cc75..4b234a7 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
@@ -21,18 +21,23 @@
 import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * The default RemoteConfiguration.ErrorHandler.
  */
 final class DefaultErrorHandler implements EventHandler<Throwable> {
 
+  private static final Logger LOG = Logger.getLogger(DefaultErrorHandler.class.getName());
+
   @Inject
-  DefaultErrorHandler() {
+  private DefaultErrorHandler() {
   }
 
   @Override
   public void onNext(final Throwable value) {
+    LOG.log(Level.SEVERE, "No error handler in RemoteManager", value);
     throw new RuntimeException("No error handler bound for RemoteManager.", value);
   }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
index 17b9b13..310de6d 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.reef.wake.remote;
 
-
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Stage;
@@ -29,9 +28,6 @@
  */
 @DefaultImplementation(DefaultRemoteManagerImplementation.class)
 public interface RemoteManager extends Stage {
-  /**
-   * Constructor that takes a Codec<T>
-   */
 
   /**
    * Returns an event handler that can be used to send messages of type T to the
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index d525c51..6012ba1 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -52,60 +52,61 @@
    * The timeout used for the execute running in close().
    */
   private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms
-  private final AtomicBoolean closed = new AtomicBoolean(false);
-  private final String name;
-  private final Transport transport;
-  private final RemoteSenderStage reSendStage;
-  private final EStage<TransportEvent> reRecvStage;
-  private final HandlerContainer handlerContainer;
-  private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
-  private RemoteIdentifier myIdentifier;
+
   /**
    * Indicates a hostname that isn't set or known.
    */
   public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME;
 
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+  private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
+
+  private final String name;
+  private final Transport transport;
+  private final RemoteSenderStage reSendStage;
+  private final EStage<TransportEvent> reRecvStage;
+  private final HandlerContainer handlerContainer;
+
+  private RemoteIdentifier myIdentifier;
+
   @Inject
   private <T> DefaultRemoteManagerImplementation(
-            @Parameter(RemoteConfiguration.ManagerName.class) final String name,
-            @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
-            @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
-            @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec,
-            @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler,
-            @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee,
-            @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
-            @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout,
-            final LocalAddressProvider localAddressProvider,
-            final TransportFactory tpFactory,
-            final TcpPortProvider tcpPortProvider) {
+        @Parameter(RemoteConfiguration.ManagerName.class) final String name,
+        @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
+        @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
+        @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec,
+        @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler,
+        @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee,
+        @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
+        @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout,
+        final LocalAddressProvider localAddressProvider,
+        final TransportFactory tpFactory,
+        final TcpPortProvider tcpPortProvider) {
 
     this.name = name;
     this.handlerContainer = new HandlerContainer<>(name, codec);
 
     this.reRecvStage = orderingGuarantee ?
-                new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
-                new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
+        new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
+        new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
 
-    this.transport = tpFactory.newInstance(
-                hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout,
-                tcpPortProvider);
+    this.transport = tpFactory.newInstance(hostAddress, listeningPort,
+        this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider);
 
     this.handlerContainer.setTransport(this.transport);
 
-    this.myIdentifier = new SocketRemoteIdentifier(
-                (InetSocketAddress) this.transport.getLocalAddress());
+    this.myIdentifier = new SocketRemoteIdentifier((InetSocketAddress)this.transport.getLocalAddress());
 
     this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
 
     StageManager.instance().register(this);
-    LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. " +
-                "Binding address provided by {5}",
-                new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(),
-                        this.transport.getLocalAddress().toString(),
-                        this.transport.getListeningPort(), localAddressProvider}
-    );
-  }
 
+    final int counter = COUNTER.incrementAndGet();
+
+    LOG.log(Level.FINEST,
+        "RemoteManager {0} instantiated id {1} counter {2} listening on {3} Binding address provided by {4}",
+        new Object[] {this.name, this.myIdentifier, counter, this.transport.getLocalAddress(), localAddressProvider});
+  }
 
   /**
    * Returns a proxy event handler for a remote identifier and a message type.
@@ -116,7 +117,7 @@
 
     if (LOG.isLoggable(Level.FINE)) {
       LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1} messageType: {2}",
-          new Object[]{this.name, destinationIdentifier, messageType.getName()});
+          new Object[] {this.name, destinationIdentifier, messageType.getName()});
     }
 
     return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier,
@@ -131,11 +132,12 @@
   public <T, U extends T> AutoCloseable registerHandler(
       final RemoteIdentifier sourceIdentifier,
       final Class<U> messageType, final EventHandler<T> theHandler) {
+
     if (LOG.isLoggable(Level.FINE)) {
       LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}",
-          new Object[]{this.name, sourceIdentifier, messageType.getName(),
-              theHandler.getClass().getName()});
+          new Object[] {this.name, sourceIdentifier, messageType.getName(), theHandler.getClass().getName()});
     }
+
     return this.handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler);
   }
 
@@ -145,10 +147,12 @@
   @Override
   public <T, U extends T> AutoCloseable registerHandler(
       final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) {
+
     if (LOG.isLoggable(Level.FINE)) {
       LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}",
-          new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()});
+          new Object[] {this.name, messageType.getName(), theHandler.getClass().getName()});
     }
+
     return this.handlerContainer.registerHandler(messageType, theHandler);
   }
 
@@ -162,10 +166,11 @@
 
   @Override
   public void close() {
-    if (closed.compareAndSet(false, true)) {
+
+    if (this.closed.compareAndSet(false, true)) {
 
       LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}",
-          new Object[]{this.name, this.myIdentifier});
+          new Object[] {this.name, this.myIdentifier});
 
       final Runnable closeRunnable = new Runnable() {
         @Override
@@ -194,12 +199,13 @@
             LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e);
           }
         }
-
       };
 
       final ExecutorService closeExecutor = Executors.newSingleThreadExecutor();
+
       closeExecutor.submit(closeRunnable);
       closeExecutor.shutdown();
+
       if (!closeExecutor.isShutdown()) {
         LOG.log(Level.SEVERE, "close executor did not shutdown properly.");
       }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
index d972d03..26a7747 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
@@ -52,8 +52,12 @@
   private Transport transport;
 
   HandlerContainer(final String name, final Codec<T> codec) {
+
     this.name = name;
     this.codec = codec;
+
+    LOG.log(Level.FINER, "Instantiated HandlerContainer {0} with codec {1}",
+        new String[] {this.name, this.codec.getClass().getCanonicalName()});
   }
 
   void setTransport(final Transport transport) {
@@ -141,7 +145,7 @@
       unsubscribeClass = new SubscriptionHandler.Unsubscriber<Class<? extends T>>() {
         @Override
         public void unsubscribe(final Class<? extends T> token) {
-          LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[] {name, token.getName()});
+          LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[] {name, token.getCanonicalName()});
           msgTypeToHandlerMap.remove(token);
         }
       };
@@ -152,7 +156,7 @@
         @Override
         public void unsubscribe(final Tuple2<RemoteIdentifier, Class<? extends T>> token) {
           LOG.log(Level.FINER, "Unsubscribe: {0} tuple {1},{2}",
-              new Object[] {name, token.getT1(), token.getT2().getName()});
+              new Object[] {name, token.getT1(), token.getT2().getCanonicalName()});
           tupleToHandlerMap.remove(token);
         }
       };
@@ -168,41 +172,49 @@
       };
 
   /**
-   * Dispatches a message.
-   *
-   * @param value
+   * Dispatch message received from the remote to proper event handler.
+   * @param value Remote message, encoded as byte[].
    */
-  @SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
   @Override
+  @SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
   public synchronized void onNext(final RemoteEvent<byte[]> value) {
 
-    LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[]{this.name, value});
+    LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[] {this.name, value});
 
     final T decodedEvent = this.codec.decode(value.getEvent());
     final Class<?> clazz = decodedEvent.getClass();
 
+    LOG.log(Level.FINEST, "RemoteManager: {0} decoded event {1} :: {2}",
+        new Object[] {this.name, clazz.getCanonicalName(), decodedEvent});
+
     // check remote identifier and message type
-    final SocketRemoteIdentifier id =
-        new SocketRemoteIdentifier((InetSocketAddress) value.remoteAddress());
+    final SocketRemoteIdentifier id = new SocketRemoteIdentifier((InetSocketAddress)value.remoteAddress());
 
     final Tuple2<RemoteIdentifier, Class<?>> tuple = new Tuple2<RemoteIdentifier, Class<?>>(id, clazz);
 
     final EventHandler<T> tupleHandler = (EventHandler<T>) this.tupleToHandlerMap.get(tuple);
+
     if (tupleHandler != null) {
-      LOG.log(Level.FINER, "Tuple handler: {0}", tuple);
+
+      LOG.log(Level.FINER, "Tuple handler: {0},{1}",
+          new Object[] {tuple.getT1(), tuple.getT2().getCanonicalName()});
+
       tupleHandler.onNext(decodedEvent);
+
     } else {
-      final EventHandler<RemoteMessage<? extends T>> messageHandler =
-          this.msgTypeToHandlerMap.get(clazz);
-      if (messageHandler != null) {
-        LOG.log(Level.FINER, "Message handler: {0}", clazz);
-        messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));
-      } else {
+
+      final EventHandler<RemoteMessage<? extends T>> messageHandler = this.msgTypeToHandlerMap.get(clazz);
+
+      if (messageHandler == null) {
         final RuntimeException ex = new RemoteRuntimeException(
-            "Unknown message type in dispatch: " + clazz.getName() + " from " + id);
+            "Unknown message type in dispatch: " + clazz.getCanonicalName() + " from " + id);
         LOG.log(Level.WARNING, "Unknown message type in dispatch.", ex);
         throw ex;
       }
+
+      LOG.log(Level.FINER, "Message handler: {0}", clazz.getCanonicalName());
+
+      messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));
     }
   }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
index 55adbc6..3ece567 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
@@ -29,7 +29,9 @@
 
   private final T event;
   private final long seq;
+
   //private static final AtomicLong curSeq = new AtomicLong(0);
+
   private SocketAddress localAddr;
   private SocketAddress remoteAddr;
 
@@ -108,17 +110,8 @@
    * @return a string representation of this object
    */
   public String toString() {
-    final StringBuilder builder = new StringBuilder();
-    builder.append("RemoteEvent");
-    builder.append(" localAddr=");
-    builder.append(localAddr);
-    builder.append(" remoteAddr=");
-    builder.append(remoteAddr);
-    builder.append(" seq=");
-    builder.append(seq);
-    builder.append(" event=");
-    builder.append(event);
-    return builder.toString();
+    return String.format(
+        "RemoteEvent localAddr=%s remoteAddr=%s seq=%d event=%s:%s",
+        this.localAddr, this.remoteAddr, this.seq, this.event.getClass().getCanonicalName(), this.event);
   }
-
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
index 5f6b15e..8565944 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
@@ -22,11 +22,11 @@
 
 import java.net.SocketAddress;
 
-
 /**
  * Event sent from a remote node.
  */
 public class TransportEvent {
+
   private final byte[] data;
   private final SocketAddress localAddr;
   private final SocketAddress remoteAddr;
@@ -65,6 +65,13 @@
     }
   }
 
+  @Override
+  public String toString() {
+    return String.format(
+        "TransportEvent: {local: %s remote: %s size: %d bytes}",
+        this.localAddr, this.remoteAddr, this.data.length);
+  }
+
   /**
    * Gets the data.
    *
@@ -102,5 +109,4 @@
   public SocketAddress getRemoteAddress() {
     return remoteAddr;
   }
-
 }