Current state of event channel code.

git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489159 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/cpp/configure.ac b/cpp/configure.ac
index 0334b00..b0947bd 100644
--- a/cpp/configure.ac
+++ b/cpp/configure.ac
@@ -25,14 +25,6 @@
 
 AM_MISSING_PROG([HELP2MAN], [help2man])
 
-AC_ARG_ENABLE(warnings,
-[  --enable-warnings   turn on lots of compiler warnings (recommended)],
-[case "${enableval}" in
-   yes|no) ;;
-   *)      AC_MSG_ERROR([bad value ${enableval} for warnings option]) ;;
- esac],
- [enableval=yes])
-
 # Turn on this automake conditional if we are in a qpid
 # hierarchy (i.e. with gentools/ and specs/ sibling directories),
 # and if we have working java + javac.
@@ -53,7 +45,12 @@
 # -Wunreachable-code -Wpadded -Winline
 # -Wshadow - warns about boost headers.
 
-if test "${enableval}" = yes; then
+AC_ARG_ENABLE(warnings,
+  [AS_HELP_STRING([--disable-warnings],
+	          [Disable compiler warnings (default enabled)])],
+    [], [enable_warnings=yes])
+
+if test "${enable_warnings}" = yes; then
   gl_COMPILER_FLAGS(-Werror)
   gl_COMPILER_FLAGS(-pedantic)
   gl_COMPILER_FLAGS(-Wall)
@@ -106,7 +103,7 @@
    yes|no) ;;
    *)      AC_MSG_ERROR([invalid APR enable/disable value: $enable_APR]) ;;
   esac],
-[enable_APR=yes])
+[enable_APR=no])
 
 APR_MINIMUM_VERSION=1.2.2
 AC_SUBST(APR_MINIMUM_VERSION)
