Updated patch from 10/14/2010, adds Netty client support.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/branches/ZOOKEEPER-823@1022642 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/ivy.xml b/ivy.xml
index 96f6a9e..9dded65 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -41,7 +41,7 @@
     <dependency org="log4j" name="log4j" rev="1.2.15" transitive="false" conf="default"/>
     <dependency org="jline" name="jline" rev="0.9.94" transitive="false" conf="default"/>
 
-    <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.1.5.GA">
+    <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.2.1.Final">
       <artifact name="netty" type="jar" conf="default"/>
     </dependency>
 
diff --git a/ivysettings.xml b/ivysettings.xml
index a120144..52cfa52 100644
--- a/ivysettings.xml
+++ b/ivysettings.xml
@@ -20,7 +20,7 @@
   <property name="repo.maven.org"
     value="http://repo1.maven.org/maven2/" override="false"/>
   <property name="repo.jboss.org"
-    value="http://repository.jboss.com/maven2/" override="false"/>
+    value="http://repository.jboss.org/nexus/content/groups/public/" override="false"/>
   <property name="repo.sun.org"
     value="http://download.java.net/maven/2/" override="false"/>
   <property name="maven2.pattern"
diff --git a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
index 822f5bf..42810e2 100644
--- a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
+++ b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
@@ -1050,13 +1050,18 @@
         <para>Prior to version 3.4 ZooKeeper has always used NIO
             directly, however in versions 3.4 and later Netty is
             supported as an option to NIO (replaces). NIO continues to
-            be the default, however Netty based communication can be
-            used in place of NIO by setting the environment variable
+            be the default for both server and client, however Netty
+            based communication can be used in place of NIO by the
+            setting of environment variables.  Set
             "zookeeper.serverCnxnFactory" to
-            "org.apache.zookeeper.server.NettyServerCnxnFactory". You
-            have the option of setting this on either the client(s) or
-            server(s), typically you would want to set this on both,
-            however that is at your discretion.
+            "org.apache.zookeeper.server.NettyServerCnxnFactory" to
+            enable the server to use Netty.  Set
+            "zookeeper.clientCnxnFactory" to
+            "org.apache.zookeeper.NettyClientCnxnFactory" to enable
+            the client to use Netty. You have the option to set this
+            on either the client(s) or server(s), typically you would
+            want to set this on both, however that is at your
+            discretion.
         </para>
         <para>
           TBD - tuning options for netty - currently there are none that are netty specific but we should add some. Esp around max bound on the number of reader worker threads netty creates.
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java b/src/java/main/org/apache/zookeeper/ClientCnxn.java
index d37ac27..d5d2364 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxn.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java
@@ -25,9 +25,6 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -55,7 +52,6 @@
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.proto.AuthPacket;
 import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
 import org.apache.zookeeper.proto.CreateResponse;
 import org.apache.zookeeper.proto.ExistsResponse;
 import org.apache.zookeeper.proto.GetACLResponse;
@@ -85,8 +81,6 @@
      * option allows the client to turn off this behavior by setting
      * the environment variable "zookeeper.disableAutoWatchReset" to "true" */
     private static boolean disableAutoWatchReset;
-
-    public static final int packetLen;
     static {
         // this var should not be public, but otw there is no easy way
         // to test
@@ -96,7 +90,6 @@
             LOG.debug("zookeeper.disableAutoWatchReset is "
                     + disableAutoWatchReset);
         }
-        packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
     }
 
     private final ArrayList<InetSocketAddress> serverAddrs =
@@ -154,15 +147,13 @@
 
     final EventThread eventThread;
 
-    final Selector selector = Selector.open();
-
     /**
      * Set to true when close is called. Latches the connection such that we
      * don't attempt to re-connect to the server if in the middle of closing the
      * connection (client sends session disconnect to server as part of close
      * operation)
      */
