[REEF-1935] Improve logging in Wake NettyMessagingTransport and related classes

Summary of changes:
   * Implement `.toString()` for `LinkReference`
   * Implement `.toString()` for `RemoteEventEncoder`
   * Implement `.toString()` for `NettyMessagingTransport`
   * Improve logging and remove redundant code in `NettyMessagingTransport` constructor

JIRA:
  [REEF-1935](https://issues.apache.org/jira/browse/REEF-1935)

Pull Request:
  This closes #1401
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
index fdf10b2..f65646f 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
@@ -59,4 +59,8 @@
     }
   }
 
+  @Override
+  public String toString() {
+    return String.format("RemoteEventDecoder: { decoder: %s }", this.decoder);
+  }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
index 29e3be6..4a3fa0b 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
@@ -61,4 +61,8 @@
     return builder.build().toByteArray();
   }
 
+  @Override
+  public String toString() {
+    return String.format("RemoteEventEncoder: { encoder: %s }", this.encoder);
+  }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java
index 6e780b8..21cd4d5 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java
@@ -49,4 +49,9 @@
   AtomicInteger getConnectInProgress() {
     return this.connectInProgress;
   }
+
+  @Override
+  public String toString() {
+    return String.format("LinkReference: { link: %s }", this.link); // NettyLink has a good .toString() implementation
+  }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index 2643030..ea221ec 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -54,7 +54,6 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -86,14 +85,12 @@
   private final EventLoopGroup serverWorkerGroup;
 
   private final Bootstrap clientBootstrap;
-  private final ServerBootstrap serverBootstrap;
   private final Channel acceptor;
 
   private final ChannelGroup clientChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
   private final ChannelGroup serverChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
-  private final int serverPort;
-  private final SocketAddress localAddress;
+  private final InetSocketAddress localAddress;
 
   private final NettyClientEventListener clientEventListener;
   private final NettyServerEventListener serverEventListener;
@@ -105,7 +102,7 @@
    * Constructs a messaging transport.
    *
    * @param hostAddress   the server host address
-   * @param port          the server listening port; when it is 0, randomly assign a port number
+   * @param listenPort    the server listening port; when it is 0, randomly assign a port number
    * @param clientStage   the client-side stage that handles transport events
    * @param serverStage   the server-side stage that handles transport events
    * @param numberOfTries the number of tries of connection
@@ -115,7 +112,7 @@
   @Inject
   private NettyMessagingTransport(
       @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
-      @Parameter(RemoteConfiguration.Port.class) final int port,
+      @Parameter(RemoteConfiguration.Port.class) final int listenPort,
       @Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage,
       @Parameter(RemoteConfiguration.RemoteServerStage.class) final EStage<TransportEvent> serverStage,
       @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
@@ -123,9 +120,8 @@
       final TcpPortProvider tcpPortProvider,
       final LocalAddressProvider localAddressProvider) {
 
-    int p = port;
-    if (p < 0) {
-      throw new RemoteRuntimeException("Invalid server port: " + p);
+    if (listenPort < 0) {
+      throw new RemoteRuntimeException("Invalid server port: " + listenPort);
     }
 
     final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress;
@@ -142,16 +138,16 @@
     this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS,
         new DefaultThreadFactory(CLASS_NAME + ":ClientWorker"));
 
-    this.clientBootstrap = new Bootstrap();
-    this.clientBootstrap.group(this.clientWorkerGroup)
+    this.clientBootstrap = new Bootstrap()
+        .group(this.clientWorkerGroup)
         .channel(NioSocketChannel.class)
         .handler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("client",
             this.clientChannelGroup, this.clientEventListener)))
         .option(ChannelOption.SO_REUSEADDR, true)
         .option(ChannelOption.SO_KEEPALIVE, true);
 