diff --git a/cpp/lib/broker/AutoDelete.cpp b/cpp/lib/broker/AutoDelete.cpp
index ae48d10..6d87c98 100644
--- a/cpp/lib/broker/AutoDelete.cpp
+++ b/cpp/lib/broker/AutoDelete.cpp
@@ -63,7 +63,7 @@
     Monitor::ScopedLock l(monitor);
     while(!stopped){
         process();
-        monitor.wait(period*TIME_MSEC);
+        monitor.wait(now() + period*TIME_MSEC);
     }
 }
 
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index 6c0d7a3..e2265a2 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -47,7 +47,7 @@
 }    
         
 void Broker::run() {
-    acceptor->run(&factory);
+    acceptor->run(factory);
 }
 
 void Broker::shutdown() {
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 5d4f68a..979617b 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -18,11 +18,15 @@
  * under the License.
  *
  */
-#include <BrokerChannel.h>
-#include <QpidError.h>
+#include <assert.h>
+
 #include <iostream>
 #include <sstream>
-#include <assert.h>
+
+#include <boost/bind.hpp>
+
+#include "BrokerChannel.h"
+#include "QpidError.h"
 
 using std::mem_fun_ref;
 using std::bind2nd;
@@ -50,11 +54,18 @@
 }
 
 bool Channel::exists(const string& consumerTag){
+    Mutex::ScopedLock l(lock);
     return consumers.find(consumerTag) != consumers.end();
 }
 
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){
-	if(tag.empty()) tag = tagGenerator.generate();
+void Channel::consume(
+    string& tag, Queue::shared_ptr queue, bool acks,
+    bool exclusive, ConnectionToken* const connection, const FieldTable*)
+{
+    Mutex::ScopedLock l(lock);
+    if(tag.empty()) tag = tagGenerator.generate();
+    // TODO aconway 2006-12-13: enforce ownership of consumer
+    // with auto_ptr.
     ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
     try{
         queue->consume(c, exclusive);//may throw exception
@@ -65,7 +76,8 @@
     }
 }
 
-void Channel::cancel(consumer_iterator i){
+void Channel::cancel(consumer_iterator i) {
+    // Private, must be called with lock held.
     ConsumerImpl* c = i->second;
     consumers.erase(i);
     if(c){
@@ -75,6 +87,7 @@
 }
 
 void Channel::cancel(const string& tag){
+    Mutex::ScopedLock l(lock);
     consumer_iterator i = consumers.find(tag);
     if(i != consumers.end()){
         cancel(i);
@@ -82,11 +95,14 @@
 }
 
 void Channel::close(){
-    //cancel all consumers
-    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
-        cancel(i);
+    {
+        Mutex::ScopedLock l(lock);
+        while(!consumers.empty()) {
+            cancel(consumers.begin());
+        }
     }
-    //requeue:
+    // TODO aconway 2006-12-13: does recovery need to be atomic with
+    // cancelling all consumers?
     recover(true);
 }
 
@@ -109,20 +125,21 @@
 }
 
 void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
-    Mutex::ScopedLock locker(deliveryLock);
-
-    u_int64_t deliveryTag = currentDeliveryTag++;
-    if(ackExpected){
-        unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
-        outstanding.size += msg->contentSize();
-        outstanding.count++;
+    u_int64_t deliveryTag;
+    {
+        Mutex::ScopedLock l(lock);
+        deliveryTag = currentDeliveryTag++;
+        if(ackExpected){
+            unacked.push_back(
+                DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+            outstanding.size += msg->contentSize();
+            outstanding.count++;
+        }
     }
-    //send deliver method, header and content(s)
     msg->deliver(out, id, consumerTag, deliveryTag, framesize);
 }
 
 bool Channel::checkPrefetch(Message::shared_ptr& msg){
-    Mutex::ScopedLock locker(deliveryLock);
     bool countOk = !prefetchCount || prefetchCount > unacked.size();
     bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
     return countOk && sizeOk;
@@ -187,71 +204,99 @@
         }
         exchange.reset();
     }else{
-        std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
+        std::cout << "Exchange not known in" << BOOST_CURRENT_FUNCTION
+                  << std::endl;
     }
 }
 
-void Channel::ack(u_int64_t deliveryTag, bool multiple){
+void Channel::ack(u_int64_t deliveryTag, bool multiple) {
     if(transactional){
+        Mutex::ScopedLock locker(lock);    
         accumulatedAck.update(deliveryTag, multiple);
-        //TODO: I think the outstanding prefetch size & count should be updated at this point...
+        //TODO: I think the outstanding prefetch size & count should
+        //be updated at this point...
         //TODO: ...this may then necessitate dispatching to consumers
-    }else{
-        Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-    
-        ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
-        if(i == unacked.end()){
-            throw InvalidAckException();
-        }else if(multiple){     
-            ack_iterator end = ++i;
-            for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
-            unacked.erase(unacked.begin(), end);
+    }
+    else {
+        {
+            Mutex::ScopedLock locker(lock);    
+            ack_iterator i = find_if(
+                unacked.begin(), unacked.end(),
+                boost::bind(&DeliveryRecord::matches, _1, deliveryTag));
+            if(i == unacked.end()) {
+                throw InvalidAckException();
+            }
+            else if(multiple) {     
+                ack_iterator end = ++i;
+                for_each(unacked.begin(), end,
+                         mem_fun_ref(&DeliveryRecord::discard));
+                unacked.erase(unacked.begin(), end);
 
-            //recalculate the prefetch:
-            outstanding.reset();
-            for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
-        }else{
-            i->discard();
-            i->subtractFrom(&outstanding);
-            unacked.erase(i);        
+                //recalculate the prefetch:
+                outstanding.reset();
+                for_each(
+                    unacked.begin(), unacked.end(),
+                    boost::bind(&DeliveryRecord::addTo, _1, &outstanding));
+            }
+            else {
+                i->discard();
+                i->subtractFrom(&outstanding);
+                unacked.erase(i);        
+            }
         }
-
         //if the prefetch limit had previously been reached, there may
         //be messages that can be now be delivered
-        for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+
+        // TODO aconway 2006-12-13: Does this need to be atomic?
+        // If so we need a redesign, requestDispatch re-enters
+        // Channel::dispatch.
+        // 
+       for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
             j->second->requestDispatch();
         }
     }
 }
 
-void Channel::recover(bool requeue){
-    Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
-    if(requeue){
-        outstanding.reset();
-        std::list<DeliveryRecord> copy = unacked;
-        unacked.clear();
-        for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
-    }else{
-        for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));        
+void Channel::recover(bool requeue) {
+    std::list<DeliveryRecord> copyUnacked;
+    boost::function1<void, DeliveryRecord&> recoverFn;
+    {
+        Mutex::ScopedLock l(lock);
+        if(requeue) {
+            outstanding.reset();
+            copyUnacked.swap(unacked);
+            recoverFn = boost::bind(&DeliveryRecord::requeue, _1);
+        }
+        else {
+            copyUnacked = unacked;
+            recoverFn = boost::bind(&DeliveryRecord::redeliver, _1, this);
+        }
     }
+    // TODO aconway 2006-12-13: Does recovery of copyUnacked have to
+    // be atomic with extracting the list?
+    for_each(copyUnacked.begin(), copyUnacked.end(), recoverFn);
 }
 
 bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+    Mutex::ScopedLock l(lock);
+    // TODO aconway 2006-12-13: Nasty to have all these external calls
+    // inside a critical.section but none appear to have blocking potential.
+    // sendGetOk does non-blocking IO
+    // 
     Message::shared_ptr msg = queue->dequeue();
-    if(msg){
-        Mutex::ScopedLock locker(deliveryLock);
+    if(msg) {
         u_int64_t myDeliveryTag = currentDeliveryTag++;
-        msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+        u_int32_t count = queue->getMessageCount();
+        msg->sendGetOk(out, id, count + 1, myDeliveryTag, framesize);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
         return true;
-    }else{
-        return false;
     }
+    return false;
 }
 
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
+                      u_int64_t deliveryTag){
     msg->deliver(out, id, consumerTag, deliveryTag, framesize);
 }
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index fa3912c..776862d 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -79,10 +79,10 @@
             u_int32_t prefetchSize;    
             u_int16_t prefetchCount;    
             Prefetch outstanding;
-            u_int32_t framesize;
+            const u_int32_t framesize;
             NameGenerator tagGenerator;
             std::list<DeliveryRecord> unacked;
-            qpid::sys::Mutex deliveryLock;
+            qpid::sys::Mutex lock;
             TxBuffer txBuffer;
             AccumulatedAck accumulatedAck;
             MessageStore* const store;
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 598de2d..148dbef 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -143,7 +143,8 @@
 {    
     u_int64_t expected = expectedContentSize();
     if (expected != buffer.available()) {
-        std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
+        std::cout << "WARN: Expected " << expectedContentSize()
+                  << " bytes, got " << buffer.available() << std::endl;
         throw Exception("Cannot decode content, buffer not large enough.");
     }
 
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 0e48d3b..deb7c38 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -83,7 +83,8 @@
         return false;
     }else if(exclusive){
         if(!exclusive->deliver(msg)){
-            std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+            std::cout << "WARNING: Dropping undeliverable message "
+                      << "from queue with exclusive consumer." << std::endl;
         }
         return true;
     }else{
diff --git a/cpp/lib/broker/TopicExchange.cpp b/cpp/lib/broker/TopicExchange.cpp
index 3ebb3c8..73a6338 100644
--- a/cpp/lib/broker/TopicExchange.cpp
+++ b/cpp/lib/broker/TopicExchange.cpp
@@ -81,10 +81,7 @@
 
 
 namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
-// Need StringRef class that operates on a string in place witout copy.
-// Should be applied everywhere strings are extracted from frames.
-// 
+
 bool do_match(Tokens::const_iterator pattern_begin,  Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin,  Tokens::const_iterator target_end)
 {
     // Invariant: [pattern_begin..p) matches [target_begin..t)
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index ac8b4a9..244831e 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -28,32 +28,33 @@
 
 qpid::client::ResponseHandler::~ResponseHandler(){}
 
-bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
+bool qpid::client::ResponseHandler::validate(
+    const qpid::framing::AMQMethodBody& expected)
+{
     return expected.match(response.get());
 }
 
 void qpid::client::ResponseHandler::waitForResponse(){
     Monitor::ScopedLock l(monitor);
-    if(waiting){
+    while(waiting)
 	monitor.wait();
-    }
 }
 
-void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
-    response = _response;
+void qpid::client::ResponseHandler::signalResponse(
+    qpid::framing::AMQMethodBody::shared_ptr _response)
+{
     Monitor::ScopedLock l(monitor);
+    response = _response;
     waiting = false;
     monitor.notify();
 }
 
 void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
     Monitor::ScopedLock l(monitor);
-    if(waiting){
+    while(waiting)
 	monitor.wait();
-    }
-    if(!validate(expected)){
+    if(!validate(expected))
 	THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
-    }
 }
 
 void qpid::client::ResponseHandler::expect(){
diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp
index 0161518..ef88c5c 100644
--- a/cpp/lib/common/Exception.cpp
+++ b/cpp/lib/common/Exception.cpp
@@ -20,6 +20,7 @@
  */
 
 #include <Exception.h>
+#include <iostream>
 
 namespace qpid {
 
@@ -29,14 +30,32 @@
 
 Exception::Exception(const char* str) throw() : whatStr(str) {}
 
+Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {}
+
 Exception::~Exception() throw() {}
 
 const char* Exception::what() const throw() { return whatStr.c_str(); }
 
 std::string Exception::toString() const throw() { return whatStr; }
 
-Exception* Exception::clone() const throw() { return new Exception(*this); }
+Exception::auto_ptr Exception::clone() const throw() {
+    return Exception::auto_ptr(new Exception(*this));
+}
 
 void Exception::throwSelf() const  { throw *this; }
 
+const char* Exception::defaultMessage = "Unexpected exception";
+
+void Exception::log(const char* what, const char* message) {
+    std::cout << message << ": " << what << std::endl;
+}
+
+void Exception::log(const std::exception& e, const char* message) {
+    log(e.what(), message);
+}
+
+void Exception::logUnknown(const char* message) {
+    log("unknown exception.", message);
+}
+
 } // namespace qpid
diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h
index f35d427..185c395 100644
--- a/cpp/lib/common/Exception.h
+++ b/cpp/lib/common/Exception.h
@@ -26,6 +26,7 @@
 #include <string>
 #include <memory>
 #include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
 
 namespace qpid
 {
@@ -38,6 +39,10 @@
     std::string whatStr;
 
   public:
+    typedef boost::shared_ptr<Exception> shared_ptr;
+    typedef boost::shared_ptr<const Exception> shared_ptr_const;
+    typedef std::auto_ptr<Exception> auto_ptr;
+
     Exception() throw();
     Exception(const std::string& str) throw();
     Exception(const char* str) throw();
@@ -48,14 +53,65 @@
     virtual const char* what() const throw();
     virtual std::string toString() const throw();
 
-    virtual Exception* clone() const throw();
+    virtual std::auto_ptr<Exception> clone() const throw();
     virtual void throwSelf() const;
 
-    typedef boost::shared_ptr<Exception> shared_ptr;
+
+    /** Default message: "Unknown exception" or something like it. */
+    static const char* defaultMessage;
+
+    /**
+     * Log a message of the form "message: what"
+     *@param what Exception's what() message.
+     *@param message Prefix message.
+     */
+    static void log(const char* what, const char* message = defaultMessage);
+
+    /**
+     * Log an exception.
+     *@param e Exception to log.
+
+     */
+    static void log(
+        const std::exception& e, const char* message = defaultMessage);
+    
+
+    /**
+     * Log an unknown exception - use in catch(...)
+     *@param message Prefix message.
+     */
+    static void logUnknown(const char* message = defaultMessage);
+
+    /**
+     * Wrapper template function to call another function inside
+     * try/catch and log any exception. Use boost::bind to wrap
+     * member function calls or functions with arguments.
+     * 
+     *@param f Function to call in try block.
+     *@param retrhow If true the exception is rethrown.
+     *@param message Prefix message.
+     */
+    template <class T>
+    static T tryCatchLog(boost::function0<T> f, bool rethrow=true,
+                         const char* message=defaultMessage)
+    {
+        try {
+            return f();
+        }
+        catch (const std::exception& e) {
+            log(e, message);
+            if (rethrow)
+                throw;
+        }
+        catch (...) {
+            logUnknown(message);
+            if (rethrow)
+                throw;
+        }
+    }
+    
 };
-
-
-
-}
+    
+} // namespace qpid
 
 #endif  /*!_Exception_*/
diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp
index de8d7b2..e69de29 100644
--- a/cpp/lib/common/ExceptionHolder.cpp
+++ b/cpp/lib/common/ExceptionHolder.cpp
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "ExceptionHolder.h"
-
-namespace qpid {
-
-ExceptionHolder::ExceptionHolder(const std::exception& e) {
-    const Exception* ex = dynamic_cast<const Exception*>(&e);
-    if (ex) {
-        reset(ex->clone());
-    } else {
-        reset(new Exception(e.what()));
-    }
-}
-
-}
diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h
index 83d0884..e69de29 100644
--- a/cpp/lib/common/ExceptionHolder.h
+++ b/cpp/lib/common/ExceptionHolder.h
@@ -1,62 +0,0 @@
-#ifndef _qpid_ExceptionHolder_h
-#define _qpid_ExceptionHolder_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <Exception.h>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-
-/**
- * Holder for a heap-allocated exc eption that can be stack allocated
- * and thrown safely.
- *
- * Basically this is a shared_ptr with the Exception functions added
- * so the catcher need not be aware that it is a pointer rather than a
- * reference.
- * 
- * shared_ptr is chosen over auto_ptr because it has normal 
- * copy semantics. 
- */
-class ExceptionHolder : public Exception, public boost::shared_ptr<Exception>
-{
-  public:
-    typedef boost::shared_ptr<Exception> shared_ptr;
-
-    ExceptionHolder() throw() {}
-    ExceptionHolder(Exception* p) throw() : shared_ptr(p) {}
-    ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {}
-
-    ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {}
-    ExceptionHolder(const std::exception& e);
-
-    ~ExceptionHolder() throw() {}
-
-    const char* what() const throw() { return (*this)->what(); }
-    std::string toString() const throw() { return (*this)->toString(); }
-    virtual Exception* clone() const throw() { return (*this)->clone(); }
-    virtual void throwSelf() const { (*this)->throwSelf(); }
-};
-
-} // namespace qpid
-
-
-
-#endif  /*!_qpid_ExceptionHolder_h*/
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index e1f7503..997558f 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -24,21 +24,23 @@
   $(apr)/LFSessionContext.h	
 
 posix = sys/posix
-posix_src =				\
-  $(posix)/PosixAcceptor.cpp		\
-  $(posix)/Socket.cpp			\
-  $(posix)/Thread.cpp			\
-  $(posix)/check.cpp			\
-  $(posix)/EventChannel.cpp		\
-  $(posix)/EventChannelThreads.cpp		
-posix_hdr = 				\
-  $(posix)/check.h			\
-  $(posix)/EventChannel.h		\
-  $(posix)/EventChannelThreads.h
+posix_src =					\
+  $(posix)/EventChannelAcceptor.cpp		\
+  $(posix)/Socket.cpp				\
+  $(posix)/Thread.cpp				\
+  $(posix)/check.cpp				\
+  $(posix)/EventChannel.cpp			\
+  $(posix)/EventChannelThreads.cpp		\
+  $(posix)/EventChannelConnection.cpp		
+posix_hdr = 					\
+  $(posix)/check.h				\
+  $(posix)/EventChannel.h			\
+  $(posix)/EventChannelThreads.h		\
+  $(posix)/EventChannelConnection.h
 
-EXTRA_DIST=$(posix_src) $(posix_hdr)
-platform_src = $(apr_src)
-platform_hdr = $(apr_hdr)
+EXTRA_DIST=$(apr_src) $(apr_hdr)
+platform_src = $(posix_src)
+platform_hdr = $(posix_hdr)
 
 framing = framing
 gen     = $(srcdir)/../../gen
@@ -76,7 +78,6 @@
   $(gen)/AMQP_MethodVersionMap.cpp		\
   $(gen)/AMQP_ServerProxy.cpp			\
   Exception.cpp					\
-  ExceptionHolder.cpp				\
   QpidError.cpp					\
   sys/Runnable.cpp				\
   sys/Time.cpp
@@ -107,7 +108,6 @@
   $(framing)/amqp_types.h			\
   $(framing)/AMQP_HighestVersion.h			\
   Exception.h					\
-  ExceptionHolder.h				\
   QpidError.h					\
   SharedObject.h				\
   sys/Acceptor.h				\
diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp
index 7f4f9e2..9cbd66c 100644
--- a/cpp/lib/common/QpidError.cpp
+++ b/cpp/lib/common/QpidError.cpp
@@ -22,23 +22,41 @@
 #include <QpidError.h>
 #include <sstream>
 
-using namespace qpid;
+namespace qpid {
 
 QpidError::QpidError() : code(0) {}
 
-QpidError::QpidError(int _code, const std::string& _msg,
-                     const SrcLine& _loc) throw()
+QpidError::QpidError(
+    int _code, const std::string& _msg, Location _loc) throw()
     : code(_code), msg(_msg), location(_loc)
 {
-    std::ostringstream os;
-    os << "Error [" << code << "] " << msg << " ("
-       << location.file << ":" << location.line << ")";
-    whatStr = os.str();
+    setWhat();
+}
+
+QpidError::QpidError(
+    int _code, const char* _msg, Location _loc) throw()
+    : code(_code), msg(_msg), location(_loc)
+{
+    setWhat();
 }
 
 QpidError::~QpidError() throw() {}
 
-Exception* QpidError::clone() const throw() { return new QpidError(*this); }
+Exception::auto_ptr QpidError::clone() const throw() {
+    return Exception::auto_ptr(new QpidError(*this));
+}
 
 void QpidError::throwSelf() const  { throw *this; }
 
+void QpidError::setWhat() {
+    std::ostringstream os;
+    os << "Error [" << code << "] " << msg;
+    if (location.file) {
+        os << " (" ;
+        os << location.file << ":" << location.line;
+        os << ")";
+    }
+    whatStr = os.str();
+}
+
+} // namespace qpid
diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h
index 30d9d27..9a47aa5 100644
--- a/cpp/lib/common/QpidError.h
+++ b/cpp/lib/common/QpidError.h
@@ -24,37 +24,45 @@
 #include <memory>
 #include <ostream>
 #include <Exception.h>
+#include <boost/current_function.hpp>
 
 namespace qpid {
 
-struct SrcLine {
-  public:
-    SrcLine(const std::string& file_="", int line_=0) :
-        file(file_), line(line_) {}
-
-    std::string file;
-    int line;
-};
-    
 class QpidError : public Exception { 
   public:
+    // Use macro QPID_LOCATION to construct a location.
+    struct Location {
+        Location(const char* function_=0, const char* file_=0, int line_=0) :
+            function(function_), file(file_), line(line_) {}
+        const char* function;
+        const char* file;
+        int line;
+    };
+
     const int code;
     const std::string msg;
-    const SrcLine location;
+    const Location location;
 
     QpidError();
-    QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw();
+    QpidError(int _code, const char* _msg, const Location _loc) throw();
+    QpidError(int _code, const std::string& _msg, const Location _loc) throw();
+    
     ~QpidError() throw();
-    Exception* clone() const throw();
+    Exception::auto_ptr clone() const throw();
     void throwSelf() const;
+
+  private:
+    void setWhat();
 };
 
 
 } // namespace qpid
 
-#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__)
+#define QPID_ERROR_LOCATION \
+    ::qpid::QpidError::Location(BOOST_CURRENT_FUNCTION, __FILE__, __LINE__)
 
-#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE)
+#define QPID_ERROR(CODE, MESSAGE) \
+    ::qpid::QpidError((CODE), (MESSAGE), QPID_ERROR_LOCATION)
 
 #define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE)
 
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index bec1946..1ff3ff1 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -40,9 +40,9 @@
             static AMQP_MethodVersionMap versionMap;
             
             u_int16_t channel;
-            u_int8_t type;//used if the body is decoded separately from the 'head'
+            u_int8_t type;//used if body decoded separately from 'head'
             AMQBody::shared_ptr body;
-			AMQBody::shared_ptr createMethodBody(Buffer& buffer);
+            AMQBody::shared_ptr createMethodBody(Buffer& buffer);
             
         public:
             AMQFrame();
diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h
index e6bc27a..7aed068 100644
--- a/cpp/lib/common/sys/Acceptor.h
+++ b/cpp/lib/common/sys/Acceptor.h
@@ -33,10 +33,11 @@
 class Acceptor : public qpid::SharedObject<Acceptor>
 {
   public:
-    static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
-    virtual ~Acceptor() = 0;
-    virtual int16_t getPort() const = 0;
-    virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0;
+    static Acceptor::shared_ptr create(
+        int16_t port, int backlog, int threads, bool trace = false);
+    virtual ~Acceptor();
+    virtual int  getPort() const = 0;
+    virtual void run(SessionHandlerFactory& factory) = 0;
     virtual void shutdown() = 0;
 };
 
diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h
index b625b2c..7a95554 100644
--- a/cpp/lib/common/sys/AtomicCount.h
+++ b/cpp/lib/common/sys/AtomicCount.h
@@ -21,36 +21,52 @@
 
 #include <boost/detail/atomic_count.hpp>
 #include <boost/noncopyable.hpp>
+#include <boost/function.hpp>
 
 namespace qpid {
 namespace sys {
 
 /**
+ * Increment counter in constructor and decrement in destructor.
+ * Optionally call a function if the decremented counter value is 0.
+ * Note the function must not throw, it is called in the destructor.
+ */
+template <class Count>
+class ScopedIncrement : boost::noncopyable {
+  public:
+    ScopedIncrement(Count& c, boost::function0<void> f=0)
+        : count(c), callback(f) { ++count; }
+    ~ScopedIncrement() { if (--count == 0 && callback) callback(); }
+
+  private:
+    Count& count;
+    boost::function0<void> callback;
+};
+
+/** Decrement counter in constructor and increment in destructor. */
+template <class Count>
+class ScopedDecrement : boost::noncopyable {
+  public:
+    ScopedDecrement(Count& c) : count(c) { value = --count; }
+    ~ScopedDecrement() { ++count; }
+
+    /** Return the value after the decrement. */
+    operator long() { return value; }
+
+  private:
+    Count& count;
+    long value;
+};
+
+
+/**
  * Atomic counter.
  */
 class AtomicCount : boost::noncopyable {
   public:
-    class ScopedDecrement : boost::noncopyable {
-      public:
-        /** Decrement counter in constructor and increment in destructor. */
-        ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
-        ~ScopedDecrement() { ++count; }
-        /** Return the value returned by the decrement. */
-        operator long() { return value; }
-      private:
-        AtomicCount& count;
-        long value;
-    };
-
-    class ScopedIncrement : boost::noncopyable {
-      public:
-        /** Increment counter in constructor and increment in destructor. */
-        ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
-        ~ScopedIncrement() { --count; }
-      private:
-        AtomicCount& count;
-    };
-
+    typedef ScopedIncrement<AtomicCount> ScopedIncrement;
+    typedef ScopedDecrement<AtomicCount> ScopedDecrement;
+    
     AtomicCount(long value = 0) : count(value) {}
     
     void operator++() { ++count ; }
diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp
index 30122c6..5d4f48a 100644
--- a/cpp/lib/common/sys/Runnable.cpp
+++ b/cpp/lib/common/sys/Runnable.cpp
@@ -29,4 +29,8 @@
     return boost::bind(&Runnable::run, this);
 }
 
+void FunctorRunnable::run() {
+    f();
+}
+
 }}
diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h
index fb3927c..ef18897 100644
--- a/cpp/lib/common/sys/Runnable.h
+++ b/cpp/lib/common/sys/Runnable.h
@@ -44,7 +44,16 @@
     Functor functor();
 };
 