-    volatile boolean closing = false;
+    private volatile boolean closing = false;
 
     public long getSessionId() {
         return sessionId;
@@ -180,56 +171,22 @@
     public String toString() {
         StringBuilder sb = new StringBuilder();
 
-        SocketAddress local = getLocalSocketAddress();
-        SocketAddress remote = getRemoteSocketAddress();
+        SocketAddress local = sendThread.getSocket().getLocalSocketAddress();
+        SocketAddress remote = sendThread.getSocket().getRemoteSocketAddress();
         sb
             .append("sessionid:0x").append(Long.toHexString(getSessionId()))
             .append(" local:").append(local)
             .append(" remoteserver:").append(remote)
             .append(" lastZxid:").append(lastZxid)
             .append(" xid:").append(xid)
-            .append(" sent:").append(sendThread.sentCount)
-            .append(" recv:").append(sendThread.recvCount)
+            .append(" sent:").append(sendThread.getSocket().getSentCount())
+            .append(" recv:").append(sendThread.getSocket().getRecvCount())
             .append(" queuedpkts:").append(outgoingQueue.size())
             .append(" pendingresp:").append(pendingQueue.size())
             .append(" queuedevents:").append(eventThread.waitingEvents.size());
 
         return sb.toString();
     }
-    
-    /**
-     * Returns the address to which the socket is connected.
-     * @return ip address of the remote side of the connection or null if
-     *         not connected
-     */
-    SocketAddress getRemoteSocketAddress() {
-        // a lot could go wrong here, so rather than put in a bunch of code
-        // to check for nulls all down the chain let's do it the simple
-        // yet bulletproof way
-        try {
-            return ((SocketChannel)sendThread.sockKey.channel())
-                .socket().getRemoteSocketAddress();
-        } catch (NullPointerException e) {
-            return null;
-        }
-    }
-
-    /** 
-     * Returns the local address to which the socket is bound.
-     * @return ip address of the remote side of the connection or null if
-     *         not connected
-     */
-    SocketAddress getLocalSocketAddress() {
-        // a lot could go wrong here, so rather than put in a bunch of code
-        // to check for nulls all down the chain let's do it the simple
-        // yet bulletproof way
-        try {
-            return ((SocketChannel)sendThread.sockKey.channel())
-                .socket().getLocalSocketAddress();
-        } catch (NullPointerException e) {
-            return null;
-        }
-    }
 
     /**
      * This class allows us to pass the headers and the relevant records around.
@@ -322,10 +279,9 @@
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            ClientWatchManager watcher)
-        throws IOException
-    {
-        this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
+            ClientWatchManager watcher, ClientCnxnSocket socket)
+            throws IOException {
+        this(hosts, sessionTimeout, zooKeeper, watcher, socket, 0, new byte[16]);
     }
 
     /**
@@ -345,9 +301,8 @@
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
-        throws IOException
-    {
+            ClientWatchManager watcher, ClientCnxnSocket socket,
+            long sessionId, byte[] sessionPasswd) throws IOException {
         this.zooKeeper = zooKeeper;
         this.watcher = watcher;
         this.sessionId = sessionId;
@@ -389,7 +344,7 @@
         connectTimeout = sessionTimeout / hostsList.length;
         readTimeout = sessionTimeout * 2 / 3;
         Collections.shuffle(serverAddrs);
-        sendThread = new SendThread();
+        sendThread = new SendThread(socket);
         eventThread = new EventThread();
     }
 
@@ -412,9 +367,10 @@
         eventThread.start();
     }
 
-    Object eventOfDeath = new Object();
+    private Object eventOfDeath = new Object();
 
-    final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+    private final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+        @Override
         public void uncaughtException(Thread t, Throwable e) {
             LOG.error("from " + t.getName(), e);
         }
@@ -618,7 +574,7 @@
         if (p.replyHeader == null) {
             return;
         }
-        switch(zooKeeper.state) {
+        switch(state) {
         case AUTH_FAILED:
             p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
             break;
@@ -631,15 +587,16 @@
         finishPacket(p);
     }
 
-    volatile long lastZxid;
+    private volatile long lastZxid;
 
-    private static class EndOfStreamException extends IOException {
+    static class EndOfStreamException extends IOException {
         private static final long serialVersionUID = -5438877188796231422L;
 
         public EndOfStreamException(String msg) {
             super(msg);
         }
-        
+
+        @Override
         public String toString() {
             return "EndOfStreamException: " + getMessage();
         }
@@ -652,7 +609,7 @@
             super(msg);
         }
     }
-    
+
     private static class SessionExpiredException extends IOException {
         private static final long serialVersionUID = -1388816932076193249L;
 
@@ -660,51 +617,275 @@
             super(msg);
         }
     }
-    
+
+    public static final int packetLen =
+        Integer.getInteger("jute.maxbuffer", 4096 * 1024);
+
     /**
      * This class services the outgoing request queue and generates the heart
      * beats. It also spawns the ReadThread.
      */
     class SendThread extends Thread {
-        SelectionKey sockKey;
-
-        final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
-
-        ByteBuffer incomingBuffer = lenBuffer;
-
-        boolean initialized;
-
         private long lastPingSentNs;
+        private final ClientCnxnSocket socket;
+        private int lastConnectIndex = -1;
+        private int currentConnectIndex;
+        private Random r = new Random(System.nanoTime());
 
-        long sentCount = 0;
-        long recvCount = 0;
-
-        void readLength() throws IOException {
-            int len = incomingBuffer.getInt();
-            if (len < 0 || len >= packetLen) {
-                throw new IOException("Packet len" + len + " is out of range!");
-            }
-            incomingBuffer = ByteBuffer.allocate(len);
+        SendThread(ClientCnxnSocket socket) {
+            super(currentThread().getName() + "-SendThread()");
+            state = States.CONNECTING;
+            this.socket = socket;
+            socket.introduce(this, outgoingQueue, sessionId);
+            setUncaughtExceptionHandler(uncaughtExceptionHandler);
+            setDaemon(true);
         }
 
-        void readConnectResult() throws IOException {
-            if (LOG.isTraceEnabled()) {
-                StringBuffer buf = new StringBuffer("0x[");
-                for (byte b : incomingBuffer.array()) {
-                    buf.append(Integer.toHexString(b) + ",");
-                }
-                buf.append("]");
-                LOG.trace("readConnectRestult " + incomingBuffer.remaining() 
-                        + " " + buf.toString());
+        @Override
+        public void run() {
+            socket.updateNow();
+            socket.updateLastSendAndHeard();
+
+            while (state.isAlive()) {
+                try {
+                    if (!socket.isConnected()) {
+                        // don't re-establish connection if we are closing
+                        if (closing) {
+                            break;
+                        }
+                        startConnect();
+                        socket.updateLastSendAndHeard();
+                    }
+
+                    int to = readTimeout - socket.getIdleRecv();
+                    if (state != States.CONNECTED) {
+                        to = connectTimeout - socket.getIdleRecv();
+                    }
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("TO=" + to);
+                    }
+                    if (to <= 0) {
+                        throw new SessionTimeoutException(
+                                "Client session timed out, have not heard from server in "
+                                        + socket.getIdleRecv() + "ms"
+                                        + " for sessionid 0x"
+                                        + Long.toHexString(sessionId));
+                    }
+                    if (state == States.CONNECTED) {
+                        int timeToNextPing = readTimeout / 2
+                                - socket.getIdleSend();
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("timeToNextPing=" + timeToNextPing);
+                        }
+                        if (timeToNextPing <= 0) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("timeToNextPing=" + timeToNextPing);
+                            }
+                            sendPing();
+                            socket.updateLastSend();
+                            socket.enableWrite();
+                        } else {
+                            if (timeToNextPing < to) {
+                                to = timeToNextPing;
+                            }
+                        }
+                    }
+
+                    socket.doTransport(to, pendingQueue);
+                } catch (Exception e) {
+                    if (closing) {
+                        if (LOG.isDebugEnabled()) {
+                            // closing so this is expected
+                            LOG.debug("An exception was thrown while closing send thread for session 0x"
+                                    + Long.toHexString(getSessionId())
+                                    + " : " + e.getMessage());
+                        }
+                        break;
+                    } else {
+                        // this is ugly, you have a better way speak up
+                        if (e instanceof SessionExpiredException) {
+                            LOG.info(e.getMessage() + ", closing socket connection");
+                        } else if (e instanceof SessionTimeoutException) {
+                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
+                        } else if (e instanceof EndOfStreamException) {
+                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
+                        } else {
+                            LOG.warn("Session 0x"
+                                    + Long.toHexString(getSessionId())
+                                    + " for server "
+                                    // TODO: this is different in Netty
+                                            // and NIO
+                                            // Netty:
+                                            // + getRemoteSocketAddress()
+                                            // NIO:
+                                            // +
+                                            // ((SocketChannel)sockKey.channel())
+                                            // .socket().getRemoteSocketAddress()
+                                    + socket.getRemoteSocketAddress()
+                                    + ", unexpected error"
+                                    + RETRY_CONN_MSG,
+                                    e);
+                        }
+                        socket.cleanup();
+                        if (state.isAlive()) {
+                            eventThread.queueEvent(new WatchedEvent(
+                                    Event.EventType.None,
+                                    Event.KeeperState.Disconnected,
+                                    null));
+                        }
+                        socket.updateNow();
+                        socket.updateLastSendAndHeard();
+                    }
+                } // catch
+            } // while
+            socket.cleanup();
+            socket.close();
+            if (state.isAlive()) {
+                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+                        Event.KeeperState.Disconnected, null));
             }
