Repair Netty 4 benchmarks tests, update Netty 4 version and include benchmarks2 in build
diff --git a/benchmarks2/pom.xml b/benchmarks2/pom.xml
index 2b03467..292e957 100755
--- a/benchmarks2/pom.xml
+++ b/benchmarks2/pom.xml
@@ -34,7 +34,7 @@
   <properties>
      <!-- defined in order to run against a different MINA version -->
      <mina.version>${project.version}</mina.version>
-     <netty.version>4.0.0.CR3</netty.version>
+     <netty.version>4.0.4.Final</netty.version>
   </properties>
   
   <dependencies>
diff --git a/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java b/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java
index b866d18..ecf00f7 100755
--- a/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java
+++ b/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java
@@ -19,7 +19,6 @@
  */
 package org.apache.mina.core;
 
-import org.apache.mina.core.nio.tcp.Mina3TcpBenchmarkClient;
 import org.apache.mina.core.nio.tcp.Netty4TcpBenchmarkClient;
 import org.apache.mina.core.nio.udp.Netty4UdpBenchmarkClient;
 
@@ -36,8 +35,6 @@
             return new Netty4TcpBenchmarkClient();
         case Netty4_udp:
             return new Netty4UdpBenchmarkClient();
-        case Mina3_tcp:
-            return new Mina3TcpBenchmarkClient();
         default:
             throw new IllegalArgumentException("Invalid type " + type);
         }
diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java
index 7edde48..fcc8782 100644
--- a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java
+++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java
@@ -19,13 +19,19 @@
  */
 package org.apache.mina.core.nio.tcp;
 
-import io.netty.bootstrap.ChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.Channels;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.mina.core.BenchmarkClient;
@@ -36,8 +42,8 @@
  */
 public class Netty4TcpBenchmarkClient implements BenchmarkClient {
 
-    private ChannelFactory factory;
-
+    private EventLoopGroup group = new NioEventLoopGroup();
+  
     /**
      * 
      */
@@ -48,42 +54,48 @@
      * {@inheritedDoc}
      */
     public void start(final int port, final CountDownLatch counter, final byte[] data) throws IOException {
-        factory = new NioClientSocketChannelFactory();
-        ClientBootstrap bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("sendBufferSize", 64 * 1024);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-            public ChannelPipeline getPipeline() throws Exception {
-                return Channels.pipeline(new SimpleChannelUpstreamHandler() {
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(group);
+        bootstrap.option(ChannelOption.SO_SNDBUF, 64 * 1024);
+        bootstrap.option(ChannelOption.TCP_NODELAY, true);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+
+            @Override
+            protected void initChannel(SocketChannel ch) throws Exception {
+                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                     private void sendMessage(ChannelHandlerContext ctx, byte[] data) {
-                        ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
-                        ctx.getChannel().write(buffer);
+                        ByteBuf buf = ctx.alloc().buffer(data.length);
+                        buf.writeBytes(data);
+                        ctx.writeAndFlush(buf);
                     }
 
                     @Override
-                    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-                        if (e.getMessage() instanceof ChannelBuffer) {
-                            ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-                            for (int i = 0; i < buffer.readableBytes(); ++i) {
-                                counter.countDown();
-                                if (counter.getCount() > 0) {
-                                    sendMessage(ctx, data);
-                                } else {
-                                    ctx.getChannel().close();
-                                }
+                    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
+                        ByteBuf buf = (ByteBuf)message;
+                        for(int i=0; i < buf.readableBytes();i++) {
+                            counter.countDown();
+                            if (counter.getCount() > 0) {
+                                sendMessage(ctx, data);
+                            } else {
+                                ctx.channel().close();
                             }
-                        } else {
-                            throw new IllegalArgumentException(e.getMessage().getClass().getName());
                         }
+                        buf.release();
                     }
 
                     @Override
-                    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-                        sendMessage(ctx, data);
+                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+                      sendMessage(ctx, data);
                     }
-
                 });
             }
+
+            @Override
+            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+                cause.printStackTrace();
+                ctx.close();
+            }
         });
         bootstrap.connect(new InetSocketAddress(port));
     }
@@ -92,6 +104,6 @@
      * {@inheritedDoc}
      */
     public void stop() throws IOException {
-        factory.releaseExternalResources();
+        group.shutdownGracefully();
     }
 }
diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java
index 1fc1f12..13a8a78 100644
--- a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java
+++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java
@@ -21,16 +21,14 @@
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.MessageBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundByteHandlerAdapter;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
@@ -59,7 +57,9 @@
 
     private static final AttributeKey<Integer> LENGTH_ATTRIBUTE = new AttributeKey<Integer>("length");
 
