HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1692461 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index ead276d..cdfa028 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
BUG FIXES
HAMA-965: Infinite loop because of recursive function call (JongYoon Lim via edwardyoon)
+ HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
IMPROVEMENTS
diff --git a/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java b/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
index 913d20b..8f66a0f 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
@@ -25,6 +25,7 @@
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -100,12 +101,20 @@
server.start();
LOG.info("BSPPeer address:" + server.getAddress().getHostName()
+ " port:" + server.getAddress().getPort());
- } catch (BindException e) {
- LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1);
- if (retry++ >= MAX_RETRY) {
- throw new RuntimeException("RPC Server could not be launched!");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ if (e.getCause() instanceof BindException) {
+ final int nextPort = port + 1;
+ LOG.warn("Address already in use. Retrying " + hostName + ":"
+ + nextPort);
+ if (retry++ >= MAX_RETRY) {
+ throw new RuntimeException("RPC Server could not be launched!");
+ }
+ startServer(hostName, nextPort);
}
- startServer(hostName, port + 1);
}
}
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
index 9f4b4e4..93627aa 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
@@ -19,34 +19,13 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.ReferenceCountUtil;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+import io.netty.util.concurrent.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -60,6 +39,17 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.Future;
+
/**
* An abstract IPC service using netty. IPC calls take a single {@link Writable}
* as a parameter, and return a {@link Writable}*
@@ -171,54 +161,62 @@
}
/** start server listener */
- public void start() {
- new NioServerListener().start();
+ public void start() throws ExecutionException, InterruptedException {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ Future<ChannelFuture> future = es.submit(new NioServerListener());
+ try {
+ ChannelFuture closeFuture = future.get();
+ closeFuture
+ .addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
+ @Override
+ public void operationComplete(
+ io.netty.util.concurrent.Future<Void> voidFuture)
+ throws Exception {
+ // Stop the server gracefully if it's not terminated.
+ stop();
+ }
+ });
+ } finally {
+ es.shutdown();
+ }
}
- private class NioServerListener extends Thread {
+ private class NioServerListener implements Callable<ChannelFuture> {
/**
* Configure and start nio server
*/
@Override
- public void run() {
+ public ChannelFuture call() throws Exception {
SERVER.set(AsyncServer.this);
- try {
- // ServerBootstrap is a helper class that sets up a server
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, backlogLength)
- .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
- .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
- .childOption(ChannelOption.RCVBUF_ALLOCATOR,
- new FixedRecvByteBufAllocator(100 * 1024))
+ // ServerBootstrap is a helper class that sets up a server
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, backlogLength)
+ .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
+ .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
+ .childOption(ChannelOption.RCVBUF_ALLOCATOR,
+ new FixedRecvByteBufAllocator(100 * 1024))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
- // Register accumulation processing handler
- p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
- // Register message processing handler
- p.addLast(new NioServerInboundHandler());
- }
- });
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ // Register accumulation processing handler
+ p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
+ // Register message processing handler
+ p.addLast(new NioServerInboundHandler());
+ }
+ });
- // Bind and start to accept incoming connections.
- ChannelFuture f = b.bind(port).sync();
- LOG.info("AsyncServer startup");
- // Wait until the server socket is closed.
- f.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // Shut down Server gracefully
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
+ // Bind and start to accept incoming connections.
+ ChannelFuture f = b.bind(port).sync();
+ LOG.info("AsyncServer startup");
+
+ return f.channel().closeFuture();
}
}