-            ByteBufferInputStream bbis = new ByteBufferInputStream(
-                    incomingBuffer);
-            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-            ConnectResponse conRsp = new ConnectResponse();
-            conRsp.deserialize(bbia, "connect");
-            negotiatedSessionTimeout = conRsp.getTimeOut();
+            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
+                    "SendThread exitedloop.");
+        }
+
+        // TODO: can not name this method getState since Thread.getState()
+        // already exists
+        // It would be cleaner to make class SendThread an implementation of
+        // Runnable
+        /**
+         * Used by ClientCnxnSocket
+         * 
+         * @return
+         */
+        ZooKeeper.States getZkState() {
+            return state;
+        }
+
+        ClientCnxnSocket getSocket() {
+            return socket;
+        }
+
+        void primeConnection() throws IOException {
+            LOG.info("Socket connection established to "
+                    + socket.getRemoteSocketAddress() 
+                    + ", initiating session");
+            lastConnectIndex = currentConnectIndex;
+            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
+                    sessionTimeout, sessionId, sessionPasswd);
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+            boa.writeInt(-1, "len");
+            conReq.serialize(boa, "connect");
+            baos.close();
+            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+            bb.putInt(bb.capacity() - 4);
+            bb.rewind();
+            synchronized (outgoingQueue) {
+                // We add backwards since we are pushing into the front
+                // Only send if there's a pending watch
+                // TODO: here we have the only remaining use of zooKeeper in
+                // this class. It's to be eliminated!
+                if (!disableAutoWatchReset &&
+                        (!zooKeeper.getDataWatches().isEmpty()
+                         || !zooKeeper.getExistWatches().isEmpty()
+                         || !zooKeeper.getChildWatches().isEmpty()))
+                {
+                    SetWatches sw = new SetWatches(lastZxid,
+                            zooKeeper.getDataWatches(),
+                            zooKeeper.getExistWatches(),
+                            zooKeeper.getChildWatches());
+                    RequestHeader h = new RequestHeader();
+                    h.setType(ZooDefs.OpCode.setWatches);
+                    h.setXid(-8);
+                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
+                                null);
+                    outgoingQueue.addFirst(packet);
+                }
+
+                synchronized (authInfo) {
+                    for (AuthData id : authInfo) {
+                        outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
+                                OpCode.auth), null, new AuthPacket(0,
+                                id.scheme, id.data), null, null, null));
+                    }
+                }
+
+                outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
+                        null)));
+            }
+            synchronized(socket) {
+                socket.enableReadWriteOnly();
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Session establishment request sent on "
+                        + socket.getRemoteSocketAddress());
+            }
+        }
+
+        private void sendPing() {
+            lastPingSentNs = System.nanoTime();
+            RequestHeader h = new RequestHeader(-2, OpCode.ping);
+            queuePacket(h, null, null, null, null, null, null, null, null);
+        }
+
+        private void startConnect() throws IOException {
+            if (lastConnectIndex == -1) {
+                // We don't want to delay the first try at a connect, so we
+                // start with -1 the first time around
+                lastConnectIndex = 0;
+            } else {
+                try {
+                    Thread.sleep(r.nextInt(1000));
+                } catch (InterruptedException e1) {
+                    LOG.warn("Unexpected exception", e1);
+                }
+                if (nextAddrToTry == lastConnectIndex) {
+                    try {
+                        // Try not to spin too fast!
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        LOG.warn("Unexpected exception", e);
+                    }
+                }
+            }
+            state = States.CONNECTING;
+            currentConnectIndex = nextAddrToTry;
+            InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
+            nextAddrToTry++;
+            if (nextAddrToTry == serverAddrs.size()) {
+                nextAddrToTry = 0;
+            }
+            LOG.info("Opening socket connection to server " + addr);
+            
+            setName(getName().replaceAll("\\(.*\\)",
+                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
+
+            socket.connect(addr);
+        }
+
+        private static final String RETRY_CONN_MSG =
+            ", closing socket connection and attempting reconnect";
+
+        void cleanup() {
+            synchronized (pendingQueue) {
+                for (Packet p : pendingQueue) {
+                    conLossPacket(p);
+                }
+                pendingQueue.clear();
+            }
+            synchronized (outgoingQueue) {
+                for (Packet p : outgoingQueue) {
+                    conLossPacket(p);
+                }
+                outgoingQueue.clear();
+            }
+        }
+
+        void onConnected(int _negotiatedSessionTimeout, long _sessionId,
+                byte[] _sessionPasswd) throws IOException {
+            negotiatedSessionTimeout = _negotiatedSessionTimeout;
             if (negotiatedSessionTimeout <= 0) {
-                zooKeeper.state = States.CLOSED;
+                state = States.CLOSED;
 
                 eventThread.queueEvent(new WatchedEvent(
                         Watcher.Event.EventType.None,
@@ -716,12 +897,11 @@
             }
             readTimeout = negotiatedSessionTimeout * 2 / 3;
             connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
-            sessionId = conRsp.getSessionId();
-            sessionPasswd = conRsp.getPasswd();
-            zooKeeper.state = States.CONNECTED;
+            sessionId = _sessionId;
+            sessionPasswd = _sessionPasswd;
+            state = States.CONNECTED;
             LOG.info("Session establishment complete on server "
-                    + ((SocketChannel)sockKey.channel())
-                        .socket().getRemoteSocketAddress()
+                    + socket.getRemoteSocketAddress() 
                     + ", sessionid = 0x"
                     + Long.toHexString(sessionId)
                     + ", negotiated timeout = " + negotiatedSessionTimeout);
@@ -729,7 +909,7 @@
                     Watcher.Event.KeeperState.SyncConnected, null));
         }
 
-        void readResponse() throws IOException {
+        void readResponse(ByteBuffer incomingBuffer) throws IOException {
             ByteBufferInputStream bbis = new ByteBufferInputStream(
                     incomingBuffer);
             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
@@ -750,7 +930,7 @@
             if (replyHdr.getXid() == -4) {
                 // -4 is the xid for AuthPacket               
                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
-                    zooKeeper.state = States.AUTH_FAILED;                    
+                    state = States.AUTH_FAILED;                    
                     eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                             Watcher.Event.KeeperState.AuthFailed, null) );            		            		
                 }
@@ -787,12 +967,12 @@
                 eventThread.queueEvent( we );
                 return;
             }
-            if (pendingQueue.size() == 0) {
-                throw new IOException("Nothing in the queue, but got "
-                        + replyHdr.getXid());
-            }
-            Packet packet = null;
+            Packet packet;
             synchronized (pendingQueue) {
+                if (pendingQueue.size() == 0) {
+                    throw new IOException("Nothing in the queue, but got "
+                            + replyHdr.getXid());
+                }
                 packet = pendingQueue.remove();
             }
             /*
@@ -803,9 +983,13 @@
                 if (packet.header.getXid() != replyHdr.getXid()) {
                     packet.replyHeader.setErr(
                             KeeperException.Code.CONNECTIONLOSS.intValue());
-                    throw new IOException("Xid out of order. Got "
-                            + replyHdr.getXid() + " expected "
-                            + packet.header.getXid());
+                    throw new IOException("Xid out of order. Got Xid "
+                            + replyHdr.getXid() + " with err " +
+                            + replyHdr.getErr() +
+                            " expected Xid "
+                            + packet.header.getXid()
+                            + " for a packet with details: "
+                            + packet );
                 }
 
                 packet.replyHeader.setXid(replyHdr.getXid());
@@ -827,426 +1011,17 @@
             }
         }
 
-        /**
-         * @return true if a packet was received
-         * @throws InterruptedException
-         * @throws IOException
-         */
-        boolean doIO() throws InterruptedException, IOException {
-            boolean packetReceived = false;
-            SocketChannel sock = (SocketChannel) sockKey.channel();
-            if (sock == null) {
-                throw new IOException("Socket is null!");
+        void close() {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("close called sessionId:0x"
+                        + Long.toHexString(sessionId));
             }
-            if (sockKey.isReadable()) {
-                int rc = sock.read(incomingBuffer);
-                if (rc < 0) {
-                    throw new EndOfStreamException(
-                            "Unable to read additional data from server sessionid 0x"
-                            + Long.toHexString(sessionId)
-                            + ", likely server has closed socket");
-                }
-                if (!incomingBuffer.hasRemaining()) {
-                    incomingBuffer.flip();
-                    if (incomingBuffer == lenBuffer) {
-                        recvCount++;
-                        readLength();
-                    } else if (!initialized) {
-                        readConnectResult();
-                        enableRead();
-                        if (!outgoingQueue.isEmpty()) {
-                            enableWrite();
-                        }
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
-                        packetReceived = true;
-                        initialized = true;
-                    } else {
-                        readResponse();
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
-                        packetReceived = true;
-                    }
-                }
-            }
-            if (sockKey.isWritable()) {
-                synchronized (outgoingQueue) {
-                    if (!outgoingQueue.isEmpty()) {
-                        ByteBuffer pbb = outgoingQueue.getFirst().bb;
-                        sock.write(pbb);
-                        if (!pbb.hasRemaining()) {
-                            sentCount++;
-                            Packet p = outgoingQueue.removeFirst();
-                            if (p.header != null
-                                    && p.header.getType() != OpCode.ping
-                                    && p.header.getType() != OpCode.auth) {
-                                pendingQueue.add(p);
-                            }
-                        }
-                    }
-                }
-            }
-            if (outgoingQueue.isEmpty()) {
-                disableWrite();
-            } else {
-                enableWrite();
-            }
-            return packetReceived;
+            state = States.CLOSED;
+            socket.wakeupCnxn();
         }
 
-        synchronized private void enableWrite() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_WRITE) == 0) {
-                sockKey.interestOps(i | SelectionKey.OP_WRITE);
-            }
-        }
-
-        synchronized private void disableWrite() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_WRITE) != 0) {
-                sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
-            }
-        }
-
-        synchronized private void enableRead() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_READ) == 0) {
-                sockKey.interestOps(i | SelectionKey.OP_READ);
-            }
-        }
-
-        synchronized private void disableRead() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_READ) != 0) {
-                sockKey.interestOps(i & (~SelectionKey.OP_READ));
-            }
-        }
-
-        SendThread() {
-            super(makeThreadName("-SendThread()"));
-            zooKeeper.state = States.CONNECTING;
-            setUncaughtExceptionHandler(uncaughtExceptionHandler);
-            setDaemon(true);
-        }
-
-        private void primeConnection(SelectionKey k) throws IOException {
-            LOG.info("Socket connection established to "
-                    + ((SocketChannel)sockKey.channel())
-                        .socket().getRemoteSocketAddress()
-                    + ", initiating session");
-            lastConnectIndex = currentConnectIndex;
-            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
-                    sessionTimeout, sessionId, sessionPasswd);
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-            boa.writeInt(-1, "len");
-            conReq.serialize(boa, "connect");
-            baos.close();
-            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
-            bb.putInt(bb.capacity() - 4);
-            bb.rewind();
-            synchronized (outgoingQueue) {
-                // We add backwards since we are pushing into the front
-                // Only send if there's a pending watch
-                if (!disableAutoWatchReset &&
-                        (!zooKeeper.getDataWatches().isEmpty()
-                         || !zooKeeper.getExistWatches().isEmpty()
-                         || !zooKeeper.getChildWatches().isEmpty()))
-                {
-                    SetWatches sw = new SetWatches(lastZxid,
-                            zooKeeper.getDataWatches(),
-                            zooKeeper.getExistWatches(),
-                            zooKeeper.getChildWatches());
-                    RequestHeader h = new RequestHeader();
-                    h.setType(ZooDefs.OpCode.setWatches);
-                    h.setXid(-8);
-                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
-                                null);
-                    outgoingQueue.addFirst(packet);
-                }
-
-                for (AuthData id : authInfo) {
-                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
-                            OpCode.auth), null, new AuthPacket(0, id.scheme,
-                            id.data), null, null, null));
-                }
-                outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
-                        null)));
-            }
-            synchronized (this) {
-                k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Session establishment request sent on "
-                        + ((SocketChannel)sockKey.channel())
-                            .socket().getRemoteSocketAddress());
-            }
-        }
-
-        private void sendPing() {
-            lastPingSentNs = System.nanoTime();
-            RequestHeader h = new RequestHeader(-2, OpCode.ping);
-            queuePacket(h, null, null, null, null, null, null, null, null);
-        }
-
-        int lastConnectIndex = -1;
-
-        int currentConnectIndex;
-
-        Random r = new Random(System.nanoTime());
-
-        private void startConnect() throws IOException {
-            if (lastConnectIndex == -1) {
-                // We don't want to delay the first try at a connect, so we
-                // start with -1 the first time around
-                lastConnectIndex = 0;
-            } else {
-                try {
-                    Thread.sleep(r.nextInt(1000));
-                } catch (InterruptedException e1) {
-                    LOG.warn("Unexpected exception", e1);
-                }
-                if (nextAddrToTry == lastConnectIndex) {
-                    try {
-                        // Try not to spin too fast!
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e) {
-                        LOG.warn("Unexpected exception", e);
-                    }
-                }
-            }
-            zooKeeper.state = States.CONNECTING;
-            currentConnectIndex = nextAddrToTry;
-            InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
-            nextAddrToTry++;
-            if (nextAddrToTry == serverAddrs.size()) {
-                nextAddrToTry = 0;
-            }
-            LOG.info("Opening socket connection to server " + addr);
-            SocketChannel sock;
-            sock = SocketChannel.open();
-            sock.configureBlocking(false);
-            sock.socket().setSoLinger(false, -1);
-            sock.socket().setTcpNoDelay(true);
-            setName(getName().replaceAll("\\(.*\\)",
-                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
-            sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
-            if (sock.connect(addr)) {
-                primeConnection(sockKey);
-            }
-            initialized = false;
-
-            /*
-             * Reset incomingBuffer
-             */
-            lenBuffer.clear();
-            incomingBuffer = lenBuffer;
-        }
-
-        private static final String RETRY_CONN_MSG =
-            ", closing socket connection and attempting reconnect";
-        
-        @Override
-        public void run() {
-            long now = System.currentTimeMillis();
-            long lastHeard = now;
-            long lastSend = now;
-            while (zooKeeper.state.isAlive()) {
-                try {
-                    if (sockKey == null) {
-                        // don't re-establish connection if we are closing
-                        if (closing) {
-                            break;
-                        }
-                        startConnect();
-                        lastSend = now;
-                        lastHeard = now;
-                    }
-                    int idleRecv = (int) (now - lastHeard);
-                    int idleSend = (int) (now - lastSend);
-                    int to = readTimeout - idleRecv;
-                    if (zooKeeper.state != States.CONNECTED) {
-                        to = connectTimeout - idleRecv;
-                    }
-                    if (to <= 0) {
-                        throw new SessionTimeoutException(
-                                "Client session timed out, have not heard from server in "
-                                + idleRecv + "ms"
-                                + " for sessionid 0x"
-                                + Long.toHexString(sessionId));
-                    }
-                    if (zooKeeper.state == States.CONNECTED) {
-                        int timeToNextPing = readTimeout/2 - idleSend;
-                        if (timeToNextPing <= 0) {
-                            sendPing();
-                            lastSend = now;
-                            enableWrite();
-                        } else {
-                            if (timeToNextPing < to) {
-                                to = timeToNextPing;
-                            }
-                        }
-                    }
-
-                    selector.select(to);
-                    Set<SelectionKey> selected;
-                    synchronized (this) {
-                        selected = selector.selectedKeys();
-                    }
-                    // Everything below and until we get back to the select is
-                    // non blocking, so time is effectively a constant. That is
-                    // Why we just have to do this once, here
-                    now = System.currentTimeMillis();
-                    for (SelectionKey k : selected) {
-                        SocketChannel sc = ((SocketChannel) k.channel());
-                        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
-                            if (sc.finishConnect()) {
-                                lastHeard = now;
-                                lastSend = now;
-                                primeConnection(k);
-                            }
-                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                            if (outgoingQueue.size() > 0) {
-                                // We have something to send so it's the same
-                                // as if we do the send now.
-                                lastSend = now;
-                            }
-                            if (doIO()) {
-                                lastHeard = now;
-                            }
-                        }
-                    }
-                    if (zooKeeper.state == States.CONNECTED) {
-                        if (outgoingQueue.size() > 0) {
-                            enableWrite();
-                        } else {
-                            disableWrite();
-                        }
-                    }
-                    selected.clear();
-                } catch (Exception e) {
-                    if (closing) {
-                        if (LOG.isDebugEnabled()) {
-                            // closing so this is expected
-                            LOG.debug("An exception was thrown while closing send thread for session 0x"
-                                    + Long.toHexString(getSessionId())
-                                    + " : " + e.getMessage());
-                        }
-                        break;
-                    } else {
-                        // this is ugly, you have a better way speak up
-                        if (e instanceof SessionExpiredException) {
-                            LOG.info(e.getMessage() + ", closing socket connection");
-                        } else if (e instanceof SessionTimeoutException) {
-                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
-                        } else if (e instanceof EndOfStreamException) {
-                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
-                        } else {
-                            LOG.warn("Session 0x"
-                                    + Long.toHexString(getSessionId())
-                                    + " for server "
-                                    + ((SocketChannel)sockKey.channel())
-                                        .socket().getRemoteSocketAddress()
-                                    + ", unexpected error"
-                                    + RETRY_CONN_MSG,
-                                    e);
-                        }
-                        cleanup();
-                        if (zooKeeper.state.isAlive()) {
-                            eventThread.queueEvent(new WatchedEvent(
-                                    Event.EventType.None,
-                                    Event.KeeperState.Disconnected,
-                                    null));
-                        }
-
-                        now = System.currentTimeMillis();
-                        lastHeard = now;
-                        lastSend = now;
-                    }
-                }
-            }
-            cleanup();
-            try {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Doing client selector close");
-                }
-                selector.close();
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Closed client selector");
-                }
-            } catch (IOException e) {
-                LOG.warn("Ignoring exception during selector close", e);
-            }
-            if (zooKeeper.state.isAlive()) {
-                eventThread.queueEvent(new WatchedEvent(
-                        Event.EventType.None,
-                        Event.KeeperState.Disconnected,
-                        null));
-            }
-            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                                     "SendThread exitedloop.");
-        }
-
-        private void cleanup() {
-            if (sockKey != null) {
-                SocketChannel sock = (SocketChannel) sockKey.channel();
-                sockKey.cancel();
-                try {
-                    sock.socket().shutdownInput();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during shutdown input", e);
-                    }
-                }
-                try {
-                    sock.socket().shutdownOutput();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during shutdown output", e);
-                    }
-                }
-                try {
-                    sock.socket().close();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during socket close", e);
-                    }
-                }
-                try {
-                    sock.close();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during channel close", e);
-                    }
-                }
-            }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("SendThread interrupted during sleep, ignoring");
-                }
-            }
-            sockKey = null;
-            synchronized (pendingQueue) {
-                for (Packet p : pendingQueue) {
-                    conLossPacket(p);
-                }
-                pendingQueue.clear();
-            }
-            synchronized (outgoingQueue) {
-                for (Packet p : outgoingQueue) {
-                    conLossPacket(p);
-                }
-                outgoingQueue.clear();
-            }
-        }
-
-        public void close() {
-            zooKeeper.state = States.CLOSED;
-            synchronized (this) {
-                selector.wakeup();
-            }
+        void testableCloseSocket() throws IOException {
+            socket.testableCloseSocket();
         }
     }
 
