TEZ-4157: ShuffleHandler: upgrade to Netty4 and remove Netty3 dependency from tez (#118) (Laszlo Bodor reviewed by Ashutosh Chauhan, Jonathan Turner Eagles)

diff --git a/pom.xml b/pom.xml
index 19fd516..c1f0682 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
     <clover.license>${user.home}/clover.license</clover.license>
     <guava.version>27.0-jre</guava.version>
     <hadoop.version>3.1.3</hadoop.version>
-    <netty.version>3.10.5.Final</netty.version>
+    <netty.version>4.0.52.Final</netty.version>
     <pig.version>0.13.0</pig.version>
     <jersey.version>1.19</jersey.version>
     <slf4j.version>1.7.30</slf4j.version>
@@ -261,7 +261,7 @@
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
-        <artifactId>netty</artifactId>
+        <artifactId>netty-all</artifactId>
         <scope>compile</scope>
         <version>${netty.version}</version>
       </dependency>
@@ -340,12 +340,22 @@
             <groupId>commons-el</groupId>
             <artifactId>commons-el</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-auth</artifactId>
         <version>${hadoop.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
@@ -562,6 +572,10 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-server-common</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>
@@ -578,6 +592,10 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>
@@ -603,6 +621,10 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-common</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>
@@ -611,6 +633,12 @@
         <scope>test</scope>
         <type>test-jar</type>
         <version>${hadoop.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
@@ -649,6 +677,10 @@
             <groupId>tomcat</groupId>
             <artifactId>jasper-runtime</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
index c806bf7..8dfad0d 100644
--- a/tez-ext-service-tests/pom.xml
+++ b/tez-ext-service-tests/pom.xml
@@ -32,7 +32,7 @@
   <dependencies>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>netty-all</artifactId>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
index 1d122be..43f24ba 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -14,17 +14,17 @@
 
 package org.apache.tez.shufflehandler;
 
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 import javax.crypto.SecretKey;
 import java.io.File;
@@ -41,15 +41,15 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import com.google.common.base.Charsets;
+
 import org.apache.tez.common.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -64,36 +64,38 @@
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultFileRegion;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,9 +112,13 @@
       Pattern.CASE_INSENSITIVE);
 
   private int port;
-  private final ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  protected HttpPipelineFactory pipelineFact;
+
+  // pipeline items
+  private Shuffle SHUFFLE;
+
+  private NioEventLoopGroup bossGroup;
+  private NioEventLoopGroup workerGroup;
+  private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
   private final Configuration conf;
 
   private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>();
@@ -171,17 +177,23 @@
       maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
     }
 
-    ThreadFactory bossFactory = new ThreadFactoryBuilder()
-        .setNameFormat("ShuffleHandler Netty Boss #%d")
-        .build();
-    ThreadFactory workerFactory = new ThreadFactoryBuilder()
-        .setNameFormat("ShuffleHandler Netty Worker #%d")
-        .build();
+    final String BOSS_THREAD_NAME_PREFIX = "ShuffleHandler Netty Boss #";
+    AtomicInteger bossThreadCounter = new AtomicInteger(0);
+    bossGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, BOSS_THREAD_NAME_PREFIX + bossThreadCounter.incrementAndGet());
+      }
+    });
 
-    selector = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory),
-        maxShuffleThreads);
+    final String WORKER_THREAD_NAME_PREFIX = "ShuffleHandler Netty Worker #";
+    AtomicInteger workerThreadCounter = new AtomicInteger(0);
+    workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet());
+      }
+    });
 
     connectionKeepAliveEnabled =
         conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
@@ -199,22 +211,44 @@
 
 
   public void start() throws Exception {
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    try {
-      pipelineFact = new HttpPipelineFactory(conf);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.setPipelineFactory(pipelineFact);
+    ServerBootstrap bootstrap = new ServerBootstrap()
+        .channel(NioServerSocketChannel.class)
+        .group(bossGroup, workerGroup)
+        .localAddress(port);
+    initPipeline(bootstrap, conf);
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    Channel ch = bootstrap.bind().sync().channel();
     accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    port = ((InetSocketAddress)ch.localAddress()).getPort();
     conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
-    pipelineFact.SHUFFLE.setPort(port);
+    SHUFFLE.setPort(port);
     LOG.info("TezShuffleHandler" + " listening on port " + port);
   }
 
+  private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
+    SHUFFLE = getShuffle(conf);
+
+    if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+        MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+      throw new UnsupportedOperationException(
+          "SSL Shuffle is not currently supported for the test shuffle handler");
+    }
+
+    ChannelInitializer<NioSocketChannel> channelInitializer =
+        new ChannelInitializer<NioSocketChannel>() {
+          @Override
+      public void initChannel(NioSocketChannel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
+        pipeline.addLast("decoder", new HttpRequestDecoder());
+        pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+        pipeline.addLast("encoder", new HttpResponseEncoder());
+        pipeline.addLast("chunking", new ChunkedWriteHandler());
+        pipeline.addLast("shuffle", SHUFFLE);
+      }
+    };
+    bootstrap.childHandler(channelInitializer);
+  }
+
   public static void initializeAndStart(Configuration conf) throws Exception {
     if (!initing.getAndSet(true)) {
       INSTANCE = new ShuffleHandler(conf);
@@ -245,15 +279,13 @@
     removeJobShuffleInfo(applicationIdString);
   }
 
-
   public void stop() throws Exception {
     accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-    if (selector != null) {
-      ServerBootstrap bootstrap = new ServerBootstrap(selector);
-      bootstrap.releaseExternalResources();
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
     }
-    if (pipelineFact != null) {
-      pipelineFact.destroy();
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
     }
   }
 
@@ -261,7 +293,6 @@
     return new Shuffle(conf);
   }
 
