[REEF-1935] Improve logging in Wake NettyMessagingTransport and related classes
Summary of changes:
* Implement `.toString()` for `LinkReference`
* Implement `.toString()` for `RemoteEventEncoder`
* Implement `.toString()` for `NettyMessagingTransport`
* Improve logging and remove redundant code in `NettyMessagingTransport` constructor
JIRA:
[REEF-1935](https://issues.apache.org/jira/browse/REEF-1935)
Pull Request:
This closes #1401
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
index fdf10b2..f65646f 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java
@@ -59,4 +59,8 @@
}
}
+ @Override
+ public String toString() {
+ return String.format("RemoteEventDecoder: { decoder: %s }", this.decoder);
+ }
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
index 29e3be6..4a3fa0b 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java
@@ -61,4 +61,8 @@
return builder.build().toByteArray();
}
+ @Override
+ public String toString() {
+ return String.format("RemoteEventEncoder: { encoder: %s }", this.encoder);
+ }
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java
index 6e780b8..21cd4d5 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java
@@ -49,4 +49,9 @@
AtomicInteger getConnectInProgress() {
return this.connectInProgress;
}
+
+ @Override
+ public String toString() {
+ return String.format("LinkReference: { link: %s }", this.link); // NettyLink has a good .toString() implementation
+ }
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index 2643030..ea221ec 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -54,7 +54,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -86,14 +85,12 @@
private final EventLoopGroup serverWorkerGroup;
private final Bootstrap clientBootstrap;
- private final ServerBootstrap serverBootstrap;
private final Channel acceptor;
private final ChannelGroup clientChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final ChannelGroup serverChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- private final int serverPort;
- private final SocketAddress localAddress;
+ private final InetSocketAddress localAddress;
private final NettyClientEventListener clientEventListener;
private final NettyServerEventListener serverEventListener;
@@ -105,7 +102,7 @@
* Constructs a messaging transport.
*
* @param hostAddress the server host address
- * @param port the server listening port; when it is 0, randomly assign a port number
+ * @param listenPort the server listening port; when it is 0, randomly assign a port number
* @param clientStage the client-side stage that handles transport events
* @param serverStage the server-side stage that handles transport events
* @param numberOfTries the number of tries of connection
@@ -115,7 +112,7 @@
@Inject
private NettyMessagingTransport(
@Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
- @Parameter(RemoteConfiguration.Port.class) final int port,
+ @Parameter(RemoteConfiguration.Port.class) final int listenPort,
@Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage,
@Parameter(RemoteConfiguration.RemoteServerStage.class) final EStage<TransportEvent> serverStage,
@Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
@@ -123,9 +120,8 @@
final TcpPortProvider tcpPortProvider,
final LocalAddressProvider localAddressProvider) {
- int p = port;
- if (p < 0) {
- throw new RemoteRuntimeException("Invalid server port: " + p);
+ if (listenPort < 0) {
+ throw new RemoteRuntimeException("Invalid server port: " + listenPort);
}
final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress;
@@ -142,16 +138,16 @@
this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS,
new DefaultThreadFactory(CLASS_NAME + ":ClientWorker"));
- this.clientBootstrap = new Bootstrap();
- this.clientBootstrap.group(this.clientWorkerGroup)
+ this.clientBootstrap = new Bootstrap()
+ .group(this.clientWorkerGroup)
.channel(NioSocketChannel.class)
.handler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("client",
this.clientChannelGroup, this.clientEventListener)))
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, true);
- this.serverBootstrap = new ServerBootstrap();
- this.serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup)
+ final ServerBootstrap serverBootstrap = new ServerBootstrap()
+ .group(this.serverBossGroup, this.serverWorkerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("server",
this.serverChannelGroup, this.serverEventListener)))
@@ -159,56 +155,43 @@
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
- LOG.log(Level.FINE, "Binding to {0}", p);
+ LOG.log(Level.FINE, "Binding to {0}:{1}", new Object[] {host, listenPort});
- Channel acceptorFound = null;
try {
- if (p > 0) {
- acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel();
+ if (listenPort > 0) {
+ this.localAddress = new InetSocketAddress(host, listenPort);
+ this.acceptor = serverBootstrap.bind(this.localAddress).sync().channel();
} else {
- final Iterator<Integer> ports = tcpPortProvider.iterator();
- while (acceptorFound == null) {
- if (!ports.hasNext()) {
- throw new IllegalStateException("tcpPortProvider cannot find a free port.");
- }
- p = ports.next();
- LOG.log(Level.FINEST, "Try port {0}", p);
+ InetSocketAddress socketAddr = null;
+ Channel acceptorFound = null;
+ for (int port : tcpPortProvider) {
+ LOG.log(Level.FINEST, "Try port {0}", port);
try {
- acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel();
+ socketAddr = new InetSocketAddress(host, port);
+ acceptorFound = serverBootstrap.bind(socketAddr).sync().channel();
+ break;
} catch (final Exception ex) {
- if (ex instanceof BindException) {
- LOG.log(Level.FINEST, "The port {0} is already bound. Try again", p);
+ if (ex instanceof BindException) { // Not visible to catch :(
+ LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port);
} else {
throw ex;
}
}
}
+ if (acceptorFound == null) {
+ throw new IllegalStateException("TcpPortProvider could not find a free port.");
+ }
+ this.localAddress = socketAddr;
+ this.acceptor = acceptorFound;
}
- } catch (final IllegalStateException ex) {
- final RuntimeException transportException =
- new TransportRuntimeException("tcpPortProvider failed to return free ports.", ex);
- LOG.log(Level.SEVERE, "Cannot find a free port with " + tcpPortProvider, transportException);
-
+ } catch (final IllegalStateException | InterruptedException ex) {
+ LOG.log(Level.SEVERE, "Cannot bind to port " + listenPort, ex);
this.clientWorkerGroup.shutdownGracefully();
this.serverBossGroup.shutdownGracefully();
this.serverWorkerGroup.shutdownGracefully();
- throw transportException;
-
- } catch (final Exception ex) {
- final RuntimeException transportException =
- new TransportRuntimeException("Cannot bind to port " + p, ex);
- LOG.log(Level.SEVERE, "Cannot bind to port " + p, ex);
-
- this.clientWorkerGroup.shutdownGracefully();
- this.serverBossGroup.shutdownGracefully();
- this.serverWorkerGroup.shutdownGracefully();
- throw transportException;
+ throw new TransportRuntimeException("Cannot bind to port " + listenPort, ex);
}
- this.acceptor = acceptorFound;
- this.serverPort = p;
- this.localAddress = new InetSocketAddress(host, this.serverPort);
-
LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress);
}
@@ -372,7 +355,7 @@
*/
@Override
public int getListeningPort() {
- return this.serverPort;
+ return this.localAddress.getPort();
}
/**
@@ -385,4 +368,9 @@
this.clientEventListener.registerErrorHandler(handler);
this.serverEventListener.registerErrorHandler(handler);
}
+
+ @Override
+ public String toString() {
+ return String.format("NettyMessagingTransport: { address: %s }", this.localAddress);
+ }
}