-    private class TestServerHandler extends ChannelInboundByteHandlerAdapter {
+    private ServerBootstrap bootstrap = null;
+
+    private class TestServerHandler extends ChannelInboundHandlerAdapter  {
         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
             System.out.println("childChannelOpen");
             ctx.attr(STATE_ATTRIBUTE).set(State.WAIT_FOR_FIRST_BYTE_LENGTH);
@@ -75,12 +75,13 @@
         }
 
         @Override
-        public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+        public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
+            ByteBuf buffer = (ByteBuf)message;
             State state = ctx.attr(STATE_ATTRIBUTE).get();
             int length = 0;
             Attribute<Integer> lengthAttribute = ctx.attr(LENGTH_ATTRIBUTE);
 
-            if (lengthAttribute != null) {
+            if (lengthAttribute.get() != null) {
                 length = lengthAttribute.get();
             }
 
@@ -106,9 +107,7 @@
                     state = State.READING;
 
                     if ((length == 0) && (buffer.readableBytes() == 0)) {
-                        MessageBuf<Object> messageOut = ctx.nextOutboundMessageBuffer();
-                        messageOut.add(ACK.slice());
-                        ctx.flush();
+                        ctx.writeAndFlush(ACK.retain(1).resetReaderIndex());
                         state = State.WAIT_FOR_FIRST_BYTE_LENGTH;
                     }
 
@@ -122,9 +121,7 @@
                         buffer.skipBytes(remaining);
                     } else {
                         buffer.skipBytes(length);
-                        MessageBuf<Object> messageOut = ctx.nextOutboundMessageBuffer();
-                        messageOut.add(ACK.slice());
-                        ctx.flush();
+                        ctx.writeAndFlush(ACK.retain(1).resetReaderIndex());
                         state = State.WAIT_FOR_FIRST_BYTE_LENGTH;
                         length = 0;
                     }
@@ -133,6 +130,7 @@
 
             ctx.attr(STATE_ATTRIBUTE).set(state);
             ctx.attr(LENGTH_ATTRIBUTE).set(length);
+            buffer.release();
         }
     }
 
@@ -141,9 +139,6 @@
      * @throws  
      */
     public void start(int port) throws IOException {
-        ServerBootstrap bootstrap = null;
-        ;
-
         try {
             bootstrap = new ServerBootstrap();
             bootstrap.option(ChannelOption.SO_RCVBUF, 128 * 1024);
@@ -158,15 +153,11 @@
                 };
             });
             ChannelFuture bindFuture = bootstrap.bind();
-            bindFuture.sync();
-            Channel channel = bindFuture.channel();
-            ChannelFuture closeFuture = channel.closeFuture();
+            //bindFuture.sync();
+            //Channel channel = bindFuture.channel();
+            //ChannelFuture closeFuture = channel.closeFuture();
             //closeFuture.sync();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
         } finally {
-            bootstrap.shutdown();
         }
     }
 
@@ -174,5 +165,7 @@
      * {@inheritedDoc}
      */
     public void stop() throws IOException {
+        bootstrap.childGroup().shutdownGracefully();
+        bootstrap.group().shutdownGracefully();
     }
 }
diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java
index 2030e5e..267dffb 100644
--- a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java
+++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java
@@ -19,13 +19,19 @@
  */
 package org.apache.mina.core.nio.udp;
 