-}}
+/** Runnable wrapper for a functor */
+class FunctorRunnable : public Runnable {
+  public:
+    explicit FunctorRunnable(const Runnable::Functor& runMe) : f(runMe) {}
+    void run();
+  private:
+    Runnable::Functor f;
+};
+
+}} // namespace qpid::sys
 
 
 #endif
diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h
index d793a24..e35ed5b 100644
--- a/cpp/lib/common/sys/Socket.h
+++ b/cpp/lib/common/sys/Socket.h
@@ -70,8 +70,11 @@
      */
     int listen(int port = 0, int backlog = 10);
 
+    /** Accept a connection. This socket must be listening */
+    Socket accept();
+
     /** Get file descriptor */
-    int fd(); 
+    int fd() const; 
     
   private:
 #ifdef USE_APR    
diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h
index 47b95b6..9647dc2 100644
--- a/cpp/lib/common/sys/Thread.h
+++ b/cpp/lib/common/sys/Thread.h
@@ -116,7 +116,8 @@
 }
 
 void Thread::join(){
-    QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+    if (thread != 0)
+        QPID_POSIX_THROW_IF(pthread_join(thread, 0));
 }
 
 long Thread::id() {
diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h
index 3dd4674..4c6951b 100644
--- a/cpp/lib/common/sys/Time.h
+++ b/cpp/lib/common/sys/Time.h
@@ -22,6 +22,7 @@
  *
  */
 
+#include <limits>
 #include <stdint.h>
 
 #ifdef USE_APR
@@ -33,7 +34,7 @@
 namespace qpid {
 namespace sys {
 
-/** Time in nanoseconds */
+/** Time in nanoseconds. */
 typedef int64_t Time;
 
 Time now();
@@ -47,6 +48,9 @@
 /** Nanoseconds per nanosecond. */
 const Time TIME_NSEC = 1;
 
+/** Value to represent an infinite timeout */
+const Time TIME_INFINITE = std::numeric_limits<Time>::max();
+ 
 #ifndef USE_APR
 struct timespec toTimespec(const Time& t);
 struct timespec& toTimespec(struct timespec& ts, const Time& t);
diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp
index 6853833..1bd2381 100644
--- a/cpp/lib/common/sys/apr/APRAcceptor.cpp
+++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -32,20 +32,16 @@
 {
   public:
     APRAcceptor(int16_t port, int backlog, int threads, bool trace);
-    virtual int16_t getPort() const;
-    virtual void run(qpid::sys::SessionHandlerFactory* factory);
+    virtual int getPort() const;
+    virtual void run(qpid::sys::SessionHandlerFactory& factory);
     virtual void shutdown();
 
   private:
-    void shutdownImpl();
-
-  private:
     int16_t port;
     bool trace;
     LFProcessor processor;
     apr_socket_t* socket;
     volatile bool running;
-    Mutex shutdownLock;
 };
 
 // Define generic Acceptor::create() to return APRAcceptor.
@@ -69,13 +65,13 @@
     CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
 }
 
-int16_t APRAcceptor::getPort() const {
+int APRAcceptor::getPort() const {
     apr_sockaddr_t* address;
     CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
     return address->port;
 }
 
-void APRAcceptor::run(SessionHandlerFactory* factory) {
+void APRAcceptor::run(SessionHandlerFactory& factory) {
     running = true;
     processor.start();
     std::cout << "Listening on port " << getPort() << "..." << std::endl;
@@ -90,32 +86,24 @@
             CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
             CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
             LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
-            session->init(factory->create(session));
+            session->init(factory.create(session));
         }else{
-            Mutex::ScopedLock locker(shutdownLock);                
-            if(running) {
-                if(status != APR_EINTR){
-                    std::cout << "ERROR: " << get_desc(status) << std::endl;
-                }
-                shutdownImpl();
+            running = false;
+            if(status != APR_EINTR){
+                std::cout << "ERROR: " << get_desc(status) << std::endl;
             }
         }
     }
+    shutdown();
 }
 
 void APRAcceptor::shutdown() {
-    Mutex::ScopedLock locker(shutdownLock);                
     if (running) {
-        shutdownImpl();
+        running = false;
+        processor.stop();
+        CHECK_APR_SUCCESS(apr_socket_close(socket));
     }
 }
 
-void APRAcceptor::shutdownImpl() {
-    Mutex::ScopedLock locker(shutdownLock);                
-    running = false;
-    processor.stop();
-    CHECK_APR_SUCCESS(apr_socket_close(socket));
-}
-
 
 }}
diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp
index 2b6fc92..f5d59e3 100644
--- a/cpp/lib/common/sys/apr/LFProcessor.cpp
+++ b/cpp/lib/common/sys/apr/LFProcessor.cpp
@@ -27,8 +27,6 @@
 using namespace qpid::sys;
 using qpid::QpidError;
 
-// TODO aconway 2006-10-12: stopped is read outside locks.
-//
 
 LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
     size(_size),
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
index 16c7ec9..860ecd6 100644
--- a/cpp/lib/common/sys/posix/EventChannel.cpp
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -1,4 +1,4 @@
-/*
+/* 
  *
  * Copyright (c) 2006 The Apache Software Foundation
  *
@@ -16,6 +16,13 @@
  *
  */
 
+// TODO aconway 2006-12-15: Locking review.
+
+// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
+// get them from channel, pass them to Event constructors.
+// Eliminate lookup.
+
+
 #include <mqueue.h>
 #include <string.h>
 #include <iostream>
@@ -29,10 +36,10 @@
 #include <queue>
 
 #include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/bind.hpp>
 
 #include <QpidError.h>
-#include <sys/Monitor.h>
 
 #include "check.h"
 #include "EventChannel.h"
@@ -40,127 +47,319 @@
 using namespace std;
 
 
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
-    ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-    
 namespace qpid {
 namespace sys {
 
 
+// ================================================================
+// Private class declarations
+
+namespace {
+
+typedef enum { IN, OUT } Direction;
+typedef std::pair<Event*, Event*> EventPair;
+} // namespace
+
 /**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */ 
-class EventHandler : public Event, private Monitor
+ * Queue of events corresponding to one IO direction (IN or OUT).
+ * Each Descriptor contains two Queues.
+ */
+class EventChannel::Queue : private boost::noncopyable
 {
   public:
-    EventHandler(int epollSize = 256);
-    ~EventHandler();
+    Queue(Descriptor& container, Direction dir);
 
-    int getEpollFd() { return epollFd; }
-    void epollAdd(int fd, uint32_t epollEvents, Event* event);
-    void epollMod(int fd, uint32_t epollEvents, Event* event);
-    void epollDel(int fd);
+    /** Called by Event classes in prepare() */ 
+    void push(Event* e);
 
-    void mqPut(Event* event);
-    Event* mqGet();
-    
-  protected:
-    // Should never be called, only complete.
-    void prepare(EventHandler&) { assert(0); }
-    Event* complete(EventHandler& eh);
+    /** Called when epoll wakes.
+     *@return The next completed event or 0.
+     */
+    Event* wake(uint32_t epollFlags);
+
+    void setBit(uint32_t &epollFlags);
+
+    void shutdown();
     
   private:
-    int epollFd;
-    std::string mqName;
-    int mqFd;
-    std::queue<Event*> mqEvents;
+    typedef std::deque<Event*> EventQ; 
+
+    inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
+
+    Mutex& lock;                // Shared with Descriptor.
+    Descriptor& descriptor;
+    uint32_t myEvent;           // Epoll event flag.
+    EventQ queue;
 };
 
-EventHandler::EventHandler(int epollSize)
-{
-    epollFd = epoll_create(epollSize);
-    if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
 
-    // Create a POSIX message queue for non-fd events.
-    // We write one byte and never read it is always ready for read
-    // when we add it to epoll.
+/**
+ * Manages a file descriptor in an epoll set.
+ *
+ * Can be shutdown and re-activated for the same file descriptor.
+ */
+class EventChannel::Descriptor : private boost::noncopyable {
+  public:
+    Descriptor() : epollFd(-1), myFd(-1),
+                   inQueue(*this, IN), outQueue(*this, OUT) {}
+
+    void activate(int epollFd_, int myFd_);
+
+    /** Epoll woke up for this descriptor. */
+    EventPair wake(uint32_t epollEvents);
+
+    /** Shut down: close and remove file descriptor.
+     * May be re-activated if fd is reused.
+     */
+    void shutdown();
+
+    // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
+    void shutdownUnsafe();      
+
+    bool isShutdown() { return epollFd == -1; }
+
+    Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
+
+  private:
+    void update();
+    void epollCtl(int op, uint32_t events);
+
+    Mutex lock;
+    int epollFd;
+    int myFd;
+    Queue inQueue, outQueue;
+
+  friend class Queue;
+};
+
+ 
+/**
+ * Holds the epoll fd, Descriptor map and dispatch queue.
+ * Most of the epoll work is done by the Descriptors.
+ */
+class EventChannel::Impl {
+  public:
+    Impl(int size = 256);
+
+    ~Impl();
+
+    /**
+     * Registers fd if not already registered.
+     */
+    Descriptor& getDescriptor(int fd);
+
+    /** Wait for an event, return 0 on timeout */
+    Event* wait(Time timeout);
+
+    Queue& getDispatchQueue() { return *dispatchQueue; }
+
+  private:
+
+    typedef boost::ptr_map<int, Descriptor> DescriptorMap;
+
+    Mutex lock;     
+    int epollFd;
+    DescriptorMap descriptors;
+    int pipe[2];
+    Queue* dispatchQueue;
+};
+
+
+
+// ================================================================
+// EventChannel::Queue::implementation.
+
+static const char* shutdownMsg = "Event queue shut down.";
+
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d),
+    myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
+{}
+
+void EventChannel::Queue::push(Event* e) {
+    Mutex::ScopedLock l(lock);
+    if (descriptor.isShutdown())
+        THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
+    queue.push_back(e);
+    descriptor.update(); 
+}
+
+void EventChannel::Queue::setBit(uint32_t &epollFlags) {
+    if (queue.empty())
+        epollFlags &= ~myEvent;
+    else
+        epollFlags |= myEvent;
+}
+
+Event* EventChannel::Queue::wake(uint32_t epollFlags) {
+    // Called with lock held.
+    if (!queue.empty() && (isMyEvent(epollFlags))) {
+        Event* e = queue.front()->complete(descriptor);
+        if (e) {
+            queue.pop_front();
+            return e;
+        }
+    }
+    return 0;
+}
+        
+void EventChannel::Queue::shutdown() {
+    // Mark all pending events with a shutdown exception.
+    // The server threads will remove and dispatch the events.
     // 
-    ZeroStruct<struct mq_attr> attr;
-    attr.mq_maxmsg = 1;
-    attr.mq_msgsize = 1;
-    do {
-        char tmpnam[L_tmpnam];
-        tmpnam_r(tmpnam);
-        mqName = tmpnam + 4; // Skip "tmp/"
-        mqFd = mq_open(
-            mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
-        if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
-    } while (mqFd == EEXIST);  // Name already taken, try again.
+    qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, QPID_ERROR_LOCATION);
+    for_each(queue.begin(), queue.end(),
+             boost::bind(&Event::setException, _1, ex));
+}
 
+
+// ================================================================
+// Descriptor
+
+
+void EventChannel::Descriptor::activate(int epollFd_, int myFd_) {
+    Mutex::ScopedLock l(lock);
+    assert(myFd < 0 || (myFd == myFd_)); // Can't change fd.
+    if (epollFd < 0) {          // Means we're not polling.
+        epollFd = epollFd_;
+        myFd = myFd_;
+        epollCtl(EPOLL_CTL_ADD, 0);
+    }
+}
+
+void EventChannel::Descriptor::shutdown() {
+    Mutex::ScopedLock l(lock);
+    shutdownUnsafe();
+}
+
+void EventChannel::Descriptor::shutdownUnsafe() {
+    // Caller holds lock.
+    ::close(myFd);
+    epollFd = -1;               // Indicate we are not polling.
+    inQueue.shutdown();
+    outQueue.shutdown();
+    epollCtl(EPOLL_CTL_DEL, 0);
+}
+
+void EventChannel::Descriptor::update() {
+    // Caller holds lock.
+    uint32_t events =  EPOLLONESHOT | EPOLLERR | EPOLLHUP;
+    inQueue.setBit(events);
+    outQueue.setBit(events);
+    epollCtl(EPOLL_CTL_MOD, events);
+}
+    
+void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
+    // Caller holds lock
+    assert(!isShutdown());
+    struct epoll_event ee;
+    memset(&ee, 0, sizeof(ee));
+    ee.data.ptr = this;
+    ee.events = events;
+    int status = ::epoll_ctl(epollFd, op, myFd, &ee);
+    if (status < 0)
+        throw QPID_POSIX_ERROR(errno);
+    }
+}
+    
+
+EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
+    Mutex::ScopedLock l(lock);
+    cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl;
+    // If we have an error:
+    if (epollEvents & (EPOLLERR | EPOLLHUP)) {
+        shutdownUnsafe();
+        // Complete both sides on error so the event can fail and
+        // mark itself with an exception.
+        epollEvents |= EPOLLIN | EPOLLOUT;
+    }
+    EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents));
+    update();
+    return ready;
+}
+
+
+// ================================================================
+// EventChannel::Impl
+
+
+EventChannel::Impl::Impl(int epollSize):
+    epollFd(-1), dispatchQueue(0)
+{
+    // Create the epoll file descriptor.
+    epollFd = epoll_create(epollSize);
+    QPID_POSIX_CHECK(epollFd);
+
+    // Create a pipe and write a single byte.  The byte is never
+    // read so the pipes read fd is always ready for read.
+    // We activate the FD when there are messages in the queue.
+    QPID_POSIX_CHECK(::pipe(pipe));
     static char zero = '\0';
-    mq_send(mqFd, &zero, 1, 0);
-    epollAdd(mqFd, 0, this);
+    QPID_POSIX_CHECK(::write(pipe[1], &zero, 1));
+    dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN);
 }
 