-    this.serverBootstrap = new ServerBootstrap();
-    this.serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup)
+    final ServerBootstrap serverBootstrap = new ServerBootstrap()
+        .group(this.serverBossGroup, this.serverWorkerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("server",
             this.serverChannelGroup, this.serverEventListener)))
@@ -159,56 +155,43 @@
         .option(ChannelOption.SO_REUSEADDR, true)
         .childOption(ChannelOption.SO_KEEPALIVE, true);
 
-    LOG.log(Level.FINE, "Binding to {0}", p);
+    LOG.log(Level.FINE, "Binding to {0}:{1}", new Object[] {host, listenPort});
 
-    Channel acceptorFound = null;
     try {
-      if (p > 0) {
-        acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel();
+      if (listenPort > 0) {
+        this.localAddress = new InetSocketAddress(host, listenPort);
+        this.acceptor = serverBootstrap.bind(this.localAddress).sync().channel();
       } else {
-        final Iterator<Integer> ports = tcpPortProvider.iterator();
-        while (acceptorFound == null) {
-          if (!ports.hasNext()) {
-            throw new IllegalStateException("tcpPortProvider cannot find a free port.");
-          }
-          p = ports.next();
-          LOG.log(Level.FINEST, "Try port {0}", p);
+        InetSocketAddress socketAddr = null;
+        Channel acceptorFound = null;
+        for (int port : tcpPortProvider) {
+          LOG.log(Level.FINEST, "Try port {0}", port);
           try {
-            acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel();
+            socketAddr = new InetSocketAddress(host, port);
+            acceptorFound = serverBootstrap.bind(socketAddr).sync().channel();
+            break;
           } catch (final Exception ex) {
-            if (ex instanceof BindException) {
-              LOG.log(Level.FINEST, "The port {0} is already bound. Try again", p);
+            if (ex instanceof BindException) { // Not visible to catch :(
+              LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port);
             } else {
               throw ex;
             }
           }
         }
+        if (acceptorFound == null) {
+          throw new IllegalStateException("TcpPortProvider could not find a free port.");
+        }
+        this.localAddress = socketAddr;
+        this.acceptor = acceptorFound;
       }
-    } catch (final IllegalStateException ex) {
-      final RuntimeException transportException =
-                new TransportRuntimeException("tcpPortProvider failed to return free ports.", ex);
-      LOG.log(Level.SEVERE, "Cannot find a free port with " + tcpPortProvider, transportException);
-
+    } catch (final IllegalStateException | InterruptedException ex) {
+      LOG.log(Level.SEVERE, "Cannot bind to port " + listenPort, ex);
       this.clientWorkerGroup.shutdownGracefully();
       this.serverBossGroup.shutdownGracefully();
       this.serverWorkerGroup.shutdownGracefully();
-      throw transportException;
-
-    } catch (final Exception ex) {
-      final RuntimeException transportException =
-          new TransportRuntimeException("Cannot bind to port " + p, ex);
-      LOG.log(Level.SEVERE, "Cannot bind to port " + p, ex);
-
-      this.clientWorkerGroup.shutdownGracefully();
-      this.serverBossGroup.shutdownGracefully();
-      this.serverWorkerGroup.shutdownGracefully();
-      throw transportException;
+      throw new TransportRuntimeException("Cannot bind to port " + listenPort, ex);
     }
 
-    this.acceptor = acceptorFound;
-    this.serverPort = p;
-    this.localAddress = new InetSocketAddress(host, this.serverPort);
-
     LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress);
   }
 
@@ -372,7 +355,7 @@
    */
   @Override
   public int getListeningPort() {
-    return this.serverPort;
+    return this.localAddress.getPort();
   }
 
   /**
@@ -385,4 +368,9 @@
     this.clientEventListener.registerErrorHandler(handler);
     this.serverEventListener.registerErrorHandler(handler);
   }
+
+  @Override
+  public String toString() {
+    return String.format("NettyMessagingTransport: { address: %s }", this.localAddress);
+  }
 }