-import io.netty.bootstrap.ChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.Channels;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.mina.core.BenchmarkClient;
@@ -36,7 +42,7 @@
  */
 public class Netty4UdpBenchmarkClient implements BenchmarkClient {
 
-    private ChannelFactory factory;
+    private Bootstrap bootstrap;
 
     /**
      * 
@@ -48,50 +54,47 @@
      * {@inheritedDoc}
      */
     public void start(final int port, final CountDownLatch counter, final byte[] data) throws IOException {
-        factory = new NioDatagramChannelFactory();
-        ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(factory);
-        bootstrap.setOption("sendBufferSize", 65536);
-        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-            public ChannelPipeline getPipeline() throws Exception {
-                return Channels.pipeline(new SimpleChannelUpstreamHandler() {
-                    private void sendMessage(ChannelHandlerContext ctx, byte[] data) {
-                        ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
-                        ctx.getChannel().write(buffer);
+        bootstrap = new Bootstrap();
+        bootstrap.option(ChannelOption.SO_SNDBUF, 65536);
+        bootstrap.group(new NioEventLoopGroup());
+        bootstrap.channel(NioDatagramChannel.class);
+        bootstrap.handler(new ChannelInitializer<DatagramChannel>() {
+
+            @Override
+            protected void initChannel(DatagramChannel ch) throws Exception {
+                ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
+                    private void sendMessage(ChannelHandlerContext ctx, byte[] data, InetSocketAddress address) {
+                        ByteBuf buf = ctx.alloc().buffer(data.length);
+                        buf.writeBytes(data);
+                        ctx.writeAndFlush(new DatagramPacket(buf, address));
+                    }
+                  
+                    @Override
+                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+                        sendMessage(ctx, data, new InetSocketAddress("localhost", port));
                     }
 
                     @Override
-                    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-                        if (e.getMessage() instanceof ChannelBuffer) {
-                            ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-                            for (int i = 0; i < buffer.readableBytes(); ++i) {
-                                counter.countDown();
-                                if (counter.getCount() > 0) {
-                                    sendMessage(ctx, data);
-                                } else {
-                                    ctx.getChannel().close();
-                                }
+                    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket message) throws Exception {
+                        for(int i=0; i < message.content().readableBytes();i++) {
+                            counter.countDown();
+                            if (counter.getCount() > 0) {
+                                sendMessage(ctx, data, message.sender());
+                            } else {
+                                ctx.channel().close();
                             }
-                        } else {
-                            throw new IllegalArgumentException(e.getMessage().getClass().getName());
                         }
                     }
-
-                    @Override
-                    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-                        sendMessage(ctx, data);
-                    }
-
                 });
             }
         });
-        bootstrap.connect(new InetSocketAddress(port));
+        bootstrap.bind(port+1);
     }
 
     /**
      * {@inheritedDoc}
      */
     public void stop() throws IOException {
-        factory.shutdown();
-        factory.releaseExternalResources();
+        bootstrap.group().shutdownGracefully();
     }
 }
diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java
index a8c3b18..be1c625 100644
--- a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java
+++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java
@@ -19,23 +19,24 @@
  */
 package org.apache.mina.core.nio.udp;
 
-import io.netty.bootstrap.ChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.Channels;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.mina.core.BenchmarkServer;
 
-import com.sun.jdi.event.ExceptionEvent;
-
 /**
  * A Netty 3 based UDP server
  * 
@@ -47,137 +48,102 @@
         WAIT_FOR_FIRST_BYTE_LENGTH, WAIT_FOR_SECOND_BYTE_LENGTH, WAIT_FOR_THIRD_BYTE_LENGTH, WAIT_FOR_FOURTH_BYTE_LENGTH, READING
     }
 
-    private static final ChannelBuffer ACK = ChannelBuffers.buffer(1);
+    private static final ByteBuf ACK = Unpooled.buffer(1);
 
+    private static final AttributeKey<State> STATE_ATTRIBUTE = new AttributeKey<State>("state");
+
+    private static final AttributeKey<Integer> LENGTH_ATTRIBUTE = new AttributeKey<Integer>("length");
+    
     static {
-        ACK.writeByte(0);
-    }
+      ACK.writeByte(0);
+  }
 
-    private static final String STATE_ATTRIBUTE = Netty4UdpBenchmarkServer.class.getName() + ".state";
-
-    private static final String LENGTH_ATTRIBUTE = Netty4UdpBenchmarkServer.class.getName() + ".length";
-
-    private ChannelFactory factory;
-
-    private ChannelGroup allChannels = new DefaultChannelGroup();
-
-    /**
-     * Allocate a map as attachment for storing attributes.
-     * 
-     * @param ctx the channel context
-     * @return the map from the attachment
-     */
-    protected static Map<String, Object> getAttributesMap(ChannelHandlerContext ctx) {
-        Map<String, Object> map = (Map<String, Object>) ctx.getAttachment();
-        if (map == null) {
-            map = new HashMap<String, Object>();
-            ctx.setAttachment(map);
-        }
-        return map;
-    }
-
-    private static void setAttribute(ChannelHandlerContext ctx, String name, Object value) {
-        getAttributesMap(ctx).put(name, value);
-    }
-
-    private static Object getAttribute(ChannelHandlerContext ctx, String name) {
-        return getAttributesMap(ctx).get(name);
-    }
+    private Bootstrap bootstrap;
 
     /**
      * {@inheritDoc}
      */
     public void start(int port) throws IOException {
-        factory = new NioDatagramChannelFactory();
-        ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(factory);
-        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-            public ChannelPipeline getPipeline() throws Exception {
-                return Channels.pipeline(new SimpleChannelUpstreamHandler() {
+        bootstrap = new Bootstrap();
+        bootstrap.group(new NioEventLoopGroup());
+        bootstrap.channel(NioDatagramChannel.class);
+        bootstrap.option(ChannelOption.SO_RCVBUF, 65536);
+        bootstrap.handler(new ChannelInitializer<DatagramChannel>() {
+
+            @Override
+            protected void initChannel(DatagramChannel ch) throws Exception {
+                ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
                     @Override
-                    public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {
-                        System.out.println("childChannelOpen");
-                        setAttribute(ctx, STATE_ATTRIBUTE, State.WAIT_FOR_FIRST_BYTE_LENGTH);
+                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+                        ctx.attr(STATE_ATTRIBUTE).set(State.WAIT_FOR_FIRST_BYTE_LENGTH);
                     }
 
                     @Override
-                    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-                        System.out.println("channelOpen");
-                        setAttribute(ctx, STATE_ATTRIBUTE, State.WAIT_FOR_FIRST_BYTE_LENGTH);
-                        allChannels.add(ctx.getChannel());
-                    }
+                    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket message) throws Exception {
+                        ByteBuf buffer = message.content();
+                        State state = ctx.attr(STATE_ATTRIBUTE).get();
+                        int length = 0;
+                        Attribute<Integer> lengthAttribute = ctx.attr(LENGTH_ATTRIBUTE);
 
-                    @Override
-                    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-                        if (e.getMessage() instanceof ChannelBuffer) {
-                            ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+                        if (lengthAttribute.get() != null) {
+                            length = lengthAttribute.get();
+                        }
 
-                            State state = (State) getAttribute(ctx, STATE_ATTRIBUTE);
-                            int length = 0;
-                            if (getAttributesMap(ctx).containsKey(LENGTH_ATTRIBUTE)) {
-                                length = (Integer) getAttribute(ctx, LENGTH_ATTRIBUTE);
-                            }
-                            while (buffer.readableBytes() > 0) {
-                                switch (state) {
+                        while (buffer.readableBytes() > 0) {
+                            switch (state) {
                                 case WAIT_FOR_FIRST_BYTE_LENGTH:
                                     length = (buffer.readByte() & 255) << 24;
                                     state = State.WAIT_FOR_SECOND_BYTE_LENGTH;
                                     break;
+
                                 case WAIT_FOR_SECOND_BYTE_LENGTH:
                                     length += (buffer.readByte() & 255) << 16;
                                     state = State.WAIT_FOR_THIRD_BYTE_LENGTH;
                                     break;
+
                                 case WAIT_FOR_THIRD_BYTE_LENGTH:
                                     length += (buffer.readByte() & 255) << 8;
                                     state = State.WAIT_FOR_FOURTH_BYTE_LENGTH;
                                     break;
+
                                 case WAIT_FOR_FOURTH_BYTE_LENGTH:
                                     length += (buffer.readByte() & 255);
                                     state = State.READING;
+
                                     if ((length == 0) && (buffer.readableBytes() == 0)) {
-                                        ctx.getChannel().write(ACK.slice());
+                                        ctx.writeAndFlush(new DatagramPacket(ACK.retain(1).resetReaderIndex(), message.sender()));
                                         state = State.WAIT_FOR_FIRST_BYTE_LENGTH;
                                     }
                                     break;
+
                                 case READING:
                                     int remaining = buffer.readableBytes();
+
                                     if (length > remaining) {
                                         length -= remaining;
                                         buffer.skipBytes(remaining);
                                     } else {
                                         buffer.skipBytes(length);
-                                        SocketAddress remoteAddress = e.getRemoteAddress();
-                                        ctx.getChannel().write(ACK.slice(), remoteAddress);
+                                        ctx.writeAndFlush(new DatagramPacket(ACK.retain(1).resetReaderIndex(), message.sender()));
                                         state = State.WAIT_FOR_FIRST_BYTE_LENGTH;
                                         length = 0;
                                     }
-                                }
                             }
-                            setAttribute(ctx, STATE_ATTRIBUTE, state);
-                            setAttribute(ctx, LENGTH_ATTRIBUTE, length);
                         }
-                    }
 
-                    @Override
-                    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-                        allChannels.remove(ctx.getChannel());
-                    }
-
-                    @Override
-                    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-                        e.getCause().printStackTrace();
-                    }
+                        ctx.attr(STATE_ATTRIBUTE).set(state);
+                        ctx.attr(LENGTH_ATTRIBUTE).set(length);
+                   }
                 });
             }
         });
-        allChannels.add(bootstrap.bind(new InetSocketAddress(port)));
+        bootstrap.bind(port);
     }
 
     /**
      * {@inheritedDoc}
      */
     public void stop() throws IOException {
-        allChannels.disconnect().awaitUninterruptibly();
-        factory.shutdown();
-        factory.releaseExternalResources();
+        bootstrap.group().shutdownGracefully();
     }
 }
diff --git a/pom.xml b/pom.xml
index 99bc8af..5f94a3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@
     <module>thrift</module>
     <module>protobuf</module>
     <module>benchmarks</module>
+    <module>benchmarks2</module>
   </modules>
 
   <dependencyManagement>