-EventHandler::~EventHandler() {
-    mq_close(mqFd);
-    mq_unlink(mqName.c_str());
+EventChannel::Impl::~Impl() {
+    close(epollFd);
+    close(pipe[0]);
+    close(pipe[1]);
 }
 
-void EventHandler::mqPut(Event* event) {
-    ScopedLock l(*this);
-    assert(event != 0);
-    mqEvents.push(event);
-    epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
-}
 
-Event* EventHandler::mqGet() {
-    ScopedLock l(*this);
-    if (mqEvents.empty()) 
-        return 0;
-    Event* event = mqEvents.front();
-    mqEvents.pop();
-    if(!mqEvents.empty())
-        epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+/**
+ * Wait for epoll to wake up, return the descriptor or 0 on timeout.
+ */
+Event* EventChannel::Impl::wait(Time timeoutNs)
+{
+    // No lock, all thread safe calls or local variables:
+    // 
+    const long timeoutMs =
+        (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
+    struct epoll_event ee;
+    Event* event = 0;
+    bool doSwap = true;
+
+    // Loop till we get a completed event. Some events may repost
+    // themselves and return 0, e.g. incomplete read or write events.
+    //
+    while (!event) {
+        int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
+        if (n == 0)             // Timeout
+            return 0;
+        if (n < 0 && errno != EINTR) // Interrupt, ignore it.
+            continue;
+        if (n < 0)              
+            throw QPID_POSIX_ERROR(errno);
+        assert(n == 1);
+        Descriptor* ed =
+            reinterpret_cast<Descriptor*>(ee.data.ptr);
+        assert(ed);
+        EventPair ready = ed->wake(ee.events); 
+
+        // We can only return one event so if both completed push one
+        // onto the dispatch queue to be dispatched in another thread.
+        if (ready.first && ready.second) {
+            // Keep it fair: in & out take turns to be returned first.
+            if (doSwap)
+                swap(ready.first, ready.second);
+            doSwap = !doSwap;
+            event = ready.first;
+            dispatchQueue->push(ready.second);
+        }
+        else {
+            event = ready.first ? ready.first : ready.second;
+        }
+    }
     return event;
 }
 
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
-{
-    ZeroStruct<struct epoll_event> ee;
-    ee.data.ptr = event;
-    ee.events = epollEvents;
-    if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) 
-        throw QPID_POSIX_ERROR(errno);
+EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
+    Mutex::ScopedLock l(lock);
+    Descriptor& ed = descriptors[fd];
+    ed.activate(epollFd, fd);
+    return ed;
 }
 
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
-{
-    ZeroStruct<struct epoll_event> ee;
-    ee.data.ptr = event;
-    ee.events = epollEvents;
-    if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) 
-        throw QPID_POSIX_ERROR(errno);
-}
 
-void EventHandler::epollDel(int fd) {
-    if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
-        throw QPID_POSIX_ERROR(errno);
-}
-
-Event* EventHandler::complete(EventHandler& eh)
-{
-    assert(&eh == this);
-    Event* event =  mqGet();
-    return event==0 ? 0 : event->complete(eh);
-}
-    
 // ================================================================
 // EventChannel
 
@@ -168,157 +367,138 @@
     return shared_ptr(new EventChannel());
 }
 