-
   private void addJobToken(String appIdString, String user,
       Token<JobTokenIdentifier> jobToken) {
     String jobIdString = appIdString.replace("application", "job");
@@ -280,40 +311,8 @@
     userRsrc.remove(appIdString);
   }
 
-  class HttpPipelineFactory implements ChannelPipelineFactory {
-
-    final Shuffle SHUFFLE;
-
-    public HttpPipelineFactory(Configuration conf) throws Exception {
-      SHUFFLE = getShuffle(conf);
-      // TODO Setup SSL Shuffle
-      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
-          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
-        throw new UnsupportedOperationException(
-            "SSL Shuffle is not currently supported for the test shuffle handler");
-      }
-    }
-
-    public void destroy() {
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", SHUFFLE);
-      return pipeline;
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-
-  }
-
-  class Shuffle extends SimpleChannelUpstreamHandler {
+  @Sharable
+  class Shuffle extends ChannelInboundHandlerAdapter {
 
     private final Configuration conf;
     private final IndexCache indexCache;
@@ -343,24 +342,24 @@
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
+
       if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
-        LOG.info(String.format("Current number of shuffle connections (%d) is " + 
-            "greater than or equal to the max allowed shuffle connections (%d)", 
+        LOG.info(String.format("Current number of shuffle connections (%d) is " +
+            "greater than or equal to the max allowed shuffle connections (%d)",
             accepted.size(), maxShuffleConnections));
-        evt.getChannel().close();
+        ctx.channel().close();
         return;
       }
-      accepted.add(evt.getChannel());
-      super.channelOpen(ctx, evt);
-     
+      accepted.add(ctx.channel());
+      super.channelActive(ctx);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelRead(ChannelHandlerContext ctx, Object message)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
+      HttpRequest request = (HttpRequest) message;
       if (request.getMethod() != GET) {
           sendError(ctx, METHOD_NOT_ALLOWED);
           return;
@@ -372,8 +371,7 @@
               request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
         sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
       }
-      final Map<String,List<String>> q =
-        new QueryStringDecoder(request.getUri()).getParameters();
+      final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
       final List<String> keepAliveList = q.get("keepAlive");
       boolean keepAliveParam = false;
       if (keepAliveList != null && keepAliveList.size() == 1) {
@@ -432,7 +430,7 @@
 
       Map<String, MapOutputInfo> mapOutputInfoMap =
           new HashMap<String, MapOutputInfo>();
-      Channel ch = evt.getChannel();
+      Channel ch = ctx.channel();
       String user = userRsrc.get(jobId);
 
       // $x/$user/appcache/$appId/output/$mapId
@@ -444,13 +442,13 @@
         populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
           response, keepAliveParam, mapOutputInfoMap);
       } catch(IOException e) {
-        ch.write(response);
+        ch.writeAndFlush(response);
         LOG.error("Shuffle error in populating headers :", e);
         String errorMessage = getErrorMessage(e);
         sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
         return;
       }
-      ch.write(response);
+      ch.writeAndFlush(response);
       // TODO refactor the following into the pipeline
       ChannelFuture lastMap = null;
       for (String mapId : mapIds) {
@@ -621,7 +619,7 @@
         new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
       final DataOutputBuffer dob = new DataOutputBuffer();
       header.write(dob);
-      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+      ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
       final File spillfile =
           new File(mapOutputInfo.mapOutputFileName.toString());
       RandomAccessFile spill;
@@ -634,15 +632,7 @@
       ChannelFuture writeFuture;
       final DefaultFileRegion partition =
           new DefaultFileRegion(spill.getChannel(), info.getStartOffset(), info.getPartLength());
-      writeFuture = ch.write(partition);
-      writeFuture.addListener(new ChannelFutureListener() {
-        // TODO error handling; distinguish IO/connection failures,
-        //      attribute to appropriate spill output
-        @Override
-        public void operationComplete(ChannelFuture future) {
-          partition.releaseExternalResources();
-        }
-      });
+      writeFuture = ch.writeAndFlush(partition);
       return writeFuture;
     }
 
@@ -653,25 +643,22 @@
 
     protected void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status);
       response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
       // Put shuffle version into http header
       response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
       response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+      response.content().writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
 
       // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+      ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
       if (cause instanceof TooLongFrameException) {
         sendError(ctx, BAD_REQUEST);
         return;
@@ -688,8 +675,8 @@
       }
 
       LOG.error("Shuffle error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("Shuffle error " + e);
+      if (ctx.channel().isActive()) {
+        LOG.error("Shuffle error", cause);
         sendError(ctx, INTERNAL_SERVER_ERROR);
       }
     }
diff --git a/tez-plugins/tez-aux-services/pom.xml b/tez-plugins/tez-aux-services/pom.xml
index 97096c8..7279eaf 100644
--- a/tez-plugins/tez-aux-services/pom.xml
+++ b/tez-plugins/tez-aux-services/pom.xml
@@ -112,7 +112,7 @@
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>netty-all</artifactId>
     </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>
@@ -237,7 +237,7 @@
                   <shadedPattern>org.apache.tez.shaded.$0</shadedPattern>
                 </relocation>
                 <relocation>
-                  <pattern>org.jboss.netty</pattern>
+                  <pattern>io.netty</pattern>
                   <shadedPattern>org.apache.tez.shaded.$0</shadedPattern>
                 </relocation>
                 <relocation>
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
index cc3f762..162feb9 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
@@ -31,7 +31,9 @@
 
 import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
 
-import org.jboss.netty.handler.stream.ChunkedFile;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedFile;
 
 public class FadvisedChunkedFile extends ChunkedFile {
 
@@ -57,13 +59,13 @@
   }
 
   @Override
-  public Object nextChunk() throws Exception {
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
     if (manageOsCache && readaheadPool != null) {
       readaheadRequest = readaheadPool
-          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
-              getEndOffset(), readaheadRequest);
+          .readaheadStream(identifier, fd, currentOffset(), readaheadLength,
+              endOffset(), readaheadRequest);
     }
-    return super.nextChunk();
+    return super.readChunk(ctx);
   }
 
   @Override
