Updated patch from 10/14/2010, adds Netty client support.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/branches/ZOOKEEPER-823@1022642 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/ivy.xml b/ivy.xml
index 96f6a9e..9dded65 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -41,7 +41,7 @@
<dependency org="log4j" name="log4j" rev="1.2.15" transitive="false" conf="default"/>
<dependency org="jline" name="jline" rev="0.9.94" transitive="false" conf="default"/>
- <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.1.5.GA">
+ <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.2.1.Final">
<artifact name="netty" type="jar" conf="default"/>
</dependency>
diff --git a/ivysettings.xml b/ivysettings.xml
index a120144..52cfa52 100644
--- a/ivysettings.xml
+++ b/ivysettings.xml
@@ -20,7 +20,7 @@
<property name="repo.maven.org"
value="http://repo1.maven.org/maven2/" override="false"/>
<property name="repo.jboss.org"
- value="http://repository.jboss.com/maven2/" override="false"/>
+ value="http://repository.jboss.org/nexus/content/groups/public/" override="false"/>
<property name="repo.sun.org"
value="http://download.java.net/maven/2/" override="false"/>
<property name="maven2.pattern"
diff --git a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
index 822f5bf..42810e2 100644
--- a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
+++ b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
@@ -1050,13 +1050,18 @@
<para>Prior to version 3.4 ZooKeeper has always used NIO
directly, however in versions 3.4 and later Netty is
supported as an option to NIO (replaces). NIO continues to
- be the default, however Netty based communication can be
- used in place of NIO by setting the environment variable
+ be the default for both server and client, however Netty
+ based communication can be used in place of NIO by the
+ setting of environment variables. Set
"zookeeper.serverCnxnFactory" to
- "org.apache.zookeeper.server.NettyServerCnxnFactory". You
- have the option of setting this on either the client(s) or
- server(s), typically you would want to set this on both,
- however that is at your discretion.
+ "org.apache.zookeeper.server.NettyServerCnxnFactory" to
+ enable the server to use Netty. Set
+ "zookeeper.clientCnxnFactory" to
+ "org.apache.zookeeper.NettyClientCnxnFactory" to enable
+ the client to use Netty. You have the option to set this
+ on either the client(s) or server(s), typically you would
+ want to set this on both, however that is at your
+ discretion.
</para>
<para>
TBD - tuning options for netty - currently there are none that are netty specific but we should add some. Esp around max bound on the number of reader worker threads netty creates.
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java b/src/java/main/org/apache/zookeeper/ClientCnxn.java
index d37ac27..d5d2364 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxn.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java
@@ -25,9 +25,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -55,7 +52,6 @@
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.proto.AuthPacket;
import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLResponse;
@@ -85,8 +81,6 @@
* option allows the client to turn off this behavior by setting
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
private static boolean disableAutoWatchReset;
-
- public static final int packetLen;
static {
// this var should not be public, but otw there is no easy way
// to test
@@ -96,7 +90,6 @@
LOG.debug("zookeeper.disableAutoWatchReset is "
+ disableAutoWatchReset);
}
- packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
}
private final ArrayList<InetSocketAddress> serverAddrs =
@@ -154,15 +147,13 @@
final EventThread eventThread;
- final Selector selector = Selector.open();
-
/**
* Set to true when close is called. Latches the connection such that we
* don't attempt to re-connect to the server if in the middle of closing the
* connection (client sends session disconnect to server as part of close
* operation)
*/
- volatile boolean closing = false;
+ private volatile boolean closing = false;
public long getSessionId() {
return sessionId;
@@ -180,56 +171,22 @@
public String toString() {
StringBuilder sb = new StringBuilder();
- SocketAddress local = getLocalSocketAddress();
- SocketAddress remote = getRemoteSocketAddress();
+ SocketAddress local = sendThread.getSocket().getLocalSocketAddress();
+ SocketAddress remote = sendThread.getSocket().getRemoteSocketAddress();
sb
.append("sessionid:0x").append(Long.toHexString(getSessionId()))
.append(" local:").append(local)
.append(" remoteserver:").append(remote)
.append(" lastZxid:").append(lastZxid)
.append(" xid:").append(xid)
- .append(" sent:").append(sendThread.sentCount)
- .append(" recv:").append(sendThread.recvCount)
+ .append(" sent:").append(sendThread.getSocket().getSentCount())
+ .append(" recv:").append(sendThread.getSocket().getRecvCount())
.append(" queuedpkts:").append(outgoingQueue.size())
.append(" pendingresp:").append(pendingQueue.size())
.append(" queuedevents:").append(eventThread.waitingEvents.size());
return sb.toString();
}
-
- /**
- * Returns the address to which the socket is connected.
- * @return ip address of the remote side of the connection or null if
- * not connected
- */
- SocketAddress getRemoteSocketAddress() {
- // a lot could go wrong here, so rather than put in a bunch of code
- // to check for nulls all down the chain let's do it the simple
- // yet bulletproof way
- try {
- return ((SocketChannel)sendThread.sockKey.channel())
- .socket().getRemoteSocketAddress();
- } catch (NullPointerException e) {
- return null;
- }
- }
-
- /**
- * Returns the local address to which the socket is bound.
- * @return ip address of the remote side of the connection or null if
- * not connected
- */
- SocketAddress getLocalSocketAddress() {
- // a lot could go wrong here, so rather than put in a bunch of code
- // to check for nulls all down the chain let's do it the simple
- // yet bulletproof way
- try {
- return ((SocketChannel)sendThread.sockKey.channel())
- .socket().getLocalSocketAddress();
- } catch (NullPointerException e) {
- return null;
- }
- }
/**
* This class allows us to pass the headers and the relevant records around.
@@ -322,10 +279,9 @@
* @throws IOException
*/
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
- ClientWatchManager watcher)
- throws IOException
- {
- this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
+ ClientWatchManager watcher, ClientCnxnSocket socket)
+ throws IOException {
+ this(hosts, sessionTimeout, zooKeeper, watcher, socket, 0, new byte[16]);
}
/**
@@ -345,9 +301,8 @@
* @throws IOException
*/
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
- ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
- throws IOException
- {
+ ClientWatchManager watcher, ClientCnxnSocket socket,
+ long sessionId, byte[] sessionPasswd) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
@@ -389,7 +344,7 @@
connectTimeout = sessionTimeout / hostsList.length;
readTimeout = sessionTimeout * 2 / 3;
Collections.shuffle(serverAddrs);
- sendThread = new SendThread();
+ sendThread = new SendThread(socket);
eventThread = new EventThread();
}
@@ -412,9 +367,10 @@
eventThread.start();
}
- Object eventOfDeath = new Object();
+ private Object eventOfDeath = new Object();
- final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+ private final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("from " + t.getName(), e);
}
@@ -618,7 +574,7 @@
if (p.replyHeader == null) {
return;
}
- switch(zooKeeper.state) {
+ switch(state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
@@ -631,15 +587,16 @@
finishPacket(p);
}
- volatile long lastZxid;
+ private volatile long lastZxid;
- private static class EndOfStreamException extends IOException {
+ static class EndOfStreamException extends IOException {
private static final long serialVersionUID = -5438877188796231422L;
public EndOfStreamException(String msg) {
super(msg);
}
-
+
+ @Override
public String toString() {
return "EndOfStreamException: " + getMessage();
}
@@ -652,7 +609,7 @@
super(msg);
}
}
-
+
private static class SessionExpiredException extends IOException {
private static final long serialVersionUID = -1388816932076193249L;
@@ -660,51 +617,275 @@
super(msg);
}
}
-
+
+ public static final int packetLen =
+ Integer.getInteger("jute.maxbuffer", 4096 * 1024);
+
/**
* This class services the outgoing request queue and generates the heart
* beats. It also spawns the ReadThread.
*/
class SendThread extends Thread {
- SelectionKey sockKey;
-
- final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
-
- ByteBuffer incomingBuffer = lenBuffer;
-
- boolean initialized;
-
private long lastPingSentNs;
+ private final ClientCnxnSocket socket;
+ private int lastConnectIndex = -1;
+ private int currentConnectIndex;
+ private Random r = new Random(System.nanoTime());
- long sentCount = 0;
- long recvCount = 0;
-
- void readLength() throws IOException {
- int len = incomingBuffer.getInt();
- if (len < 0 || len >= packetLen) {
- throw new IOException("Packet len" + len + " is out of range!");
- }
- incomingBuffer = ByteBuffer.allocate(len);
+ SendThread(ClientCnxnSocket socket) {
+ super(currentThread().getName() + "-SendThread()");
+ state = States.CONNECTING;
+ this.socket = socket;
+ socket.introduce(this, outgoingQueue, sessionId);
+ setUncaughtExceptionHandler(uncaughtExceptionHandler);
+ setDaemon(true);
}
- void readConnectResult() throws IOException {
- if (LOG.isTraceEnabled()) {
- StringBuffer buf = new StringBuffer("0x[");
- for (byte b : incomingBuffer.array()) {
- buf.append(Integer.toHexString(b) + ",");
- }
- buf.append("]");
- LOG.trace("readConnectRestult " + incomingBuffer.remaining()
- + " " + buf.toString());
+ @Override
+ public void run() {
+ socket.updateNow();
+ socket.updateLastSendAndHeard();
+
+ while (state.isAlive()) {
+ try {
+ if (!socket.isConnected()) {
+ // don't re-establish connection if we are closing
+ if (closing) {
+ break;
+ }
+ startConnect();
+ socket.updateLastSendAndHeard();
+ }
+
+ int to = readTimeout - socket.getIdleRecv();
+ if (state != States.CONNECTED) {
+ to = connectTimeout - socket.getIdleRecv();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("TO=" + to);
+ }
+ if (to <= 0) {
+ throw new SessionTimeoutException(
+ "Client session timed out, have not heard from server in "
+ + socket.getIdleRecv() + "ms"
+ + " for sessionid 0x"
+ + Long.toHexString(sessionId));
+ }
+ if (state == States.CONNECTED) {
+ int timeToNextPing = readTimeout / 2
+ - socket.getIdleSend();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("timeToNextPing=" + timeToNextPing);
+ }
+ if (timeToNextPing <= 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("timeToNextPing=" + timeToNextPing);
+ }
+ sendPing();
+ socket.updateLastSend();
+ socket.enableWrite();
+ } else {
+ if (timeToNextPing < to) {
+ to = timeToNextPing;
+ }
+ }
+ }
+
+ socket.doTransport(to, pendingQueue);
+ } catch (Exception e) {
+ if (closing) {
+ if (LOG.isDebugEnabled()) {
+ // closing so this is expected
+ LOG.debug("An exception was thrown while closing send thread for session 0x"
+ + Long.toHexString(getSessionId())
+ + " : " + e.getMessage());
+ }
+ break;
+ } else {
+ // this is ugly, you have a better way speak up
+ if (e instanceof SessionExpiredException) {
+ LOG.info(e.getMessage() + ", closing socket connection");
+ } else if (e instanceof SessionTimeoutException) {
+ LOG.info(e.getMessage() + RETRY_CONN_MSG);
+ } else if (e instanceof EndOfStreamException) {
+ LOG.info(e.getMessage() + RETRY_CONN_MSG);
+ } else {
+ LOG.warn("Session 0x"
+ + Long.toHexString(getSessionId())
+ + " for server "
+ // TODO: this is different in Netty
+ // and NIO
+ // Netty:
+ // + getRemoteSocketAddress()
+ // NIO:
+ // +
+ // ((SocketChannel)sockKey.channel())
+ // .socket().getRemoteSocketAddress()
+ + socket.getRemoteSocketAddress()
+ + ", unexpected error"
+ + RETRY_CONN_MSG,
+ e);
+ }
+ socket.cleanup();
+ if (state.isAlive()) {
+ eventThread.queueEvent(new WatchedEvent(
+ Event.EventType.None,
+ Event.KeeperState.Disconnected,
+ null));
+ }
+ socket.updateNow();
+ socket.updateLastSendAndHeard();
+ }
+ } // catch
+ } // while
+ socket.cleanup();
+ socket.close();
+ if (state.isAlive()) {
+ eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+ Event.KeeperState.Disconnected, null));
}
- ByteBufferInputStream bbis = new ByteBufferInputStream(
- incomingBuffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ConnectResponse conRsp = new ConnectResponse();
- conRsp.deserialize(bbia, "connect");
- negotiatedSessionTimeout = conRsp.getTimeOut();
+ ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
+ "SendThread exitedloop.");
+ }
+
+ // TODO: can not name this method getState since Thread.getState()
+ // already exists
+ // It would be cleaner to make class SendThread an implementation of
+ // Runnable
+ /**
+ * Used by ClientCnxnSocket
+ *
+ * @return
+ */
+ ZooKeeper.States getZkState() {
+ return state;
+ }
+
+ ClientCnxnSocket getSocket() {
+ return socket;
+ }
+
+ void primeConnection() throws IOException {
+ LOG.info("Socket connection established to "
+ + socket.getRemoteSocketAddress()
+ + ", initiating session");
+ lastConnectIndex = currentConnectIndex;
+ ConnectRequest conReq = new ConnectRequest(0, lastZxid,
+ sessionTimeout, sessionId, sessionPasswd);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ boa.writeInt(-1, "len");
+ conReq.serialize(boa, "connect");
+ baos.close();
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+ bb.putInt(bb.capacity() - 4);
+ bb.rewind();
+ synchronized (outgoingQueue) {
+ // We add backwards since we are pushing into the front
+ // Only send if there's a pending watch
+ // TODO: here we have the only remaining use of zooKeeper in
+ // this class. It's to be eliminated!
+ if (!disableAutoWatchReset &&
+ (!zooKeeper.getDataWatches().isEmpty()
+ || !zooKeeper.getExistWatches().isEmpty()
+ || !zooKeeper.getChildWatches().isEmpty()))
+ {
+ SetWatches sw = new SetWatches(lastZxid,
+ zooKeeper.getDataWatches(),
+ zooKeeper.getExistWatches(),
+ zooKeeper.getChildWatches());
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.setWatches);
+ h.setXid(-8);
+ Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
+ null);
+ outgoingQueue.addFirst(packet);
+ }
+
+ synchronized (authInfo) {
+ for (AuthData id : authInfo) {
+ outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
+ OpCode.auth), null, new AuthPacket(0,
+ id.scheme, id.data), null, null, null));
+ }
+ }
+
+ outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
+ null)));
+ }
+ synchronized(socket) {
+ socket.enableReadWriteOnly();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Session establishment request sent on "
+ + socket.getRemoteSocketAddress());
+ }
+ }
+
+ private void sendPing() {
+ lastPingSentNs = System.nanoTime();
+ RequestHeader h = new RequestHeader(-2, OpCode.ping);
+ queuePacket(h, null, null, null, null, null, null, null, null);
+ }
+
+ private void startConnect() throws IOException {
+ if (lastConnectIndex == -1) {
+ // We don't want to delay the first try at a connect, so we
+ // start with -1 the first time around
+ lastConnectIndex = 0;
+ } else {
+ try {
+ Thread.sleep(r.nextInt(1000));
+ } catch (InterruptedException e1) {
+ LOG.warn("Unexpected exception", e1);
+ }
+ if (nextAddrToTry == lastConnectIndex) {
+ try {
+ // Try not to spin too fast!
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected exception", e);
+ }
+ }
+ }
+ state = States.CONNECTING;
+ currentConnectIndex = nextAddrToTry;
+ InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
+ nextAddrToTry++;
+ if (nextAddrToTry == serverAddrs.size()) {
+ nextAddrToTry = 0;
+ }
+ LOG.info("Opening socket connection to server " + addr);
+
+ setName(getName().replaceAll("\\(.*\\)",
+ "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
+
+ socket.connect(addr);
+ }
+
+ private static final String RETRY_CONN_MSG =
+ ", closing socket connection and attempting reconnect";
+
+ void cleanup() {
+ synchronized (pendingQueue) {
+ for (Packet p : pendingQueue) {
+ conLossPacket(p);
+ }
+ pendingQueue.clear();
+ }
+ synchronized (outgoingQueue) {
+ for (Packet p : outgoingQueue) {
+ conLossPacket(p);
+ }
+ outgoingQueue.clear();
+ }
+ }
+
+ void onConnected(int _negotiatedSessionTimeout, long _sessionId,
+ byte[] _sessionPasswd) throws IOException {
+ negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
- zooKeeper.state = States.CLOSED;
+ state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
@@ -716,12 +897,11 @@
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
- sessionId = conRsp.getSessionId();
- sessionPasswd = conRsp.getPasswd();
- zooKeeper.state = States.CONNECTED;
+ sessionId = _sessionId;
+ sessionPasswd = _sessionPasswd;
+ state = States.CONNECTED;
LOG.info("Session establishment complete on server "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress()
+ + socket.getRemoteSocketAddress()
+ ", sessionid = 0x"
+ Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout);
@@ -729,7 +909,7 @@
Watcher.Event.KeeperState.SyncConnected, null));
}
- void readResponse() throws IOException {
+ void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
@@ -750,7 +930,7 @@
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
- zooKeeper.state = States.AUTH_FAILED;
+ state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
@@ -787,12 +967,12 @@
eventThread.queueEvent( we );
return;
}
- if (pendingQueue.size() == 0) {
- throw new IOException("Nothing in the queue, but got "
- + replyHdr.getXid());
- }
- Packet packet = null;
+ Packet packet;
synchronized (pendingQueue) {
+ if (pendingQueue.size() == 0) {
+ throw new IOException("Nothing in the queue, but got "
+ + replyHdr.getXid());
+ }
packet = pendingQueue.remove();
}
/*
@@ -803,9 +983,13 @@
if (packet.header.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
- throw new IOException("Xid out of order. Got "
- + replyHdr.getXid() + " expected "
- + packet.header.getXid());
+ throw new IOException("Xid out of order. Got Xid "
+ + replyHdr.getXid() + " with err " +
+ + replyHdr.getErr() +
+ " expected Xid "
+ + packet.header.getXid()
+ + " for a packet with details: "
+ + packet );
}
packet.replyHeader.setXid(replyHdr.getXid());
@@ -827,426 +1011,17 @@
}
}
- /**
- * @return true if a packet was received
- * @throws InterruptedException
- * @throws IOException
- */
- boolean doIO() throws InterruptedException, IOException {
- boolean packetReceived = false;
- SocketChannel sock = (SocketChannel) sockKey.channel();
- if (sock == null) {
- throw new IOException("Socket is null!");
+ void close() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("close called sessionId:0x"
+ + Long.toHexString(sessionId));
}
- if (sockKey.isReadable()) {
- int rc = sock.read(incomingBuffer);
- if (rc < 0) {
- throw new EndOfStreamException(
- "Unable to read additional data from server sessionid 0x"
- + Long.toHexString(sessionId)
- + ", likely server has closed socket");
- }
- if (!incomingBuffer.hasRemaining()) {
- incomingBuffer.flip();
- if (incomingBuffer == lenBuffer) {
- recvCount++;
- readLength();
- } else if (!initialized) {
- readConnectResult();
- enableRead();
- if (!outgoingQueue.isEmpty()) {
- enableWrite();
- }
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- packetReceived = true;
- initialized = true;
- } else {
- readResponse();
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- packetReceived = true;
- }
- }
- }
- if (sockKey.isWritable()) {
- synchronized (outgoingQueue) {
- if (!outgoingQueue.isEmpty()) {
- ByteBuffer pbb = outgoingQueue.getFirst().bb;
- sock.write(pbb);
- if (!pbb.hasRemaining()) {
- sentCount++;
- Packet p = outgoingQueue.removeFirst();
- if (p.header != null
- && p.header.getType() != OpCode.ping
- && p.header.getType() != OpCode.auth) {
- pendingQueue.add(p);
- }
- }
- }
- }
- }
- if (outgoingQueue.isEmpty()) {
- disableWrite();
- } else {
- enableWrite();
- }
- return packetReceived;
+ state = States.CLOSED;
+ socket.wakeupCnxn();
}
- synchronized private void enableWrite() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_WRITE) == 0) {
- sockKey.interestOps(i | SelectionKey.OP_WRITE);
- }
- }
-
- synchronized private void disableWrite() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_WRITE) != 0) {
- sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
- }
- }
-
- synchronized private void enableRead() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_READ) == 0) {
- sockKey.interestOps(i | SelectionKey.OP_READ);
- }
- }
-
- synchronized private void disableRead() {
- int i = sockKey.interestOps();
- if ((i & SelectionKey.OP_READ) != 0) {
- sockKey.interestOps(i & (~SelectionKey.OP_READ));
- }
- }
-
- SendThread() {
- super(makeThreadName("-SendThread()"));
- zooKeeper.state = States.CONNECTING;
- setUncaughtExceptionHandler(uncaughtExceptionHandler);
- setDaemon(true);
- }
-
- private void primeConnection(SelectionKey k) throws IOException {
- LOG.info("Socket connection established to "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress()
- + ", initiating session");
- lastConnectIndex = currentConnectIndex;
- ConnectRequest conReq = new ConnectRequest(0, lastZxid,
- sessionTimeout, sessionId, sessionPasswd);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
- boa.writeInt(-1, "len");
- conReq.serialize(boa, "connect");
- baos.close();
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- bb.putInt(bb.capacity() - 4);
- bb.rewind();
- synchronized (outgoingQueue) {
- // We add backwards since we are pushing into the front
- // Only send if there's a pending watch
- if (!disableAutoWatchReset &&
- (!zooKeeper.getDataWatches().isEmpty()
- || !zooKeeper.getExistWatches().isEmpty()
- || !zooKeeper.getChildWatches().isEmpty()))
- {
- SetWatches sw = new SetWatches(lastZxid,
- zooKeeper.getDataWatches(),
- zooKeeper.getExistWatches(),
- zooKeeper.getChildWatches());
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.setWatches);
- h.setXid(-8);
- Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
- null);
- outgoingQueue.addFirst(packet);
- }
-
- for (AuthData id : authInfo) {
- outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
- OpCode.auth), null, new AuthPacket(0, id.scheme,
- id.data), null, null, null));
- }
- outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
- null)));
- }
- synchronized (this) {
- k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Session establishment request sent on "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress());
- }
- }
-
- private void sendPing() {
- lastPingSentNs = System.nanoTime();
- RequestHeader h = new RequestHeader(-2, OpCode.ping);
- queuePacket(h, null, null, null, null, null, null, null, null);
- }
-
- int lastConnectIndex = -1;
-
- int currentConnectIndex;
-
- Random r = new Random(System.nanoTime());
-
- private void startConnect() throws IOException {
- if (lastConnectIndex == -1) {
- // We don't want to delay the first try at a connect, so we
- // start with -1 the first time around
- lastConnectIndex = 0;
- } else {
- try {
- Thread.sleep(r.nextInt(1000));
- } catch (InterruptedException e1) {
- LOG.warn("Unexpected exception", e1);
- }
- if (nextAddrToTry == lastConnectIndex) {
- try {
- // Try not to spin too fast!
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.warn("Unexpected exception", e);
- }
- }
- }
- zooKeeper.state = States.CONNECTING;
- currentConnectIndex = nextAddrToTry;
- InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
- nextAddrToTry++;
- if (nextAddrToTry == serverAddrs.size()) {
- nextAddrToTry = 0;
- }
- LOG.info("Opening socket connection to server " + addr);
- SocketChannel sock;
- sock = SocketChannel.open();
- sock.configureBlocking(false);
- sock.socket().setSoLinger(false, -1);
- sock.socket().setTcpNoDelay(true);
- setName(getName().replaceAll("\\(.*\\)",
- "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
- sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
- if (sock.connect(addr)) {
- primeConnection(sockKey);
- }
- initialized = false;
-
- /*
- * Reset incomingBuffer
- */
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- }
-
- private static final String RETRY_CONN_MSG =
- ", closing socket connection and attempting reconnect";
-
- @Override
- public void run() {
- long now = System.currentTimeMillis();
- long lastHeard = now;
- long lastSend = now;
- while (zooKeeper.state.isAlive()) {
- try {
- if (sockKey == null) {
- // don't re-establish connection if we are closing
- if (closing) {
- break;
- }
- startConnect();
- lastSend = now;
- lastHeard = now;
- }
- int idleRecv = (int) (now - lastHeard);
- int idleSend = (int) (now - lastSend);
- int to = readTimeout - idleRecv;
- if (zooKeeper.state != States.CONNECTED) {
- to = connectTimeout - idleRecv;
- }
- if (to <= 0) {
- throw new SessionTimeoutException(
- "Client session timed out, have not heard from server in "
- + idleRecv + "ms"
- + " for sessionid 0x"
- + Long.toHexString(sessionId));
- }
- if (zooKeeper.state == States.CONNECTED) {
- int timeToNextPing = readTimeout/2 - idleSend;
- if (timeToNextPing <= 0) {
- sendPing();
- lastSend = now;
- enableWrite();
- } else {
- if (timeToNextPing < to) {
- to = timeToNextPing;
- }
- }
- }
-
- selector.select(to);
- Set<SelectionKey> selected;
- synchronized (this) {
- selected = selector.selectedKeys();
- }
- // Everything below and until we get back to the select is
- // non blocking, so time is effectively a constant. That is
- // Why we just have to do this once, here
- now = System.currentTimeMillis();
- for (SelectionKey k : selected) {
- SocketChannel sc = ((SocketChannel) k.channel());
- if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
- if (sc.finishConnect()) {
- lastHeard = now;
- lastSend = now;
- primeConnection(k);
- }
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- if (outgoingQueue.size() > 0) {
- // We have something to send so it's the same
- // as if we do the send now.
- lastSend = now;
- }
- if (doIO()) {
- lastHeard = now;
- }
- }
- }
- if (zooKeeper.state == States.CONNECTED) {
- if (outgoingQueue.size() > 0) {
- enableWrite();
- } else {
- disableWrite();
- }
- }
- selected.clear();
- } catch (Exception e) {
- if (closing) {
- if (LOG.isDebugEnabled()) {
- // closing so this is expected
- LOG.debug("An exception was thrown while closing send thread for session 0x"
- + Long.toHexString(getSessionId())
- + " : " + e.getMessage());
- }
- break;
- } else {
- // this is ugly, you have a better way speak up
- if (e instanceof SessionExpiredException) {
- LOG.info(e.getMessage() + ", closing socket connection");
- } else if (e instanceof SessionTimeoutException) {
- LOG.info(e.getMessage() + RETRY_CONN_MSG);
- } else if (e instanceof EndOfStreamException) {
- LOG.info(e.getMessage() + RETRY_CONN_MSG);
- } else {
- LOG.warn("Session 0x"
- + Long.toHexString(getSessionId())
- + " for server "
- + ((SocketChannel)sockKey.channel())
- .socket().getRemoteSocketAddress()
- + ", unexpected error"
- + RETRY_CONN_MSG,
- e);
- }
- cleanup();
- if (zooKeeper.state.isAlive()) {
- eventThread.queueEvent(new WatchedEvent(
- Event.EventType.None,
- Event.KeeperState.Disconnected,
- null));
- }
-
- now = System.currentTimeMillis();
- lastHeard = now;
- lastSend = now;
- }
- }
- }
- cleanup();
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Doing client selector close");
- }
- selector.close();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Closed client selector");
- }
- } catch (IOException e) {
- LOG.warn("Ignoring exception during selector close", e);
- }
- if (zooKeeper.state.isAlive()) {
- eventThread.queueEvent(new WatchedEvent(
- Event.EventType.None,
- Event.KeeperState.Disconnected,
- null));
- }
- ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
- "SendThread exitedloop.");
- }
-
- private void cleanup() {
- if (sockKey != null) {
- SocketChannel sock = (SocketChannel) sockKey.channel();
- sockKey.cancel();
- try {
- sock.socket().shutdownInput();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during shutdown input", e);
- }
- }
- try {
- sock.socket().shutdownOutput();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during shutdown output", e);
- }
- }
- try {
- sock.socket().close();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during socket close", e);
- }
- }
- try {
- sock.close();
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring exception during channel close", e);
- }
- }
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SendThread interrupted during sleep, ignoring");
- }
- }
- sockKey = null;
- synchronized (pendingQueue) {
- for (Packet p : pendingQueue) {
- conLossPacket(p);
- }
- pendingQueue.clear();
- }
- synchronized (outgoingQueue) {
- for (Packet p : outgoingQueue) {
- conLossPacket(p);
- }
- outgoingQueue.clear();
- }
- }
-
- public void close() {
- zooKeeper.state = States.CLOSED;
- synchronized (this) {
- selector.wakeup();
- }
+ void testableCloseSocket() throws IOException {
+ socket.testableCloseSocket();
}
}
@@ -1292,6 +1067,8 @@
private int xid = 1;
+ private volatile States state;
+
synchronized private int getXid() {
return xid++;
}
@@ -1325,7 +1102,7 @@
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
- if (!zooKeeper.state.isAlive() || closing) {
+ if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
@@ -1336,19 +1113,23 @@
outgoingQueue.add(packet);
}
}
- synchronized (sendThread) {
- selector.wakeup();
- }
+ sendThread.getSocket().wakeupCnxn();
return packet;
}
public void addAuthInfo(String scheme, byte auth[]) {
- if (!zooKeeper.state.isAlive()) {
+ if (!state.isAlive()) {
return;
}
- authInfo.add(new AuthData(scheme, auth));
+ synchronized (authInfo) {
+ authInfo.add(new AuthData(scheme, auth));
+ }
queuePacket(new RequestHeader(-4, OpCode.auth), null,
new AuthPacket(0, scheme, auth), null, null, null, null,
null, null);
}
+
+ States getState() {
+ return state;
+ }
}
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
new file mode 100644
index 0000000..de12b4f
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
@@ -0,0 +1,126 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+
+abstract class ClientCnxnSocket {
+ private static final Logger LOG = Logger.getLogger(ClientCnxnSocket.class);
+
+ protected boolean initialized;
+
+ protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
+
+ protected ByteBuffer incomingBuffer = lenBuffer;
+ protected long sentCount = 0;
+ protected long recvCount = 0;
+ protected long lastHeard;
+ protected long lastSend;
+ protected long now;
+ protected ClientCnxn.SendThread sendThread;
+ protected LinkedList<ClientCnxn.Packet> outgoingQueue;
+ /**
+ * The sessionId is only available here for Log and Exception messages.
+ * Otherwise the socket doesn't know it.
+ */
+ protected long sessionId;
+
+ void introduce(ClientCnxn.SendThread sendThread,
+ LinkedList<ClientCnxn.Packet> outgoingQueue, long sessionId) {
+ this.sendThread = sendThread;
+ this.outgoingQueue = outgoingQueue;
+ this.sessionId = sessionId;
+ }
+
+ void updateNow() {
+ now = System.currentTimeMillis();
+ }
+
+ int getIdleRecv() {
+ return (int) (now - lastHeard);
+ }
+
+ int getIdleSend() {
+ return (int) (now - lastSend);
+ }
+
+ long getSentCount() {
+ return sentCount;
+ }
+
+ long getRecvCount() {
+ return recvCount;
+ }
+
+ void updateLastHeard() {
+ this.lastHeard = now;
+ }
+
+ void updateLastSend() {
+ this.lastSend = now;
+ }
+
+ void updateLastSendAndHeard() {
+ this.lastSend = now;
+ this.lastHeard = now;
+ }
+
+ protected void readLength() throws IOException {
+ int len = incomingBuffer.getInt();
+ if (len < 0 || len >= ClientCnxn.packetLen) {
+ throw new IOException("Packet len" + len + " is out of range!");
+ }
+ incomingBuffer = ByteBuffer.allocate(len);
+ }
+
+ void readConnectResult() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ StringBuffer buf = new StringBuffer("0x[");
+ for (byte b : incomingBuffer.array()) {
+ buf.append(Integer.toHexString(b) + ",");
+ }
+ buf.append("]");
+ LOG.trace("readConnectRestult " + incomingBuffer.remaining() + " "
+ + buf.toString());
+ }
+ ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
+ BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
+ ConnectResponse conRsp = new ConnectResponse();
+ conRsp.deserialize(bbia, "connect");
+ this.sessionId = conRsp.getSessionId();
+ sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
+ conRsp.getPasswd());
+ }
+
+ abstract boolean isConnected();
+
+ abstract void connect(InetSocketAddress addr) throws IOException;
+
+ abstract SocketAddress getRemoteSocketAddress();
+
+ abstract SocketAddress getLocalSocketAddress();
+
+ abstract void cleanup();
+
+ abstract void close();
+
+ abstract void wakeupCnxn();
+
+ abstract void enableWrite();
+
+ abstract void enableReadWriteOnly();
+
+ abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+ throws IOException;
+
+ abstract void testableCloseSocket() throws IOException;
+}
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
new file mode 100644
index 0000000..242e33a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -0,0 +1,303 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ClientCnxnSocketNIO extends ClientCnxnSocket {
+ private static final Logger LOG = Logger
+ .getLogger(ClientCnxnSocketNIO.class);
+
+ private final Selector selector = Selector.open();
+
+ private SelectionKey sockKey;
+
+ ClientCnxnSocketNIO() throws IOException {
+ super();
+ }
+
+ @Override
+ boolean isConnected() {
+ return sockKey != null;
+ }
+
+ /**
+ * @return true if a packet was received
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ boolean doIO(List<Packet> pendingQueue) throws IOException {
+ boolean packetReceived = false;
+ SocketChannel sock = (SocketChannel) sockKey.channel();
+ if (sock == null) {
+ throw new IOException("Socket is null!");
+ }
+ if (sockKey.isReadable()) {
+ int rc = sock.read(incomingBuffer);
+ if (rc < 0) {
+ throw new EndOfStreamException(
+ "Unable to read additional data from server sessionid 0x"
+ + Long.toHexString(sessionId)
+ + ", likely server has closed socket");
+ }
+ if (!incomingBuffer.hasRemaining()) {
+ incomingBuffer.flip();
+ if (incomingBuffer == lenBuffer) {
+ recvCount++;
+ readLength();
+ } else if (!initialized) {
+ readConnectResult();
+ enableRead();
+ if (!outgoingQueue.isEmpty()) {
+ enableWrite();
+ }
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ packetReceived = true;
+ initialized = true;
+ } else {
+ sendThread.readResponse(incomingBuffer);
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ packetReceived = true;
+ }
+ }
+ }
+ if (sockKey.isWritable()) {
+ synchronized (outgoingQueue) {
+ if (!outgoingQueue.isEmpty()) {
+ ByteBuffer pbb = outgoingQueue.getFirst().bb;
+ sock.write(pbb);
+ if (!pbb.hasRemaining()) {
+ sentCount++;
+ Packet p = outgoingQueue.removeFirst();
+ if (p.header != null
+ && p.header.getType() != OpCode.ping
+ && p.header.getType() != OpCode.auth) {
+ pendingQueue.add(p);
+ }
+ }
+ }
+ }
+ }
+ if (outgoingQueue.isEmpty()) {
+ disableWrite();
+ } else {
+ enableWrite();
+ }
+ return packetReceived;
+ }
+
+ @Override
+ void cleanup() {
+ if (sockKey != null) {
+ SocketChannel sock = (SocketChannel) sockKey.channel();
+ sockKey.cancel();
+ try {
+ sock.socket().shutdownInput();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during shutdown input", e);
+ }
+ }
+ try {
+ sock.socket().shutdownOutput();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during shutdown output", e);
+ }
+ }
+ try {
+ sock.socket().close();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during socket close", e);
+ }
+ }
+ try {
+ sock.close();
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring exception during channel close", e);
+ }
+ }
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SendThread interrupted during sleep, ignoring");
+ }
+ }
+ sockKey = null;
+ sendThread.cleanup();
+ }
+
+ @Override
+ void close() {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Doing client selector close");
+ }
+ selector.close();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closed client selector");
+ }
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception during selector close", e);
+ }
+ }
+
+ @Override
+ void connect(InetSocketAddress addr) throws IOException {
+ SocketChannel sock;
+ sock = SocketChannel.open();
+ sock.configureBlocking(false);
+ sock.socket().setSoLinger(false, -1);
+ sock.socket().setTcpNoDelay(true);
+ sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
+ if (sock.connect(addr)) {
+ sendThread.primeConnection();
+ }
+ initialized = false;
+
+ /*
+ * Reset incomingBuffer
+ */
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ }
+
+ /**
+ * Returns the address to which the socket is connected.
+ *
+ * @return ip address of the remote side of the connection or null if not
+ * connected
+ */
+ @Override
+ SocketAddress getRemoteSocketAddress() {
+ // a lot could go wrong here, so rather than put in a bunch of code
+ // to check for nulls all down the chain let's do it the simple
+ // yet bulletproof way
+ try {
+ return ((SocketChannel) sockKey.channel()).socket()
+ .getRemoteSocketAddress();
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Returns the local address to which the socket is bound.
+ *
+ * @return ip address of the remote side of the connection or null if not
+ * connected
+ */
+ @Override
+ SocketAddress getLocalSocketAddress() {
+ // a lot could go wrong here, so rather than put in a bunch of code
+ // to check for nulls all down the chain let's do it the simple
+ // yet bulletproof way
+ try {
+ return ((SocketChannel) sockKey.channel()).socket()
+ .getLocalSocketAddress();
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ @Override
+ void wakeupCnxn() {
+ synchronized (this) {
+ selector.wakeup();
+ }
+ }
+
+ @Override
+ void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+ throws IOException {
+ selector.select(waitTimeOut);
+
+ Set<SelectionKey> selected;
+ synchronized (this) {
+ selected = selector.selectedKeys();
+ }
+ // Everything below and until we get back to the select is
+ // non blocking, so time is effectively a constant. That is
+ // Why we just have to do this once, here
+ updateNow();
+
+ for (SelectionKey k : selected) {
+ SocketChannel sc = ((SocketChannel) k.channel());
+ if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
+ if (sc.finishConnect()) {
+ updateLastSendAndHeard();
+ sendThread.primeConnection();
+ }
+ } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+ if (outgoingQueue.size() > 0) {
+ // We have something to send so it's the same
+ // as if we do the send now.
+ updateLastSend();
+ }
+ if (doIO(pendingQueue)) {
+ updateLastHeard();
+ }
+ }
+ }
+ if (sendThread.getZkState() == States.CONNECTED) {
+ if (outgoingQueue.size() > 0) {
+ enableWrite();
+ } else {
+ disableWrite();
+ }
+ }
+ selected.clear();
+ }
+
+ @Override
+ void testableCloseSocket() throws IOException {
+ LOG.info("testableCloseSocket() called");
+ ((SocketChannel) sockKey.channel()).socket().close();
+ }
+
+ @Override
+ synchronized void enableWrite() {
+ int i = sockKey.interestOps();
+ if ((i & SelectionKey.OP_WRITE) == 0) {
+ sockKey.interestOps(i | SelectionKey.OP_WRITE);
+ }
+ }
+
+ synchronized private void disableWrite() {
+ int i = sockKey.interestOps();
+ if ((i & SelectionKey.OP_WRITE) != 0) {
+ sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
+ }
+ }
+
+ synchronized private void enableRead() {
+ int i = sockKey.interestOps();
+ if ((i & SelectionKey.OP_READ) == 0) {
+ sockKey.interestOps(i | SelectionKey.OP_READ);
+ }
+ }
+
+ // TODO: Why isn't that synchronized like the others?
+ @Override
+ void enableReadWriteOnly() {
+ sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ }
+}
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
new file mode 100644
index 0000000..3c564f3
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
@@ -0,0 +1,299 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelState;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+public class ClientCnxnSocketNetty extends ClientCnxnSocket {
+ private static final Logger LOG = Logger
+ .getLogger(ClientCnxnSocketNetty.class);
+
+ private Channel channel;
+
+ private ChannelFactory factory;
+
+ private boolean disconnected;
+
+ @Override
+ boolean isConnected() {
+ return channel != null;
+ }
+
+ private boolean doWrites(List<Packet> pendingQueue) {
+ boolean written = false;
+ while (!outgoingQueue.isEmpty() && channel.isWritable()) {
+ Packet p;
+ synchronized(outgoingQueue){
+ p = outgoingQueue.removeFirst();
+ }
+
+ ByteBuffer pbb = p.bb;
+ ChannelFuture write;
+ synchronized(pendingQueue){
+ write = channel.write(ChannelBuffers
+ .copiedBuffer(pbb));
+ pbb.position(pbb.limit());
+ if (p.header != null && p.header.getType() != OpCode.ping
+ && p.header.getType() != OpCode.auth) {
+ pendingQueue.add(p);
+ }
+ }
+ if (p.header != null && p.header.getType() == OpCode.closeSession) {
+ // ensure that the close session is sent before
+ // we close the channel
+ write.awaitUninterruptibly();
+ }
+
+ written = true;
+ sentCount++;
+ }
+ return written;
+ }
+
+ @Override
+ void connect(InetSocketAddress addr) throws IOException {
+ factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ return Channels.pipeline(new ZKClientHandler());
+ }
+ });
+
+ bootstrap.setOption("soLinger", -1);
+ bootstrap.setOption("tcpNoDelay", true);
+
+ disconnected = false;
+ bootstrap.connect(addr);
+ }
+
+ @Override
+ void enableReadWriteOnly() {
+
+ }
+
+ /**
+ * Returns the address to which the socket is connected.
+ *
+ * @return ip address of the remote side of the connection or null if not
+ * connected
+ */
+ @Override
+ public SocketAddress getRemoteSocketAddress() {
+ if (channel == null) {
+ return null;
+ }
+
+ return channel.getRemoteAddress();
+ }
+
+ /**
+ * Returns the local address to which the socket is bound.
+ *
+ * @return ip address of the remote side of the connection or null if not
+ * connected
+ */
+ @Override
+ SocketAddress getLocalSocketAddress() {
+ if (channel == null) {
+ return null;
+ }
+
+ return channel.getLocalAddress();
+ }
+
+ @Override
+ void cleanup() {
+ if (channel != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("cleanup closing sessionId:0x"
+ + Long.toHexString(sessionId));
+ }
+ channel.close().awaitUninterruptibly();
+ }
+ channel = null;
+ if (factory != null) {
+ factory.releaseExternalResources();
+ }
+ sendThread.cleanup();
+ }
+
+ @Override
+ void close() {
+ // NO-OP
+ }
+
+ @Override
+ void testableCloseSocket() throws IOException {
+ LOG.info("testableCloseSocket() called");
+ channel.disconnect().awaitUninterruptibly();
+ }
+
+ @Override
+ void wakeupCnxn() {
+ synchronized (outgoingQueue) {
+ outgoingQueue.notifyAll();
+ }
+ }
+
+ @Override
+ void enableWrite() {
+
+ }
+
+ @Override
+ void doTransport(int waitTimeOut, List<Packet> pendingQueue)
+ throws EndOfStreamException {
+ if (disconnected) {
+ throw new EndOfStreamException("connection for sessionid 0x"
+ + Long.toHexString(sessionId)
+ + " lost, likely server has closed socket");
+
+ }
+
+ // channel may not have been connected yet
+ if (isConnected() && doWrites(pendingQueue)) {
+ updateLastSend();
+ }
+
+ if (sendThread.getZkState().isAlive()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("WAIT to=" + waitTimeOut + " sessionId:0x"
+ + Long.toHexString(sessionId));
+ }
+ try {
+ synchronized (outgoingQueue) {
+ outgoingQueue.wait(waitTimeOut);
+ }
+ } catch (InterruptedException e) {
+ LOG.trace("WOKE via interrupt sessionId:0x"
+ + Long.toHexString(sessionId));
+ } finally {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("WOKE sessionId:0x" + Long.toHexString(sessionId));
+ }
+ }
+
+ }
+ // Everything below and until we get back to the wait is
+ // non blocking, so time is effectively a constant. That is
+ // Why we just have to do this once, here
+ updateNow();
+ }
+
+ private class ZKClientHandler extends SimpleChannelHandler {
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ disconnected = true;
+ wakeupCnxn();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel connected " + e);
+ }
+ channel = ctx.getChannel();
+
+ long now = System.currentTimeMillis();
+
+ lastHeard = now;
+ lastSend = now;
+
+ sendThread.primeConnection();
+
+ initialized = false;
+
+ /*
+ * Reset incomingBuffer
+ */
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+
+ wakeupCnxn();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws IOException {
+ lastHeard = System.currentTimeMillis();
+
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ while (buf.readable()) {
+ if (incomingBuffer.remaining() > buf.readableBytes()) {
+ int newLimit = incomingBuffer.position()
+ + buf.readableBytes();
+ incomingBuffer.limit(newLimit);
+ }
+ buf.readBytes(incomingBuffer);
+ incomingBuffer.limit(incomingBuffer.capacity());
+
+ if (!incomingBuffer.hasRemaining()) {
+ incomingBuffer.flip();
+ if (incomingBuffer == lenBuffer) {
+ recvCount++;
+ readLength();
+ } else if (!initialized) {
+ readConnectResult();
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ initialized = true;
+ } else {
+ synchronized (outgoingQueue) {
+ sendThread.readResponse(incomingBuffer);
+ }
+
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void channelInterestChanged(ChannelHandlerContext ctx,
+ ChannelStateEvent e) {
+ if (e.getState() == ChannelState.INTEREST_OPS) {
+ // handle the case where OP_WRITE changes
+ wakeupCnxn();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Exception caught " + e, e.getCause());
+ }
+ }
+}
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java b/src/java/main/org/apache/zookeeper/ZooKeeper.java
index 48aab01..eb3eb5c 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeper.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java
@@ -105,6 +105,7 @@
*/
public class ZooKeeper {
private static final Logger LOG;
+ public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
static {
LOG = Logger.getLogger(ZooKeeper.class);
@@ -154,6 +155,7 @@
/* (non-Javadoc)
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
*/
+ @Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
@@ -322,8 +324,6 @@
}
}
- volatile States state;
-
protected final ClientCnxn cnxn;
/**
@@ -376,7 +376,8 @@
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
- cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager);
+ cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+ watchManager, getClientCnxnSocket());
cnxn.start();
}
@@ -443,8 +444,8 @@
+ (sessionPasswd == null ? "<null>" : "<hidden>"));
watchManager.defaultWatcher = watcher;
- cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager,
- sessionId, sessionPasswd);
+ cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+ watchManager, getClientCnxnSocket(), sessionId, sessionPasswd);
cnxn.start();
}
@@ -518,7 +519,7 @@
* @throws InterruptedException
*/
public synchronized void close() throws InterruptedException {
- if (!state.isAlive()) {
+ if (!cnxn.getState().isAlive()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Close called on already closed client");
}
@@ -1557,7 +1558,7 @@
}
public States getState() {
- return state;
+ return cnxn.getState();
}
/**
@@ -1617,7 +1618,7 @@
* not connected
*/
protected SocketAddress testableRemoteSocketAddress() {
- return cnxn.getRemoteSocketAddress();
+ return cnxn.sendThread.getSocket().getRemoteSocketAddress();
}
/**
@@ -1630,6 +1631,23 @@
* not connected
*/
protected SocketAddress testableLocalSocketAddress() {
- return cnxn.getLocalSocketAddress();
+ return cnxn.sendThread.getSocket().getLocalSocketAddress();
+ }
+
+ private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
+ String clientCnxnSocketName = System
+ .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+ if (clientCnxnSocketName == null) {
+ clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
+ }
+ try {
+ return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
+ .newInstance();
+ } catch (Exception e) {
+ IOException ioe = new IOException("Couldn't instantiate "
+ + clientCnxnSocketName);
+ ioe.initCause(e);
+ throw ioe;
+ }
}
}
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeperMain.java b/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
index d8fad42..26a572a 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
@@ -653,7 +653,7 @@
}
// Below commands all need a live connection
- if (zk == null || !zk.state.isAlive()) {
+ if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 94d8393..9b1ce5f 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -24,7 +24,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
-import java.io.StringWriter;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index b66f400..0b2eb7d 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -34,7 +34,6 @@
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -61,7 +60,6 @@
* NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
* this class gets access to the member variables and methods.
*/
- @ChannelPipelineCoverage("all")
class CnxnChannelHandler extends SimpleChannelHandler {
@Override
diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml
index c09eb16..789ed3f 100644
--- a/src/java/test/config/findbugsExcludeFile.xml
+++ b/src/java/test/config/findbugsExcludeFile.xml
@@ -27,7 +27,12 @@
<!-- We want to catch all exceptions and cleanup, regardless of source
(incl runtime) -->
<Match>
- <Class name="org.apache.zookeeper.ClientCnxn$SendThread" />
+ <Class name="org.apache.zookeeper.NIOClientCnxn$NIOSendThread" />
+ <Method name="run" />
+ <Bug pattern="REC_CATCH_EXCEPTION" />
+ </Match>
+ <Match>
+ <Class name="org.apache.zookeeper.NettyClientCnxn$NettySendThread" />
<Method name="run" />
<Bug pattern="REC_CATCH_EXCEPTION" />
</Match>
@@ -79,6 +84,14 @@
<Class name="org.apache.zookeeper.ClientCnxn"/>
<Bug code="EI, EI2" />
</Match>
+ <Match>
+ <Class name="org.apache.zookeeper.NIOClientCnxn"/>
+ <Bug code="EI, EI2" />
+ </Match>
+ <Match>
+ <Class name="org.apache.zookeeper.NettyClientCnxn"/>
+ <Bug code="EI, EI2" />
+ </Match>
<Match>
<Class name="org.apache.zookeeper.server.DataNode"/>
diff --git a/src/java/test/org/apache/zookeeper/TestableZooKeeper.java b/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
index 4de2098..f8344b6 100644
--- a/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
+++ b/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
@@ -59,7 +59,7 @@
synchronized(cnxn) {
try {
try {
- ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+ cnxn.sendThread.testableCloseSocket();
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/src/java/test/org/apache/zookeeper/ThreadUtil.java b/src/java/test/org/apache/zookeeper/ThreadUtil.java
new file mode 100644
index 0000000..c5045ca
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/ThreadUtil.java
@@ -0,0 +1,70 @@
+package org.apache.zookeeper;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ThreadUtil {
+
+ public static ThreadGroup getRootThreadGroup() {
+ ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+ ThreadGroup parentThreadGroup;
+ while ( null != (parentThreadGroup = threadGroup.getParent()) ){
+ threadGroup = parentThreadGroup;
+ }
+ return threadGroup;
+ }
+
+ public static Thread[] getAllThreads() {
+ final ThreadGroup root = getRootThreadGroup();
+ int arraySize = ManagementFactory.getThreadMXBean().getThreadCount();
+ int returnedThreads = 0;
+ Thread[] threads;
+ do {
+ arraySize *= 2;
+ threads = new Thread[arraySize];
+ returnedThreads = root.enumerate( threads, true );
+ } while ( returnedThreads >= arraySize );
+ return java.util.Arrays.copyOf( threads, returnedThreads );
+ }
+
+ public static List<Thread> getThreadsFiltered(String pattern) {
+ Thread[] allThreads = getAllThreads();
+ ArrayList<Thread> filteredThreads = new ArrayList<Thread>();
+
+
+ for(int i=0;i<allThreads.length;++i){
+ Thread currentThread = allThreads[i];
+ if(currentThread.getName().contains(pattern)){
+ filteredThreads.add(currentThread);
+ }
+ }
+ return filteredThreads;
+ }
+
+ public static List<Thread> getThreadsFiltered(String pattern, Thread exclude){
+ List<Thread> filteredThreads = getThreadsFiltered(pattern);
+ filteredThreads.remove(exclude);
+ return filteredThreads;
+ }
+
+ public static String formatThread(Thread thread){
+ StringBuilder out = new StringBuilder();
+ out.append("Name: ")
+ .append(thread.getName())
+ .append(" State: ")
+ .append(thread.getState())
+ .append(" Prio: ")
+ .append(thread.getPriority())
+ .append("\nTrace:\n");
+
+ StackTraceElement[] trace = thread.getStackTrace();
+
+ for(int i=0;i<trace.length;++i){
+ out.append(trace[i]).append("\n");
+ }
+
+ return out.toString();
+ }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java b/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
index e7bff22..d183e69 100644
--- a/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
+++ b/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
@@ -181,7 +181,7 @@
LOG.info("Stopping hammers");
for (int i = 0; i < hammers.length; i++) {
hammers[i].interrupt();
- verifyThreadTerminated(hammers[i], 60000);
+ verifyThreadTerminated(hammers[i], i, 60000);
Assert.assertFalse(hammers[i].failed);
}
@@ -210,7 +210,7 @@
bang = false;
for (int i = 0; i < hammers.length; i++) {
hammers[i].interrupt();
- verifyThreadTerminated(hammers[i], 60000);
+ verifyThreadTerminated(hammers[i], i, 60000);
}
// before restart
qb.verifyRootOfAllServersMatch(qb.hostPort);
diff --git a/src/java/test/org/apache/zookeeper/test/ClientBase.java b/src/java/test/org/apache/zookeeper/test/ClientBase.java
index 110a882..8655a17 100644
--- a/src/java/test/org/apache/zookeeper/test/ClientBase.java
+++ b/src/java/test/org/apache/zookeeper/test/ClientBase.java
@@ -43,6 +43,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.ThreadUtil;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -298,15 +299,27 @@
return false;
}
- static void verifyThreadTerminated(Thread thread, long millis)
+ static void verifyThreadTerminated(Thread thread, int index, long millis)
throws InterruptedException
{
+ long start = java.lang.System.currentTimeMillis();
thread.join(millis);
+ long end = java.lang.System.currentTimeMillis();
if (thread.isAlive()) {
- LOG.error("Thread " + thread.getName() + " : "
- + Arrays.toString(thread.getStackTrace()));
- Assert.assertFalse("thread " + thread.getName()
- + " still alive after join", true);
+ List<Thread> otherThreads = ThreadUtil.getThreadsFiltered(String.valueOf(index), thread);
+ StringBuilder err = new StringBuilder();
+ err.append("Thread that did not join:\n")
+ .append(ThreadUtil.formatThread(thread))
+ .append("other Threads:\n");
+ for(Thread otherThread : otherThreads){
+ err.append(ThreadUtil.formatThread(otherThread));
+ }
+
+ LOG.error(err);
+ Assert.fail("thread " + thread.getName()
+ + " still alive after waiting "
+ + (end - start)
+ +" milliseconds for join");
}
}
diff --git a/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java b/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java
index aa7bcf5..827c30a 100644
--- a/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java
+++ b/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java
@@ -210,9 +210,10 @@
LOG.info("Hammer threads completed creation operations");
}
- for (HammerThread h : threads) {
+ for (int i=0;i<threads.length;++i) {
+ HammerThread h = threads[i];
final int safetyFactor = 3;
- verifyThreadTerminated(h,
+ verifyThreadTerminated(h, i,
threads.length * childCount
* HAMMERTHREAD_LATENCY * safetyFactor);
}
diff --git a/src/java/test/org/apache/zookeeper/test/ClientTest.java b/src/java/test/org/apache/zookeeper/test/ClientTest.java
index 8c2b6cf..f316016 100644
--- a/src/java/test/org/apache/zookeeper/test/ClientTest.java
+++ b/src/java/test/org/apache/zookeeper/test/ClientTest.java
@@ -32,15 +32,15 @@
import org.apache.log4j.Priority;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.TestableZooKeeper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.InvalidACLException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
@@ -726,6 +726,8 @@
Assert.assertTrue(threads[i].current == threads[i].count);
}
+ long currentlyOpen = unixos.getOpenFileDescriptorCount();
+ LOG.info("initial:" + initialFdCount + " currentlyOpen:" + currentlyOpen);
// if this Assert.fails it means we are not cleaning up after the closed
// sessions.
long currentCount = unixos.getOpenFileDescriptorCount();
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
new file mode 100644
index 0000000..e893cf3
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@RunWith(Suite.class)
+public class NettyNettySuiteBase {
+ @BeforeClass
+ public static void setUp() {
+ System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+ ClientCnxnSocketNetty.class.getName());
+ System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+ NettyServerCnxnFactory.class.getName());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+ System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+ }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
new file mode 100644
index 0000000..c5805ea
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+ AsyncHammerTest.class
+ })
+public class NettyNettySuiteHammerTest extends NettyNettySuiteBase {
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
new file mode 100644
index 0000000..48c5e4d
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+ ACLTest.class,
+ AsyncOpsTest.class,
+ ChrootClientTest.class,
+ ClientTest.class,
+ FourLetterWordsTest.class,
+ NullDataTest.class,
+ SessionTest.class,
+ WatcherTest.class
+ })
+public class NettyNettySuiteTest extends NettyNettySuiteBase {
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNioSuiteBase.java b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteBase.java
new file mode 100644
index 0000000..c749220
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteBase.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ClientCnxnSocketNIO;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Nio server
+ */
+@RunWith(Suite.class)
+public class NettyNioSuiteBase {
+ @BeforeClass
+ public static void setUp() {
+ System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+ ClientCnxnSocketNIO.class.getName());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+ }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNioSuiteHammerTest.java b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteHammerTest.java
new file mode 100644
index 0000000..29b5122
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteHammerTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Nio server
+ */
+@Suite.SuiteClasses({
+ AsyncHammerTest.class
+ })
+public class NettyNioSuiteHammerTest extends NettyNioSuiteBase {
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNioSuiteTest.java b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteTest.java
new file mode 100644
index 0000000..36b765b
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/NettyNioSuiteTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Nio server
+ */
+@Suite.SuiteClasses({
+ ACLTest.class,
+ AsyncOpsTest.class,
+ ChrootClientTest.class,
+ ClientTest.class,
+ FourLetterWordsTest.class,
+ NullDataTest.class,
+ SessionTest.class,
+ WatcherTest.class
+ })
+public class NettyNioSuiteTest extends NettyNioSuiteBase {
+}