-EventChannel::EventChannel() : handler(new EventHandler()) {}
+EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
 
 EventChannel::~EventChannel() {}
 
-void EventChannel::postEvent(Event& e) 
+void EventChannel::post(Event& e) 
 {
-    e.prepare(*handler);
+    e.prepare(*impl);
 }
 
-Event* EventChannel::getEvent()
-{
-    static const int infiniteTimeout = -1; 
-    ZeroStruct<struct epoll_event> epollEvent;
-
-    // Loop until we can complete the event. Some events may re-post
-    // themselves and return 0 from complete, e.g. partial reads. // 
-    Event* event = 0;
-    while (event == 0) {
-        int eventCount = epoll_wait(handler->getEpollFd(),
-                                    &epollEvent, 1, infiniteTimeout);
-        if (eventCount < 0) {
-            if (errno != EINTR) {
-                // TODO aconway 2006-11-28: Proper handling/logging of errors.
-                cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
-                     << PosixError::getMessage(errno) << endl;
-                assert(0);
-            }
-        }
-        else if (eventCount == 1) {
-            event = reinterpret_cast<Event*>(epollEvent.data.ptr);
-            assert(event != 0);
-            try {
-                event = event->complete(*handler);
-            }
-            catch (const Exception& e) {
-                if (event)
-                    event->setError(e);
-            }
-            catch (const std::exception& e) {
-                if (event)
-                    event->setError(e);
-            }
-        }
-    }
-    return event;
+void EventChannel::post(Event* e) {
+    assert(e);
+    post(*e);
 }
 
+Event* EventChannel::wait(Time timeoutNs)
+{
+    return impl->wait(timeoutNs);
+}
+
+
+// ================================================================
+// Event and subclasses.
+
 Event::~Event() {}
     
-void Event::prepare(EventHandler& handler)
-{
-    handler.mqPut(this);
+Exception::shared_ptr_const Event::getException() const {
+    return exception;
 }
 
-bool Event::hasError() const {
-    return error;
-}
-
-void Event::throwIfError() throw (Exception) {
-    if (hasError())
-        error.throwSelf();
-}
-
-Event* Event::complete(EventHandler&)
-{
-    return this;
+void Event::throwIfException() {
+    if (getException())
+        exception->throwSelf();
 }
 
 void Event::dispatch()
 {
+    if (!callback.empty())
+        callback();
+}
+
+void Event::setException(const std::exception& e) {
+    const Exception* ex = dynamic_cast<const Exception*>(&e);
+    if (ex) 
+        exception.reset(ex->clone().release());
+    else 
+        exception.reset(new Exception(e));
+#ifndef NDEBUG
+    // Throw and re-catch the exception. Has no effect on the
+    // program but it triggers debuggers watching for throw.  The
+    // context that sets the exception is more informative for
+    // debugging purposes than the one that ultimately throws it.
+    // 
     try {
-        if (!callback.empty())
-            callback();
-    } catch (const std::exception&) {
-        throw;
-    } catch (...) {
-        throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+        throwIfException();
     }
+    catch (...) { }             // Ignored.
+#endif
 }
 
-void Event::setError(const ExceptionHolder& e) {
-    error = e;
+
+void ReadEvent::prepare(EventChannel::Impl& impl) {
+    impl.getDescriptor(descriptor).getQueue(IN).push(this);
 }
 
-void ReadEvent::prepare(EventHandler& handler)
+Event* ReadEvent::complete(EventChannel::Descriptor& ed)
 {
-    handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
-}
+    ssize_t n = ::read(descriptor,
+                       static_cast<char*>(buffer) + bytesRead,
+                       size - bytesRead);
 
-ssize_t ReadEvent::doRead() {
-    ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
-                       size - received);
-    if (n > 0) received += n;
-    return n;
-}
-
-Event* ReadEvent::complete(EventHandler& handler)
-{
-    // Read as much as possible without blocking.
-    ssize_t n = doRead();
-    while (n > 0 && received < size) doRead();
-
-    if (received == size) {
-        handler.epollDel(descriptor);
-        received = 0;           // Reset for re-use.
-        return this;
+    if (n < 0 && errno != EAGAIN) { // Error
+        setException(QPID_POSIX_ERROR(errno));
+        ed.shutdownUnsafe(); // Called with lock held.
     }
-    else if (n <0 && (errno == EAGAIN)) {
-        // Keep polling for more.
-        handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
-        return 0;
+    else if (n == 0) {               // End of file
+        // TODO aconway 2006-12-13: Don't treat EOF as exception
+        // unless we're partway thru a !noWait read.
+        setException(QPID_POSIX_ERROR(ENODATA));
+        ed.shutdownUnsafe(); // Called with lock held.
     }
     else {
-        // Unexpected EOF or error. Throw ENODATA for EOF.
-        handler.epollDel(descriptor);
-        received = 0;           // Reset for re-use.
-        throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+        if (n > 0)              // possible that n < 0 && errno == EAGAIN
+            bytesRead += n;
+        if (bytesRead  < size && !noWait) {
+            // Continue reading, not enough data.
+            return 0;
+        }
     }
-}
-
-void WriteEvent::prepare(EventHandler& handler)
-{
-    handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
-}
-
-Event* WriteEvent::complete(EventHandler& handler)
-{
-    ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
-                      size - written);
-    if (n < 0) throw QPID_POSIX_ERROR(errno);
-    written += n;
-    if(written < size) {
-        // Keep polling.
-        handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
-        return 0;
-    }
-    written = 0;                // Reset for re-use.
-    handler.epollDel(descriptor);
     return this;
 }
 
-void AcceptEvent::prepare(EventHandler& handler)
-{
-    handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+
+void WriteEvent::prepare(EventChannel::Impl& impl) {
+    impl.getDescriptor(descriptor).getQueue(OUT).push(this);
 }
 
-Event* AcceptEvent::complete(EventHandler& handler)
+
+Event* WriteEvent::complete(EventChannel::Descriptor& ed)
 {
-    handler.epollDel(descriptor);
+    ssize_t n = ::write(descriptor,
+                        static_cast<const char*>(buffer) + bytesWritten,
+                        size - bytesWritten);
+    if(n < 0 && errno == EAGAIN && noWait) {
+        return 0;
+    }
+    if (n < 0 || (bytesWritten += n) < size) {
+        setException(QPID_POSIX_ERROR(errno));
+        ed.shutdownUnsafe(); // Called with lock held.
+    }
+    return this;
+}
+
+void AcceptEvent::prepare(EventChannel::Impl& impl) {
+    impl.getDescriptor(descriptor).getQueue(IN).push(this);
+}
+
+Event* AcceptEvent::complete(EventChannel::Descriptor& ed)
+{
     accepted = ::accept(descriptor, 0, 0);
-    if (accepted < 0) throw QPID_POSIX_ERROR(errno);
+    if (accepted < 0) {
+        setException(QPID_POSIX_ERROR(errno));
+        ed.shutdownUnsafe(); // Called with lock held.
+    }
+    return this;
+}
+
+void DispatchEvent::prepare(EventChannel::Impl& impl) {
+    impl.getDispatchQueue().push(this);
+}
+
+Event* DispatchEvent::complete(EventChannel::Descriptor&)
+{
     return this;
 }
 
diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h
index 49c7fce..60c4026 100644
--- a/cpp/lib/common/sys/posix/EventChannel.h
+++ b/cpp/lib/common/sys/posix/EventChannel.h
@@ -19,8 +19,11 @@
  *
  */
 
-#include <SharedObject.h>
-#include <ExceptionHolder.h>
+#include "SharedObject.h"
+#include "Exception.h"
+#include "sys/Monitor.h"
+#include "sys/Time.h"
+
 #include <boost/function.hpp>
 #include <memory>
 
@@ -28,118 +31,6 @@
 namespace sys {
 
 class Event;
-class EventHandler;
-class EventChannel;
-
-/**
- * Base class for all Events.
- */
-class Event
-{
-  public:
-    /** Type for callback when event is dispatched */
-    typedef boost::function0<void> Callback;
-
-    /**
-     * Create an event with optional callback.
-     * Instances of Event are sent directly through the channel.
-     * Derived classes define additional waiting behaviour.
-     *@param cb A callback functor that is invoked when dispatch() is called.
-     */
-    Event(Callback cb = 0) : callback(cb) {}
-
-    virtual ~Event();
-
-    /** Call the callback provided to the constructor, if any. */
-    void dispatch();
-
-    /** True if there was an error processing this event */
-    bool hasError() const;
-
-    /** If hasError() throw the corresponding exception. */
-    void throwIfError() throw(Exception);
-
-  protected:
-    virtual void prepare(EventHandler&);
-    virtual Event* complete(EventHandler&);
-    void setError(const ExceptionHolder& e);
-
-    Callback callback;
-    ExceptionHolder error;
-
-  friend class EventChannel;
-  friend class EventHandler;
-};
-
-template <class BufT>
-class IOEvent : public Event {
-  public:
-    void getDescriptor() const { return descriptor; }
-    size_t getSize() const { return size; }
-    BufT getBuffer() const { return buffer; }
-  
-  protected:
-    IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
-        Event(cb), descriptor(fd), buffer(buf), size(sz) {}
-
-    int descriptor;
-    BufT buffer;
-    size_t size;
-};
-
-/** Asynchronous read event */
-class ReadEvent : public IOEvent<void*>
-{
-  public:
-    explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
-        IOEvent<void*>(fd, cb, sz, buf), received(0) {}
-
-  private:
-    void prepare(EventHandler&);
-    Event* complete(EventHandler&);
-    ssize_t doRead();
-
-    size_t received;
-};
-
-/** Asynchronous write event */
-class WriteEvent : public IOEvent<const void*>
-{
-  public:
-    explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
-                        Callback cb=0) :
-        IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
-
-  protected:
-    void prepare(EventHandler&);
-    Event* complete(EventHandler&);
-
-  private:
-    ssize_t doWrite();
-    size_t written;
-};
-
-/** Asynchronous socket accept event */
-class AcceptEvent : public Event
-{
-  public:
-    /** Accept a connection on fd. */
-    explicit AcceptEvent(int fd=-1, Callback cb=0) :
-        Event(cb), descriptor(fd), accepted(0) {}
-
-    /** Get descriptor for server socket */
-    int getAcceptedDesscriptor() const { return accepted; }
-
-  private:
-    void prepare(EventHandler&);
-    Event* complete(EventHandler&);
-
-    int descriptor;
-    int accepted;
-};
-
-
-class QueueSet;
 
 /**
  * Channel to post and wait for events.
@@ -148,24 +39,174 @@
 {
   public:
     static shared_ptr create();
-    
+
     ~EventChannel();
     
     /** Post an event to the channel. */
-    void postEvent(Event& event);
+    void post(Event& event);
 
     /** Post an event to the channel. Must not be 0. */
-    void postEvent(Event* event) { postEvent(*event); }
+    void post(Event* event);
         
     /**
-     * Wait for the next complete event.
-     *@return Pointer to event. Will never return 0.
+     * Wait for the next complete event, up to timeout.
+     *@return Pointer to event or 0 if timeout elapses.
      */
-    Event* getEvent();
+    Event* wait(Time timeout = TIME_INFINITE);
+
+    class Impl;
+    class Queue;
+    class Descriptor;
+    
+  private:
+
+    EventChannel();
+
+    Mutex lock;
+    boost::shared_ptr<Impl> impl;
+};
+
+/**
+ * Base class for all Events.
+ * Derived classes define events representing various async IO operations.
+ * When an event is complete, it is returned by the EventChannel to
+ * a thread calling wait. The thread will call Event::dispatch() to
+ * execute code associated with event completion.
+ */
+class Event
+{
+  public:
+    /** Type for callback when event is dispatched */
+    typedef boost::function0<void> Callback;
+
+    virtual ~Event();
+
+    /** Call the callback provided to the constructor, if any. */
+    void dispatch();
+
+    /**
+     *If there was an exception processing this Event, return it.
+     *@return 0 if there was no exception. Caller must not delete.
+     */
+    qpid::Exception::shared_ptr_const getException() const;
+
+    /** If getException() throw the corresponding exception. */
+    void throwIfException();
+
+    /** Set the dispatch callback. */
+    void setCallback(Callback cb) { callback = cb; }
+
+    /** Set the exception. */
+    void setException(const std::exception& e);
+
+  protected:
+    Event(Callback cb=0) : callback(cb) {}
+
+    virtual void prepare(EventChannel::Impl&) = 0;
+    virtual Event* complete(EventChannel::Descriptor&) = 0;
+
+    Callback callback;
+    Exception::shared_ptr_const exception;
+
+  friend class EventChannel;
+  friend class EventChannel::Queue;
+};
+
+/**
+ * An event that does not wait for anything, it is processed
+ * immediately by one of the channel threads.
+ */
+class DispatchEvent : public Event {
+  public:
+    DispatchEvent(Callback cb=0) : Event(cb) {}
+
+  protected:
+    void prepare(EventChannel::Impl&);
+    Event* complete(EventChannel::Descriptor&);
+};
+
+// Utility base class.
+class FDEvent : public Event {
+  public:
+    int getDescriptor() const { return descriptor; }
+
+  protected:
+    FDEvent(Callback cb = 0, int fd = 0)
+        : Event(cb), descriptor(fd) {}
+    int descriptor;
+};
+
+// Utility base class
+class IOEvent : public FDEvent {
+  public:
+    size_t getSize() const { return size; }
+    
+  protected:
+    IOEvent(Callback cb, int fd, size_t sz, bool noWait_) :
+        FDEvent(cb, fd), size(sz), noWait(noWait_) {}
+
+    size_t size;
+    bool noWait;
+};
+    
+/** Asynchronous read event */
+class ReadEvent : public IOEvent
+{
+  public:
+    explicit ReadEvent(
+        int fd=-1, void* buf=0, size_t sz=0,
+        Callback cb=0, bool noWait=false
+    ) : IOEvent(cb, fd, sz, noWait), buffer(buf), bytesRead(0) {}
+
+    void* getBuffer() const { return buffer; }
+    size_t getBytesRead() const { return bytesRead; }
+    
+  private:
+    void prepare(EventChannel::Impl&);
+    Event* complete(EventChannel::Descriptor&);
+    ssize_t doRead();
+
+    void* buffer;
+    size_t bytesRead;
+};
+
+/** Asynchronous write event */
+class WriteEvent : public IOEvent
+{
+  public:
+    explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
+                        Callback cb=0) :
+        IOEvent(cb, fd, sz, noWait), buffer(buf), bytesWritten(0) {}
+
+    const void* getBuffer() const { return buffer; }
+    size_t getBytesWritten() const { return bytesWritten; }
 
   private:
-    EventChannel();
-    boost::shared_ptr<EventHandler> handler;
+    void prepare(EventChannel::Impl&);
+    Event* complete(EventChannel::Descriptor&);
+    ssize_t doWrite();
+
+    const void* buffer;
+    size_t bytesWritten;
+};
+
+
+/** Asynchronous socket accept event */
+class AcceptEvent : public FDEvent
+{
+  public:
+    /** Accept a connection on fd. */
+    explicit AcceptEvent(int fd=-1, Callback cb=0) :
+        FDEvent(cb, fd), accepted(0) {}
+    
+    /** Get descriptor for accepted server socket */
+    int getAcceptedDesscriptor() const { return accepted; }
+
+  private:
+    void prepare(EventChannel::Impl&);
+    Event* complete(EventChannel::Descriptor&);
+
+    int accepted;
 };
 
 
diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
index 7cd6f60..28f9beb 100644
--- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
@@ -139,11 +139,11 @@
         shutdown();
         return;
     }
-    // TODO aconway 2006-11-29: Need to reap closed connections also.
     int fd = acceptEvent.getAcceptedDesscriptor();
+    threads->post(acceptEvent); // Keep accepting.
+    // TODO aconway 2006-11-29: Need to reap closed connections also.
     connections.push_back(
         new EventChannelConnection(threads, *factory, fd, fd, isTrace));
-    threads->post(acceptEvent); // Keep accepting.
 }
 
 }} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