@@ -71,11 +73,11 @@
     if (readaheadRequest != null) {
       readaheadRequest.cancel();
     }
-    if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+    if (manageOsCache && endOffset() - startOffset() > 0) {
       try {
         NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
             fd,
-            getStartOffset(), getEndOffset() - getStartOffset(),
+            startOffset(), endOffset() - startOffset(),
             POSIX_FADV_DONTNEED);
       } catch (Throwable t) {
         LOG.warn("Failed to manage OS cache for " + identifier, t);
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
index 40789d8..2366363 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
@@ -34,7 +34,7 @@
 
 import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
 
-import org.jboss.netty.channel.DefaultFileRegion;
+import io.netty.channel.DefaultFileRegion;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -54,6 +54,7 @@
   private final FileChannel fileChannel;
 
   private ReadaheadRequest readaheadRequest;
+  private boolean transferred = false;
 
   public FadvisedFileRegion(RandomAccessFile file, long position, long count,
                             boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
@@ -77,15 +78,40 @@
       throws IOException {
     if (readaheadPool != null && readaheadLength > 0) {
       readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
-          getPosition() + position, readaheadLength,
-          getPosition() + getCount(), readaheadRequest);
+          position() + position, readaheadLength,
+          position() + count(), readaheadRequest);
     }
 
+    long written = 0;
     if(this.shuffleTransferToAllowed) {
-      return super.transferTo(target, position);
+      written = super.transferTo(target, position);
     } else {
-      return customShuffleTransfer(target, position);
+      written = customShuffleTransfer(target, position);
     }
+    /*
+     * At this point, we can assume that the transfer was successful.
+     */
+    transferred = true;
+    return written;
+  }
+
+  /**
+   * Since Netty4, deallocate() is called automatically during cleanup, but before the
+   * ChannelFutureListeners. Deallocate calls FileChannel.close() and makes the file descriptor
+   * invalid, so every OS cache operation (e.g. posix_fadvice) with the original file descriptor
+   * will fail after this operation, so we need to take care of cleanup operations here (before
+   * deallocating) instead of listeners outside.
+   */
+  @Override
+  protected void deallocate() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+
+    if (transferred) {
+      transferSuccessful();
+    }
+    super.deallocate();
   }
 
   /**
@@ -142,24 +168,19 @@
     return actualCount - trans;
   }
 
-
-  @Override
-  public void releaseExternalResources() {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
-    }
-    super.releaseExternalResources();
-  }
-
   /**
    * Call when the transfer completes successfully so we can advise the OS that
    * we don't need the region to be cached anymore.
    */
   public void transferSuccessful() {
-    if (manageOsCache && getCount() > 0) {
+    if (manageOsCache && count() > 0) {
       try {
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
-            fd, getPosition(), getCount(), POSIX_FADV_DONTNEED);
+        if (fd.valid()) {
+          NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, position(),
+              count(), POSIX_FADV_DONTNEED);
+        } else {
+          LOG.debug("File descriptor is not valid anymore, skipping posix_fadvise: " + identifier);
+        }
       } catch (Throwable t) {
         LOG.warn("Failed to manage OS cache for " + identifier, t);
       }
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 55389ea..e73805a 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -21,17 +21,17 @@
 import org.apache.hadoop.util.DiskChecker;
 import static org.fusesource.leveldbjni.JniDBFactory.asString;
 import static org.fusesource.leveldbjni.JniDBFactory.bytes;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -48,7 +48,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
@@ -102,47 +102,47 @@
 import org.iq80.leveldb.DBException;
 import org.iq80.leveldb.Logger;
 import org.iq80.leveldb.Options;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerBossPool;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.handler.timeout.IdleState;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.util.CharsetUtil;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.Timer;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -183,11 +183,15 @@
   private static final String INDEX_FILE_NAME = "file.out.index";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  protected HttpPipelineFactory pipelineFact;
+  private NioEventLoopGroup bossGroup;
+  private NioEventLoopGroup workerGroup;
+  private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
   private int sslFileBufferSize;
 
+  // pipeline items
+  private Shuffle SHUFFLE;
+  private SSLFactory sslFactory;
+
   /**
    * Should the shuffle use posix_fadvise calls to manage the OS cache during
    * sendfile
@@ -263,7 +267,6 @@
   boolean connectionKeepAliveEnabled = false;
   private int connectionKeepAliveTimeOut;
   private int mapOutputMetaInfoCacheSize;
-  private Timer timer;
 
   @Metrics(about="Shuffle output metrics", context="mapred", name="tez")
   static class ShuffleMetrics implements ChannelFutureListener {
@@ -300,7 +303,7 @@
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
       if (!future.isSuccess()) {
-        future.getChannel().close();
+        future.channel().close();
         return;
       }
       int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
@@ -308,21 +311,21 @@
         metrics.operationComplete(future);
         // Let the idle timer handler close keep-alive connections
         if (reduceContext.getKeepAlive()) {
-          ChannelPipeline pipeline = future.getChannel().getPipeline();
+          ChannelPipeline pipeline = future.channel().pipeline();
           TimeoutHandler timeoutHandler =
               (TimeoutHandler) pipeline.get(TIMEOUT_HANDLER);
           timeoutHandler.setEnabledTimeout(true);
         } else {
-          future.getChannel().close();
+          future.channel().close();
         }
       } else {
-        pipelineFact.getSHUFFLE().sendMap(reduceContext);
+        SHUFFLE.sendMap(reduceContext);
       }
     }
   }
 
   /**
-   * Maintain parameters per messageReceived() Netty context.
+   * Maintain parameters per channelRead() Netty context.
    * Allows sendMapOutput calls from operationComplete()
    */
   private static class ReduceContext {
@@ -419,9 +422,11 @@
    */
   public static ByteBuffer serializeMetaData(int port) throws IOException {
     //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+    DataOutputBuffer portDob = new DataOutputBuffer();
+    portDob.writeInt(port);
+    ByteBuffer buf = ByteBuffer.wrap(portDob.getData(), 0, portDob.getLength());
+    portDob.close();
+    return buf;
   }
 
   /**
@@ -434,6 +439,7 @@
     DataInputByteBuffer in = new DataInputByteBuffer();
     in.reset(meta);
     int port = in.readInt();
+    in.close();
     return port;
   }
 
@@ -516,22 +522,23 @@
         DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
 
     final String BOSS_THREAD_NAME_PREFIX = "Tez Shuffle Handler Boss #";
-    NioServerBossPool bossPool = new NioServerBossPool(Executors.newCachedThreadPool(), 1, new ThreadNameDeterminer() {
+    AtomicInteger bossThreadCounter = new AtomicInteger(0);
+    bossGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
       @Override
-      public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
-        return BOSS_THREAD_NAME_PREFIX + currentThreadName.substring(currentThreadName.lastIndexOf('-') + 1);
+      public Thread newThread(Runnable r) {
+        return new Thread(r, BOSS_THREAD_NAME_PREFIX + bossThreadCounter.incrementAndGet());
       }
     });
 
     final String WORKER_THREAD_NAME_PREFIX = "Tez Shuffle Handler Worker #";
-    NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), maxShuffleThreads, new ThreadNameDeterminer() {
+    AtomicInteger workerThreadCounter = new AtomicInteger(0);
+    workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
       @Override
-      public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
-        return WORKER_THREAD_NAME_PREFIX + currentThreadName.substring(currentThreadName.lastIndexOf('-') + 1);
+      public Thread newThread(Runnable r) {
+        return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet());
       }
     });
 
-    selector = new NioServerSocketChannelFactory(bossPool, workerPool);
     super.serviceInit(new YarnConfiguration(conf));
   }
 
@@ -542,25 +549,24 @@
     userRsrc = new ConcurrentHashMap<String,String>();
     secretManager = new JobTokenSecretManager();
     recoverState(conf);
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    // Timer is shared across entire factory and must be released separately
-    timer = new HashedWheelTimer();
-    try {
-      pipelineFact = new HttpPipelineFactory(conf, timer);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.setOption("backlog", conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
-        DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE));
-    bootstrap.setOption("child.keepAlive", true);
-    bootstrap.setPipelineFactory(pipelineFact);
+    ServerBootstrap bootstrap = new ServerBootstrap()
+        .channel(NioServerSocketChannel.class)
+        .group(bossGroup, workerGroup)
+        .localAddress(port)
+        .option(ChannelOption.SO_BACKLOG,
+            conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE))
+        .childOption(ChannelOption.SO_KEEPALIVE, true);
+    initPipeline(bootstrap, conf);
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    Channel ch = bootstrap.bind().sync().channel();
     accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+
+    // setup port
+    port = ((InetSocketAddress)ch.localAddress()).getPort();
     conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
-    pipelineFact.SHUFFLE.setPort(port);
+    SHUFFLE.setPort(port);
     LOG.info(getName() + " listening on port " + port);
+
     super.serviceStart();
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
@@ -576,20 +582,50 @@
           DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
   }
 
+  private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
+    SHUFFLE = getShuffle(conf);
+    if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY, SHUFFLE_SSL_ENABLED_DEFAULT)) {
+      LOG.info("Encrypted shuffle is enabled.");
+      sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+      sslFactory.init();
+    }
+
+    ChannelInitializer<NioSocketChannel> channelInitializer =
+        new ChannelInitializer<NioSocketChannel>() {
+          @Override
+      public void initChannel(NioSocketChannel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
+        if (sslFactory != null) {
+          pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+        }
+        pipeline.addLast("decoder", new HttpRequestDecoder());
+        pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+        pipeline.addLast("encoder", new HttpResponseEncoder());
+        pipeline.addLast("chunking", new ChunkedWriteHandler());
+        pipeline.addLast("shuffle", SHUFFLE);
+        pipeline.addLast("idle", new IdleStateHandler(0, connectionKeepAliveTimeOut, 0));
+        pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
+      }
+    };
+    bootstrap.childHandler(channelInitializer);
+  }
+
+  private void destroyPipeline() {
+    if (sslFactory != null) {
+      sslFactory.destroy();
+    }
+  }
+
   @Override
   protected void serviceStop() throws Exception {
     accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-    if (selector != null) {
-      ServerBootstrap bootstrap = new ServerBootstrap(selector);
-      bootstrap.releaseExternalResources();
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
     }
-    if (pipelineFact != null) {
-      pipelineFact.destroy();
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
     }
-    if (timer != null) {
-      // Release this shared timer resource
-      timer.stop();
-    }
+    destroyPipeline();
     if (stateDb != null) {
       stateDb.close();
     }
@@ -800,7 +836,7 @@
     }
   }
 
-  static class TimeoutHandler extends IdleStateAwareChannelHandler {
+  static class TimeoutHandler extends ChannelDuplexHandler {
 
     private boolean enabledTimeout;
 
@@ -809,61 +845,16 @@
     }
 
     @Override
-    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
-      if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
-        e.getChannel().close();
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent) {
+        IdleStateEvent e = (IdleStateEvent) evt;
+        if (e.state() == IdleState.WRITER_IDLE && enabledTimeout) {
+          ctx.channel().close();
+        }
       }
     }
   }
 
-  class HttpPipelineFactory implements ChannelPipelineFactory {
-
-    final Shuffle SHUFFLE;
-    private SSLFactory sslFactory;
-    private final ChannelHandler idleStateHandler;
-
-    public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
-      SHUFFLE = getShuffle(conf);
-      if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY,
-                          SHUFFLE_SSL_ENABLED_DEFAULT)) {
-        LOG.info("Encrypted shuffle is enabled.");
-        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-        sslFactory.init();
-      }
-      this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0);
-    }
-
-    public Shuffle getSHUFFLE() {
-      return SHUFFLE;
-    }
-
-    public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", SHUFFLE);
-      pipeline.addLast("idle", idleStateHandler);
-      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
-      return pipeline;
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-
-  }
-
   protected static class Range {
     final int first;
     final int last;
@@ -887,7 +878,8 @@
     }
   }
 
-  class Shuffle extends SimpleChannelUpstreamHandler {
+  @Sharable
+  class Shuffle extends ChannelInboundHandlerAdapter {
 
     private static final int MAX_WEIGHT = 10 * 1024 * 1024;
     private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
@@ -973,24 +965,30 @@
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
 
       if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
         LOG.info(String.format("Current number of shuffle connections (%d) is " +
             "greater than or equal to the max allowed shuffle connections (%d)",
             accepted.size(), maxShuffleConnections));
-        evt.getChannel().close();
+        ctx.channel().close();
         return;
       }
-      accepted.add(evt.getChannel());
-      super.channelOpen(ctx, evt);
+      accepted.add(ctx.channel());
+      super.channelActive(ctx);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelRead(ChannelHandlerContext ctx, Object message)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
+      FullHttpRequest request = (FullHttpRequest) message;
+      handleRequest(ctx, request);
+      request.release();
+    }
+
+    private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request)
+        throws IOException, Exception {
       if (request.getMethod() != GET) {
           sendError(ctx, METHOD_NOT_ALLOWED);
           return;
@@ -1001,9 +999,9 @@
           || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
               request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
         sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+        return;
       }
-      final Map<String,List<String>> q =
-        new QueryStringDecoder(request.getUri()).getParameters();
+      final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
       final List<String> keepAliveList = q.get("keepAlive");
       final List<String> dagCompletedQ = q.get("dagAction");
       boolean keepAliveParam = false;
@@ -1024,7 +1022,7 @@
             "\n  keepAlive: " + keepAliveParam);
       }
       // If the request is for Dag Deletion, process the request and send OK.
-      if (deleteDagDirectories(evt, dagCompletedQ, jobQ, dagIdQ))  {
+      if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ))  {
         return;
       }
       if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
@@ -1073,8 +1071,8 @@
 
       Map<String, MapOutputInfo> mapOutputInfoMap =
           new HashMap<String, MapOutputInfo>();
-      Channel ch = evt.getChannel();
-      ChannelPipeline pipeline = ch.getPipeline();
+      Channel ch = ctx.channel();
+      ChannelPipeline pipeline = ch.pipeline();
       TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
       timeoutHandler.setEnabledTimeout(false);
       String user = userRsrc.get(jobId);
@@ -1098,19 +1096,23 @@
         return;
       }
       ch.write(response);
-      //Initialize one ReduceContext object per messageReceived call
+      //Initialize one ReduceContext object per channelRead call
       boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
       ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx,
           user, mapOutputInfoMap, jobId, dagId, keepAlive);
       for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
         ChannelFuture nextMap = sendMap(reduceContext);
         if(nextMap == null) {
+          // by this special message flushed, we can make sure the whole response is finished
+          ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
           return;
         }
       }
+      // by this special message flushed, we can make sure the whole response is finished
+      ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
     }
 
-    private boolean deleteDagDirectories(MessageEvent evt,
+    private boolean deleteDagDirectories(Channel channel,
                                          List<String> dagCompletedQ, List<String> jobQ,
                                          List<String> dagIdQ) {
       if (jobQ == null || jobQ.isEmpty()) {
@@ -1127,8 +1129,8 @@
         } catch (IOException e) {
           LOG.warn("Encountered exception during dag delete "+ e);
         }
-        evt.getChannel().write(new DefaultHttpResponse(HTTP_1_1, OK));
-        evt.getChannel().close();
+        channel.writeAndFlush(new DefaultHttpResponse(HTTP_1_1, OK))
+            .addListener(ChannelFutureListener.CLOSE);
         return true;
       }
       return false;
@@ -1136,7 +1138,7 @@
 
     /**
      * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
-     * and increments it. This method is first called by messageReceived()
+     * and increments it. This method is first called by channelRead()
      * maxSessionOpenFiles times and then on the completion of every
      * sendMapOutput operation. This limits the number of open files on a node,
      * which can get really large(exhausting file descriptors on the NM) if all
@@ -1146,7 +1148,6 @@
      */
     public ChannelFuture sendMap(ReduceContext reduceContext)
         throws Exception {
-
       ChannelFuture nextMap = null;
       if (reduceContext.getMapsToSend().get() <
           reduceContext.getMapIds().size()) {
@@ -1155,14 +1156,16 @@
 
         try {
           MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
+
           if (info == null) {
             info = getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(),
                 reduceContext.getJobId(),
                 reduceContext.getUser());
           }
+
           nextMap = sendMapOutput(
               reduceContext.getCtx(),
-              reduceContext.getCtx().getChannel(),
+              reduceContext.getCtx().channel(),
               reduceContext.getUser(), mapId,
               reduceContext.getReduceRange(), info);
           if (null == nextMap) {
@@ -1417,23 +1420,12 @@
         return null;
       }
       ChannelFuture writeFuture;
-      if (ch.getPipeline().get(SslHandler.class) == null) {
+      if (ch.pipeline().get(SslHandler.class) == null) {
         final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
             rangeOffset, rangePartLength, manageOsCache, readaheadLength,
             readaheadPool, spillFile.getAbsolutePath(),
             shuffleBufferSize, shuffleTransferToAllowed);
         writeFuture = ch.write(partition);
-        writeFuture.addListener(new ChannelFutureListener() {
-            // TODO error handling; distinguish IO/connection failures,
-            //      attribute to appropriate spill output
-          @Override
-          public void operationComplete(ChannelFuture future) {
-            if (future.isSuccess()) {
-              partition.transferSuccessful();
-            }
-            partition.releaseExternalResources();
-          }
-        });
       } else {
         // HTTPS cannot be done with zero copy.
         final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
@@ -1452,42 +1444,46 @@
     }
 
     protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status);
       sendError(ctx, message, response);
+      response.release();
     }
 
-    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponse response) {
-      sendError(ctx, ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8), response);
+    protected void sendError(ChannelHandlerContext ctx, String message, FullHttpResponse response) {
+      sendError(ctx, Unpooled.copiedBuffer(message, CharsetUtil.UTF_8), response);
     }
 
     private void sendFakeShuffleHeaderWithError(ChannelHandlerContext ctx, String message,
         HttpResponse response) throws IOException {
+      FullHttpResponse fullResponse =
+          new DefaultFullHttpResponse(response.getProtocolVersion(), response.getStatus());
+      fullResponse.headers().set(response.headers());
+
       ShuffleHeader header = new ShuffleHeader(message, -1, -1, -1);
       DataOutputBuffer out = new DataOutputBuffer();
       header.write(out);
 
-      sendError(ctx, wrappedBuffer(out.getData(), 0, out.getLength()), response);
+      sendError(ctx, wrappedBuffer(out.getData(), 0, out.getLength()), fullResponse);
+      fullResponse.release();
     }
 
-    protected void sendError(ChannelHandlerContext ctx, ChannelBuffer content,
-        HttpResponse response) {
+    protected void sendError(ChannelHandlerContext ctx, ByteBuf content,
+        FullHttpResponse response) {
       response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
       // Put shuffle version into http header
       response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
       response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      response.setContent(content);
+      response.content().writeBytes(content);
 
       // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+      ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
       if (cause instanceof TooLongFrameException) {
         sendError(ctx, BAD_REQUEST);
         return;
@@ -1504,8 +1500,8 @@
       }
 
       LOG.error("Shuffle error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("Shuffle error " + e);
+      if (ctx.channel().isActive()) {
+        LOG.error("Shuffle error", cause);
         sendError(ctx, INTERNAL_SERVER_ERROR);
       }
     }
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 5ca4ed8..21addd3 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -22,9 +22,7 @@
 //import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import static org.junit.Assert.assertTrue;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
@@ -44,6 +42,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.conf.Configuration;
@@ -55,8 +54,6 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -79,20 +76,21 @@
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.AbstractChannel;
-import org.jboss.netty.handler.codec.http.DefaultHttpHeaders;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.AbstractChannel;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.HttpMethod;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -141,12 +139,12 @@
               new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
           DataOutputBuffer dob = new DataOutputBuffer();
           header.write(dob);
-          ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
           dob = new DataOutputBuffer();
           for (int i = 0; i < 100; ++i) {
             header.write(dob);
           }
-          return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
         }
       };
     }
