Changed netty transport for async communication from NIO to EPOLL
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
index ba0266a..878c85f 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
@@ -20,24 +20,29 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.*;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BSPNetUtils;
+import javax.net.SocketFactory;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ConnectException;
@@ -51,20 +56,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.util.BSPNetUtils;
-
/**
* A client for an IPC service using netty. IPC calls take a single
* {@link Writable} as a parameter, and return a {@link Writable} as their
@@ -186,7 +177,7 @@
* @throws IOException
*/
public Connection(ConnectionId remoteId) throws IOException {
- group = new NioEventLoopGroup();
+ group = new EpollEventLoopGroup();
bootstrap = new Bootstrap();
this.remoteId = remoteId;
this.serverAddress = remoteId.getAddress();
@@ -280,12 +271,12 @@
}
// Configure the client.
- // NioEventLoopGroup is a multithreaded event loop that handles I/O
+ // EpollEventLoopGroup is a multithreaded event loop that handles I/O
// operation
- group = new NioEventLoopGroup();
+ group = new EpollEventLoopGroup();
// Bootstrap is a helper class that sets up a client
bootstrap = new Bootstrap();
- bootstrap.group(group).channel(NioSocketChannel.class)
+ bootstrap.group(group).channel(EpollSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, this.tcpNoDelay)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, pingInterval)
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
index 93627aa..67ad5d0 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
@@ -20,12 +20,12 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.*;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -48,7 +48,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
-import java.util.concurrent.Future;
/**
* An abstract IPC service using netty. IPC calls take a single {@link Writable}
@@ -82,8 +81,8 @@
private int port; // port we listen on
private Class<? extends Writable> paramClass; // class of call parameters
// Configure the server.(constructor is thread num)
- private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- private EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
+ private EventLoopGroup workerGroup = new EpollEventLoopGroup();
private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
@@ -192,7 +191,7 @@
// ServerBootstrap is a helper class that sets up a server
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
+ .channel(EpollServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, backlogLength)
.childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)