index 95e699e..787da72 100644
--- a/cpp/lib/common/sys/posix/EventChannelThreads.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
@@ -16,26 +16,40 @@
  *
  */
 
-#include "EventChannelThreads.h"
-#include <sys/Runnable.h>
 #include <iostream>
-using namespace std;
+#include <limits>
+
 #include <boost/bind.hpp>
 
+#include <sys/Runnable.h>
+
+#include "EventChannelThreads.h"
+
 namespace qpid {
 namespace sys {
 
+const size_t EventChannelThreads::unlimited =
+    std::numeric_limits<size_t>::max();
+
 EventChannelThreads::shared_ptr EventChannelThreads::create(
-    EventChannel::shared_ptr ec)
+    EventChannel::shared_ptr ec, size_t min, size_t max
+)
 {
-    return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+    return EventChannelThreads::shared_ptr(
+        new EventChannelThreads(ec, min, max));
 }
 
-EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
-    channel(ec), nWaiting(0), state(RUNNING)
+EventChannelThreads::EventChannelThreads(
+    EventChannel::shared_ptr ec, size_t min, size_t max) :
+    minThreads(std::max(size_t(1), min)),
+    maxThreads(std::min(min, max)),
+    channel(ec),
+    nWaiting(0),
+    state(RUNNING)
 {
-    // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
-    addThread();
+    Monitor::ScopedLock l(monitor);
+    while (workers.size() < minThreads) 
+        workers.push_back(Thread(*this));
 }
 
 EventChannelThreads::~EventChannelThreads() {
@@ -43,34 +57,37 @@
     join();
 }
 
+// Termination marker event.
+static DispatchEvent terminate;
+
 void EventChannelThreads::shutdown() 
 {
-    ScopedLock lock(*this);
+    Monitor::ScopedLock lock(monitor);
     if (state != RUNNING)       // Already shutting down.
         return;
+    state = TERMINATING;
     for (size_t i = 0; i < workers.size(); ++i) {
-        channel->postEvent(terminate);
+        channel->post(terminate);
     }
-    state = TERMINATE_SENT;
-    notify();                // Wake up one join() thread.
+    monitor.notify();           // Wake up one join() thread.
 }
 
 void EventChannelThreads::join() 
 {
     {
-        ScopedLock lock(*this);
+        Monitor::ScopedLock lock(monitor);
         while (state == RUNNING)    // Wait for shutdown to start.
-            wait();
+            monitor.wait();
         if (state == SHUTDOWN)      // Shutdown is complete
             return;
         if (state == JOINING) {
             // Someone else is doing the join.
             while (state != SHUTDOWN)
-                wait();
+                monitor.wait();
             return;
         }
         // I'm the  joining thread
-        assert(state == TERMINATE_SENT); 
+        assert(state == TERMINATING); 
         state = JOINING; 
     } // Drop the lock.
 
@@ -79,12 +96,13 @@
         workers[i].join();
     }
     state = SHUTDOWN;
-    notifyAll();                // Notify other join() threaeds.
+    monitor.notifyAll();        // Notify any other join() threads.
 }
 
 void EventChannelThreads::addThread() {
-    ScopedLock l(*this);
-    workers.push_back(Thread(*this));
+    Monitor::ScopedLock l(monitor);
+    if (workers.size() < maxThreads)
+        workers.push_back(Thread(*this));
 }
 
 void EventChannelThreads::run()
@@ -93,26 +111,22 @@
     AtomicCount::ScopedIncrement inc(nWaiting);
     try {
         while (true) {
-            Event* e = channel->getEvent(); 
+            Event* e = channel->wait(); 
             assert(e != 0);
-            if (e == &terminate) {
+            if (e == &terminate) 
                 return;
-            }
             AtomicCount::ScopedDecrement dec(nWaiting);
-            // I'm no longer waiting, make sure someone is.
-            if (dec == 0)
+            // Make sure there's at least one waiting thread.
+            if (dec == 0 && state == RUNNING)
                 addThread();
             e->dispatch();
         }
     }
     catch (const std::exception& e) {
-        // TODO aconway 2006-11-15: need better logging across the board.
-        std::cerr << "EventChannelThreads::run() caught: " << e.what()
-                  << std::endl;
+        Exception::log(e, "Exception in EventChannelThreads::run()");
     }
     catch (...) {
-        std::cerr << "EventChannelThreads::run() caught unknown exception."
-                  << std::endl;
+        Exception::logUnknown("Exception in EventChannelThreads::run()");
     }
 }
 
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h
index 98403c0..721a5e9 100644
--- a/cpp/lib/common/sys/posix/EventChannelThreads.h
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.h
@@ -20,11 +20,12 @@
  */
 #include <vector>
 