@@ -162,8 +160,8 @@
         protected void verifyRequest(String appid, ChannelHandlerContext ctx,
             HttpRequest request, HttpResponse response, URL requestUri)
             throws IOException {
-          SocketChannel channel = (SocketChannel)(ctx.getChannel());
-          socketKeepAlive = channel.getConfig().isKeepAlive();
+          SocketChannel channel = (SocketChannel)(ctx.channel());
+          socketKeepAlive = channel.config().isKeepAlive();
         }
       };
     }
@@ -258,6 +256,7 @@
     sh.metrics.operationComplete(cf);
 
     checkShuffleMetrics(ms, 3*MiB, 1, 1, 0);
+    sh.close();
   }
 
   static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed,
@@ -279,7 +278,7 @@
    */
   @Test (timeout = 10000)
   public void testClientClosesConnection() throws Exception {
-    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    final AtomicBoolean failureEncountered = new AtomicBoolean(false);
     Configuration conf = new Configuration();
     conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
     conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -321,27 +320,25 @@
                 new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
             DataOutputBuffer dob = new DataOutputBuffer();
             header.write(dob);
-            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
             dob = new DataOutputBuffer();
             for (int i = 0; i < 100000; ++i) {
               header.write(dob);
             }
-            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
           }
           @Override
           protected void sendError(ChannelHandlerContext ctx,
               HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error());