@@ -1292,6 +1067,8 @@
 
     private int xid = 1;
 
+    private volatile States state;
+
     synchronized private int getXid() {
         return xid++;
     }
@@ -1325,7 +1102,7 @@
             packet.ctx = ctx;
             packet.clientPath = clientPath;
             packet.serverPath = serverPath;
-            if (!zooKeeper.state.isAlive() || closing) {
+            if (!state.isAlive() || closing) {
                 conLossPacket(packet);
             } else {
                 // If the client is asking to close the session then
@@ -1336,19 +1113,23 @@
                 outgoingQueue.add(packet);
             }
         }
-        synchronized (sendThread) {
-            selector.wakeup();
-        }
+        sendThread.getSocket().wakeupCnxn();
         return packet;
     }
 
     public void addAuthInfo(String scheme, byte auth[]) {
-        if (!zooKeeper.state.isAlive()) {
+        if (!state.isAlive()) {
             return;
         }
-        authInfo.add(new AuthData(scheme, auth));
+        synchronized (authInfo) {
+            authInfo.add(new AuthData(scheme, auth));
+        }
         queuePacket(new RequestHeader(-4, OpCode.auth), null,
                 new AuthPacket(0, scheme, auth), null, null, null, null,
                 null, null);
     }
+
+    States getState() {
+        return state;
+    }
 }
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
new file mode 100644
index 0000000..de12b4f
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
@@ -0,0 +1,126 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+
+abstract class ClientCnxnSocket {
+    private static final Logger LOG = Logger.getLogger(ClientCnxnSocket.class);
+
+    protected boolean initialized;
+
+    protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
+
+    protected ByteBuffer incomingBuffer = lenBuffer;
+    protected long sentCount = 0;
+    protected long recvCount = 0;
+    protected long lastHeard;
+    protected long lastSend;
+    protected long now;
+    protected ClientCnxn.SendThread sendThread;
+    protected LinkedList<ClientCnxn.Packet> outgoingQueue;
+    /**
+     * The sessionId is only available here for Log and Exception messages.
+     * Otherwise the socket doesn't know it.
+     */
+    protected long sessionId;
+
+    void introduce(ClientCnxn.SendThread sendThread,
+            LinkedList<ClientCnxn.Packet> outgoingQueue, long sessionId) {
+        this.sendThread = sendThread;
+        this.outgoingQueue = outgoingQueue;
+        this.sessionId = sessionId;
+    }
+
+    void updateNow() {
+        now = System.currentTimeMillis();
+    }
+
+    int getIdleRecv() {
+        return (int) (now - lastHeard);
+    }
+
+    int getIdleSend() {
+        return (int) (now - lastSend);
+    }
+
+    long getSentCount() {
+        return sentCount;
+    }
+
+    long getRecvCount() {
+        return recvCount;
+    }
+
+    void updateLastHeard() {
+        this.lastHeard = now;
+    }
+
+    void updateLastSend() {
+        this.lastSend = now;
+    }
+
+    void updateLastSendAndHeard() {
+        this.lastSend = now;
+        this.lastHeard = now;
+    }
+
+    protected void readLength() throws IOException {
+        int len = incomingBuffer.getInt();
+        if (len < 0 || len >= ClientCnxn.packetLen) {
+            throw new IOException("Packet len" + len + " is out of range!");
+        }
+        incomingBuffer = ByteBuffer.allocate(len);
+    }
+
+    void readConnectResult() throws IOException {
+        if (LOG.isTraceEnabled()) {
+            StringBuffer buf = new StringBuffer("0x[");
+            for (byte b : incomingBuffer.array()) {
+                buf.append(Integer.toHexString(b) + ",");
+            }
+            buf.append("]");
+            LOG.trace("readConnectRestult " + incomingBuffer.remaining() + " "
+                    + buf.toString());
+        }
+        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
+        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
+        ConnectResponse conRsp = new ConnectResponse();
+        conRsp.deserialize(bbia, "connect");
+        this.sessionId = conRsp.getSessionId();
+        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
+                conRsp.getPasswd());
+    }
+
+    abstract boolean isConnected();
+
+    abstract void connect(InetSocketAddress addr) throws IOException;
+
+    abstract SocketAddress getRemoteSocketAddress();
+
+    abstract SocketAddress getLocalSocketAddress();
+
+    abstract void cleanup();
+
+    abstract void close();
+
+    abstract void wakeupCnxn();
+
+    abstract void enableWrite();
+
+    abstract void enableReadWriteOnly();
+
+    abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+            throws IOException;
+
+    abstract void testableCloseSocket() throws IOException;
+}
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
new file mode 100644
index 0000000..242e33a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -0,0 +1,303 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ClientCnxnSocketNIO extends ClientCnxnSocket {
+    private static final Logger LOG = Logger
+            .getLogger(ClientCnxnSocketNIO.class);
+
+    private final Selector selector = Selector.open();
+
+    private SelectionKey sockKey;
+
+    ClientCnxnSocketNIO() throws IOException {
+        super();
+    }
+
+    @Override
+    boolean isConnected() {
+        return sockKey != null;
+    }
+    
+    /**
+     * @return true if a packet was received
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    boolean doIO(List<Packet> pendingQueue) throws IOException {
+        boolean packetReceived = false;
+        SocketChannel sock = (SocketChannel) sockKey.channel();
+        if (sock == null) {
+            throw new IOException("Socket is null!");
+        }
+        if (sockKey.isReadable()) {
+            int rc = sock.read(incomingBuffer);
+            if (rc < 0) {
+                throw new EndOfStreamException(
+                        "Unable to read additional data from server sessionid 0x"
+                        + Long.toHexString(sessionId)
+                        + ", likely server has closed socket");
+            }
+            if (!incomingBuffer.hasRemaining()) {
+                incomingBuffer.flip();
+                if (incomingBuffer == lenBuffer) {
+                    recvCount++;
+                    readLength();
+                } else if (!initialized) {
+                    readConnectResult();
+                    enableRead();
+                    if (!outgoingQueue.isEmpty()) {
+                        enableWrite();
+                    }
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    packetReceived = true;
+                    initialized = true;
+                } else {
+                    sendThread.readResponse(incomingBuffer);
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    packetReceived = true;
+                }
+            }
+        }
+        if (sockKey.isWritable()) {
+            synchronized (outgoingQueue) {
+                if (!outgoingQueue.isEmpty()) {
+                    ByteBuffer pbb = outgoingQueue.getFirst().bb;
+                    sock.write(pbb);
+                    if (!pbb.hasRemaining()) {
+                        sentCount++;
+                        Packet p = outgoingQueue.removeFirst();
+                        if (p.header != null
+                                && p.header.getType() != OpCode.ping
+                                && p.header.getType() != OpCode.auth) {
+                            pendingQueue.add(p);
+                        }
+                    }
+                }
+            }
+        }
+        if (outgoingQueue.isEmpty()) {
+            disableWrite();
+        } else {
+            enableWrite();
+        }
+        return packetReceived;
+    }
+
+    @Override
+    void cleanup() {
+        if (sockKey != null) {
+            SocketChannel sock = (SocketChannel) sockKey.channel();
+            sockKey.cancel();
+            try {
+                sock.socket().shutdownInput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown input", e);
+                }
+            }
+            try {
+                sock.socket().shutdownOutput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown output", e);
+                }
+            }
+            try {
+                sock.socket().close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during socket close", e);
+                }
+            }
+            try {
+                sock.close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during channel close", e);
+                }
+            }
+        }
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("SendThread interrupted during sleep, ignoring");
+            }
+        }
+        sockKey = null;
+        sendThread.cleanup();
+    }
+
+    @Override
+    void close() {
+        try {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Doing client selector close");
+            }
+            selector.close();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Closed client selector");
+            }
+        } catch (IOException e) {
+            LOG.warn("Ignoring exception during selector close", e);
+        }
+    }
+
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        SocketChannel sock;
+        sock = SocketChannel.open();
+        sock.configureBlocking(false);
+        sock.socket().setSoLinger(false, -1);
+        sock.socket().setTcpNoDelay(true);
+        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
+        if (sock.connect(addr)) {
+            sendThread.primeConnection();
+        }
+        initialized = false;
+
+        /*
+         * Reset incomingBuffer
+         */
+        lenBuffer.clear();
+        incomingBuffer = lenBuffer;
+    }
+
+    /**
+     * Returns the address to which the socket is connected.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getRemoteSocketAddress() {
+        // a lot could go wrong here, so rather than put in a bunch of code
+        // to check for nulls all down the chain let's do it the simple
+        // yet bulletproof way
+        try {
+            return ((SocketChannel) sockKey.channel()).socket()
+                    .getRemoteSocketAddress();
+        } catch (NullPointerException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the local address to which the socket is bound.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        // a lot could go wrong here, so rather than put in a bunch of code
+        // to check for nulls all down the chain let's do it the simple
+        // yet bulletproof way
+        try {
+            return ((SocketChannel) sockKey.channel()).socket()
+                    .getLocalSocketAddress();
+        } catch (NullPointerException e) {
+            return null;
+        }
+    }
+
+    @Override
+    void wakeupCnxn() {
+        synchronized (this) {
+            selector.wakeup();
+        }
+    }
+
+    @Override
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+            throws IOException {
+        selector.select(waitTimeOut);
+
+        Set<SelectionKey> selected;
+        synchronized (this) {
+            selected = selector.selectedKeys();
+        }
+        // Everything below and until we get back to the select is
+        // non blocking, so time is effectively a constant. That is
+        // Why we just have to do this once, here
+        updateNow();
+
+        for (SelectionKey k : selected) {
+            SocketChannel sc = ((SocketChannel) k.channel());
+            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
+                if (sc.finishConnect()) {
+                    updateLastSendAndHeard();
+                    sendThread.primeConnection();
+                }
+            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+                if (outgoingQueue.size() > 0) {
+                    // We have something to send so it's the same
+                    // as if we do the send now.
+                    updateLastSend();
+                }
+                if (doIO(pendingQueue)) {
+                    updateLastHeard();
+                }
+            }
+        }
+        if (sendThread.getZkState() == States.CONNECTED) {
+            if (outgoingQueue.size() > 0) {
+                enableWrite();
+            } else {
+                disableWrite();
+            }
+        }
+        selected.clear();
+    }
+
+    @Override
+    void testableCloseSocket() throws IOException {
+        LOG.info("testableCloseSocket() called");
+        ((SocketChannel) sockKey.channel()).socket().close();
+    }
+
+    @Override
+    synchronized void enableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_WRITE);
+        }
+    }
+
+    synchronized private void disableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) != 0) {
+            sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
+        }
+    }
+
+    synchronized private void enableRead() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_READ) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_READ);
+        }
+    }
+
+    // TODO: Why isn't that synchronized like the others?
+    @Override
+    void enableReadWriteOnly() {
+        sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+    }
+}
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
new file mode 100644
index 0000000..3c564f3
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
@@ -0,0 +1,299 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelState;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+public class ClientCnxnSocketNetty extends ClientCnxnSocket {
+    private static final Logger LOG = Logger
+            .getLogger(ClientCnxnSocketNetty.class);
+
+    private Channel channel;
+
+    private ChannelFactory factory;
+
+    private boolean disconnected;
+
+    @Override
+    boolean isConnected() {
+        return channel != null;
+    }
+
+    private boolean doWrites(List<Packet> pendingQueue) {
+        boolean written = false;
+        while (!outgoingQueue.isEmpty() && channel.isWritable()) {
+            Packet p;
+            synchronized(outgoingQueue){
+                p = outgoingQueue.removeFirst();                
+            }
+
+            ByteBuffer pbb = p.bb;
+            ChannelFuture write;
+            synchronized(pendingQueue){
+                write = channel.write(ChannelBuffers
+                        .copiedBuffer(pbb));
+                pbb.position(pbb.limit());
+                if (p.header != null && p.header.getType() != OpCode.ping
+                        && p.header.getType() != OpCode.auth) {
+                    pendingQueue.add(p);
+                }               
+            }
+            if (p.header != null && p.header.getType() == OpCode.closeSession) {
+                // ensure that the close session is sent before
+                // we close the channel
+                write.awaitUninterruptibly();
+            }
+
+            written = true;
+            sentCount++;
+        }
+        return written;
+    }
+
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        factory = new NioClientSocketChannelFactory(
+                Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool());
+
+        ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() {
+                return Channels.pipeline(new ZKClientHandler());
+            }
+        });
+
+        bootstrap.setOption("soLinger", -1);
+        bootstrap.setOption("tcpNoDelay", true);
+
+        disconnected = false;
+        bootstrap.connect(addr);
+    }
+
+    @Override
+    void enableReadWriteOnly() {
+
+    }
+
+    /**
+     * Returns the address to which the socket is connected.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    public SocketAddress getRemoteSocketAddress() {
+        if (channel == null) {
+            return null;
+        }
+
+        return channel.getRemoteAddress();
+    }
+
+    /**
+     * Returns the local address to which the socket is bound.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        if (channel == null) {
+            return null;
+        }
+
+        return channel.getLocalAddress();
+    }
+
+    @Override
+    void cleanup() {
+        if (channel != null) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("cleanup closing sessionId:0x"
+                        + Long.toHexString(sessionId));
+            }
+            channel.close().awaitUninterruptibly();
+        }
+        channel = null;
+        if (factory != null) {
+            factory.releaseExternalResources();
+        }
+        sendThread.cleanup();
+    }
+
+    @Override
+    void close() {
+        // NO-OP
+    }
+
+    @Override
+    void testableCloseSocket() throws IOException {
+        LOG.info("testableCloseSocket() called");
+        channel.disconnect().awaitUninterruptibly();
+    }
+
+    @Override
+    void wakeupCnxn() {
+        synchronized (outgoingQueue) {
+            outgoingQueue.notifyAll();
+        }
+    }
+
+    @Override
+    void enableWrite() {
+
+    }
+
+    @Override
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+            throws EndOfStreamException {
+        if (disconnected) {
+            throw new EndOfStreamException("connection for sessionid 0x"
+                    + Long.toHexString(sessionId)
+                    + " lost, likely server has closed socket");
+
+        }
+
+        // channel may not have been connected yet
+        if (isConnected() && doWrites(pendingQueue)) {
+            updateLastSend();
+        }
+
+        if (sendThread.getZkState().isAlive()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("WAIT to=" + waitTimeOut + " sessionId:0x"
+                        + Long.toHexString(sessionId));
+            }
+            try {
+                synchronized (outgoingQueue) {
+                    outgoingQueue.wait(waitTimeOut);
+                }
+            } catch (InterruptedException e) {
+                LOG.trace("WOKE via interrupt sessionId:0x"
+                        + Long.toHexString(sessionId));
+            } finally {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("WOKE sessionId:0x" + Long.toHexString(sessionId));
+                }
+            }
+
+        }
+        // Everything below and until we get back to the wait is
+        // non blocking, so time is effectively a constant. That is
+        // Why we just have to do this once, here
+        updateNow();
+    }
+
+    private class ZKClientHandler extends SimpleChannelHandler {
+
+        @Override
+        public void channelDisconnected(ChannelHandlerContext ctx,
+                ChannelStateEvent e) throws Exception {
+            disconnected = true;
+            wakeupCnxn();
+        }
+
+        @Override
+        public void channelConnected(ChannelHandlerContext ctx,
+                ChannelStateEvent e) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Channel connected " + e);
+            }
+            channel = ctx.getChannel();
+
+            long now = System.currentTimeMillis();
+
+            lastHeard = now;
+            lastSend = now;
+
+            sendThread.primeConnection();
+
+            initialized = false;
+
+            /*
+             * Reset incomingBuffer
+             */
+            lenBuffer.clear();
+            incomingBuffer = lenBuffer;
+
+            wakeupCnxn();
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+                throws IOException {
+            lastHeard = System.currentTimeMillis();
+
+            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+            while (buf.readable()) {
+                if (incomingBuffer.remaining() > buf.readableBytes()) {
+                    int newLimit = incomingBuffer.position()
+                            + buf.readableBytes();
+                    incomingBuffer.limit(newLimit);
+                }
+                buf.readBytes(incomingBuffer);
+                incomingBuffer.limit(incomingBuffer.capacity());
+
+                if (!incomingBuffer.hasRemaining()) {
+                    incomingBuffer.flip();
+                    if (incomingBuffer == lenBuffer) {
+                        recvCount++;
+                        readLength();
+                    } else if (!initialized) {
+                        readConnectResult();
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                        initialized = true;
+                    } else {
+                        synchronized (outgoingQueue) {
+                            sendThread.readResponse(incomingBuffer);                            
+                        }
+
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void channelInterestChanged(ChannelHandlerContext ctx,
+                ChannelStateEvent e) {
+            if (e.getState() == ChannelState.INTEREST_OPS) {
+                // handle the case where OP_WRITE changes
+                wakeupCnxn();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+            LOG.warn("Exception caught " + e, e.getCause());
+        }
+    }
+}
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java b/src/java/main/org/apache/zookeeper/ZooKeeper.java
index 48aab01..eb3eb5c 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeper.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java
@@ -105,6 +105,7 @@
  */
 public class ZooKeeper {
     private static final Logger LOG;
+    public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
 
     static {
         LOG = Logger.getLogger(ZooKeeper.class);
@@ -154,6 +155,7 @@
         /* (non-Javadoc)
          * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
          */
+        @Override
         public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                         Watcher.Event.EventType type,
                                         String clientPath)
@@ -322,8 +324,6 @@
         }
     }
 
-    volatile States state;
-
     protected final ClientCnxn cnxn;
 
     /**
@@ -376,7 +376,8 @@
                 + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
 
         watchManager.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager);
+        cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+                watchManager, getClientCnxnSocket());
         cnxn.start();
     }
 
@@ -443,8 +444,8 @@
                 + (sessionPasswd == null ? "<null>" : "<hidden>"));
 
         watchManager.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager,
-                sessionId, sessionPasswd);
+        cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+                watchManager, getClientCnxnSocket(), sessionId, sessionPasswd);
         cnxn.start();
     }
 
@@ -518,7 +519,7 @@
      * @throws InterruptedException
      */
     public synchronized void close() throws InterruptedException {
-        if (!state.isAlive()) {
+        if (!cnxn.getState().isAlive()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Close called on already closed client");
             }
@@ -1557,7 +1558,7 @@
     }
 
     public States getState() {
-        return state;
+        return cnxn.getState();
     }
 
     /**
@@ -1617,7 +1618,7 @@
      *         not connected
      */
     protected SocketAddress testableRemoteSocketAddress() {
-        return cnxn.getRemoteSocketAddress();
+        return cnxn.sendThread.getSocket().getRemoteSocketAddress();
     }
 
     /** 
@@ -1630,6 +1631,23 @@
      *         not connected
      */
     protected SocketAddress testableLocalSocketAddress() {
-        return cnxn.getLocalSocketAddress();
+        return cnxn.sendThread.getSocket().getLocalSocketAddress();
+    }
+
+    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
+        String clientCnxnSocketName = System
+                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        if (clientCnxnSocketName == null) {
+            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
+        }
+        try {
+            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
+                    .newInstance();
+        } catch (Exception e) {
+            IOException ioe = new IOException("Couldn't instantiate "
+                    + clientCnxnSocketName);
+            ioe.initCause(e);
+            throw ioe;
+        }
     }
 }
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeperMain.java b/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
index d8fad42..26a572a 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
@@ -653,7 +653,7 @@
         } 
         
         // Below commands all need a live connection