-#include <Exception.h>
-#include <sys/Time.h>
-#include <sys/Monitor.h>
-#include <sys/Thread.h>
-#include <sys/AtomicCount.h>
+#include "Exception.h"
+#include "sys/AtomicCount.h"
+#include "sys/Monitor.h"
+#include "sys/Thread.h"
+#include "sys/Time.h"
+
 #include "EventChannel.h"
 
 namespace qpid {
@@ -33,26 +34,36 @@
 /**
    Dynamic thread pool serving an EventChannel.
 
-   Threads run a loop { e = getEvent(); e->dispatch(); }
+   Threads run a loop { e = wait(); e->dispatch(); }
    The size of the thread pool is automatically adjusted to optimal size.
 */
 class EventChannelThreads :
         public qpid::SharedObject<EventChannelThreads>,
-        public sys::Monitor, private sys::Runnable
+        private sys::Runnable
 {
   public:
-    /** Create the thread pool and start initial threads. */
+    /** Constant to represent an unlimited number of threads */ 
+    static const size_t unlimited;
+    
+    /**
+     * Create the thread pool and start initial threads.
+     * @param minThreads Pool will initialy contain minThreads threads and
+     * will never shrink to less until shutdown.
+     * @param maxThreads Pool will never grow to more than maxThreads. 
+     */
     static EventChannelThreads::shared_ptr create(
-        EventChannel::shared_ptr channel
+        EventChannel::shared_ptr channel = EventChannel::create(),
+        size_t minThreads = 1,
+        size_t maxThreads = unlimited
     );
 
     ~EventChannelThreads();
 
     /** Post event to the underlying channel */
-    void postEvent(Event& event) { channel->postEvent(event); }
+    void post(Event& event) { channel->post(event); }
 
     /** Post event to the underlying channel Must not be 0. */
-    void postEvent(Event* event) { channel->postEvent(event); }
+    void post(Event* event) { channel->post(event); }
 
     /**
      * Terminate all threads.
@@ -68,21 +79,25 @@
   private:
     typedef std::vector<sys::Thread> Threads;
     typedef enum {
-        RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+        RUNNING, TERMINATING, JOINING, SHUTDOWN
     } State;
 
-    EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+    EventChannelThreads(
+        EventChannel::shared_ptr channel, size_t min, size_t max);
+    
     void addThread();
 
     void run();
     bool keepRunning();
     void adjustThreads();
 
+    Monitor monitor;
+    size_t minThreads;
+    size_t maxThreads;
     EventChannel::shared_ptr channel;
     Threads workers;
     sys::AtomicCount nWaiting;
     State state;
-    Event terminate;
 };
 
 
diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
index 842aa76..e69de29 100644
--- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp
+++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <sys/Acceptor.h>
-#include <Exception.h>
-
-namespace qpid {
-namespace sys {
-
-namespace {
-void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
-}
-
-class PosixAcceptor : public Acceptor {
-  public:
-    virtual int16_t getPort() const { fail(); return 0; }
-    virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); }
-    virtual void shutdown() { fail(); }
-};
-
-// Define generic Acceptor::create() to return APRAcceptor.
-    Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
-{
-    return Acceptor::shared_ptr(new PosixAcceptor());
-}
-
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
-
-}}
diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp
index 5bd1374..fc82b4e 100644
--- a/cpp/lib/common/sys/posix/Socket.cpp
+++ b/cpp/lib/common/sys/posix/Socket.cpp
@@ -96,6 +96,8 @@
 int Socket::listen(int port, int backlog) 
 {
     struct sockaddr_in name;
+    static const int ON = 1;
+    setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &ON, sizeof(ON));
     name.sin_family = AF_INET;
     name.sin_port = htons(port);
     name.sin_addr.s_addr = 0;
@@ -111,8 +113,15 @@
     return ntohs(name.sin_port);
 }
 
+Socket Socket::accept() {
+    int accepted = ::accept(socket, 0, 0);
+    if (accepted < 0)
+        throw (QPID_POSIX_ERROR(errno));
+    return Socket(accepted);
+}
 
-int Socket::fd() 
+
+int Socket::fd()const
 {
     return socket;
 }
diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp
index 408679c..4ddacb3 100644
--- a/cpp/lib/common/sys/posix/check.cpp
+++ b/cpp/lib/common/sys/posix/check.cpp
@@ -32,8 +32,8 @@
     return std::string(strerror_r(errNo, buf, sizeof(buf)));
 }
 
-PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
-    : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
+PosixError::PosixError(int errNo, const qpid::QpidError::Location& l) throw()
+    : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), l)
 { }
     
 }}
diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h
index 5afbe8f..052fb08 100644
--- a/cpp/lib/common/sys/posix/check.h
+++ b/cpp/lib/common/sys/posix/check.h
@@ -37,13 +37,15 @@
   public:
     static std::string getMessage(int errNo);
     
-    PosixError(int errNo, const qpid::SrcLine& location) throw();
+    PosixError(int errNo, const qpid::QpidError::Location& location) throw();
     
     ~PosixError() throw() {}
     
     int getErrNo() { return errNo; }
 
-    Exception* clone() const throw() { return new PosixError(*this); }
+    Exception::auto_ptr clone() const throw() {
+        return Exception::auto_ptr(new PosixError(*this));
+    }
         
     void throwSelf() { throw *this; }
 
@@ -54,9 +56,17 @@
 }}
 
 /** Create a PosixError for the current file/line and errno. */
-#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+#define QPID_POSIX_ERROR(ERRNO) \
+    ::qpid::sys::PosixError((ERRNO), QPID_ERROR_LOCATION)
 
-/** Throw a posix error if errNo is non-zero */
-#define QPID_POSIX_THROW_IF(ERRNO)              \
-    if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
+/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+#define QPID_POSIX_CHECK(RESULT)                        \
+    if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
+
+/** Throw QPID_POSIX_ERROR(ERRNO) if ERRNO is non zero */
+#define QPID_POSIX_THROW_IF(ERRNO)           \
+    do { int e = (ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0)
+         
+        
+        
 #endif  /*!_posix_check_h*/
diff --git a/cpp/tests/EventChannelTest.cpp b/cpp/tests/EventChannelTest.cpp
index 8e5c724..67b8b03 100644
--- a/cpp/tests/EventChannelTest.cpp
+++ b/cpp/tests/EventChannelTest.cpp
@@ -47,8 +47,9 @@
 class EventChannelTest : public CppUnit::TestCase  
 {
     CPPUNIT_TEST_SUITE(EventChannelTest);
-    CPPUNIT_TEST(testEvent);
+    CPPUNIT_TEST(testDispatch);
     CPPUNIT_TEST(testRead);
+    CPPUNIT_TEST(testPartialRead);
     CPPUNIT_TEST(testFailedRead);
     CPPUNIT_TEST(testWrite);
     CPPUNIT_TEST(testFailedWrite);
@@ -72,26 +73,26 @@
         signal(SIGPIPE, SIG_IGN);
     }
 
-    // Verify that calling getEvent returns event.
+    // Verify that calling wait returns event.
     template <class T> bool isNextEvent(T& event)
     {
-        return &event == dynamic_cast<T*>(ec->getEvent());
+        return &event == dynamic_cast<T*>(ec->wait(5*TIME_SEC));
     }
 
     template <class T> bool isNextEventOk(T& event)
     {
-        Event* next = ec->getEvent();
-        if (next) next->throwIfError();
+        Event* next = ec->wait(TIME_SEC);
+        if (next) next->throwIfException();
         return &event == next;
     }
         
-    void testEvent()
+    void testDispatch()
     {
         RunMe runMe;
         CPPUNIT_ASSERT(!runMe.ran);
         // Instances of Event just pass thru the channel immediately.
-        Event e(runMe.functor());
-        ec->postEvent(e);
+        DispatchEvent e(runMe.functor());
+        ec->post(e);
         CPPUNIT_ASSERT(isNextEventOk(e));
         e.dispatch();
         CPPUNIT_ASSERT(runMe.ran);
@@ -99,42 +100,64 @@
 
     void testRead() {
         ReadEvent re(pipe[0], readBuf, size);
-        ec->postEvent(re);
+        ec->post(re);
         CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size));
         CPPUNIT_ASSERT(isNextEventOk(re));
-        CPPUNIT_ASSERT_EQUAL(size, re.getSize());
+        CPPUNIT_ASSERT_EQUAL(size, re.getBytesRead());
         CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
     }
 
+    void testPartialRead() {
+        ReadEvent re(pipe[0], readBuf, size, 0, true);
+        ec->post(re);
+        CPPUNIT_ASSERT_EQUAL(ssize_t(size/2), ::write(pipe[1], hello, size/2));
+        CPPUNIT_ASSERT(isNextEventOk(re));
+        CPPUNIT_ASSERT_EQUAL(size/2, re.getBytesRead());
+        CPPUNIT_ASSERT_EQUAL(std::string(hello, size/2),
+                             std::string(readBuf, size/2));
+    }
+    
+
     void testFailedRead() 
     {
         ReadEvent re(pipe[0], readBuf, size);
-        ec->postEvent(re);
+        ec->post(re);
 
         // EOF before all data read.
         ::close(pipe[1]);
         CPPUNIT_ASSERT(isNextEvent(re));
-        CPPUNIT_ASSERT(re.hasError());
+        CPPUNIT_ASSERT(re.getException());
         try {
-            re.throwIfError();
+            re.throwIfException();
             CPPUNIT_FAIL("Expected QpidError.");
         }
         catch (const qpid::QpidError&) { }
 
-        //  Bad file descriptor. Note in this case we fail
-        //  in postEvent and throw immediately.
+        
+        //  Try to read from closed file descriptor.
         try {
-            ReadEvent bad;
-            ec->postEvent(bad);
+            ec->post(re);
+            CPPUNIT_ASSERT(isNextEvent(re));
+            re.throwIfException();
+            CPPUNIT_FAIL("Expected an exception.");
+        }
+        catch (const qpid::QpidError&) {}
+        
+        //  Bad file descriptor. Note in this case we fail
+        //  in post and throw immediately.
+        try {
+            ReadEvent bad(-1, readBuf, size);
+            ec->post(bad);
             CPPUNIT_FAIL("Expected QpidError.");
         }
-        catch (const qpid::QpidError&) { }
+        catch (const qpid::QpidError&) {}
     }
 
     void testWrite() {
         WriteEvent wr(pipe[1], hello, size);
-        ec->postEvent(wr);
+        ec->post(wr);
         CPPUNIT_ASSERT(isNextEventOk(wr));
+        CPPUNIT_ASSERT_EQUAL(size, wr.getBytesWritten());
         CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));;
         CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
     }
@@ -142,19 +165,37 @@
     void testFailedWrite() {
         WriteEvent wr(pipe[1], hello, size);
         ::close(pipe[0]);
-        ec->postEvent(wr);
+        ec->post(wr);
         CPPUNIT_ASSERT(isNextEvent(wr));
-        CPPUNIT_ASSERT(wr.hasError());
+        CPPUNIT_ASSERT(wr.getException());
     }
 
     void testReadWrite()
     {
         ReadEvent re(pipe[0], readBuf, size);
         WriteEvent wr(pipe[1], hello, size);
-        ec->postEvent(re);
-        ec->postEvent(wr);
-        ec->getEvent();
-        ec->getEvent();
+        ec->post(re);
+        ec->post(wr);
+        ec->wait(TIME_SEC);
+        ec->wait(TIME_SEC);
+        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+    }
+
+    void connectSendRead(AcceptEvent& ae, int port, Socket client)
+    {
+        ec->post(ae);
+        // Connect a client, send some data, read the data.
+        client.connect("localhost", port);
+        CPPUNIT_ASSERT(isNextEvent(ae));
+        ae.throwIfException();
+
+        char readBuf[size];
+        ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size);
+        ec->post(re);
+        CPPUNIT_ASSERT_EQUAL(ssize_t(size),
+                             client.send(hello, sizeof(hello)));
+        CPPUNIT_ASSERT(isNextEvent(re));
+        re.throwIfException();
         CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
     }
 
@@ -164,20 +205,14 @@
         CPPUNIT_ASSERT(port != 0);
 
         AcceptEvent ae(s.fd());
-        ec->postEvent(ae);
         Socket client = Socket::createTcp();