-              ctx.getChannel().close();
+            if (failureEncountered.compareAndSet(false, true)) {
+              ctx.channel().close();
             }
           }
           @Override
           protected void sendError(ChannelHandlerContext ctx, String message,
               HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error());
-              ctx.getChannel().close();
+            if (failureEncountered.compareAndSet(false, true)) {
+              ctx.channel().close();
             }
           }
         };
@@ -368,9 +365,9 @@
     header.readFields(input);
     input.close();
 
-    shuffleHandler.stop();
+    shuffleHandler.close();
     Assert.assertTrue("sendError called when client closed connection",
-        failures.size() == 0);
+        !failureEncountered.get());
   }
 
   static class LastSocketAddress {
@@ -378,14 +375,14 @@
     void setAddress(SocketAddress lastAddress) {
       this.lastAddress = lastAddress;
     }
-    SocketAddress getSocketAddres() {
+    SocketAddress getSocketAddress() {
       return lastAddress;
     }
   }
 
   @Test(timeout = 10000)
   public void testKeepAlive() throws Exception {
-    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    final AtomicBoolean failureEncountered = new AtomicBoolean(false);
     Configuration conf = new Configuration();
     conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
     conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -443,8 +440,7 @@
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
                                                 Channel ch, String user, String mapId, Range reduceRange,
                                                 MapOutputInfo info) throws IOException {
-            lastSocketAddress.setAddress(ch.getRemoteAddress());
-            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+            lastSocketAddress.setAddress(ch.remoteAddress());
 
             // send a shuffle header and a lot of data down the channel
             // to trigger a broken pipe
@@ -452,29 +448,27 @@
                 new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
             DataOutputBuffer dob = new DataOutputBuffer();
             header.write(dob);
-            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
             dob = new DataOutputBuffer();
             for (int i = 0; i < 100000; ++i) {
               header.write(dob);
             }
-            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
           }
 
           @Override
           protected void sendError(ChannelHandlerContext ctx,
               HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error());
-              ctx.getChannel().close();
+            if (failureEncountered.compareAndSet(false, true)) {
+              ctx.channel().close();
             }
           }
 
           @Override
           protected void sendError(ChannelHandlerContext ctx, String message,
               HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error());
