Changed netty transport for async communication from NIO to EPOLL
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
index ba0266a..878c85f 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
@@ -20,24 +20,29 @@
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.*;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BSPNetUtils;
 
+import javax.net.SocketFactory;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.ConnectException;
@@ -51,20 +56,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.util.BSPNetUtils;
-
 /**
  * A client for an IPC service using netty. IPC calls take a single
  * {@link Writable} as a parameter, and return a {@link Writable} as their
@@ -186,7 +177,7 @@
      * @throws IOException
      */
     public Connection(ConnectionId remoteId) throws IOException {
-      group = new NioEventLoopGroup();
+      group = new EpollEventLoopGroup();
       bootstrap = new Bootstrap();
       this.remoteId = remoteId;
       this.serverAddress = remoteId.getAddress();
@@ -280,12 +271,12 @@
           }
 
           // Configure the client.
-          // NioEventLoopGroup is a multithreaded event loop that handles I/O
+          // EpollEventLoopGroup is a multithreaded event loop that handles I/O
           // operation
-          group = new NioEventLoopGroup();
+          group = new EpollEventLoopGroup();
           // Bootstrap is a helper class that sets up a client
           bootstrap = new Bootstrap();
-          bootstrap.group(group).channel(NioSocketChannel.class)
+          bootstrap.group(group).channel(EpollSocketChannel.class)
               .option(ChannelOption.TCP_NODELAY, this.tcpNoDelay)
               .option(ChannelOption.SO_KEEPALIVE, true)
               .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, pingInterval)
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
index 93627aa..67ad5d0 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
@@ -20,12 +20,12 @@
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.*;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,7 +48,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.*;
-import java.util.concurrent.Future;
 
 /**
  * An abstract IPC service using netty. IPC calls take a single {@link Writable}
@@ -82,8 +81,8 @@
   private int port; // port we listen on
   private Class<? extends Writable> paramClass; // class of call parameters
   // Configure the server.(constructor is thread num)
-  private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
-  private EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
+  private EventLoopGroup workerGroup = new EpollEventLoopGroup();
   private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();
   private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
 
@@ -192,7 +191,7 @@
       // ServerBootstrap is a helper class that sets up a server
       ServerBootstrap b = new ServerBootstrap();
       b.group(bossGroup, workerGroup)
-          .channel(NioServerSocketChannel.class)
+          .channel(EpollServerSocketChannel.class)
           .option(ChannelOption.SO_BACKLOG, backlogLength)
           .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
           .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)