-        if (zk == null || !zk.state.isAlive()) {
+        if (zk == null || !zk.getState().isAlive()) {
             System.out.println("Not connected");
             return false;
         }
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 94d8393..9b1ce5f 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -24,7 +24,6 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.io.Writer;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index b66f400..0b2eb7d 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -34,7 +34,6 @@
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -61,7 +60,6 @@
      * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
      * this class gets access to the member variables and methods.
      */
-    @ChannelPipelineCoverage("all")
     class CnxnChannelHandler extends SimpleChannelHandler {
 
         @Override
diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml
index c09eb16..789ed3f 100644
--- a/src/java/test/config/findbugsExcludeFile.xml
+++ b/src/java/test/config/findbugsExcludeFile.xml
@@ -27,7 +27,12 @@
   <!-- We want to catch all exceptions and cleanup, regardless of source
        (incl runtime) -->
   <Match>
-    <Class name="org.apache.zookeeper.ClientCnxn$SendThread" />
+    <Class name="org.apache.zookeeper.NIOClientCnxn$NIOSendThread" />
+    <Method name="run" />
+    <Bug pattern="REC_CATCH_EXCEPTION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.zookeeper.NettyClientCnxn$NettySendThread" />
     <Method name="run" />
     <Bug pattern="REC_CATCH_EXCEPTION" />
   </Match>
@@ -79,6 +84,14 @@
     <Class name="org.apache.zookeeper.ClientCnxn"/>
       <Bug code="EI, EI2" />
   </Match>
+  <Match>
+    <Class name="org.apache.zookeeper.NIOClientCnxn"/>
+      <Bug code="EI, EI2" />
+  </Match>
+  <Match>
+    <Class name="org.apache.zookeeper.NettyClientCnxn"/>
+      <Bug code="EI, EI2" />
+  </Match>
 
   <Match>
     <Class name="org.apache.zookeeper.server.DataNode"/>
diff --git a/src/java/test/org/apache/zookeeper/TestableZooKeeper.java b/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
index 4de2098..f8344b6 100644
--- a/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
+++ b/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
@@ -59,7 +59,7 @@
                 synchronized(cnxn) {
                     try {
                         try {
-                            ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+                            cnxn.sendThread.testableCloseSocket();
                         } catch (IOException e) {
                             e.printStackTrace();
                         }
diff --git a/src/java/test/org/apache/zookeeper/ThreadUtil.java b/src/java/test/org/apache/zookeeper/ThreadUtil.java
new file mode 100644
index 0000000..c5045ca
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/ThreadUtil.java
@@ -0,0 +1,70 @@
+package org.apache.zookeeper;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ThreadUtil {
+
+    public static ThreadGroup getRootThreadGroup() {
+        ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+        ThreadGroup parentThreadGroup;
+        while ( null != (parentThreadGroup = threadGroup.getParent()) ){
+            threadGroup = parentThreadGroup;          
+        }
+        return threadGroup;
+    }
+    
+    public static Thread[] getAllThreads() {
+        final ThreadGroup root = getRootThreadGroup();
+        int arraySize = ManagementFactory.getThreadMXBean().getThreadCount();
+        int returnedThreads = 0;
+        Thread[] threads;
+        do {
+            arraySize *= 2;
+            threads = new Thread[arraySize];
+            returnedThreads = root.enumerate( threads, true );
+        } while ( returnedThreads >= arraySize );
+        return java.util.Arrays.copyOf( threads, returnedThreads );
+    }
+    
+    public static List<Thread> getThreadsFiltered(String pattern) {
+        Thread[] allThreads = getAllThreads();
+        ArrayList<Thread> filteredThreads = new ArrayList<Thread>();
+        
+ 
+        for(int i=0;i<allThreads.length;++i){
+            Thread currentThread = allThreads[i];
+            if(currentThread.getName().contains(pattern)){
+                filteredThreads.add(currentThread);
+            }
+        }
+        return filteredThreads;
+    }
+    
+    public static List<Thread> getThreadsFiltered(String pattern, Thread exclude){
+        List<Thread> filteredThreads = getThreadsFiltered(pattern);
+        filteredThreads.remove(exclude);
+        return filteredThreads;
+    }
+    
+    public static String formatThread(Thread thread){
+        StringBuilder out = new StringBuilder();
+        out.append("Name: ")
+           .append(thread.getName())
+           .append(" State: ")
+           .append(thread.getState())
+           .append(" Prio: ")
+           .append(thread.getPriority())
+           .append("\nTrace:\n");
+        
+        StackTraceElement[] trace = thread.getStackTrace();
+        
+        for(int i=0;i<trace.length;++i){
+            out.append(trace[i]).append("\n");
+        }
+           
+        return out.toString();
+    }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java b/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
index e7bff22..d183e69 100644
--- a/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
+++ b/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
@@ -181,7 +181,7 @@
         LOG.info("Stopping hammers");
         for (int i = 0; i < hammers.length; i++) {
             hammers[i].interrupt();
-            verifyThreadTerminated(hammers[i], 60000);
+            verifyThreadTerminated(hammers[i], i, 60000);
             Assert.assertFalse(hammers[i].failed);
         }
 
@@ -210,7 +210,7 @@
         bang = false;
         for (int i = 0; i < hammers.length; i++) {
             hammers[i].interrupt();
-            verifyThreadTerminated(hammers[i], 60000);
+            verifyThreadTerminated(hammers[i], i, 60000);
         }
         // before restart
         qb.verifyRootOfAllServersMatch(qb.hostPort);
diff --git a/src/java/test/org/apache/zookeeper/test/ClientBase.java b/src/java/test/org/apache/zookeeper/test/ClientBase.java
index 110a882..8655a17 100644
--- a/src/java/test/org/apache/zookeeper/test/ClientBase.java
+++ b/src/java/test/org/apache/zookeeper/test/ClientBase.java
@@ -43,6 +43,7 @@
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.ThreadUtil;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -298,15 +299,27 @@
         return false;
     }
 
-    static void verifyThreadTerminated(Thread thread, long millis)
+    static void verifyThreadTerminated(Thread thread, int index, long millis)
         throws InterruptedException
     {
+        long start = java.lang.System.currentTimeMillis();
         thread.join(millis);
+        long end = java.lang.System.currentTimeMillis();
         if (thread.isAlive()) {
-            LOG.error("Thread " + thread.getName() + " : "
-                    + Arrays.toString(thread.getStackTrace()));
-            Assert.assertFalse("thread " + thread.getName()
-                    + " still alive after join", true);
+            List<Thread> otherThreads = ThreadUtil.getThreadsFiltered(String.valueOf(index), thread);
+            StringBuilder err = new StringBuilder();
+            err.append("Thread that did not join:\n")
+               .append(ThreadUtil.formatThread(thread))
+               .append("other Threads:\n");
+            for(Thread otherThread : otherThreads){
+                err.append(ThreadUtil.formatThread(otherThread));
+            }
+            
+            LOG.error(err);
+            Assert.fail("thread " + thread.getName()
+                    + " still alive after waiting "
+                    + (end - start)
+                    +" milliseconds for join");
         }
     }
 
diff --git a/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java b/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java
index aa7bcf5..827c30a 100644
--- a/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java
+++ b/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java
@@ -210,9 +210,10 @@
             LOG.info("Hammer threads completed creation operations");
         }
 
-        for (HammerThread h : threads) {
+        for (int i=0;i<threads.length;++i) {
+            HammerThread h = threads[i];
             final int safetyFactor = 3;
-            verifyThreadTerminated(h,
+            verifyThreadTerminated(h, i,
                     threads.length * childCount
                     * HAMMERTHREAD_LATENCY * safetyFactor);
         }
diff --git a/src/java/test/org/apache/zookeeper/test/ClientTest.java b/src/java/test/org/apache/zookeeper/test/ClientTest.java
index 8c2b6cf..f316016 100644
--- a/src/java/test/org/apache/zookeeper/test/ClientTest.java
+++ b/src/java/test/org/apache/zookeeper/test/ClientTest.java
@@ -32,15 +32,15 @@
 import org.apache.log4j.Priority;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.TestableZooKeeper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.InvalidACLException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
@@ -726,6 +726,8 @@
             Assert.assertTrue(threads[i].current == threads[i].count);
         }
 
+        long currentlyOpen = unixos.getOpenFileDescriptorCount();
+        LOG.info("initial:" + initialFdCount + " currentlyOpen:" + currentlyOpen);
         // if this Assert.fails it means we are not cleaning up after the closed
         // sessions.
         long currentCount = unixos.getOpenFileDescriptorCount();
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
new file mode 100644
index 0000000..e893cf3
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@RunWith(Suite.class)
+public class NettyNettySuiteBase {
+    @BeforeClass
+    public static void setUp() {
+        System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+                ClientCnxnSocketNetty.class.getName());
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+                NettyServerCnxnFactory.class.getName());
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+    }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
new file mode 100644
index 0000000..c5805ea
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+        AsyncHammerTest.class
+        })
+public class NettyNettySuiteHammerTest extends NettyNettySuiteBase {
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
new file mode 100644
index 0000000..48c5e4d
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+        ACLTest.class,
+        AsyncOpsTest.class,
+        ChrootClientTest.class,
+        ClientTest.class,
+        FourLetterWordsTest.class,
+        NullDataTest.class,
+        SessionTest.class,
+        WatcherTest.class
+        })
+public class NettyNettySuiteTest extends NettyNettySuiteBase {
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNioSuiteBase.java b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteBase.java
new file mode 100644
index 0000000..c749220
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteBase.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ClientCnxnSocketNIO;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Nio server
+ */
+@RunWith(Suite.class)
+public class NettyNioSuiteBase {
+    @BeforeClass
+    public static void setUp() {
+        System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+                ClientCnxnSocketNIO.class.getName());
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+    }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNioSuiteHammerTest.java b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteHammerTest.java
new file mode 100644
index 0000000..29b5122
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteHammerTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Nio server
+ */
+@Suite.SuiteClasses({
+        AsyncHammerTest.class
+        })
+public class NettyNioSuiteHammerTest extends NettyNioSuiteBase {
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNioSuiteTest.java b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteTest.java
new file mode 100644
index 0000000..36b765b
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Nio server
+ */
+@Suite.SuiteClasses({
+        ACLTest.class,
+        AsyncOpsTest.class,
+        ChrootClientTest.class,
+        ClientTest.class,
+        FourLetterWordsTest.class,
+        NullDataTest.class,
+        SessionTest.class,
+        WatcherTest.class
+        })
+public class NettyNioSuiteTest extends NettyNioSuiteBase {
+}