-              ctx.getChannel().close();
+            if (failureEncountered.compareAndSet(false, true)) {
+              ctx.channel().close();
             }
           }
         };
@@ -505,7 +499,7 @@
     header.readFields(input);
     byte[] buffer = new byte[1024];
     while (input.read(buffer) != -1) {}
-    SocketAddress firstAddress = lastSocketAddress.getSocketAddres();
+    SocketAddress firstAddress = lastSocketAddress.getSocketAddress();
     input.close();
 
     // For keepAlive via URL
@@ -527,11 +521,12 @@
     header = new ShuffleHeader();
     header.readFields(input);
     input.close();
-    SocketAddress secondAddress = lastSocketAddress.getSocketAddres();
+    SocketAddress secondAddress = lastSocketAddress.getSocketAddress();
     Assert.assertNotNull("Initial shuffle address should not be null", firstAddress);
     Assert.assertNotNull("Keep-Alive shuffle address should not be null", secondAddress);
     Assert.assertEquals("Initial shuffle address and keep-alive shuffle "
         + "address should be the same", firstAddress, secondAddress);
+    shuffleHandler.close();
   }
 
   @Test
@@ -567,7 +562,7 @@
       if (conn != null) {
         conn.disconnect();
       }
-      shuffleHandler.stop();
+      shuffleHandler.close();
     }
   }
 
