[REEF-1729] Fix test job timeouts in Travis CI
Gracefully shutdown all worker groups and wait for them to complete in the `.close()` method
JIRA: [REEF-1729](https://issues.apache.org/jira/browse/REEF-1729)
Closes #1268
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index c3a910b..2643030 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -25,10 +25,12 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
@@ -51,6 +53,7 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -217,18 +220,27 @@
LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
- this.clientChannelGroup.close().awaitUninterruptibly();
- this.serverChannelGroup.close().awaitUninterruptibly();
+ final ChannelGroupFuture clientChannelGroupFuture = this.clientChannelGroup.close();
+ final ChannelGroupFuture serverChannelGroupFuture = this.serverChannelGroup.close();
+ final ChannelFuture acceptorFuture = this.acceptor.close();
+
+ final ArrayList<Future> eventLoopGroupFutures = new ArrayList<>(3);
+ eventLoopGroupFutures.add(this.clientWorkerGroup.shutdownGracefully());
+ eventLoopGroupFutures.add(this.serverBossGroup.shutdownGracefully());
+ eventLoopGroupFutures.add(this.serverWorkerGroup.shutdownGracefully());
+
+ clientChannelGroupFuture.awaitUninterruptibly();
+ serverChannelGroupFuture.awaitUninterruptibly();
try {
- this.acceptor.close().sync();
+ acceptorFuture.sync();
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex);
}
- this.clientWorkerGroup.shutdownGracefully().awaitUninterruptibly();
- this.serverBossGroup.shutdownGracefully().awaitUninterruptibly();
- this.serverWorkerGroup.shutdownGracefully().awaitUninterruptibly();
+ for (final Future eventLoopGroupFuture : eventLoopGroupFutures) {
+ eventLoopGroupFuture.awaitUninterruptibly();
+ }
LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
}