-        client.connect("localhost", port);
-        CPPUNIT_ASSERT(isNextEvent(ae));
-        ae.dispatch();
-
-        // Verify client writes are read by the accepted descriptor.
-        char readBuf[size];
-        ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size);
-        ec->postEvent(re);
-        CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello)));
-        CPPUNIT_ASSERT(isNextEvent(re));
-        re.dispatch();
-        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+        connectSendRead(ae, port,  client);
+        Socket client2 = Socket::createTcp();
+        connectSendRead(ae, port,  client2);
+        client.close();
+        client2.close();
+        Socket client3 = Socket::createTcp();
+        connectSendRead(ae, port,  client3);
     }
 };
 
diff --git a/cpp/tests/EventChannelThreadsTest.cpp b/cpp/tests/EventChannelThreadsTest.cpp
index 285ed29..f8b4ad6 100644
--- a/cpp/tests/EventChannelThreadsTest.cpp
+++ b/cpp/tests/EventChannelThreadsTest.cpp
@@ -42,7 +42,7 @@
  * We count the total number of events, and the
  * number of reads and writes for each message number.
  */
-class TestResults : public Monitor {
+class TestResults {
   public:
     TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {}
 
@@ -62,20 +62,21 @@
     }
 
     void shutdown(const std::string& exceptionMsg = std::string()) {
-        ScopedLock lock(*this);
+        Monitor::ScopedLock lock(monitor);
         exception = exceptionMsg;
         isShutdown = true;
-        notifyAll();
+        monitor.notifyAll();
     }
     
     void wait() {
-        ScopedLock lock(*this);
+        Monitor::ScopedLock lock(monitor);
         Time deadline = now() + 10*TIME_SEC; 
         while (!isShutdown) {
-            CPPUNIT_ASSERT(Monitor::wait(deadline));
+            CPPUNIT_ASSERT(monitor.wait(deadline));
         }
     }
 
+    Monitor monitor;
     bool isShutdown;
     std::string exception;
     AtomicCount reads[nMessages];
@@ -113,30 +114,34 @@
 };
 
 /** Repost an event N times. */
-class Repost {
+template <class T>
+class Reposter {
   public:
-    Repost(int n) : count (n) {}
-    virtual ~Repost() {}
+    Reposter(T* event_, int n) : event(event_), original(*event_), count (n) {}
+    virtual ~Reposter() {}
     
-    void repost(Event* event) {
+    void repost() {
         if (--count==0) {
             delete event;
         } else {
-            threads->postEvent(event);
+            *event = original;
+            threads->post(event);
         }
     }
   private:
+    T* event;
+    T original;
     int count;
 };
     
             
 
 /** Repeating read event. */
-class TestReadEvent : public ReadEvent, public Runnable, private Repost {
+class TestReadEvent : public ReadEvent, public Runnable {
   public:
     explicit TestReadEvent(int fd=-1) :
         ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)),
-        Repost(nMessages)
+        reposter(this, nMessages)
     {}
     
     void run() {
@@ -144,47 +149,50 @@
         CPPUNIT_ASSERT(0 <= value);
         CPPUNIT_ASSERT(value < nMessages);
         results.countRead(value);
-        repost(this);
+        reposter.repost();
     }
     
   private:
     int value;
-    ReadEvent original;
+    Reposter<ReadEvent> reposter;
 };
 
 
 /** Fire and forget write event */
-class TestWriteEvent : public WriteEvent, public Runnable, private Repost {
+class TestWriteEvent : public WriteEvent, public Runnable {
   public:
     TestWriteEvent(int fd=-1) :
         WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)),
-        Repost(nMessages),
+        reposter(this, nMessages),
         value(0)
     {}
     
     void run() {
         CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize());
         results.countWrite(value++);
-        repost(this);
+        reposter.repost();
     }
 
   private:
+    Reposter<WriteEvent> reposter;
     int value;
 };
 
 /** Fire-and-forget Accept event, posts reads on the accepted connection. */
-class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost {
+class TestAcceptEvent : public AcceptEvent, public Runnable {
   public:
     TestAcceptEvent(int fd=-1) :
         AcceptEvent(fd, SafeCallback(*this)),
-        Repost(nConnections)
+        reposter(this, nConnections)
     {}
     
     void run() {
-        threads->postEvent(new TestReadEvent(getAcceptedDesscriptor()));
+        threads->post(new TestReadEvent(getAcceptedDesscriptor()));
         results.countEvent();
-        repost(this);
+        reposter.repost();
     }
+  private:
+    Reposter<AcceptEvent> reposter;
 };
 
 class EventChannelThreadsTest : public CppUnit::TestCase
@@ -207,10 +215,10 @@
     {
         Socket listener = Socket::createTcp();
         int port = listener.listen();
-
+        
         // Post looping accept events, will repost nConnections times.
         // The accept event will automatically post read events.
-        threads->postEvent(new TestAcceptEvent(listener.fd()));
+        threads->post(new TestAcceptEvent(listener.fd()));
 
         // Make connections.
         Socket connections[nConnections];
@@ -221,7 +229,7 @@
 
         // Post looping write events.
         for (int i = 0; i < nConnections; ++i) {
-            threads->postEvent(new TestWriteEvent(connections[i].fd()));
+            threads->post(new TestWriteEvent(connections[i].fd()));
         }
 
         // Wait for all events to be dispatched.
diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am
index 900bf47..943afde 100644
--- a/cpp/tests/Makefile.am
+++ b/cpp/tests/Makefile.am
@@ -16,9 +16,10 @@
   topicall		\
   topictest		\
   qpid_test_plugin.h	\
-  APRBaseTest.cpp
+  MockSessionHandler.h
 
-client_tests =		\
+
+client_exe_tests =	\
   client_test		\
   echo_service		\
   topic_listener	\
@@ -41,7 +42,8 @@
   TxAckTest		\
   TxBufferTest		\
   TxPublishTest		\
-  ValueTest
+  ValueTest		\
+  AcceptorTest
 
 framing_tests =		\
   BodyHandlerTest	\
@@ -54,17 +56,21 @@
 
 posix_tests =		\
   EventChannelTest	\
-  EventChannelThreadsTest
+  EventChannelThreadsTest \
+  EventChannelConnectionTest 
+
+apr_tests =   APRBaseTest.cpp
 
 unit_tests =		\
   $(broker_tests)	\
   $(framing_tests)	\
-  $(misc_tests)
+  $(misc_tests)         \
+  $(posix_tests)
 
 
-noinst_PROGRAMS = $(client_tests)
+noinst_PROGRAMS = $(client_exe_tests)
 
-TESTS = run-unit-tests run-python-tests
+TESTS = run-unit-tests run-system-tests
 EXTRA_DIST += $(TESTS)
 
 include gen.mk
@@ -77,7 +83,7 @@
 
 gen.mk: Makefile.am
 	(					\
-	  for i in $(client_tests); do		\
+	  for i in $(client_exe_tests); do		\
 	    echo $${i}_SOURCES = $$i.cpp;	\
 	    echo $${i}_LDADD = '$$(lib_client) $$(lib_common) $$(extra_libs)'; \
 	  done;					\
diff --git a/cpp/tests/run-python-tests b/cpp/tests/run-python-tests
index 57be07e..e69de29 100755
--- a/cpp/tests/run-python-tests
+++ b/cpp/tests/run-python-tests
@@ -1,15 +0,0 @@
-#!/bin/sh
-
-set -e
-log=`pwd`/qpidd.log
-# Start the daemon, recording its PID.
-../src/qpidd > $log 2>&1 & pid=$!
-
-# Arrange to kill the daemon upon any type of termination.
-trap 'status=$?; kill $pid; exit $status' 0
-trap '(exit $?); exit $?' 1 2 13 15
-
-# Run the tests.
-cd ../../python && ./run-tests -v -I cpp_failing.txt
-
-rm -f $log
diff --git a/cpp/tests/topic_publisher.cpp b/cpp/tests/topic_publisher.cpp
index b95abd9..6d17b70 100644
--- a/cpp/tests/topic_publisher.cpp
+++ b/cpp/tests/topic_publisher.cpp
@@ -138,13 +138,15 @@
             int64_t sum(0);
             for(int i = 0; i < batchSize; i++){
                 if(i > 0 && args.getDelay()) sleep(args.getDelay());
-                Time time = publisher.publish(
-                    args.getMessages(), args.getSubscribers(), args.getSize());
-                if(!max || time > max) max = time;
-                if(!min || time < min) min = time;
-                sum += time;
+                int64_t msecs =
+                    publisher.publish(args.getMessages(),
+                                      args.getSubscribers(),
+                                      args.getSize()) / TIME_MSEC;
+                if(!max || msecs > max) max = msecs;
+                if(!min || msecs < min) min = msecs;
+                sum += msecs;
                 std::cout << "Completed " << (i+1) << " of " << batchSize
-                          << " in " << time/TIME_MSEC << "ms" << std::endl;
+                          << " in " << msecs << "ms" << std::endl;
             }
             publisher.terminate();
             int64_t avg = sum / batchSize;
diff --git a/cpp/tests/topictest b/cpp/tests/topictest
index 792f063..da3a0c1 100755
--- a/cpp/tests/topictest
+++ b/cpp/tests/topictest
@@ -1,42 +1,41 @@
 #!/bin/bash
-# Run the c++ or java topic test
+# Run the c++ or topic test
 
-. `dirname $0`/env
-
-# Edit parameters here:
-
-# Big test:
-# LISTENERS=10
-# MESSAGES=10000
-# BATCHES=20
-
-LISTENERS=10
+# Defaults
+SUBSCRIBERS=10
 MESSAGES=2000
 BATCHES=10
 
-cppcmds() {
-    LISTEN_CMD=topic_listener
-    PUBLISH_CMD="topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $LISTENERS"
+while getopts "s:m:b:" opt ; do
+    case $opt in
+	s) SUBSCRIBERS=$OPTARG ;;
+	m) MESSAGES=$OPTARG ;;
+	b) BATCHES=$OPTARG ;;
+	?)
+	    echo "Usage: %0 [-l <subscribers>] [-m <messages.] [-b <batches>]"
+	    exit 1
+	    ;;
+    esac
+done
+
+subscribe() {
+    ID=$1
+    echo "subscriber $ID"
+    ./topic_listener > subscriber.$ID 2>&1 || {
+	echo "SUBSCRIBER %ID FAILED: " ;
+	cat subscriber.$ID
+    }
+    rm subscriber.$ID
 }
 
-javacmds() {
-    DEF=-Damqj.logging.level="error"
-    LISTEN_CMD="qpid-run $DEF org.apache.qpid.topic.Listener"
-    PUBLISH_CMD="qpid-run $DEF org.apache.qpid.topic.Publisher -messages $MESSAGES -batch $BATCHES -clients $LISTENERS"
+publish() {
+    ./topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $SUBSCRIBERS
 }
 
-case $1 in
-    c) cppcmds ;;
-    j) javacmds ;;
-    *) cppcmds ;;
-esac
-
-for ((i=$LISTENERS ; i--; )); do
-    $LISTEN_CMD  > /dev/null 2>&1 &
+for ((i=$SUBSCRIBERS ; i--; )); do
+    subscribe $i &
 done
 sleep 1
-echo $PUBLISH_CMD $OPTIONS
-
 STATS=~/bin/topictest.times
 echo "---- topictest `date`" >> $STATS
-$PUBLISH_CMD $OPTIONS | tee -a $STATS
+publish | tee -a $STATS