@@ -603,7 +598,6 @@
           HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
     }
 
-    shuffleHandler.stop();
     shuffleHandler.close();
   }
 
@@ -657,12 +651,12 @@
                 new ShuffleHeader("dummy_header", 5678, 5678, 1);
             DataOutputBuffer dob = new DataOutputBuffer();
             header.write(dob);
-            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
             dob = new DataOutputBuffer();
             for (int i=0; i<100000; ++i) {
               header.write(dob);
             }
-            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
           }
         };
       }
@@ -689,6 +683,10 @@
 
     // Try to open numerous connections
     for (int i = 0; i < connAttempts; i++) {
+      // connections should be made in a bit relaxed way, otherwise
+      // non-synced channelActive method will mess them up
+      Thread.sleep(200);
+
       conns[i].connect();
     }
 
@@ -712,7 +710,7 @@
       Assert.fail("Expected a SocketException");
     }
 
-    shuffleHandler.stop();
+    shuffleHandler.close();
   }
 
   /**
@@ -807,7 +805,7 @@
       }
 
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
       FileUtil.fullyDelete(absLogDir);
     }
   }
@@ -900,7 +898,7 @@
               + " did not match expected owner '" + user + "'";
       Assert.assertTrue((new String(byteArr)).contains(message));
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
       FileUtil.fullyDelete(absLogDir);
     }
   }
@@ -953,7 +951,6 @@
   public void testRecovery() throws IOException {
     final String user = "someuser";
     final ApplicationId appId = ApplicationId.newInstance(12345, 1);
-    final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
     final File tmpDir = new File(System.getProperty("test.build.data",
         System.getProperty("java.io.tmpdir")),
         TestShuffleHandler.class.getName());
@@ -1131,7 +1128,7 @@
 
   @Test(timeout = 100000)
   public void testGetMapOutputInfo() throws Exception {
-    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    final AtomicBoolean failureEncountered = new AtomicBoolean(false);
     Configuration conf = new Configuration();
     conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
     conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -1173,9 +1170,8 @@
           @Override
           protected void sendError(ChannelHandlerContext ctx, String message,
               HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error(message));
-              ctx.getChannel().close();
+            if (failureEncountered.compareAndSet(false, true)) {
+              ctx.channel().close();
             }
           }
           @Override
@@ -1187,7 +1183,7 @@
                 new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
             DataOutputBuffer dob = new DataOutputBuffer();
             header.write(dob);
-            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
           }
         };
       }
@@ -1227,16 +1223,16 @@
         // ignore
       }
       Assert.assertEquals("sendError called due to shuffle error",
-          0, failures.size());
+          false, failureEncountered.get());
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
       FileUtil.fullyDelete(absLogDir);
     }
   }
 
   @Test(timeout = 5000)
   public void testDagDelete() throws Exception {
-    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    final AtomicBoolean failureEncountered = new AtomicBoolean(false);
     Configuration conf = new Configuration();
     conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
@@ -1261,9 +1257,8 @@
           @Override
           protected void sendError(ChannelHandlerContext ctx, String message,
                                    HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error(message));
-              ctx.getChannel().close();
+            if (failureEncountered.compareAndSet(false, true)) {
+              ctx.channel().close();
             }
           }
         };
@@ -1309,9 +1304,9 @@
         // ignore
       }
       Assert.assertEquals("sendError called due to shuffle error",
-          0, failures.size());
+          false, failureEncountered.get());
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
       FileUtil.fullyDelete(absLogDir);
     }
   }
@@ -1323,29 +1318,23 @@
 
     final ChannelHandlerContext mockCtx =
         mock(ChannelHandlerContext.class);
-    final MessageEvent mockEvt = mock(MessageEvent.class);
     final Channel mockCh = mock(AbstractChannel.class);
     final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
 
     // Mock HttpRequest and ChannelFuture
-    final HttpRequest mockHttpRequest = createMockHttpRequest();
+    final FullHttpRequest httpRequest = createHttpRequest();
     final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
         listenerList);
     final ShuffleHandler.TimeoutHandler timerHandler =
         new ShuffleHandler.TimeoutHandler();
 
     // Mock Netty Channel Context and Channel behavior
-    Mockito.doReturn(mockCh).when(mockCtx).getChannel();
-    Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
+    Mockito.doReturn(mockCh).when(mockCtx).channel();
+    Mockito.when(mockCh.pipeline()).thenReturn(mockPipeline);
     Mockito.when(mockPipeline.get(Mockito.any(String.class))).thenReturn(timerHandler);
-    when(mockCtx.getChannel()).thenReturn(mockCh);
-    Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
-    when(mockCh.write(Object.class)).thenReturn(mockFuture);
-
-    //Mock MessageEvent behavior
-    Mockito.doReturn(mockCh).when(mockEvt).getChannel();
-    when(mockEvt.getChannel()).thenReturn(mockCh);
-    Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
+    when(mockCtx.channel()).thenReturn(mockCh);
+    Mockito.doReturn(mockFuture).when(mockCh).writeAndFlush(Mockito.any(Object.class));
+    when(mockCh.writeAndFlush(Object.class)).thenReturn(mockFuture);
 
     final ShuffleHandler sh = new MockShuffleHandler();
     Configuration conf = new Configuration();
@@ -1356,7 +1345,7 @@
     sh.start();
     int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
         ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
-    sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
+    sh.getShuffle(conf).channelRead(mockCtx, httpRequest);
     assertTrue("Number of Open files should not exceed the configured " +
             "value!-Not Expected",
         listenerList.size() <= maxOpenFiles);
@@ -1419,9 +1408,9 @@
   public ChannelFuture createMockChannelFuture(Channel mockCh,
       final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
     final ChannelFuture mockFuture = mock(ChannelFuture.class);
-    when(mockFuture.getChannel()).thenReturn(mockCh);
+    when(mockFuture.channel()).thenReturn(mockCh);
     Mockito.doReturn(true).when(mockFuture).isSuccess();
-    Mockito.doAnswer(new Answer() {
+    Mockito.doAnswer(new Answer<Object>() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
         //Add ReduceMapFileCount listener to a list
@@ -1436,19 +1425,11 @@
     return mockFuture;
   }
 
-  public HttpRequest createMockHttpRequest() {
-    HttpRequest mockHttpRequest = mock(HttpRequest.class);
-    Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
-    Mockito.doReturn(new DefaultHttpHeaders()).when(mockHttpRequest).headers();
-    Mockito.doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocation) throws Throwable {
-        String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
-        for (int i = 0; i < 100; i++)
-          uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
-        return uri;
-      }
-    }).when(mockHttpRequest).getUri();
-    return mockHttpRequest;
+  public FullHttpRequest createHttpRequest() {
+    String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
+    for (int i = 0; i < 100; i++) {
+      uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
+    }
+    return new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
   }
 }