Merge in trunk changes from r758432:768028

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/cmake@768053 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/INSTALL b/qpid/cpp/INSTALL
index 3134959..e7103d2 100644
--- a/qpid/cpp/INSTALL
+++ b/qpid/cpp/INSTALL
@@ -104,6 +104,9 @@
 For SASL and SSL, include
  # yum install cyrus-sasl-devel
 
+For the XML Exchange, include:
+
+ # yum install xqilla-devel xerces-c-devel
 
 Follow the manual installation instruction below for any packages not
 available through your distributions packaging tool.
diff --git a/qpid/cpp/rubygen/framing.0-10/Session.rb b/qpid/cpp/rubygen/framing.0-10/Session.rb
index 709491e..48a1608 100644
--- a/qpid/cpp/rubygen/framing.0-10/Session.rb
+++ b/qpid/cpp/rubygen/framing.0-10/Session.rb
@@ -77,9 +77,9 @@
 
 class ContentField               # For extra content parameters
   def cppname() "content"  end
-  def signature() "const MethodContent& content" end
-  def sig_default() signature+"="+"DefaultContent(std::string())" end
-  def unpack() "p[arg::content|DefaultContent(std::string())]"; end
+  def signature() "const Message& content" end
+  def sig_default() signature+"="+"Message(std::string())" end
+  def unpack() "p[arg::content|Message(std::string())]"; end
   def doc() "Message content"; end
 end
 
@@ -160,6 +160,10 @@
     cpp_file(@file) { 
       include @classname
       include "qpid/framing/all_method_bodies.h"
+      include "qpid/client/SessionImpl.h"
+      include "qpid/client/MessageImpl.h"
+      include "qpid/client/PrivateImplPrivate.h"
+      include "qpid/client/CompletionImpl.h"
       namespace(@namespace) {
         genl "using namespace framing;"
         session_methods(sync_default).each { |m|
@@ -171,8 +175,8 @@
             genl "#{m.body_name} body(#{args});";
             genl "body.setSync(sync);"
             sendargs="body"
-            sendargs << ", content" if m.content
-            async_retval="#{m.return_type(true)}(impl->send(#{sendargs}), impl)"
+            sendargs << ", *privateImplGetPtr(content)" if m.content
+            async_retval="#{m.return_type(true)}(new CompletionImpl(impl->send(#{sendargs}), impl))"
             if @async then
               genl "return #{async_retval};"
             else
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 3ac1992..457463e 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -343,7 +343,6 @@
   qpid/sys/AsynchIOHandler.cpp \
   qpid/sys/Dispatcher.cpp \
   qpid/sys/DispatchHandle.cpp \
-  qpid/sys/LatencyMetric.cpp \
   qpid/sys/Runnable.cpp \
   qpid/sys/Shlib.cpp \
   qpid/sys/Timer.cpp
@@ -437,21 +436,26 @@
   qpid/client/Bounds.cpp			\
   qpid/client/Connection.cpp			\
   qpid/client/ConnectionHandler.cpp		\
-  qpid/client/ConnectionImpl.cpp                \
+  qpid/client/ConnectionImpl.cpp		\
   qpid/client/ConnectionSettings.cpp		\
-  qpid/client/Connector.cpp	                \
+  qpid/client/Connector.cpp			\
   qpid/client/Demux.cpp				\
   qpid/client/Dispatcher.cpp			\
-  qpid/client/FailoverManager.cpp            \
+  qpid/client/FailoverManager.cpp		\
   qpid/client/FailoverListener.h		\
   qpid/client/FailoverListener.cpp		\
   qpid/client/Future.cpp			\
   qpid/client/FutureCompletion.cpp		\
+  qpid/client/Completion.cpp			\
+  qpid/client/CompletionImpl.h			\
   qpid/client/FutureResult.cpp			\
   qpid/client/HandlePrivate.h			\
+  qpid/client/PrivateImplPrivate.h		\
   qpid/client/LoadPlugins.cpp			\
   qpid/client/LocalQueue.cpp			\
   qpid/client/Message.cpp			\
+  qpid/client/MessageImpl.cpp			\
+  qpid/client/MessageImpl.h			\
   qpid/client/MessageListener.cpp		\
   qpid/client/MessageReplayTracker.cpp		\
   qpid/client/QueueOptions.cpp			\
@@ -597,6 +601,7 @@
   qpid/client/FutureCompletion.h \
   qpid/client/FutureResult.h \
   qpid/client/Handle.h \
+  qpid/client/PrivateImpl.h \
   qpid/client/LocalQueue.h \
   qpid/client/QueueOptions.h \
   qpid/client/Message.h \
@@ -697,7 +702,6 @@
   qpid/sys/FileSysDir.h \
   qpid/sys/IntegerTypes.h \
   qpid/sys/IOHandle.h \
-  qpid/sys/LatencyMetric.h \
   qpid/sys/LockFile.h \
   qpid/sys/LockPtr.h \
   qpid/sys/Monitor.h \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index e2054d7..fdac229 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -56,6 +56,8 @@
   qpid/cluster/Dispatchable.h			\
   qpid/cluster/UpdateClient.cpp			\
   qpid/cluster/UpdateClient.h			\
+  qpid/cluster/ErrorCheck.cpp			\
+  qpid/cluster/ErrorCheck.h			\
   qpid/cluster/Event.cpp			\
   qpid/cluster/Event.h				\
   qpid/cluster/EventFrame.h			\
diff --git a/qpid/cpp/src/common.vcproj b/qpid/cpp/src/common.vcproj
index 96d67b9..a95e848 100644
--- a/qpid/cpp/src/common.vcproj
+++ b/qpid/cpp/src/common.vcproj
@@ -456,6 +456,9 @@
 				RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.cpp">

 			</File>

 			<File

+				RelativePath="gen\qpid\framing\ClusterErrorCheckBody.cpp">

+			</File>

+			<File

 				RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.cpp">

 			</File>

 			<File

@@ -972,9 +975,6 @@
 				RelativePath="qpid\sys\Dispatcher.cpp">

 			</File>

 			<File

-				RelativePath="qpid\sys\LatencyMetric.cpp">

-			</File>

-			<File

 				RelativePath="qpid\sys\Runnable.cpp">

 			</File>

 			<File

@@ -1165,6 +1165,9 @@
 				RelativePath="gen\qpid\framing\ClusterConnectionTxStartBody.h">

 			</File>

 			<File

+				RelativePath="gen\qpid\framing\ClusterErrorCheckBody.h">

+			</File>

+			<File

 				RelativePath="gen\qpid\framing\ClusterMessageExpiredBody.h">

 			</File>

 			<File

@@ -1810,9 +1813,6 @@
 				RelativePath="qpid\sys\IOHandle.h">

 			</File>

 			<File

-				RelativePath="qpid\sys\LatencyMetric.h">

-			</File>

-			<File

 				RelativePath="qpid\sys\LockFile.h">

 			</File>

 			<File

diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index db95705..f927db0 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -88,6 +88,7 @@
     }
     catch(const SessionException& e) {
         QPID_LOG(error, "Execution exception: " << e.what());
+        executionException(e.code, e.what()); // Let subclass handle this first.
         framing::AMQP_AllProxy::Execution  execution(channel);
         AMQMethodBody* m = f.getMethod();
         SequenceNumber commandId;
@@ -98,6 +99,7 @@
     }
     catch(const ChannelException& e){
         QPID_LOG(error, "Channel exception: " << e.what());
+        channelException(e.code, e.what()); // Let subclass handle this first.
         peer.detached(name, e.code);
     }
     catch(const ConnectionException& e) {
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
index 0b158ec..0d9c72f 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -87,8 +87,9 @@
     QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m);
 
     virtual void setState(const std::string& sessionName, bool force) = 0;
-    virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0;
     virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0;
+    virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0;
+    virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0;
     virtual void detaching() = 0;
 
     // Notification of events
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index b06e06d..365b3cc 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -57,7 +57,8 @@
     mgmtObject(0),
     links(broker_.getLinks()),
     agent(0),
-    timer(broker_.getTimer())
+    timer(broker_.getTimer()),
+    errorListener(0)
 {
     Manageable* parent = broker.GetVhostObject();
 
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index b659fe6..e67cdce 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -66,6 +66,17 @@
                    public RefCounted
 {
   public:
+    /**
+     * Listener that can be registered with a Connection to be informed of errors.
+     */
+    class ErrorListener
+    {
+      public:
+        virtual ~ErrorListener() {}
+        virtual void sessionError(uint16_t channel, const std::string&) = 0;
+        virtual void connectionError(const std::string&) = 0;
+    };
+
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
     ~Connection ();
 
@@ -101,6 +112,9 @@
     const std::string& getMgmtId() const { return mgmtId; }
     management::ManagementAgent* getAgent() const { return agent; }
     void setFederationLink(bool b);
+    /** Connection does not delete the listener. 0 resets. */
+    void setErrorListener(ErrorListener* l) { errorListener=l; }
+    ErrorListener* getErrorListener() { return errorListener; }
     
     void setHeartbeatInterval(uint16_t heartbeat);
     void sendHeartbeat();
@@ -112,6 +126,7 @@
 
     void sendClose();
     void setSecureConnection(SecureConnection* secured);
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -128,6 +143,8 @@
     management::ManagementAgent* agent;
     Timer& timer;
     boost::intrusive_ptr<TimerTask> heartbeatTimer;
+    ErrorListener* errorListener;
+
   public:
     qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
 };
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 63212c7..8b70836 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -64,13 +64,16 @@
 void ConnectionHandler::handle(framing::AMQFrame& frame)
 {
     AMQMethodBody* method=frame.getBody()->getMethod();
+    Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
     try{
         if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
             handler->connection.getChannel(frame.getChannel()).in(frame);
         }
     }catch(ConnectionException& e){
+        if (errorListener) errorListener->connectionError(e.what());
         handler->proxy.close(e.code, e.what());
     }catch(std::exception& e){
+        if (errorListener) errorListener->connectionError(e.what());
         handler->proxy.close(541/*internal error*/, e.what());
     }
 }
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
index 907f1e5..ffe0cc4 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -33,4 +33,6 @@
     return m.getExpiration() < sys::AbsTime::now();
 }
 
+void ExpiryPolicy::forget(Message&) {}
+
 }} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
index cefe9b7..eeb3ffd 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -39,6 +39,7 @@
     QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
     QPID_BROKER_EXTERN virtual void willExpire(Message&);
     QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
+    QPID_BROKER_EXTERN virtual void forget(Message&);
 };
 }} // namespace qpid::broker
 
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 40b5515..1e9eb9d 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -53,6 +53,8 @@
 
 Message::~Message()
 {
+    if (expiryPolicy)
+        expiryPolicy->forget(*this);
 }
 
 void Message::forcePersistent()
@@ -334,7 +336,7 @@
 
 void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
     expiryPolicy = e;
-    if (expiryPolicy)
+    if (expiryPolicy) 
         expiryPolicy->willExpire(*this);
 }
 
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 442c3eb..ca1f875 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -45,12 +45,18 @@
 MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
 } // namespace
 
-void SessionHandler::channelException(framing::session::DetachCode, const std::string&) {
-    handleDetach();
+void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+    // NOTE: must tell the error listener _before_ calling connection.close()
+    if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
+    connection.close(code, msg);
 }
 
-void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
-    connection.close(code, msg);
+void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
+    if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
+void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
+    if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
 }
 
 ConnectionState& SessionHandler::getConnection() { return connection; }
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ffc032f6..ca6d6bb 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -73,8 +73,9 @@
     virtual void setState(const std::string& sessionName, bool force);
     virtual qpid::SessionState* getState();
     virtual framing::FrameHandler* getInHandler();
-    virtual void channelException(framing::session::DetachCode code, const std::string& msg);
     virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
+    virtual void channelException(framing::session::DetachCode, const std::string& msg);
+    virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
     virtual void detaching();
     virtual void readyToSend();
 
diff --git a/qpid/cpp/src/qpid/client/Completion.cpp b/qpid/cpp/src/qpid/client/Completion.cpp
new file mode 100644
index 0000000..e3676b2
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Completion.cpp
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Completion.h"
+#include "CompletionImpl.h"
+#include "HandlePrivate.h"
+
+namespace qpid {
+namespace client {
+
+Completion::Completion(CompletionImpl* i) : Handle<CompletionImpl>(i) {}
+Completion::~Completion() {}
+Completion::Completion(const Completion& c) : Handle<CompletionImpl>(c.impl) {}
+Completion& Completion::operator=(const Completion& c) { Handle<CompletionImpl>::operator=(c); return *this; }
+
+void Completion::wait() { impl->wait(); }
+bool Completion::isComplete() { return impl->isComplete(); }
+std::string Completion::getResult() { return impl->getResult(); }
+
+}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Completion.h b/qpid/cpp/src/qpid/client/Completion.h
index c4979d7..0b246b7 100644
--- a/qpid/cpp/src/qpid/client/Completion.h
+++ b/qpid/cpp/src/qpid/client/Completion.h
@@ -22,13 +22,14 @@
 #ifndef _Completion_
 #define _Completion_
 
-#include <boost/shared_ptr.hpp>
-#include "Future.h"
-#include "SessionImpl.h"
+#include "Handle.h"
+#include <string>
 
 namespace qpid {
 namespace client {
 
+class CompletionImpl;
+
 /** 
  * Asynchronous commands that do not return a result will return a
  * Completion. You can use the completion to wait for that specific
@@ -38,32 +39,26 @@
  *
  *\ingroup clientapi
  */
-class Completion
+class Completion : public Handle<CompletionImpl>
 {
-protected:
-    Future future;
-    shared_ptr<SessionImpl> session;
-
 public:
     ///@internal
-    Completion() {}
-
-    ///@internal
-    Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {}
+    QPID_CLIENT_EXTERN Completion(CompletionImpl* =0);
+    QPID_CLIENT_EXTERN ~Completion();
+    QPID_CLIENT_EXTERN Completion(const Completion&);
+    QPID_CLIENT_EXTERN Completion& operator=(const Completion&);
 
     /** Wait for the asynchronous command that returned this
      *Completion to complete.
      *
-     *@exception If the command returns an error, get() throws an exception.
+     *@exception If the command returns an error.
      */
-    void wait()
-    {
-        future.wait(*session);
-    }
+    QPID_CLIENT_EXTERN void wait();
 
-    bool isComplete() {
-        return future.isComplete(*session);
-    }
+    QPID_CLIENT_EXTERN bool isComplete();
+
+  protected:
+    QPID_CLIENT_EXTERN std::string getResult();
 };
 
 }}
diff --git a/qpid/cpp/src/qpid/client/CompletionImpl.h b/qpid/cpp/src/qpid/client/CompletionImpl.h
new file mode 100644
index 0000000..119abc0
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/CompletionImpl.h
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_COMPLETIONIMPL_H
+#define QPID_CLIENT_COMPLETIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "Future.h"
+
+namespace qpid {
+namespace client {
+
+///@internal
+class CompletionImpl : public RefCounted
+{
+public:
+    CompletionImpl() {}
+    CompletionImpl(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {}
+
+    bool isComplete() { return future.isComplete(*session); }
+    void wait() { future.wait(*session); }
+    std::string getResult() { return future.getResult(*session); }
+
+protected:
+    Future future;
+    shared_ptr<SessionImpl> session;
+};
+
+}} // namespace qpid::client
+
+
+#endif  /*!QPID_CLIENT_COMPLETIONIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index cc62d72..e841540 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -19,6 +19,7 @@
  *
  */
 #include "Connection.h"
+#include "ConnectionImpl.h"
 #include "ConnectionSettings.h"
 #include "Message.h"
 #include "SessionImpl.h"
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index 846ac33..d898ea7 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -25,6 +25,7 @@
 #include <string>
 #include "qpid/client/Session.h"
 #include "qpid/client/ClientImportExport.h"
+#include "qpid/client/ConnectionSettings.h"
 
 namespace qpid {
 
@@ -32,7 +33,6 @@
 
 namespace client {
 
-struct ConnectionSettings;
 class ConnectionImpl;
 
 /**
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 745bdb6..b1e8302 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -111,9 +111,11 @@
         Mutex::ScopedLock l(lock);
         s = sessions[frame.getChannel()].lock();
     }
-    if (!s)
-        throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel()));
-    s->in(frame);
+    if (!s) {
+        QPID_LOG(info, "Dropping frame received on invalid channel: " << frame);
+    } else {
+        s->in(frame);
+    }
 }
 
 bool ConnectionImpl::isOpen() const 
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 8d85745..5156031 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -26,6 +26,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/sys/BlockingQueue.h"
 #include "Message.h"
+#include "MessageImpl.h"
 
 #include <boost/state_saver.hpp>
 
@@ -49,6 +50,9 @@
         session.getExecution().getDemux().get(q); 
 }
 
+Dispatcher::~Dispatcher() {}
+    
+
 void Dispatcher::start()
 {
     worker = Thread(this);
@@ -71,7 +75,7 @@
             Mutex::ScopedUnlock u(lock);
             FrameSet::shared_ptr content = queue->pop();
             if (content->isA<MessageTransferBody>()) {
-                Message msg(*content);
+                Message msg(new MessageImpl(*content));
                 boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
                 if (!listener) {
                     QPID_LOG(error, "No listener found for destination " << msg.getDestination());
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.h b/qpid/cpp/src/qpid/client/Dispatcher.h
index e84f8f3..4dbb75d 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.h
+++ b/qpid/cpp/src/qpid/client/Dispatcher.h
@@ -30,7 +30,6 @@
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "MessageListener.h"
-#include "SubscriptionImpl.h"
 
 namespace qpid {
 namespace client {
@@ -61,6 +60,7 @@
 
 public:
     Dispatcher(const Session& session, const std::string& queue = "");
+    ~Dispatcher();
 
     void start();
     void wait();
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.cpp b/qpid/cpp/src/qpid/client/FailoverListener.cpp
index 16370f8..ed9354d 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverListener.cpp
@@ -20,6 +20,9 @@
  */
 #include "FailoverListener.h"
 #include "SessionBase_0_10Access.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/SubscriptionImpl.h"
 #include "qpid/client/SubscriptionManager.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
diff --git a/qpid/cpp/src/qpid/client/FailoverListener.h b/qpid/cpp/src/qpid/client/FailoverListener.h
index fe73a26..7afee73 100644
--- a/qpid/cpp/src/qpid/client/FailoverListener.h
+++ b/qpid/cpp/src/qpid/client/FailoverListener.h
@@ -33,6 +33,7 @@
 namespace client {
 
 class SubscriptionManager;
+class ConnectionImpl;
 
 /**
  * @internal Listen for failover updates from the amq.failover exchange.
diff --git a/qpid/cpp/src/qpid/client/Future.cpp b/qpid/cpp/src/qpid/client/Future.cpp
index 6a0c78a..fda4021 100644
--- a/qpid/cpp/src/qpid/client/Future.cpp
+++ b/qpid/cpp/src/qpid/client/Future.cpp
@@ -20,6 +20,7 @@
  */
 
 #include "Future.h"
+#include "SessionImpl.h"
 
 namespace qpid {
 namespace client {
diff --git a/qpid/cpp/src/qpid/client/Future.h b/qpid/cpp/src/qpid/client/Future.h
index ea01522..28c9a2b 100644
--- a/qpid/cpp/src/qpid/client/Future.h
+++ b/qpid/cpp/src/qpid/client/Future.h
@@ -26,17 +26,15 @@
 #include <boost/shared_ptr.hpp>
 #include "qpid/Exception.h"
 #include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/StructHelper.h"
 #include "FutureCompletion.h"
 #include "FutureResult.h"
-#include "SessionImpl.h"
 #include "ClientImportExport.h"
 
 namespace qpid {
 namespace client {
 
 /**@internal */
-class Future : private framing::StructHelper
+class Future
 {
     framing::SequenceNumber command;
     boost::shared_ptr<FutureResult> result;
@@ -46,13 +44,9 @@
     Future() : complete(false) {}    
     Future(const framing::SequenceNumber& id) : command(id), complete(false) {}    
 
-    template <class T> void decodeResult(T& value, SessionImpl& session) 
-    {
-        if (result) {
-            decode(value, result->getResult(session));
-        } else {
-            throw Exception("Result not expected");
-        }
+    std::string getResult(SessionImpl& session) {
+        if (result) return result->getResult(session);
+        else throw Exception("Result not expected");
     }
 
     QPID_CLIENT_EXTERN void wait(SessionImpl& session);
diff --git a/qpid/cpp/src/qpid/client/Handle.h b/qpid/cpp/src/qpid/client/Handle.h
index d8b822d..12fb4cf 100644
--- a/qpid/cpp/src/qpid/client/Handle.h
+++ b/qpid/cpp/src/qpid/client/Handle.h
@@ -30,9 +30,11 @@
 template <class T> class HandlePrivate;
 
 /**
- * A handle is like a pointer: it points to some underlying object.
+ * A handle is like a pointer: it points to some implementation object.
+ * Copying the handle does not copy the object.
+ * 
  * Handles can be null,  like a 0 pointer. Use isValid(), isNull() or the
- * implicit conversion to bool to test for a null handle.
+ * conversion to bool to test for a null handle.
  */
 template <class T> class Handle {
   public:
@@ -46,8 +48,11 @@
     /**@return true if handle is null. It is an error to call any function on a null handle. */
     QPID_CLIENT_EXTERN bool isNull() const { return !impl; }
 
+    /** Conversion to bool supports idiom if (handle) { handle->... } */
     QPID_CLIENT_EXTERN operator bool() const { return impl; }
-    QPID_CLIENT_EXTERN bool operator !() const { return impl; }
+
+    /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */
+    QPID_CLIENT_EXTERN bool operator !() const { return !impl; }
 
     QPID_CLIENT_EXTERN void swap(Handle<T>&);
 
diff --git a/qpid/cpp/src/qpid/client/HandlePrivate.h b/qpid/cpp/src/qpid/client/HandlePrivate.h
index 488ce48..46e4bff 100644
--- a/qpid/cpp/src/qpid/client/HandlePrivate.h
+++ b/qpid/cpp/src/qpid/client/HandlePrivate.h
@@ -21,14 +21,16 @@
  * under the License.
  *
  */
+#include "Handle.h"
+#include "qpid/RefCounted.h"
 #include <algorithm>
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
 namespace client {
 
 /** @file
- * Private implementation of handle, include in .cpp file of handle
- * subclasses _after_ including the declaration of class T.
+ * Implementation of handle, include in .cpp file of handle subclasses.
  * T can be any class that can be used with boost::intrusive_ptr.
  */
 
@@ -52,9 +54,13 @@
 template <class T>
 class HandlePrivate {
   public:
-    static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+    static boost::intrusive_ptr<T> get(const Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+    static void set(Handle<T>& h, const boost::intrusive_ptr<T>& p) { Handle<T>(p.get()).swap(h); }
 };
 
+template<class T> boost::intrusive_ptr<T> handleGetPtr(Handle<T>& h) { return HandlePrivate<T>::get(h); }
+template<class T> boost::intrusive_ptr<const T> handleGetPtr(const Handle<T>& h) { return HandlePrivate<T>::get(h); }
+template<class T> void handleSetPtr(Handle<T>& h, const boost::intrusive_ptr<T>& p) { HandlePrivate<T>::set(h, p); }
 
 }} // namespace qpid::client
 
diff --git a/qpid/cpp/src/qpid/client/LocalQueue.cpp b/qpid/cpp/src/qpid/client/LocalQueue.cpp
index e449c9f..02fecf8 100644
--- a/qpid/cpp/src/qpid/client/LocalQueue.cpp
+++ b/qpid/cpp/src/qpid/client/LocalQueue.cpp
@@ -19,11 +19,14 @@
  *
  */
 #include "LocalQueue.h"
+#include "MessageImpl.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "HandlePrivate.h"
 #include "SubscriptionImpl.h"
+#include "CompletionImpl.h"
 
 namespace qpid {
 namespace client {
@@ -49,7 +52,7 @@
     bool ok = queue->pop(content, timeout);
     if (!ok) return false;
     if (content->isA<MessageTransferBody>()) {
-        result = Message(*content);
+        result = Message(new MessageImpl(*content));
         boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription);
         assert(si);
         if (si) si->received(result);
diff --git a/qpid/cpp/src/qpid/client/Message.cpp b/qpid/cpp/src/qpid/client/Message.cpp
index 13caaec..962ce26 100644
--- a/qpid/cpp/src/qpid/client/Message.cpp
+++ b/qpid/cpp/src/qpid/client/Message.cpp
@@ -20,52 +20,37 @@
  */
 
 #include "Message.h"
+#include "PrivateImplPrivate.h"
+#include "MessageImpl.h"
 
 namespace qpid {
 namespace client {
 
-Message::Message(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {}
+template class PrivateImpl<MessageImpl>;
 
-std::string Message::getDestination() const 
-{ 
-    return method.getDestination(); 
-}
+Message::Message(const std::string& data, const std::string& routingKey) : PrivateImpl<MessageImpl>(new MessageImpl(data, routingKey)) {}
+Message::Message(MessageImpl* i) : PrivateImpl<MessageImpl>(i) {}
+Message::~Message() {}
 
-bool Message::isRedelivered() const 
-{ 
-    return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); 
-}
+std::string Message::getDestination() const { return impl->getDestination(); }
+bool Message::isRedelivered() const { return impl->isRedelivered(); }
+void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); }
+framing::FieldTable& Message::getHeaders() { return impl->getHeaders(); }
+const framing::FieldTable& Message::getHeaders() const { return impl->getHeaders(); }
+const framing::SequenceNumber& Message::getId() const { return impl->getId(); }
 
-void Message::setRedelivered(bool redelivered) 
-{ 
-    getDeliveryProperties().setRedelivered(redelivered); 
-}
+void Message::setData(const std::string& s) { impl->setData(s); }
+const std::string& Message::getData() const { return impl->getData(); }
+std::string& Message::getData() { return impl->getData(); }
 
-framing::FieldTable& Message::getHeaders() 
-{ 
-    return getMessageProperties().getApplicationHeaders(); 
-}
+void Message::appendData(const std::string& s) { impl->appendData(s); }
 
-const framing::FieldTable& Message::getHeaders() const
-{ 
-    return getMessageProperties().getApplicationHeaders(); 
-}
+bool Message::hasMessageProperties() const { return impl->hasMessageProperties(); }
+framing::MessageProperties& Message::getMessageProperties() { return impl->getMessageProperties(); }
+const framing::MessageProperties& Message::getMessageProperties() const { return impl->getMessageProperties(); }
 
-const framing::MessageTransferBody& Message::getMethod() const
-{
-    return method;
-}
-
-const framing::SequenceNumber& Message::getId() const
-{
-    return id;
-}
-
-/**@internal for incoming messages */
-Message::Message(const framing::FrameSet& frameset) :
-    method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
-{
-    populate(frameset);
-}
+bool Message::hasDeliveryProperties() const { return impl->hasDeliveryProperties(); }
+framing::DeliveryProperties& Message::getDeliveryProperties() { return impl->getDeliveryProperties(); }
+const framing::DeliveryProperties& Message::getDeliveryProperties() const { return impl->getDeliveryProperties(); }
 
 }}
diff --git a/qpid/cpp/src/qpid/client/Message.h b/qpid/cpp/src/qpid/client/Message.h
index 235e20f..97238db 100644
--- a/qpid/cpp/src/qpid/client/Message.h
+++ b/qpid/cpp/src/qpid/client/Message.h
@@ -1,5 +1,5 @@
-#ifndef _client_Message_h
-#define _client_Message_h
+#ifndef QPID_CLIENT_MESSAGE_H
+#define QPID_CLIENT_MESSAGE_H
 
 /*
  *
@@ -21,15 +21,24 @@
  * under the License.
  *
  */
-#include <string>
-#include "qpid/client/Session.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/TransferContent.h"
+
+#include "qpid/client/PrivateImpl.h"
 #include "qpid/client/ClientImportExport.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include <string>
 
 namespace qpid {
+
+namespace framing {
+class FieldTable;
+class SequenceNumber;           // FIXME aconway 2009-04-17: remove with getID?
+}
+
 namespace client {
 
+class MessageImpl;
+
 /**
  * A message sent to or received from the broker.
  *
@@ -104,8 +113,7 @@
  * 
  * 
  */
-
-class Message : public framing::TransferContent 
+class Message : public PrivateImpl<MessageImpl>
 {
 public:
     /** Create a Message.
@@ -115,6 +123,23 @@
     QPID_CLIENT_EXTERN Message(const std::string& data=std::string(),
             const std::string& routingKey=std::string());
 
+    QPID_CLIENT_EXTERN ~Message();
+
+    QPID_CLIENT_EXTERN void setData(const std::string&);
+    QPID_CLIENT_EXTERN const std::string& getData() const;
+    QPID_CLIENT_EXTERN std::string& getData();
+
+    QPID_CLIENT_EXTERN void appendData(const std::string&);
+
+    QPID_CLIENT_EXTERN bool hasMessageProperties() const;
+    QPID_CLIENT_EXTERN framing::MessageProperties& getMessageProperties();
+    QPID_CLIENT_EXTERN const framing::MessageProperties& getMessageProperties() const;
+
+    QPID_CLIENT_EXTERN bool hasDeliveryProperties() const;
+    QPID_CLIENT_EXTERN framing::DeliveryProperties& getDeliveryProperties();
+    QPID_CLIENT_EXTERN const framing::DeliveryProperties& getDeliveryProperties() const;
+
+    
     /** The destination of messages sent to the broker is the exchange
      * name.  The destination of messages received from the broker is
      * the delivery tag identifyig the local subscription (often this
@@ -133,20 +158,14 @@
     /** Get a non-modifyable reference to the message headers. */
     QPID_CLIENT_EXTERN const framing::FieldTable& getHeaders() const;
 
-    ///@internal
-    QPID_CLIENT_EXTERN const framing::MessageTransferBody& getMethod() const;
+    // FIXME aconway 2009-04-17: does this need to be in public API?
     ///@internal
     QPID_CLIENT_EXTERN const framing::SequenceNumber& getId() const;
 
-    /**@internal for incoming messages */
-    QPID_CLIENT_EXTERN Message(const framing::FrameSet& frameset);
-    
-private:
-    //method and id are only set for received messages:
-    framing::MessageTransferBody method;
-    framing::SequenceNumber id;
+    ///@internal
+    Message(MessageImpl*);
 };
 
 }}
 
-#endif  /*!_client_Message_h*/
+#endif  /*!QPID_CLIENT_MESSAGE_H*/
diff --git a/qpid/cpp/src/qpid/client/MessageImpl.cpp b/qpid/cpp/src/qpid/client/MessageImpl.cpp
new file mode 100644
index 0000000..3d06fd1
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/MessageImpl.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageImpl.h"
+
+namespace qpid {
+namespace client {
+
+MessageImpl::MessageImpl(const std::string& data, const std::string& routingKey) : TransferContent(data, routingKey) {}
+
+std::string MessageImpl::getDestination() const 
+{ 
+    return method.getDestination(); 
+}
+
+bool MessageImpl::isRedelivered() const 
+{ 
+    return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); 
+}
+
+void MessageImpl::setRedelivered(bool redelivered) 
+{ 
+    getDeliveryProperties().setRedelivered(redelivered); 
+}
+
+framing::FieldTable& MessageImpl::getHeaders() 
+{ 
+    return getMessageProperties().getApplicationHeaders(); 
+}
+
+const framing::FieldTable& MessageImpl::getHeaders() const
+{ 
+    return getMessageProperties().getApplicationHeaders(); 
+}
+
+const framing::MessageTransferBody& MessageImpl::getMethod() const
+{
+    return method;
+}
+
+const framing::SequenceNumber& MessageImpl::getId() const
+{
+    return id;
+}
+
+/**@internal for incoming messages */
+MessageImpl::MessageImpl(const framing::FrameSet& frameset) :
+    method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
+{
+    populate(frameset);
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/client/MessageImpl.h b/qpid/cpp/src/qpid/client/MessageImpl.h
new file mode 100644
index 0000000..c06d9b5
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/MessageImpl.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_MESSAGEIMPL_H
+#define QPID_CLIENT_MESSAGEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/Session.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/TransferContent.h"
+
+namespace qpid {
+namespace client {
+
+class MessageImpl : public framing::TransferContent
+{
+public:
+    /** Create a Message.
+     *@param data Data for the message body.
+     *@param routingKey Passed to the exchange that routes the message.
+     */
+    MessageImpl(const std::string& data=std::string(),
+            const std::string& routingKey=std::string());
+
+    /** The destination of messages sent to the broker is the exchange
+     * name.  The destination of messages received from the broker is
+     * the delivery tag identifyig the local subscription (often this
+     * is the name of the subscribed queue.)
+     */
+    std::string getDestination() const;
+
+    /** Check the redelivered flag. */
+    bool isRedelivered() const;
+    /** Set the redelivered flag. */
+    void setRedelivered(bool redelivered);
+
+    /** Get a modifyable reference to the message headers. */
+    framing::FieldTable& getHeaders();
+
+    /** Get a non-modifyable reference to the message headers. */
+    const framing::FieldTable& getHeaders() const;
+
+    ///@internal
+    const framing::MessageTransferBody& getMethod() const;
+    ///@internal
+    const framing::SequenceNumber& getId() const;
+
+    /**@internal for incoming messages */
+    MessageImpl(const framing::FrameSet& frameset);
+    
+private:
+    //method and id are only set for received messages:
+    framing::MessageTransferBody method;
+    framing::SequenceNumber id;
+};
+
+}}
+
+#endif  /*!QPID_CLIENT_MESSAGEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/PrivateImpl.h b/qpid/cpp/src/qpid/client/PrivateImpl.h
new file mode 100644
index 0000000..6e5ea35
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/PrivateImpl.h
@@ -0,0 +1,54 @@
+#ifndef QPID_CLIENT_PRIVATEIMPL_H
+#define QPID_CLIENT_PRIVATEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+
+template <class T> class PrivateImplPrivate;
+
+/**
+ * Base classes for objects with a private implementation.
+ * 
+ * PrivateImpl objects have value semantics: copying the object also
+ * makes a copy of the implementation.
+ */
+template <class T> class PrivateImpl {
+  public:
+    QPID_CLIENT_EXTERN ~PrivateImpl();
+    QPID_CLIENT_EXTERN PrivateImpl(const PrivateImpl&);
+    QPID_CLIENT_EXTERN PrivateImpl& operator=(const PrivateImpl&);
+    QPID_CLIENT_EXTERN void swap(PrivateImpl<T>&);
+
+  protected:
+    QPID_CLIENT_EXTERN PrivateImpl(T*);
+    T* impl;
+
+  friend class PrivateImplPrivate<T>;
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_PRIVATEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/client/PrivateImplPrivate.h b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h
new file mode 100644
index 0000000..021456e
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/PrivateImplPrivate.h
@@ -0,0 +1,66 @@
+#ifndef QPID_CLIENT_PRIVATEIMPLPRIVATE_H
+#define QPID_CLIENT_PRIVATEIMPLPRIVATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <algorithm>
+
+namespace qpid {
+namespace client {
+
+/** @file
+ * Implementation of PrivateImpl functions, to include in .cpp file of handle subclasses.
+ * T can be any class with value semantics.
+ */
+
+template <class T>
+PrivateImpl<T>::PrivateImpl(T* p) : impl(p) { assert(impl); }
+
+template <class T>
+PrivateImpl<T>::~PrivateImpl() { delete impl; }
+
+template <class T>
+PrivateImpl<T>::PrivateImpl(const PrivateImpl& h) : impl(new T(*h.impl)) {}
+
+template <class T>
+PrivateImpl<T>& PrivateImpl<T>::operator=(const PrivateImpl<T>& h) { PrivateImpl<T>(h).swap(*this); return *this; }
+
+template <class T>
+void PrivateImpl<T>::swap(PrivateImpl<T>& h) { std::swap(impl, h.impl); }
+
+
+/** Access to private impl of a PrivateImpl */
+template <class T>
+class PrivateImplPrivate {
+  public:
+    static T* get(const PrivateImpl<T>& h) { return h.impl; }
+    static void set(PrivateImpl<T>& h, const T& p) { PrivateImpl<T>(p).swap(h); }
+};
+
+template<class T> T* privateImplGetPtr(PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); }
+template<class T> T* privateImplGetPtr(const PrivateImpl<T>& h) { return PrivateImplPrivate<T>::get(h); }
+template<class T> void privateImplSetPtr(PrivateImpl<T>& h, const T*& p) { PrivateImplPrivate<T>::set(h, p); }
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_PRIVATEIMPLPRIVATE_H*/
+
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
index e81b78e..8a33c73 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -20,6 +20,8 @@
  */
 #include "SessionBase_0_10.h"
 #include "Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/Future.h"
 #include "qpid/framing/all_method_bodies.h"
 
 namespace qpid {
diff --git a/qpid/cpp/src/qpid/client/SessionBase_0_10.h b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
index 3ae2193..d375b3e 100644
--- a/qpid/cpp/src/qpid/client/SessionBase_0_10.h
+++ b/qpid/cpp/src/qpid/client/SessionBase_0_10.h
@@ -23,14 +23,11 @@
  */
 
 #include "qpid/SessionId.h"
-#include "qpid/framing/amqp_structs.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/TransferContent.h"
-#include "qpid/client/Completion.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/Execution.h"
 #include "qpid/client/SessionImpl.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/Execution.h"
 #include "qpid/client/TypedResult.h"
 #include "qpid/shared_ptr.h"
 #include "qpid/client/ClientImportExport.h"
@@ -44,7 +41,6 @@
 using std::string;
 using framing::Content;
 using framing::FieldTable;
-using framing::MethodContent;
 using framing::SequenceNumber;
 using framing::SequenceSet;
 using framing::SequenceNumberSet;
@@ -62,8 +58,6 @@
  */
 class SessionBase_0_10 {
   public:
-    
-    typedef framing::TransferContent DefaultContent;
 
     ///@internal
     QPID_CLIENT_EXTERN SessionBase_0_10();
diff --git a/qpid/cpp/src/qpid/client/Subscription.cpp b/qpid/cpp/src/qpid/client/Subscription.cpp
index 1f1b5ac..37f5557 100644
--- a/qpid/cpp/src/qpid/client/Subscription.cpp
+++ b/qpid/cpp/src/qpid/client/Subscription.cpp
@@ -21,6 +21,7 @@
 
 #include "Subscription.h"
 #include "SubscriptionImpl.h"
+#include "CompletionImpl.h"
 #include "HandlePrivate.h"
 #include "qpid/framing/enum.h"
 
diff --git a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
index e09a4c1..82c920c 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -20,8 +20,12 @@
  */
 
 #include "SubscriptionImpl.h"
+#include "MessageImpl.h"
+#include "CompletionImpl.h"
 #include "SubscriptionManager.h"
 #include "SubscriptionSettings.h"
+#include "HandlePrivate.h"
+#include "PrivateImplPrivate.h"
 
 namespace qpid {
 namespace client {
@@ -114,9 +118,9 @@
 
 void SubscriptionImpl::received(Message& m) {
     Mutex::ScopedLock l(lock);
-    if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) 
+    if (privateImplGetPtr(m)->getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) 
         unacquired.add(m.getId());
-    else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
+    else if (privateImplGetPtr(m)->getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
         unaccepted.add(m.getId());
 
     if (listener) {
diff --git a/qpid/cpp/src/qpid/client/TypedResult.h b/qpid/cpp/src/qpid/client/TypedResult.h
index 5306997..2e54f9f 100644
--- a/qpid/cpp/src/qpid/client/TypedResult.h
+++ b/qpid/cpp/src/qpid/client/TypedResult.h
@@ -23,6 +23,7 @@
 #define _TypedResult_
 
 #include "Completion.h"
+#include "qpid/framing/StructHelper.h"
 
 namespace qpid {
 namespace client {
@@ -39,7 +40,7 @@
 
 public:
     ///@internal
-    TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {}
+    TypedResult(CompletionImpl* c) : Completion(c), decoded(false) {}
 
     /**
      * Wait for the asynchronous command that returned this TypedResult to complete
@@ -49,13 +50,12 @@
      *@exception If the command returns an error, get() throws an exception.
      *
      */
-    T& get() 
-    {
+    T& get() {
         if (!decoded) {
-            future.decodeResult(result, *session);
+            framing::StructHelper helper;
+            helper.decode(result, getResult());
             decoded = true;
         }
-        
         return result;
     }
 };
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index f8e412f..a17f540 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,6 +36,7 @@
 #include "qpid/framing/ClusterConfigChangeBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -46,7 +47,6 @@
 #include "qpid/management/ManagementBroker.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
-#include "qpid/sys/LatencyMetric.h"
 #include "qpid/sys/Thread.h"
 
 #include <boost/bind.hpp>
@@ -63,6 +63,7 @@
 using namespace qpid::sys;
 using namespace std;
 using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
 using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
@@ -77,9 +78,10 @@
 
     void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
     void ready(const std::string& url) { cluster.ready(member, url, l); }
-    void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+    void configChange(const std::string& current) { cluster.configChange(member, current, l); }
     void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
     void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+    void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
     void shutdown() { cluster.shutdown(member, l); }
 
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +114,8 @@
     discarding(true),
     state(INIT),
     lastSize(0),
-    lastBroker(false)
+    lastBroker(false),
+    error(*this)
 {
     mAgent = ManagementAgent::Singleton::getInstance();
     if (mAgent != 0){
@@ -195,14 +198,19 @@
     leave(l);
 }
 
+#define LEAVE_TRY(STMT) try { STMT; } \
+    catch (const std::exception& e) { \
+        QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+    } do {} while(0)
+
 void Cluster::leave(Lock&) { 
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
-        try { broker.shutdown(); }
-        catch (const std::exception& e) {
-            QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
-        }
+        // Finalize connections now now to avoid problems later in destructor.
+        LEAVE_TRY(localConnections.clear());
+        LEAVE_TRY(connections.clear());
+        LEAVE_TRY(broker.shutdown());
     }
 }
 
@@ -218,8 +226,6 @@
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
-    if (from == self)  // Record self-deliveries for flow control.
-        mcast.selfDeliver(e);
     deliverEvent(e);
 }
 
@@ -254,10 +260,22 @@
         QPID_LOG(trace, *this << " DROP: " << e);
 }
 
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+    Mutex::ScopedLock l(lock);
+    error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
 void Cluster::deliveredFrame(const EventFrame& e) {
     Mutex::ScopedLock l(lock);
+    // Process each frame through the error checker.
+    error.delivered(e);
+    while (error.canProcess())  // There is a frame ready to process.
+        processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
     if (e.isCluster()) {
         QPID_LOG(trace, *this << " DLVR: " << e);
         ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +283,8 @@
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
     else if (state >= CATCHUP) {
-        QPID_LOG(trace, *this << " DLVR:  " << e);
+        map.incrementFrameSeq();
+        QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ":  " << e);
         ConnectionPtr connection = getConnection(e.connectionId, l);
         if (connection)
             connection->deliveredFrame(e);
@@ -316,11 +335,11 @@
     for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
         const char* reasonString;
         switch (p->reason) {
-          case CPG_REASON_JOIN: reasonString =  " (joined) "; break;
-          case CPG_REASON_LEAVE: reasonString =  " (left) "; break;
-          case CPG_REASON_NODEDOWN: reasonString =  " (node-down) "; break;
-          case CPG_REASON_NODEUP: reasonString =  " (node-up) "; break;
-          case CPG_REASON_PROCDOWN: reasonString =  " (process-down) "; break;
+          case CPG_REASON_JOIN: reasonString =  "(joined) "; break;
+          case CPG_REASON_LEAVE: reasonString =  "(left) "; break;
+          case CPG_REASON_NODEDOWN: reasonString =  "(node-down) "; break;
+          case CPG_REASON_NODEUP: reasonString =  "(node-up) "; break;
+          case CPG_REASON_PROCDOWN: reasonString =  "(process-down) "; break;
           default: reasonString = " ";
         }
         qpid::cluster::MemberId member(*p);
@@ -342,8 +361,8 @@
         broker.setRecovery(nCurrent == 1);
         initialized = true;
     }
-    QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) 
-             << AddrList(left, nLeft, "( ", ")"));
+    QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+             << AddrList(left, nLeft, "left: "));
     std::string addresses;
     for (cpg_address* p = current; p < current+nCurrent; ++p) 
         addresses.append(MemberId(*p).str());
@@ -357,8 +376,8 @@
     broker.getQueueEvents().enable();
 }
 
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
-    bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+    bool memberChange = map.configChange(current);
     if (state == LEFT) return;
     
     if (!map.isAlive(self)) {  // Final config change.
@@ -589,19 +608,24 @@
         mgmtObject->set_memberIDs(idstr);
     }
 
-    // Erase connections belonging to members that have left the cluster.
+    // Close connections belonging to members that have left the cluster.
     ConnectionMap::iterator i = connections.begin();
     while (i != connections.end()) {
         ConnectionMap::iterator j = i++;
         MemberId m = j->second->getId().getMember();
         if (m != self && !map.isMember(m))
-            connections.erase(j);
+            j->second->deliverClose();
     }
 }
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
-    static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
-    return o << cluster.self << "(" << STATE[cluster.state] << ")";
+    static const char* STATE[] = {
+        "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+    };
+    assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+    o << cluster.self << "(" << STATE[cluster.state];
+    if (cluster.error.isUnresolved()) o << "/error";
+    return o << ")";
 }
 
 MemberId Cluster::getId() const {
@@ -635,4 +659,13 @@
     expiryPolicy->deliverExpire(id);
 }
 
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+    // If we receive an errorCheck here, it's because we  have processed past the point
+    // of the error so respond with ERROR_TYPE_NONE
+    assert(map.getFrameSeq() >= frameSeq);
+    if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+        mcast.mcastControl(
+            ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
 }} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index b716e2d..8a94fc7 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -23,6 +23,7 @@
 #include "ClusterSettings.h"
 #include "Cpg.h"
 #include "Decoder.h"
+#include "ErrorCheck.h"
 #include "Event.h"
 #include "EventFrame.h"
 #include "ExpiryPolicy.h"
@@ -105,6 +106,10 @@
 
     void deliverFrame(const EventFrame&);
 
+    // Called in deliverFrame thread to indicate an error from the broker.
+    void flagError(Connection&, ErrorCheck::ErrorType);
+    void connectionError();
+
     // Called only during update by Connection::shadowReady
     Decoder& getDecoder() { return decoder; }
 
@@ -132,13 +137,15 @@
 
     // == Called in deliverFrameQueue thread
     void deliveredFrame(const EventFrame&); 
+    void processFrame(const EventFrame&, Lock&); 
 
     // Cluster controls implement XML methods from cluster.xml.
     void updateRequest(const MemberId&, const std::string&, Lock&);
     void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
-    void configChange(const MemberId&, const std::string& addresses, Lock& l);
+    void configChange(const MemberId&, const std::string& current, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
+    void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
     void shutdown(const MemberId&, Lock&);
 
     // Helper functions
@@ -216,11 +223,13 @@
     Decoder decoder;
     bool discarding;
     
+
     // Remaining members are protected by lock.
-    // FIXME aconway 2009-03-06: Most of these members are also only used in
+
+    // TODO aconway 2009-03-06: Most of these members are also only used in
     // deliverFrameQueue thread or during stall. Review and separate members
     // that require a lock, drop lock when not needed.
-    // 
+
     mutable sys::Monitor lock;
 
 
@@ -243,7 +252,7 @@
     bool lastBroker;
     sys::Thread updateThread;
     boost::optional<ClusterMap> updatedMap;
-
+    ErrorCheck error;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index 9e72321..0395ff6 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -33,6 +33,13 @@
 
 namespace cluster {
 
+ClusterMap::Set ClusterMap::decode(const std::string& s) {
+    Set set;
+    for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)  
+        set.insert(MemberId(std::string(i, i+8)));
+    return set;
+}
+
 namespace {
 
 void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
@@ -54,9 +61,9 @@
 
 }
 
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : frameSeq(0) {}
 
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
     alive.insert(id);
     if (isMember)
         members[id] = url;
@@ -64,7 +71,9 @@
         joiners[id] = url;
 }
 
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+  : frameSeq(frameSeq_)
+{
     std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
     std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
 }
@@ -78,22 +87,7 @@
     }
     b.getMembers().clear();
     std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
-}
-
-bool ClusterMap::configChange(
-    cpg_address *current, int nCurrent,
-    cpg_address *left, int nLeft,
-    cpg_address */*joined*/, int /*nJoined*/)
-{
-    cpg_address* a;
-    bool memberChange=false;
-    for (a = left; a != left+nLeft; ++a) {
-        memberChange = memberChange || members.erase(*a);
-        joiners.erase(*a);
-    }
-    alive.clear();
-    std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
-    return memberChange;
+    b.setFrameSeq(frameSeq);
 }
 
 Url ClusterMap::getUrl(const Map& map, const  MemberId& id) {
@@ -123,8 +117,13 @@
     return urls;
 }
 
-ClusterMap::Set ClusterMap::getAlive() const {
-    return alive;
+ClusterMap::Set ClusterMap::getAlive() const { return alive; }
+
+ClusterMap::Set ClusterMap::getMembers() const {
+    Set s;
+    std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
+                   boost::bind(&Map::value_type::first, _1));
+    return s;
 }
 
 std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
@@ -158,7 +157,7 @@
 
 bool ClusterMap::configChange(const std::string& addresses) {
     bool memberChange = false;
-    Set update;
+    Set update = decode(addresses);
     for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8)  
         update.insert(MemberId(std::string(i, i+8)));
     Set removed;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 4548441..3359c7c 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -38,26 +38,26 @@
 namespace qpid {
 namespace cluster {
 
+typedef std::set<MemberId> MemberSet;
+
 /**
- * Map of established cluster members and joiners waiting for an update.
+ * Map of established cluster members and joiners waiting for an update,
+ * along with other cluster state that must be updated.
  */
 class ClusterMap {
   public:
     typedef std::map<MemberId, Url> Map;
     typedef std::set<MemberId> Set;
 
+    static Set decode(const std::string&);
+        
     ClusterMap();
     ClusterMap(const MemberId& id, const Url& url, bool isReady);
-    ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
+    ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
 
     /** Update from config change.
      *@return true if member set changed.
      */
-    bool configChange(
-        cpg_address *current, int nCurrent,
-        cpg_address *left, int nLeft,
-        cpg_address *joined, int nJoined);
-
     bool configChange(const std::string& addresses);
 
     bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
@@ -78,6 +78,7 @@
     std::vector<std::string> memberIds() const;
     std::vector<Url> memberUrls() const;
     Set getAlive() const;
+    Set getMembers() const;
 
     bool updateRequest(const MemberId& id, const std::string& url);       
     /** Return non-empty Url if accepted */
@@ -90,11 +91,16 @@
      * Utility method to return intersection of two member sets
      */
     static Set intersection(const Set& a, const Set& b);
+
+    uint64_t getFrameSeq() { return frameSeq; }
+    uint64_t incrementFrameSeq() { return ++frameSeq; }
+    
   private:
     Url getUrl(const Map& map, const  MemberId& id);
     
     Map joiners, members;
     Set alive;
+    uint64_t frameSeq;
 
   friend std::ostream& operator<<(std::ostream&, const Map&);
   friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index aa7d082..97cafba 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -39,8 +39,6 @@
 #include "qpid/framing/ConnectionCloseBody.h"
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyMetric.h"
-#include "qpid/sys/AtomicValue.h"
 
 #include <boost/current_function.hpp>
 
@@ -56,8 +54,16 @@
 namespace cluster {
 
 using namespace framing;
+using namespace framing::cluster;
 
-NoOpConnectionOutputHandler Connection::discardHandler;
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+    void handle(framing::AMQFrame&) {}
+};
+
 
 namespace {
 sys::AtomicValue<uint64_t> idCounter;
@@ -89,6 +95,8 @@
         connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
         connection.setClientThrottling(false);              // Disable client throttling, done by active node.
     }
+    if (!isCatchUp())
+        connection.setErrorListener(this);
 }
 
 void Connection::giveReadCredit(int credit) {
@@ -97,6 +105,7 @@
 }
 
 Connection::~Connection() {
+    connection.setErrorListener(0);
     QPID_LOG(debug, cluster << " deleted connection: " << *this);
 }
 
@@ -126,7 +135,7 @@
                 cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
             connection.getOutput().send(ok);
-            output.closeOutput(discardHandler);
+            output.closeOutput();
             catchUp = false;
         }
         else
@@ -156,8 +165,8 @@
     {
         if (f.type == DATA) // incoming data frames to broker::Connection
             connection.received(const_cast<AMQFrame&>(f.frame)); 
-        else {                    // frame control, send frame via SessionState
-            broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+        else {           // frame control, send frame via SessionState
+            broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
             if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
         }
     }
@@ -180,7 +189,7 @@
             // This was a local replicated connection. Multicast a deliver
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
-            output.closeOutput(discardHandler);
+            output.closeOutput();
             cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
         }
     }
@@ -275,13 +284,14 @@
     QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
     self = shadowId;
     connection.setUserId(username);
-    // OK to use decoder here because we are stalled for update.
+    // OK to use decoder here because cluster is stalled for update.
     cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+    connection.setErrorListener(this);
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
-    cluster.updateInDone(ClusterMap(joiners, members));
+    cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
     self.second = 0;        // Mark this as completed update connection.
 }
 
@@ -305,7 +315,9 @@
 }
 
 broker::QueuedMessage Connection::getUpdateMessage() {
-    broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+    shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+    assert(!updateq->isDurable());
+    broker::QueuedMessage m = updateq->get();
     if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
     return m;
 }
@@ -342,15 +354,15 @@
 
     // If the message was unacked, the newbie broker must place
     // it in its messageStore.
-    if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled )
+    if ( m.payload && m.payload->isPersistent() && acquired && !ended)
         queue->enqueue ( 0, m.payload );
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
-    shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
-    if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
-    q->setPosition(position);
-}
+        shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+        if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+        q->setPosition(position);
+    }
 
 void Connection::expiryId(uint64_t id) {
     cluster.getExpiryPolicy().setId(id);
@@ -407,7 +419,14 @@
     QPID_LOG(debug, cluster << " decoded queue " << q->getName());    
 }
 
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+void Connection::sessionError(uint16_t , const std::string& ) {
+    cluster.flagError(*this, ERROR_TYPE_SESSION);
+    
+}
+
+void Connection::connectionError(const std::string& ) {
+    cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+}
 
 }} // namespace qpid::cluster
 
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 6434f76..414e5c9 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -25,7 +25,6 @@
 #include "types.h"
 #include "WriteEstimate.h"
 #include "OutputInterceptor.h"
-#include "NoOpConnectionOutputHandler.h"
 #include "EventFrame.h"
 #include "McastFrameHandler.h"
 
@@ -58,7 +57,8 @@
 class Connection :
         public RefCounted,
         public sys::ConnectionInputHandler,
-        public framing::AMQP_AllOperations::ClusterConnectionHandler
+        public framing::AMQP_AllOperations::ClusterConnectionHandler,
+        private broker::Connection::ErrorListener
         
 {
   public:
@@ -120,7 +120,7 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&);
+    void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
 
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,
@@ -151,14 +151,22 @@
 
     void giveReadCredit(int credit);
 
+    void deliverClose();
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
     };
     
+
+    static NullFrameHandler nullFrameHandler;
+
+    // Error listener functions
+    void connectionError(const std::string&);
+    void sessionError(uint16_t channel, const std::string&);
+    
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
-    void deliverClose();
     void deliverDoOutput(uint32_t requested);
     void sendDoOutput();
 
@@ -167,8 +175,6 @@
     broker::SemanticState& semanticState();
     broker::QueuedMessage getUpdateMessage();
 
-    static NoOpConnectionOutputHandler discardHandler;
-
     Cluster& cluster;
     ConnectionId self;
     bool catchUp;
@@ -181,7 +187,6 @@
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     bool expectProtocolHeader;
     McastFrameHandler mcastFrameHandler;
-    NullFrameHandler nullFrameHandler;
 
     static qpid::sys::AtomicValue<uint64_t> catchUpId;
     
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 915a578..f746eac 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -75,11 +75,14 @@
     ::memset(&callbacks, sizeof(callbacks), 0);
     callbacks.cpg_deliver_fn = &globalDeliver;
     callbacks.cpg_confchg_fn = &globalConfigChange;
+
+    QPID_LOG(info, "Initializing CPG");
     cpg_error_t err = cpg_initialize(&handle, &callbacks);
-    if (err == CPG_ERR_TRY_AGAIN) {
-        QPID_LOG(notice, "Waiting for CPG initialization.");
-        while (CPG_ERR_TRY_AGAIN == (err = cpg_initialize(&handle, &callbacks)))
-            sys::sleep(5);
+    int retries = 6;
+    while (err == CPG_ERR_TRY_AGAIN && --retries) {
+        QPID_LOG(notice, "Re-trying CPG initialization.");
+        sys::sleep(5);
+        err = cpg_initialize(&handle, &callbacks);
     }
     check(err, "Failed to initialize CPG.");
     check(cpg_context_set(handle, this), "Cannot set CPG context");
@@ -87,7 +90,6 @@
     // windows then this needs to be refactored into
     // qpid::sys::<platform>
     IOHandle::impl->fd = getFd();
-    QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle);
 }
 
 Cpg::~Cpg() {
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
new file mode 100644
index 0000000..6132d52
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ErrorCheck.h"
+#include "EventFrame.h"
+#include "ClusterMap.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+using namespace framing;
+using namespace framing::cluster;
+
+ErrorCheck::ErrorCheck(Cluster& c)
+    : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
+{}
+
+ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+    copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
+    return o;
+}
+
+void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+{
+    // Detected a local error, inform cluster and set error state.
+    assert(t != ERROR_TYPE_NONE); // Must be an error.
+    assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+    type = t;
+    unresolved = ms;
+    frameSeq = seq;
+    connection = &c;
+    QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
+             << " error " << frameSeq << " unresolved: " << unresolved);
+    mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+}
+
+void ErrorCheck::delivered(const EventFrame& e) {
+    if (isUnresolved()) {
+        const ClusterErrorCheckBody* errorCheck =
+            dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+        const ClusterConfigChangeBody* configChange =
+            dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+
+        if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+            if (errorCheck->getType() < type) { // my error is worse than his
+                QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+                throw Exception("Aborted by local failure that did not occur on all replicas");
+            }
+            else {              // his error is worse/same as mine.
+                QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+                unresolved.erase(e.getMemberId());
+                checkResolved();
+            }
+        }
+        else {
+            frames.push_back(e); // Only drop matching errorCheck controls.
+            if (configChange) {
+                MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+                MemberSet result;
+                set_intersection(members.begin(), members.end(),
+                                 unresolved.begin(), unresolved.end(),
+                                 inserter(result, result.begin()));
+                unresolved.swap(result);
+                checkResolved();
+            }
+        }
+    }
+    else 
+        frames.push_back(e);
+}
+
+void ErrorCheck::checkResolved() {
+    if (unresolved.empty()) {   // No more potentially conflicted members, we're clear.
+        type = ERROR_TYPE_NONE;
+        QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+    }
+    else 
+        QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+}
+
+EventFrame ErrorCheck::getNext() {
+    assert(canProcess());
+    EventFrame e(frames.front());
+    frames.pop_front();
+    return e;
+}
+
+bool ErrorCheck::canProcess() const {
+    return type == ERROR_TYPE_NONE && !frames.empty();
+}
+
+bool ErrorCheck::isUnresolved() const {
+    return type != ERROR_TYPE_NONE;
+}
+    
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
new file mode 100644
index 0000000..97b5f2b
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
@@ -0,0 +1,80 @@
+#ifndef QPID_CLUSTER_ERRORCHECK_H
+#define QPID_CLUSTER_ERRORCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/enum.h"
+#include <boost/function.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class ClusterMap;
+class Cluster;
+class Multicaster;
+class Connection;
+
+/**
+ * Error checking logic.
+ * 
+ * When an error occurs stop processing frames and queue them until we
+ * can determine if all nodes experienced the error. If not, we shut down.
+ */
+class ErrorCheck
+{
+  public:
+    typedef std::set<MemberId> MemberSet;
+    typedef framing::cluster::ErrorType ErrorType;
+    
+    ErrorCheck(Cluster&);
+
+    /** A local error has occured */
+    void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+
+    /** Called when a frame is delivered */
+    void delivered(const EventFrame&);
+
+    EventFrame getNext();
+
+    bool canProcess() const;
+    bool isUnresolved() const;
+    
+  private:
+    void checkResolved();
+    
+    Cluster& cluster;
+    Multicaster& mcast;
+    std::deque<EventFrame> frames;
+    std::set<MemberId> unresolved;
+    uint64_t frameSeq;
+    ErrorType type;
+    Connection* connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_ERRORCHECK_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index e05ad60..f0e445a 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -25,7 +25,6 @@
 #include "types.h"
 #include "qpid/RefCountedBuffer.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/LatencyMetric.h"
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
 
@@ -42,7 +41,7 @@
 namespace cluster {
 
 /** Header data for a multicast event */
-class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
+class EventHeader {
   public:
     EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
     void decode(const MemberId& m, framing::Buffer&);
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
index 9350c80..a48d134 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
@@ -28,9 +28,7 @@
 
 EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
     : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType())
-{
-    QPID_LATENCY_INIT(frame);
-}
+{}
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
     if (e.frame.getBody()) o << e.frame;
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h
index d6ff58d..e275aac 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.h
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.h
@@ -25,7 +25,6 @@
 #include "types.h"
 #include "Event.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/LatencyMetric.h"
 #include <boost/intrusive_ptr.hpp>
 #include <iosfwd>
 
@@ -45,6 +44,7 @@
     bool isCluster() const { return connectionId.getNumber() == 0; }
     bool isConnection() const { return connectionId.getNumber() != 0; }
     bool isLastInEvent() const { return readCredit; }
+    MemberId getMemberId() const { return connectionId.getMember(); }
 
 
     ConnectionId connectionId;
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index 409180c..348963f 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -50,6 +50,13 @@
     timer.add(new ExpiryTask(this, id, m.getExpiration()));
 }
 
+void ExpiryPolicy::forget(broker::Message& m) {
+    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+    assert(i != unexpiredByMessage.end());
+    unexpiredById.erase(i->second);
+    unexpiredByMessage.erase(i);
+}
+
 bool ExpiryPolicy::hasExpired(broker::Message& m) {
     return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
 }
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index 9f8b1a9..c147e54 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -49,8 +49,8 @@
     ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
 
     void willExpire(broker::Message&);
-
     bool hasExpired(broker::Message&);
+    void forget(broker::Message&);
 
     // Send expiration notice to cluster.
     void sendExpire(uint64_t);
diff --git a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
index 8b2f6da..4df742d 100644
--- a/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
+++ b/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
@@ -52,6 +52,8 @@
             return 0;
     }
 
+    void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); }
+
   private:
     typedef std::map<ConnectionId, ConnectionPtr> Map;
     mutable sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index f0738ab..3b9d3ac 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -22,7 +22,6 @@
 #include "Multicaster.h"
 #include "Cpg.h"
 #include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyMetric.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/AMQFrame.h"
 
@@ -64,7 +63,6 @@
             return;
         }
     }
-    QPID_LATENCY_INIT(e);
     queue.push(e);
 }
 
@@ -73,7 +71,6 @@
     try {
         PollableEventQueue::Queue::iterator i = values.begin();
         while( i != values.end()) {
-            QPID_LATENCY_RECORD("mcast send queue", *i);
             iovec iov = i->toIovec();
             if (!cpg.mcast(&iov, 1)) {
                 // cpg didn't send because of CPG flow control.
@@ -97,9 +94,4 @@
     holdingQueue.clear();
 }
 
-void Multicaster::selfDeliver(const Event& e) {
-    sys::Mutex::ScopedLock l(lock);
-    QPID_LATENCY_RECORD("cpg self deliver", e);
-}
-
 }} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h
index d1c3115..baa5b87 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.h
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.h
@@ -55,8 +55,6 @@
     void mcast(const Event& e);
     /** End holding mode, held events are mcast */
     void release();
-    /** Call when events are self-delivered to manage flow control. */
-    void selfDeliver(const Event&);
     
   private:
     typedef sys::PollableQueue<Event> PollableEventQueue;
diff --git a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
index 74a376a..6a30bdd 100644
--- a/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
+++ b/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
@@ -30,8 +30,7 @@
 namespace cluster {
 
 /**
- * Output handler for frames sent to noop connections.
- * Simply discards frames.
+ * Output handler shadow connections, simply discards frames.
  */
 class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
 {
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index cd42446..6af114a 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -32,8 +32,9 @@
 
 using namespace framing;
 
-OutputInterceptor::OutputInterceptor(
-    cluster::Connection& p, sys::ConnectionOutputHandler& h)
+NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
+
+OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
     : parent(p), closing(false), next(&h), sent(),
       writeEstimate(p.getCluster().getWriteEstimate()),
       moreOutput(), doingOutput()
@@ -47,7 +48,6 @@
     }
     if (!parent.isCatchUp())
         sent += f.encodedSize();
-    QPID_LATENCY_RECORD("up to write queue", f);
 }
 
 void OutputInterceptor::activateOutput() {
@@ -98,7 +98,6 @@
 // Send a doOutput request if one is not already in flight.
 void OutputInterceptor::sendDoOutput() {
     if (!parent.isLocal()) return;
-    QPID_LATENCY_INIT(*this);
     doingOutput = true;
     size_t request = writeEstimate.sending(getBuffered());
     
@@ -111,10 +110,10 @@
     QPID_LOG(trace, parent << "Send doOutput request for " << request);
 }
 
-void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) {
+void OutputInterceptor::closeOutput() {
     sys::Mutex::ScopedLock l(lock);
     closing = true;
-    next = &h;
+    next = &discardHandler;
 }
 
 void OutputInterceptor::close() {
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
index c080a41..61e246b 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -23,9 +23,9 @@
  */
 
 #include "WriteEstimate.h"
+#include "NoOpConnectionOutputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/broker/ConnectionFactory.h"
-#include "qpid/sys/LatencyMetric.h"
 #include <boost/function.hpp>
 
 namespace qpid {
@@ -37,7 +37,7 @@
 /**
  * Interceptor for connection OutputHandler, manages outgoing message replication.
  */
-class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetricTimestamp {
+class OutputInterceptor : public sys::ConnectionOutputHandler {
   public:
     OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h);
 
@@ -53,7 +53,7 @@
     // Intercept doOutput requests on Connection.
     bool doOutput();
 
-    void closeOutput(sys::ConnectionOutputHandler& h);
+    void closeOutput();
 
     cluster::Connection& parent;
     
@@ -70,6 +70,7 @@
     WriteEstimate writeEstimate;
     bool moreOutput;
     bool doingOutput;
+    static NoOpConnectionOutputHandler discardHandler;
 };
 
 }} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 97eae7e..bb4df88 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -26,6 +26,9 @@
 #include "ExpiryPolicy.h"
 #include "qpid/client/SessionBase_0_10Access.h" 
 #include "qpid/client/ConnectionAccess.h" 
+#include "qpid/client/SessionImpl.h" 
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Future.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
@@ -98,10 +101,7 @@
       expiry(expiry_), connections(cons), decoder(decoder_),
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail), connectionSettings(cs)
-{
-    connection.open(url, cs);
-    session = connection.newSession(UPDATE);
-}
+{}
 
 UpdateClient::~UpdateClient() {}
 
@@ -110,6 +110,8 @@
 
 void UpdateClient::run() {
     try {
+        connection.open(updateeUrl, connectionSettings);
+        session = connection.newSession(UPDATE);
         update();
         done();
     } catch (const std::exception& e) {
@@ -126,15 +128,19 @@
     // Update queue is used to transfer acquired messages that are no longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
-    session.close();
 
     std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
 
+    session.queueDelete(arg::queue=UPDATE);
+    session.close();
+
+
     ClusterConnectionProxy(session).expiryId(expiry.getId());
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
+
     connection.close();
     QPID_LOG(debug,  updaterId << " updated state to " << updateeId << " at " << updateeUrl);
 }
@@ -203,7 +209,6 @@
         sb.get()->send(transfer, message.payload->getFrames());
         if (message.payload->isContentReleased()){
             uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
-
             uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
             bool morecontent = true;
             for (uint64_t offset = 0; morecontent; offset += maxContentSize)
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 23d061b..96e2479 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -24,6 +24,7 @@
 
 #include "ClusterMap.h"
 #include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
 #include "qpid/client/AsyncSession.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/sys/Runnable.h"
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index 34319e7..2f0a01d 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -26,7 +26,6 @@
 #include "AMQContentBody.h"
 #include "AMQHeartbeatBody.h"
 #include "ProtocolVersion.h"
-#include "qpid/sys/LatencyMetric.h"
 #include <boost/intrusive_ptr.hpp>
 #include <boost/cast.hpp>
 #include "qpid/CommonImportExport.h"
@@ -34,7 +33,7 @@
 namespace qpid {
 namespace framing {
 
-class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
+class AMQFrame : public AMQDataBlock
 {
   public:
     QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0);
diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp b/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
deleted file mode 100644
index caa221d..0000000
--- a/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifdef QPID_LATENCY_METRIC
-
-#include "LatencyMetric.h"
-#include "Time.h"
-#include <iostream>
-
-namespace qpid {
-namespace sys {
-
-void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) {
-    const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now());
-}
-
-void LatencyMetricTimestamp::clear(const LatencyMetricTimestamp& ts) {
-    const_cast<int64_t&>(ts.latency_metric_timestamp) = 0;
-}
-
-LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) : 
-    message(msg), count(0), total(0), skipped(0), skip(skip_)
-{}
-
-LatencyMetric::~LatencyMetric() { report(); }
-    
-void LatencyMetric::record(const LatencyMetricTimestamp& start) {
-    if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps.
-    if (skip) {
-        if (++skipped < skip) return;
-        else skipped = 0;
-    }
-    ++count;
-    int64_t now_ = Duration(now());
-    total += now_ - start.latency_metric_timestamp;
-    // Set start time for next leg of the journey 
-    const_cast<int64_t&>(start.latency_metric_timestamp) = now_; 
-}
-
-void LatencyMetric::report() {
-    using namespace std;
-    if (count) {
-        cout << "LATENCY: " << message << ": "
-             << total / (count * TIME_USEC) << " microseconds" << endl;
-    }
-    else {
-        cout << "LATENCY: " << message << ": no data." << endl;
-    }
-    count = 0;
-    total = 0; 
-}
-
-
-}} // namespace qpid::sys
-
-#endif 
diff --git a/qpid/cpp/src/qpid/sys/LatencyMetric.h b/qpid/cpp/src/qpid/sys/LatencyMetric.h
deleted file mode 100644
index 63b5020..0000000
--- a/qpid/cpp/src/qpid/sys/LatencyMetric.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef QPID_SYS_LATENCYMETRIC_H
-#define QPID_SYS_LATENCYMETRIC_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifdef QPID_LATENCY_METRIC
-
-#include "qpid/sys/IntegerTypes.h"
-
-namespace qpid {
-namespace sys {
-
-/** Use this base class to add a timestamp for latency to an object */
-struct LatencyMetricTimestamp {
-    LatencyMetricTimestamp() : latency_metric_timestamp(0) {}
-    static void initialize(const LatencyMetricTimestamp&);
-    static void clear(const LatencyMetricTimestamp&);
-    int64_t latency_metric_timestamp;
-};
-    
-/**
- * Record average latencies, report on destruction.
- *
- * For debugging only, use via macros below so it can be compiled out
- * of production code.
- */
-class LatencyMetric {
-  public:
-    /** msg should be a string literal. */
-    LatencyMetric(const char* msg, int64_t skip_=0);
-    ~LatencyMetric();
-    
-    void record(const LatencyMetricTimestamp& start);
-
-  private:
-    void report();
-    const char* message;
-    int64_t count, total, skipped, skip;
-};
-
-}} // namespace qpid::sys
-
-#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x)
-#define QPID_LATENCY_CLEAR(x) ::qpid::sys::LatencyMetricTimestamp::clear(x)
-#define QPID_LATENCY_RECORD(msg, x) do {                                 \
-        static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \
-    } while (false)
-#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) do {                          \
-        static ::qpid::sys::LatencyMetric metric__(msg, skip); metric__.record(x); \
-    } while (false)
-
-
-#else  /* defined QPID_LATENCY_METRIC */
-
-namespace qpid { namespace sys {
-class LatencyMetricTimestamp {};
-}}
-
-#define QPID_LATENCY_INIT(x) (void)x
-#define QPID_LATENCY_CLEAR(x) (void)x
-#define QPID_LATENCY_RECORD(msg, x) (void)x
-#define QPID_LATENCY_RECORD_SKIP(msg, x, skip) (void)x
-
-#endif /* defined QPID_LATENCY_METRIC */
-
-#endif  /*!QPID_SYS_LATENCYMETRIC_H*/
diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
index 067c0fe..cd89f39 100755
--- a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
+++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -23,6 +23,7 @@
  */
 
 #include "AsynchIoResult.h"
+#include "qpid/CommonImportExport.h"
 
 #include <winsock2.h>
 
@@ -49,7 +50,7 @@
     AsynchIO::RequestCallback cbRequest;
 };
 
-SOCKET toFd(const IOHandlePrivate* h);
+QPID_COMMON_EXTERN SOCKET toFd(const IOHandlePrivate* h);
 
 }}
 
diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp
index e33b2dc..41423d8 100644
--- a/qpid/cpp/src/tests/AsyncCompletion.cpp
+++ b/qpid/cpp/src/tests/AsyncCompletion.cpp
@@ -24,6 +24,8 @@
 #include "qpid/sys/BlockingQueue.h"
 #include "qpid/client/AsyncSession.h"
 #include "qpid/sys/Time.h"
+#include "qpid/framing/QueueQueryResult.h"
+#include "qpid/client/TypedResult.h"
 
 using namespace std;
 using namespace qpid;
@@ -97,4 +99,15 @@
     sync.wait();                // Should complete now, all messages are completed.
 }
 
+QPID_AUTO_TEST_CASE(testGetResult) {
+    SessionFixture fix;
+    AsyncSession s = fix.session;
+
+    s.queueDeclare("q", arg::durable=true);
+    TypedResult<QueueQueryResult> tr = s.queueQuery("q");
+    QueueQueryResult qq = tr.get();
+    BOOST_CHECK_EQUAL(qq.getQueue(), "q");
+    BOOST_CHECK_EQUAL(qq.getMessageCount(), 0U);
+}
+
 QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/BasicP2PTest.cpp b/qpid/cpp/src/tests/BasicP2PTest.cpp
deleted file mode 100644
index f4a4cce..0000000
--- a/qpid/cpp/src/tests/BasicP2PTest.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "BasicP2PTest.h"
-
-using namespace qpid;
-using namespace qpid::client;
-
-class BasicP2PTest::Receiver : public Worker, public MessageListener
-{
-    const std::string queue;
-    std::string tag;
-public:
-    Receiver(ConnectionOptions& options, const std::string& _queue, const int _messages) 
-        : Worker(options, _messages), queue(_queue){}
-    void init()
-    {
-        Queue q(queue, true);
-        channel.declareQueue(q);
-        framing::FieldTable args;
-        channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, q, queue, args);
-        channel.consume(q, tag, this);
-        channel.start();
-    }
-
-    void start()
-    {
-    }
-        
-    void received(Message&)
-    {
-        count++;
-    }
-};
-
-void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options)
-{
-    std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME");
-    int messages = params.getInt("P2P_NUM_MESSAGES");
-    if (role == "SENDER") {
-        worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages));
-    } else if(role == "RECEIVER"){
-        worker = std::auto_ptr<Worker>(new Receiver(options, queue, messages));
-    } else {
-        throw Exception("unrecognised role");
-    }
-    worker->init();
-}
diff --git a/qpid/cpp/src/tests/BasicP2PTest.h b/qpid/cpp/src/tests/BasicP2PTest.h
deleted file mode 100644
index b2611f0..0000000
--- a/qpid/cpp/src/tests/BasicP2PTest.h
+++ /dev/null
@@ -1,46 +0,0 @@
-#ifndef _BasicP2PTest_
-#define _BasicP2PTest_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <sstream>
-
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/MessageListener.h"
-#include "SimpleTestCaseBase.h"
-
-
-namespace qpid {
-
-class BasicP2PTest : public SimpleTestCaseBase
-{
-    class Receiver;
-public:
-    void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options);
-};
-
-}
-
-#endif
diff --git a/qpid/cpp/src/tests/BasicPubSubTest.cpp b/qpid/cpp/src/tests/BasicPubSubTest.cpp
deleted file mode 100644
index 1e9ff36..0000000
--- a/qpid/cpp/src/tests/BasicPubSubTest.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "BasicPubSubTest.h"
-
-using namespace qpid;
-
-class BasicPubSubTest::Receiver : public Worker, public MessageListener
-{
-    const Exchange& exchange;
-    const std::string queue;
-    const std::string key;
-    std::string tag;
-public:
-    Receiver(ConnectionOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages) 
-        : Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){}
-
-    void init()
-    {
-        Queue q(queue, true);
-        channel.declareQueue(q);
-        framing::FieldTable args;
-        channel.bind(exchange, q, key, args);
-        channel.consume(q, tag, this);
-        channel.start();
-    }
-
-    void start(){
-    }
-        
-    void received(Message&)
-    {
-        count++;
-    }
-};
-
-class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener
-{
-    typedef boost::ptr_vector<Receiver> ReceiverList;
-    ReceiverList receivers;
-
-public:
-    MultiReceiver(ConnectionOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount) 
-        : Worker(options, _messages) 
-    {
-        for (int i = 0; i != receiverCount; i++) {                
-            std::string queue = (boost::format("%1%_%2%") % options.clientid % i).str();
-            receivers.push_back(new Receiver(options, exchange, queue, key, _messages));
-        }
-    }
-
-    void init()
-    {
-        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
-            receivers[i].init();
-        }
-    }
-
-    void start()
-    {
-        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
-            receivers[i].start();
-        }
-    }
-        
-    void received(Message& msg)
-    {
-        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
-            receivers[i].received(msg);
-        }
-    }
-
-    virtual int getCount()
-    {
-        count = 0;
-        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
-            count += receivers[i].getCount();
-        }
-        return count;
-    }
-    virtual void stop()
-    {
-        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
-            receivers[i].stop();
-        }
-    }
-};
-
-void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options)
-{
-    std::string key = params.getString("PUBSUB_KEY");
-    int messages = params.getInt("PUBSUB_NUM_MESSAGES");
-    int receivers = params.getInt("PUBSUB_NUM_RECEIVERS");
-    if (role == "SENDER") {
-        worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages));
-    } else if(role == "RECEIVER"){
-        worker = std::auto_ptr<Worker>(new MultiReceiver(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages, receivers));
-    } else {
-        throw Exception("unrecognised role");
-    }
-    worker->init();
-}
-
diff --git a/qpid/cpp/src/tests/BasicPubSubTest.h b/qpid/cpp/src/tests/BasicPubSubTest.h
deleted file mode 100644
index 242d284..0000000
--- a/qpid/cpp/src/tests/BasicPubSubTest.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#ifndef _BasicPubSubTest_
-#define _BasicPubSubTest_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <sstream>
-
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/MessageListener.h"
-#include "SimpleTestCaseBase.h"
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/format.hpp>
-
-
-namespace qpid {
-
-using namespace qpid::client;
-
-class BasicPubSubTest : public SimpleTestCaseBase
-{
-    class Receiver;
-    class MultiReceiver;
-public:
-    void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options);
-};
-
-}
-
-#endif
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h
index f555607..b32b7f4 100644
--- a/qpid/cpp/src/tests/BrokerFixture.h
+++ b/qpid/cpp/src/tests/BrokerFixture.h
@@ -114,10 +114,12 @@
     SessionType session;
     qpid::client::SubscriptionManager subs;
     qpid::client::LocalQueue lq;
-    ClientT(uint16_t port, const std::string& name=std::string())
-        : connection(port), session(connection.newSession(name)), subs(session) {}
-    ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string())
-        : connection(settings), session(connection.newSession(name)), subs(session) {}
+    std::string name;
+
+    ClientT(uint16_t port, const std::string& name_=std::string())
+        : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {}
+    ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
+        : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
 
     ~ClientT() { connection.close(); }
 };
diff --git a/qpid/cpp/src/tests/ClientMessageTest.cpp b/qpid/cpp/src/tests/ClientMessageTest.cpp
new file mode 100644
index 0000000..bc09456
--- /dev/null
+++ b/qpid/cpp/src/tests/ClientMessageTest.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+/**@file Unit tests for the client::Message class. */
+
+#include "unit_test.h"
+#include "qpid/client/Message.h"
+
+using namespace qpid::client;
+
+QPID_AUTO_TEST_SUITE(ClientMessageTestSuite)
+
+QPID_AUTO_TEST_CASE(MessageCopyAssign) {
+    // Verify that message has normal copy semantics.
+    Message m("foo");
+    BOOST_CHECK_EQUAL("foo", m.getData());
+    Message c(m);
+    BOOST_CHECK_EQUAL("foo", c.getData());
+    Message a;
+    BOOST_CHECK_EQUAL("", a.getData());    
+    a = m;
+    BOOST_CHECK_EQUAL("foo", a.getData());
+    a.setData("a");
+    BOOST_CHECK_EQUAL("a", a.getData());
+    c.setData("c");
+    BOOST_CHECK_EQUAL("c", c.getData());
+    BOOST_CHECK_EQUAL("foo", m.getData());
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 589e115..1c719d1 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -28,7 +28,7 @@
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Time.h"
 #include "qpid/client/Session.h"
-#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Message.h"
 #include "qpid/framing/reply_exceptions.h"
 
 #include <boost/optional.hpp>
@@ -121,7 +121,7 @@
     fix.session =fix.connection.newSession();
     size_t count = 100;
     for (size_t i = 0; i < count; ++i) 
-        fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
+        fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue"));
     DummyListener listener(fix.session, "my-queue", count);
     listener.run();
     BOOST_CHECK_EQUAL(count, listener.messages.size());        
@@ -137,7 +137,7 @@
     DummyListener listener(fix.session, "my-queue", count);
     sys::Thread t(listener);
     for (size_t i = 0; i < count; ++i) {
-        fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
+        fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue"));
     }
     t.join();
     BOOST_CHECK_EQUAL(count, listener.messages.size());        
@@ -173,7 +173,7 @@
     fix.session.suspend();
     // Make sure we are still subscribed after resume.
     fix.connection.resume(fix.session);
-    fix.session.messageTransfer(arg::content=TransferContent("my-message", "my-queue"));
+    fix.session.messageTransfer(arg::content=Message("my-message", "my-queue"));
     FrameSet::shared_ptr msg = fix.session.get();
     BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
 }
diff --git a/qpid/cpp/src/tests/ClusterFailover.cpp b/qpid/cpp/src/tests/ClusterFailover.cpp
new file mode 100644
index 0000000..db2392b
--- /dev/null
+++ b/qpid/cpp/src/tests/ClusterFailover.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file Tests for partial failure in a cluster.
+ * Partial failure means some nodes experience a failure while others do not.
+ * In this case the failed nodes must shut down.
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ClusterFixture.h"
+#include "qpid/client/FailoverManager.h"
+#include <boost/assign.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+
+QPID_AUTO_TEST_SUITE(ClusterFailoverTestSuite)
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace qpid::client::arg;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+
+
+// Test re-connecting with same session name after a failure.
+QPID_AUTO_TEST_CASE(testReconnectSameSessionName) {
+    ClusterFixture cluster(2, -1);
+    Client c0(cluster[0], "foo");
+    cluster.kill(0, 9);
+    Client c1(cluster[1], "foo"); // Using same name, should be cleaned up.
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp
index 5658957..70d60b1 100644
--- a/qpid/cpp/src/tests/ClusterFixture.cpp
+++ b/qpid/cpp/src/tests/ClusterFixture.cpp
@@ -61,8 +61,14 @@
 
 #include "ClusterFixture.h"
 
-ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_)
-    : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_)
+ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_)
+    : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_)
+{
+    add(n);
+}
+
+ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_)
+    : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_)
 {
     add(n);
 }
@@ -70,13 +76,14 @@
 const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
     list_of<string>("--auth=no")("--no-data-dir");
 
-ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) {
-    Args args = list_of<string>("qpidd " __FILE__)
+ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
+    Args args = list_of<string>("qpidd ")
         ("--no-module-dir")
-        ("--load-module=../.libs/cluster.so")
-        ("--cluster-name")(name) 
+        ("--load-module")(clusterLib)
+        ("--cluster-name")(name)
         ("--log-prefix")(prefix);
     args.insert(args.end(), userArgs.begin(), userArgs.end());
+    if (updateArgs) updateArgs(args, index);
     return args;
 }
 
@@ -84,7 +91,7 @@
     if (size() != size_t(localIndex))  { // fork a broker process.
         std::ostringstream os; os << "fork" << size();
         std::string prefix = os.str();
-        forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix))));
+        forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size()))));
         push_back(forkedBrokers.back()->getPort());
     }
     else {                      // Run in this process
@@ -106,7 +113,7 @@
     assert(int(size()) == localIndex);
     ostringstream os; os << "local" << localIndex;
     string prefix = os.str();
-    Args args(makeArgs(prefix));
+    Args args(makeArgs(prefix, localIndex));
     vector<const char*> argv(args.size());
     transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
     qpid::log::Logger::instance().setPrefix(prefix);
@@ -116,7 +123,7 @@
 }
 
 bool ClusterFixture::hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); }
-    
+
 /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
 void ClusterFixture::kill(size_t n, int sig) {
     if (n == size_t(localIndex))
@@ -131,3 +138,22 @@
     kill(n,sig);
     try { c.close(); } catch(...) {}
 }
+
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
+    std::vector<qpid::Url> urls = source.getKnownBrokers();
+    if (n >= 0 && unsigned(n) != urls.size()) {
+        // Retry up to 10 secs in .1 second intervals.
+        for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+            qpid::sys::usleep(1000*100); // 0.1 secs
+            urls = source.getKnownBrokers();
+        }
+    }
+    std::set<int> s;
+    for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
+        s.insert((*i)[0].get<qpid::TcpAddress>()->port);
+    return s;
+}
diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h
index 84fb9f2..353ec0c 100644
--- a/qpid/cpp/src/tests/ClusterFixture.h
+++ b/qpid/cpp/src/tests/ClusterFixture.h
@@ -38,6 +38,7 @@
 #include "qpid/log/Logger.h"
 
 #include <boost/bind.hpp>
+#include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
 
 #include <string>
@@ -59,43 +60,55 @@
 using boost::shared_ptr;
 using qpid::cluster::Cluster;
 
-
+#define DEFAULT_CLUSTER_LIB "../.libs/cluster.so"
 
 /** Cluster fixture is a vector of ports for the replicas.
- * 
+ *
  * At most one replica (by default replica 0) is in the current
  * process, all others are forked as children.
  */
 class ClusterFixture : public vector<uint16_t>  {
   public:
     typedef std::vector<std::string> Args;
+    static const Args DEFAULT_ARGS;
+
     /** @param localIndex can be -1 meaning don't automatically start a local broker.
      * A local broker can be started with addLocal().
      */
-    ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS);
+    ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+
+    /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */
+    ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+
     void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
     void add();                 // Add a broker.
     void setup();
 
     bool hasLocal() const;
-    
-    /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
+
+    /** Kill a forked broker with sig, or shutdown localBroker. */
     void kill(size_t n, int sig=SIGINT);
 
     /** Kill a broker and suppressing errors from closing connection c. */
     void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT);
 
   private:
-    static const Args DEFAULT_ARGS;
-    
+
     void addLocal();            // Add a local broker.
-    Args makeArgs(const std::string& prefix);
+    Args makeArgs(const std::string& prefix, size_t index);
     string name;
     std::auto_ptr<BrokerFixture> localBroker;
     int localIndex;
     std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
     Args userArgs;
+    boost::function<void (Args&, size_t)> updateArgs;
+    string clusterLib;
 };
 
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1);
 
 #endif  /*!CLUSTER_FIXTURE_H*/
diff --git a/qpid/cpp/src/tests/ForkedBroker.cpp b/qpid/cpp/src/tests/ForkedBroker.cpp
index f90f76a..12175d3 100644
--- a/qpid/cpp/src/tests/ForkedBroker.cpp
+++ b/qpid/cpp/src/tests/ForkedBroker.cpp
@@ -20,20 +20,39 @@
  */
 
 #include "ForkedBroker.h"
+#include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
+#include <boost/algorithm/string.hpp>
 #include <algorithm>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <signal.h>
 
-ForkedBroker::ForkedBroker(const Args& args) { init(args); }
+using namespace std;
+using qpid::ErrnoException;
 
-ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); }
+ForkedBroker::ForkedBroker(const Args& constArgs) {
+    Args args(constArgs);
+    Args::iterator i = find(args.begin(), args.end(), string("TMP_DATA_DIR"));
+    if (i != args.end()) {
+        args.erase(i);
+        char dd[] = "/tmp/ForkedBroker.XXXXXX";
+        if (!mkdtemp(dd))
+            throw qpid::ErrnoException("Can't create data dir");
+        dataDir = dd;
+        args.push_back("--data-dir");
+        args.push_back(dataDir);
+    }
+    init(args);
+}
 
 ForkedBroker::~ForkedBroker() {
-    try { kill(); } catch(const std::exception& e) {
+    try { kill(); }
+    catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what()));
     }
+    if (!dataDir.empty()) 
+        ::system(("rm -rf "+dataDir).c_str());
 }
 
 void ForkedBroker::kill(int sig) {
@@ -42,14 +61,25 @@
     pid = 0;                // Reset pid here in case of an exception.
     using qpid::ErrnoException;
     if (::kill(savePid, sig) < 0) 
-        throw ErrnoException("kill failed");
+            throw ErrnoException("kill failed");
     int status;
     if (::waitpid(savePid, &status, 0) < 0 && sig != 9) 
         throw ErrnoException("wait for forked process failed");
     if (WEXITSTATUS(status) != 0 && sig != 9) 
         throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
 }
+        
+namespace std {
+static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) {
+    copy(a.begin(), a.end(), ostream_iterator<string>(o, " "));
+    return o;
+}
 
+bool isLogOption(const std::string& s) {
+    return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace");
+}
+
+}
         
 void ForkedBroker::init(const Args& userArgs) {
     using qpid::ErrnoException;
@@ -70,17 +100,20 @@
     }
     else {                  // child
         ::close(pipeFds[0]);
-        // FIXME aconway 2009-02-12: 
         int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent.
         if (fd < 0) throw ErrnoException("dup2 failed");
-        const char* prog = "../qpidd";
+        const char* prog = ::getenv("QPID_FORKED_BROKER");
+        if (!prog) prog = "../qpidd";
         Args args(userArgs);
         args.push_back("--port=0");
-        if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE"))
-            args.push_back("--log-enable=error+"); // Keep quiet except for errors.
+        // Keep quiet except for errors.
+        if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")
+            && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end())
+            args.push_back("--log-enable=error+"); 
         std::vector<const char*> argv(args.size());
         std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
         argv.push_back(0);
+        QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
         execv(prog, const_cast<char* const*>(&argv[0]));
         throw ErrnoException("execv failed");
     }
diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h
index 6f97fbd..45b5220 100644
--- a/qpid/cpp/src/tests/ForkedBroker.h
+++ b/qpid/cpp/src/tests/ForkedBroker.h
@@ -48,19 +48,26 @@
   public:
     typedef std::vector<std::string> Args;
 
+    // argv args are passed to broker.
+    // 
+    // Special value "TMP_DATA_DIR" is substituted with a temporary
+    // data directory for the broker.
+    // 
     ForkedBroker(const Args& argv);
-    ForkedBroker(int argc, const char* const argv[]);
     ~ForkedBroker();
 
     void kill(int sig=SIGINT);
+    int wait();                 // Wait for exit, return exit status.
     uint16_t getPort() { return port; }
     pid_t getPID() { return pid; }
 
   private:
+
     void init(const Args& args);
 
     pid_t pid;
     int port;
+    std::string dataDir;
 };
 
 #endif  /*!TESTS_FORKEDBROKER_H*/
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 9a81ef1..161428f 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -96,7 +96,9 @@
 	RetryList.cpp \
 	RateFlowcontrolTest.cpp \
 	FrameDecoder.cpp \
-	ReplicationTest.cpp
+	ReplicationTest.cpp \
+	ClientMessageTest.cpp \
+	PollableCondition.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -110,11 +112,17 @@
 # 	amqp_0_10/Map.cpp \
 # 	amqp_0_10/handlers.cpp 
 
+TESTLIBFLAGS = -module -rpath $(abs_builddir)
 
 check_LTLIBRARIES += libshlibtest.la
-libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)
+libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS)
 libshlibtest_la_SOURCES = shlibtest.cpp
 
+check_LTLIBRARIES += test_store.la
+test_store_la_SOURCES = test_store.cpp
+test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required?
+test_store_la_LDFLAGS = $(TESTLIBFLAGS)
+
 include cluster.mk
 if SSL
 include ssl.mk
@@ -236,24 +244,6 @@
 
 CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
 
-# FIXME aconway 2008-05-23: Disabled interop_runner because it uses
-# the obsolete Channel class.  Convert to Session and re-enable.
-# 
-# check_PROGRAMS += interop_runner
-
-# interop_runner_SOURCES = 	\
-#   interop_runner.cpp	 	\
-#   SimpleTestCaseBase.cpp	\
-#   BasicP2PTest.cpp		\
-#   BasicPubSubTest.cpp		\
-#   SimpleTestCaseBase.h		\
-#   BasicP2PTest.h		\
-#   BasicPubSubTest.h		\
-#   TestCase.h			\
-#   TestOptions.h ConnectionOptions.h
-# interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs)
-
-
 # Longer running stability tests, not run by default check: target.
 # Not run under valgrind, too slow
 
diff --git a/qpid/cpp/src/tests/PartialFailure.cpp b/qpid/cpp/src/tests/PartialFailure.cpp
new file mode 100644
index 0000000..5137672
--- /dev/null
+++ b/qpid/cpp/src/tests/PartialFailure.cpp
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file Tests for partial failure in a cluster.
+ * Partial failure means some nodes experience a failure while others do not.
+ * In this case the failed nodes must shut down.
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ClusterFixture.h"
+#include <boost/assign.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+
+QPID_AUTO_TEST_SUITE(PartialFailureTestSuite)
+
+    using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace qpid::client::arg;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+
+static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); }
+
+void updateArgs(ClusterFixture::Args& args, size_t index) {
+    ostringstream os;
+    os << "--test-store-name=s" << index;
+    args.push_back(os.str());
+    args.push_back("--load-module=.libs/test_store.so");
+    args.push_back("--auth=no");
+    args.push_back("TMP_DATA_DIR");
+
+    // These tests generate errors deliberately, disable error logging unless a log env var is set.
+    if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) {
+        remove_if(args.begin(), args.end(), isLogOption);
+        args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs.
+    }
+}
+
+Message pMessage(string data, string q) {
+    Message msg(data, q);
+    msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+    return msg;
+}
+
+void queueAndSub(Client& c) {
+    c.session.queueDeclare(c.name, durable=true);
+    c.subs.subscribe(c.lq, c.name);
+}
+
+// Verify normal cluster-wide errors.
+QPID_AUTO_TEST_CASE(testNormalErrors) {
+    // FIXME aconway 2009-04-10: Would like to put a scope just around
+    // the statements expected to fail (in BOOST_CHECK_THROW) but that
+    // sproadically lets out messages, possibly because they're in
+    // Connection thread.
+    ScopedSuppressLogging allQuiet; 
+
+    ClusterFixture cluster(3, -1, updateArgs);    
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    Client c2(cluster[2], "c2");
+
+    queueAndSub(c0);
+    c0.session.messageTransfer(content=Message("x", "c0"));
+    BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
+
+    // Session error.
+    BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
+    c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
+
+    // Connection error, kill c1 on all members.
+    queueAndSub(c1);
+    BOOST_CHECK_THROW(
+        c1.session.messageTransfer(
+            content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
+        ConnectionException);
+    c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
+
+    BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
+    BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
+    BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+}
+
+
+// Test errors after a new member joins to verify frame-sequence-numbers are ok in update.
+QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(1, -1, updateArgs);
+    Client c0(cluster[0]);
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+
+    // Kill the new guy
+    cluster.add();
+    Client c1(cluster[1]);
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
+    BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+    // Kill the old guy
+    cluster.add();
+    Client c2(cluster[2]);
+    c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
+    BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+}
+
+// Test that if one member fails and  others do not, the failure leaves the cluster. 
+QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(3, -1, updateArgs);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    Client c2(cluster[2], "c2");
+    
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+    // Cause partial failure on c1
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
+    BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+
+    c0.session.messageTransfer(content=pMessage("b", "q"));
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+    // Cause partial failure on c2
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
+    BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+    c0.session.messageTransfer(content=pMessage("c", "q"));
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+}
+
+// Test multiple partial falures: 2 fail 2 pass 
+QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(4, -1, updateArgs);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    Client c2(cluster[2], "c2");
+    Client c3(cluster[3], "c3");
+    
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+
+    // Cause partial failure on c1, c2
+    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
+    BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+    BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+    c0.session.messageTransfer(content=pMessage("b", "q"));
+    c3.session.messageTransfer(content=pMessage("c", "q"));
+    BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+}
+
+/** FIXME aconway 2009-04-10:
+ * The current approach to shutting down a process in test_store
+ * sometimes leads to assertion failures and errors in the shut-down
+ * process. Need a cleaner solution
+ */
+#if 0
+QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
+    ScopedSuppressLogging allQuiet;
+
+    ClusterFixture cluster(2, -1, updateArgs);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+
+    c0.session.queueDeclare("q", durable=true);
+    c0.session.messageTransfer(content=pMessage("a", "q"));
+
+    // Cause failure on member 0 and simultaneous crash on member 1.
+    BOOST_CHECK_THROW(
+        c0.session.messageTransfer(
+            content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
+        ConnectionException);
+    cluster.wait(1);
+
+    Client c00(cluster[0], "c00"); // Old connection is dead.
+    BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+}
+#endif
+
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/PollableCondition.cpp b/qpid/cpp/src/tests/PollableCondition.cpp
new file mode 100644
index 0000000..33664d4
--- /dev/null
+++ b/qpid/cpp/src/tests/PollableCondition.cpp
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
+#include <boost/bind.hpp>
+
+
+QPID_AUTO_TEST_SUITE(PollableConditionTest)
+
+using namespace qpid::sys;
+
+const Duration SHORT = TIME_SEC/100;
+const Duration LONG = TIME_SEC/10;
+
+class  Callback {
+  public:    
+    enum Action { NONE, DISARM, CLEAR, DISARM_CLEAR };
+
+    Callback() : count(), action(NONE) {}
+
+    void call(PollableCondition& pc) {
+        Mutex::ScopedLock l(lock);
+        ++count;
+        switch(action) {
+          case NONE: break; 
+          case DISARM:  pc.disarm(); break;
+          case CLEAR: pc.clear(); break;
+          case DISARM_CLEAR: pc.disarm(); pc.clear(); break;
+        }
+        action = NONE;
+        lock.notify();
+    }
+
+    bool isCalling() { Mutex::ScopedLock l(lock); return wait(LONG); }
+
+    bool isNotCalling() { Mutex::ScopedLock l(lock); return !wait(SHORT); }
+
+    bool nextCall(Action a=NONE) {
+        Mutex::ScopedLock l(lock);
+        action = a;
+        return wait(LONG);
+    }
+    
+  private:
+    bool wait(Duration timeout) {        
+        int n = count;
+        AbsTime deadline(now(), timeout);
+        while (n == count && lock.wait(deadline))
+               ;
+        return n != count;
+    }
+
+    Monitor lock;
+    int count;
+    Action action;
+};
+
+QPID_AUTO_TEST_CASE(testPollableCondition) {
+    boost::shared_ptr<Poller> poller(new Poller());
+    Callback callback;
+    PollableCondition pc(boost::bind(&Callback::call, &callback, _1), poller);
+
+    Thread runner = Thread(*poller);
+    
+    BOOST_CHECK(callback.isNotCalling()); // condition is not set or armed.
+
+    pc.rearm();                          
+    BOOST_CHECK(callback.isNotCalling()); // Armed but not set
+
+    pc.set();
+    BOOST_CHECK(callback.isCalling()); // Armed and set.
+    BOOST_CHECK(callback.isCalling()); // Still armed and set.
+
+    callback.nextCall(Callback::DISARM);
+    BOOST_CHECK(callback.isNotCalling()); // set but not armed
+
+    pc.rearm();
+    BOOST_CHECK(callback.isCalling()); // Armed and set.
+    callback.nextCall(Callback::CLEAR);    
+    BOOST_CHECK(callback.isNotCalling()); // armed but not set
+
+    pc.set();
+    BOOST_CHECK(callback.isCalling()); // Armed and set.
+    callback.nextCall(Callback::DISARM_CLEAR);    
+    BOOST_CHECK(callback.isNotCalling()); // not armed or set.
+
+    poller->shutdown();
+    runner.join();
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+
diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp b/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
deleted file mode 100644
index 2739734..0000000
--- a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "SimpleTestCaseBase.h"
-
-using namespace qpid;
-
-void SimpleTestCaseBase::start()
-{
-    if (worker.get()) {
-        worker->start();
-    }
-}
-
-void SimpleTestCaseBase::stop()
-{
-    if (worker.get()) {
-        worker->stop();
-    }
-}
-
-void SimpleTestCaseBase::report(client::Message& report)
-{
-    if (worker.get()) {
-        report.getHeaders().setInt("MESSAGE_COUNT", worker->getCount());
-        //add number of messages sent or received
-        std::stringstream reportstr;
-        reportstr << worker->getCount();
-        report.setData(reportstr.str());
-    }
-}
-
-SimpleTestCaseBase::Sender::Sender(ConnectionOptions& options, 
-                                   const Exchange& _exchange, 
-                                   const std::string& _key, 
-                                   const int _messages) 
-    : Worker(options, _messages), exchange(_exchange), key(_key) {}
-
-void SimpleTestCaseBase::Sender::init()
-{
-    channel.start();
-}
-
-void SimpleTestCaseBase::Sender::start(){
-    Message msg;
-    while (count < messages) {
-        channel.publish(msg, exchange, key);
-        count++;
-    }
-    stop();
-}
-
-SimpleTestCaseBase::Worker::Worker(ConnectionOptions& options, const int _messages) : 
-    messages(_messages), count(0)
-{
-    connection.open(options.host, options.port);
-    connection.openChannel(channel);
-}
-            
-void SimpleTestCaseBase::Worker::stop()
-{
-    channel.close();
-    connection.close();
-}
-
-int SimpleTestCaseBase::Worker::getCount()
-{
-    return count;
-}
-
diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.h b/qpid/cpp/src/tests/SimpleTestCaseBase.h
deleted file mode 100644
index 0c1052d..0000000
--- a/qpid/cpp/src/tests/SimpleTestCaseBase.h
+++ /dev/null
@@ -1,89 +0,0 @@
-#ifndef _SimpleTestCaseBase_
-#define _SimpleTestCaseBase_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <sstream>
-
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/Connection.h"
-#include "ConnectionOptions.h"
-#include "qpid/client/MessageListener.h"
-#include "TestCase.h"
-
-
-namespace qpid {
-
-using namespace qpid::client;
-
-class SimpleTestCaseBase : public TestCase
-{
-protected:
-    class Worker
-    {
-    protected:
-        client::Connection connection;
-        client::Channel channel;
-        const int messages;
-        int count;
-
-    public:
-
-        Worker(ConnectionOptions& options, const int messages);
-        virtual ~Worker(){}
-
-        virtual void stop();
-        virtual int getCount();
-        virtual void init() = 0;
-        virtual void start() = 0;
-    };
-
-    class Sender : public Worker
-    {
-        const Exchange& exchange;
-        const std::string key;
-    public:
-        Sender(ConnectionOptions& options, 
-               const Exchange& exchange, 
-               const std::string& key, 
-               const int messages); 
-        void init();
-        void start();
-    };
-
-    std::auto_ptr<Worker> worker;
-
-public:
-    virtual void assign(const std::string& role, framing::FieldTable& params, ConnectionOptions& options) = 0;
-    
-    virtual ~SimpleTestCaseBase() {}
-
-    void start();
-    void stop();
-    void report(client::Message& report);
-};
-
-}
-
-#endif
diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h
index d2a93c9..ccce3c8 100644
--- a/qpid/cpp/src/tests/SocketProxy.h
+++ b/qpid/cpp/src/tests/SocketProxy.h
@@ -21,45 +21,58 @@
  *
  */
 
+#include "qpid/sys/IOHandle.h"
+#ifdef _WIN32
+#  include "qpid/sys/windows/IoHandlePrivate.h"
+   typedef SOCKET FdType;
+#else
+#  include "qpid/sys/posix/PrivatePosix.h"
+   typedef int FdType;
+#endif
 #include "qpid/sys/Socket.h"
-#include "qpid/sys/Poller.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/client/Connection.h"
 #include "qpid/log/Statement.h"
 
-#include <algorithm>
-
 /**
  * A simple socket proxy that forwards to another socket. 
  * Used between client & local broker to simulate network failures.
  */
 class SocketProxy : private qpid::sys::Runnable
 {
+    // Need a Socket we can get the fd from
+    class LowSocket : public qpid::sys::Socket {
+    public:
+        FdType getFd() { return toFd(impl); }
+    };
+
   public:
     /** Connect to connectPort on host, start a forwarding thread.
      * Listen for connection on getPort().
      */
     SocketProxy(int connectPort, const std::string host="localhost")
-        : closed(false), port(listener.listen()), dropClient(), dropServer()
+      : closed(false), joined(true),
+        port(listener.listen()), dropClient(), dropServer()
     {
         client.connect(host, connectPort);
+        joined = false;
         thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
     }
     
-    ~SocketProxy() { close(); }
+      ~SocketProxy() { close(); if (!joined) thread.join(); }
 
     /** Simulate a network disconnect. */
     void close() {
         {
             qpid::sys::Mutex::ScopedLock l(lock);
-            if (closed) return;
+            if (closed) { return; }
             closed=true;
         }
-        poller.shutdown();
-        if (thread.id() != qpid::sys::Thread::current().id())
-        thread.join();
+        if (thread.id() != qpid::sys::Thread::current().id()) {
+            thread.join();
+            joined = true;
+        }
         client.close();
     }
 
@@ -85,56 +98,72 @@
     }
 
     void run() {
-        std::auto_ptr<qpid::sys::Socket> server;
+        std::auto_ptr<LowSocket> server;
         try {
-            qpid::sys::PollerHandle listenerHandle(listener);
-            poller.addFd(listenerHandle, qpid::sys::Poller::INPUT);
-            qpid::sys::Poller::Event event = poller.wait();
-            throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
-            throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed");
-
-            poller.delFd(listenerHandle);
-            server.reset(listener.accept());
-
-            // Pump data between client & server sockets
-            qpid::sys::PollerHandle clientHandle(client);
-            qpid::sys::PollerHandle serverHandle(*server);
-            poller.addFd(clientHandle, qpid::sys::Poller::INPUT);
-            poller.addFd(serverHandle, qpid::sys::Poller::INPUT);
+            fd_set socks;
+            FdType maxFd = listener.getFd();
+            struct timeval tmo;
+            for (;;) {
+                FD_ZERO(&socks);
+                FD_SET(maxFd, &socks);
+                tmo.tv_sec = 0;
+                tmo.tv_usec = 500 * 1000;
+                if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
+                    qpid::sys::Mutex::ScopedLock l(lock);
+                    throwIf(closed, "SocketProxy: Closed by close()");
+                    continue;
+                }
+                throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed");
+                break;   // Accept ready... go to next step
+            }
+            server.reset(reinterpret_cast<LowSocket *>(listener.accept()));
+            maxFd = server->getFd();
+            if (client.getFd() > maxFd)
+                maxFd = client.getFd();
             char buffer[1024];
             for (;;) {
-                qpid::sys::Poller::Event event = poller.wait();
-                throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
-                throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected");
-                if (event.handle == &serverHandle) {
-                    ssize_t n = server->read(buffer, sizeof(buffer));
-                    if (!dropServer) client.write(buffer, n);
-                    poller.rearmFd(serverHandle);
-                } else if (event.handle == &clientHandle) {
-                    ssize_t n = client.read(buffer, sizeof(buffer));
-                    if (!dropClient) server->write(buffer, n);
-                    poller.rearmFd(clientHandle);
-                } else {
-                    throwIf(true, "SocketProxy: No handle ready");
+                FD_ZERO(&socks);
+                tmo.tv_sec = 0;
+                tmo.tv_usec = 500 * 1000;
+                FD_SET(client.getFd(), &socks);
+                FD_SET(server->getFd(), &socks);
+                if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
+                    qpid::sys::Mutex::ScopedLock l(lock);
+                    throwIf(closed, "SocketProxy: Closed by close()");
+                    continue;
                 }
+                // Something is set; relay data as needed until something closes
+                if (FD_ISSET(server->getFd(), &socks)) {
+                    ssize_t n = server->read(buffer, sizeof(buffer));
+                    throwIf(n <= 0, "SocketProxy: server disconnected");
+                    if (!dropServer) client.write(buffer, n);
+                }
+                if (FD_ISSET(client.getFd(), &socks)) {
+                    ssize_t n = client.read(buffer, sizeof(buffer));
+                    throwIf(n <= 0, "SocketProxy: client disconnected");
+                    if (!dropServer) server->write(buffer, n);
+                }
+                if (!FD_ISSET(client.getFd(), &socks) &&
+                    !FD_ISSET(server->getFd(), &socks))
+                    throwIf(true, "SocketProxy: No handle ready");
             }
         }
         catch (const std::exception& e) {
             QPID_LOG(debug, "SocketProxy::run exception: " << e.what());
         }
         try {
-        if (server.get()) server->close();
-        close(); 
-    }
+            if (server.get()) server->close();
+            close(); 
+        }
         catch (const std::exception& e) {
             QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what());
         }
     }
 
     mutable qpid::sys::Mutex lock;
-    bool closed;
-    qpid::sys::Poller poller;
-    qpid::sys::Socket client, listener;
+    mutable bool closed;
+    bool joined;
+    LowSocket client, listener;
     uint16_t port;
     qpid::sys::Thread thread;
     bool dropClient, dropServer;
diff --git a/qpid/cpp/src/tests/TestCase.h b/qpid/cpp/src/tests/TestCase.h
deleted file mode 100644
index ba3330c..0000000
--- a/qpid/cpp/src/tests/TestCase.h
+++ /dev/null
@@ -1,64 +0,0 @@
-#ifndef _TestCase_
-#define _TestCase_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ConnectionOptions.h"
-#include "qpid/client/Message.h"
-
-
-namespace qpid {
-
-/**
- * Interface to be implemented by test cases for use with the test
- * runner.
- */
-class TestCase
-{
-public:
-    /**
-     * Directs the test case to act in a particular role. Some roles
-     * may be 'activated' at this stage others may require an explicit
-     * start request.
-     */
-    virtual void assign(const std::string& role, framing::FieldTable& params, client::ConnectionOptions& options) = 0;
-    /**
-     * Each test will be started on its own thread, which should block
-     * until the test completes (this may or may not require an
-     * explicit stop() request).
-     */
-    virtual void start() = 0;
-    /**
-     * Requests that the test be stopped if still running.
-     */
-    virtual void stop() = 0;
-    /**
-     * Allows the test to fill in details on the final report
-     * message. Will be called only after start has returned.
-     */
-    virtual void report(client::Message& report) = 0;
-
-    virtual ~TestCase() {}
-};
-
-}
-
-#endif
diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
index 98558f0..aeb13c2 100644
--- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
@@ -26,7 +26,7 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"
-#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Message.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/SubscriptionManager.h"
diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp
index 204c2c4..05b42f6 100644
--- a/qpid/cpp/src/tests/client_test.cpp
+++ b/qpid/cpp/src/tests/client_test.cpp
@@ -32,8 +32,8 @@
 #include "qpid/client/Connection.h"
 #include "qpid/client/Message.h"
 #include "qpid/client/Session.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/all_method_bodies.h"
+#include "qpid/client/SubscriptionManager.h"
+
 
 using namespace qpid;
 using namespace qpid::client;
@@ -113,35 +113,18 @@
         session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1);
 	if (opts.verbose) print("Published message: ", msgOut);
 
-        //subscribe to the queue, add sufficient credit and then get
-        //incoming 'frameset', check that its a message transfer and
-        //then convert it to a message (see Dispatcher and
-        //SubscriptionManager utilties for common reusable patterns at
-        //a higher level)
-        session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId");
-        session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message
-        session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes
-	if (opts.verbose) std::cout << "Subscribed to queue." << std::endl;
-        FrameSet::shared_ptr incoming = session.get();
-        if (incoming->isA<MessageTransferBody>()) {
-            Message msgIn(*incoming);
-            if (msgIn.getData() == msgOut.getData()) {
-                if (opts.verbose) std::cout << "Received the exepected message." << std::endl;
-                session.messageAccept(SequenceSet(msgIn.getId()));
-                session.markCompleted(msgIn.getId(), true, true);
-            } else {
-                print("Received an unexepected message: ", msgIn);
-            }
-        } else {
-            throw Exception("Unexpected command received");
-        }
-        
+        // Using the SubscriptionManager, get the message from the queue.
+        SubscriptionManager subs(session);
+        Message msgIn = subs.get("MyQueue");
+        if (msgIn.getData() == msgOut.getData()) 
+            if (opts.verbose) std::cout << "Received the exepected message." << std::endl;
+
         //close the session & connection
 	session.close();
 	if (opts.verbose) std::cout << "Closed session." << std::endl;
 	connection.close();	
 	if (opts.verbose) std::cout << "Closed connection." << std::endl;
-    return 0;
+        return 0;
     } catch(const std::exception& e) {
 	std::cout << e.what() << std::endl;
     }
diff --git a/qpid/cpp/src/tests/client_test.vcproj b/qpid/cpp/src/tests/client_test.vcproj
index 1623c29..32c90bc 100644
--- a/qpid/cpp/src/tests/client_test.vcproj
+++ b/qpid/cpp/src/tests/client_test.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="client_test"

-	ProjectGUID="{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="client_test"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 5d115de..e5e8030 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -34,8 +34,10 @@
   federated_cluster_test clustered_replication_test
 
 check_PROGRAMS+=cluster_test
-cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp
-cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework 
+cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \
+	cluster_test.cpp PartialFailure.cpp ClusterFailover.cpp
+
+cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework 
 
 unit_test_LDADD+=../cluster.la
 
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index eee2df5..d38d840 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -35,6 +35,7 @@
 #include "qpid/framing/Uuid.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Logger.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Thread.h"
@@ -73,7 +74,7 @@
 
 
 ostream& operator<<(ostream& o, const cpg_name* n) {
-    return o << cluster::Cpg::str(*n);
+    return o << Cpg::str(*n);
 }
 
 ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -89,29 +90,12 @@
     return o;
 }
 
-template <class C> set<uint16_t> makeSet(const C& c) {
-    set<uint16_t> s;
+template <class C> set<int> makeSet(const C& c) {
+    set<int> s;
     copy(c.begin(), c.end(), inserter(s, s.begin()));
     return s;
 }
 
-template <class T>  set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
-    vector<Url> urls = source.getKnownBrokers();
-    if (n >= 0 && unsigned(n) != urls.size()) {
-        BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
-        // Retry up to 10 secs in .1 second intervals.
-        for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
-            sys::usleep(1000*100); // 0.1 secs
-            urls = source.getKnownBrokers();
-        }
-    }
-    BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
-    set<uint16_t> s;
-    for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) 
-        s.insert((*i)[0].get<TcpAddress>()->port);
-    return s;
-}
-
 class Sender {
   public:
     Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
@@ -175,7 +159,6 @@
 
 QPID_AUTO_TEST_CASE(testAcl) {
     ofstream policyFile("cluster_test.acl");
-    // FIXME aconway 2009-02-12: guest -> qpidd?
     policyFile << "acl allow foo@QPID create queue name=foo" << endl
                << "acl allow foo@QPID create queue name=foo2" << endl
                << "acl deny foo@QPID create queue name=bar" << endl
@@ -446,13 +429,13 @@
 QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
-    set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+    set<int> kb0 = knownBrokerPorts(c0.connection);
     BOOST_CHECK_EQUAL(kb0.size(), 1u);
     BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
 
     cluster.add();
     Client c1(cluster[1], "c1");
-    set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+    set<int> kb1 = knownBrokerPorts(c1.connection);
     kb0 = knownBrokerPorts(c0.connection, 2);
     BOOST_CHECK_EQUAL(kb1.size(), 2u);
     BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
@@ -460,7 +443,7 @@
 
     cluster.add();
     Client c2(cluster[2], "c2");
-    set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+    set<int> kb2 = knownBrokerPorts(c2.connection);
     kb1 = knownBrokerPorts(c1.connection, 3);
     kb0 = knownBrokerPorts(c0.connection, 3);
     BOOST_CHECK_EQUAL(kb2.size(), 3u);
diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test
index 2a3e742..7afda87 100755
--- a/qpid/cpp/src/tests/clustered_replication_test
+++ b/qpid/cpp/src/tests/clustered_replication_test
@@ -23,6 +23,7 @@
 # failures:
 srcdir=`dirname $0`
 PYTHON_DIR=$srcdir/../../../python
+export PYTHONPATH=$PYTHON_DIR
 
 trap stop_brokers INT EXIT
 
diff --git a/qpid/cpp/src/tests/consume.vcproj b/qpid/cpp/src/tests/consume.vcproj
index 2e1f8e3..1fb72e2 100644
--- a/qpid/cpp/src/tests/consume.vcproj
+++ b/qpid/cpp/src/tests/consume.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="consume"

-	ProjectGUID="{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="consume"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/declare_queues.cpp b/qpid/cpp/src/tests/declare_queues.cpp
index 7f61bde..d17a72b 100644
--- a/qpid/cpp/src/tests/declare_queues.cpp
+++ b/qpid/cpp/src/tests/declare_queues.cpp
@@ -33,14 +33,15 @@
 int main(int argc, char ** argv) 
 {
     ConnectionSettings settings;
-    if ( argc != 3 )
+    if ( argc != 4 )
     {
-      cerr << "Usage: declare_queues host port\n";
+      cerr << "Usage: declare_queues host port durability\n";
       return 1;
     }
 
     settings.host = argv[1];
     settings.port = atoi(argv[2]);
+    int durability = atoi(argv[3]);
     
     FailoverManager connection(settings);
     try {
@@ -48,7 +49,10 @@
         while (!complete) {
             Session session = connection.connect().newSession();
             try {
-                session.queueDeclare(arg::queue="message_queue");
+                if ( durability )
+                  session.queueDeclare(arg::queue="message_queue", arg::durable=true);
+                else
+                  session.queueDeclare(arg::queue="message_queue");
                 complete = true;
             } catch (const qpid::TransportFailure&) {}
         }
diff --git a/qpid/cpp/src/tests/echotest.vcproj b/qpid/cpp/src/tests/echotest.vcproj
index 2848894..d41a0df 100644
--- a/qpid/cpp/src/tests/echotest.vcproj
+++ b/qpid/cpp/src/tests/echotest.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="echotest"

-	ProjectGUID="{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="echotest"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp
index 4f16e46..c8f67aa 100644
--- a/qpid/cpp/src/tests/failover_soak.cpp
+++ b/qpid/cpp/src/tests/failover_soak.cpp
@@ -220,63 +220,13 @@
         cout << "\n\n\n\n";
     }
 
-
-    /* 
-       Only call this if you already know there is at least 
-       one child still running.  Supply a time in seconds.
-       If it has been at least that long since a shild stopped
-       running, we judge the system to have hung.
-    */
-    int
-    hanging ( int hangTime )
-    {
-        struct timeval now,
-                       duration;
-        gettimeofday ( &now, 0 );
-
-        int how_many_hanging = 0;
-
-        vector<child *>::iterator i;
-        for ( i = begin(); i != end(); ++ i )
-        {
-            //Not in POSIX
-            //timersub ( & now, &((*i)->startTime), & duration );
-            duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec;
-            duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec;
-            if (duration.tv_usec < 0) {
-                --duration.tv_sec;
-                duration.tv_usec += 1000000;
-            }
-
-            if ( (COMPLETED != (*i)->status)     // child isn't done running
-                  &&
-                 ( duration.tv_sec >= hangTime ) // it's been too long
-               )
-            {
-                std::cerr << "Child of type " 
-                          << (*i)->type 
-                          << " hanging.   "
-                          << "PID is "
-                          << (*i)->pid
-                          << endl;
-                ++ how_many_hanging;
-            }
-        }
-        
-        return how_many_hanging;
-    }
-    
-
     int verbosity;
 };
 
 
-
 children allMyChildren;
 
 
-
-
 void 
 childExit ( int ) 
 {
@@ -361,36 +311,45 @@
   }
 }
 
-
+bool endsWith(const char* str, const char* suffix) {
+    return (strlen(suffix) < strlen(str) && 0 == strcmp(str+strlen(str)-strlen(suffix), suffix));
+}
 
 
 void
 startNewBroker ( brokerVector & brokers,
-                 char const * srcRoot,
-                 char const * moduleDir,
+                 char const * moduleOrDir,
                  string const clusterName,
-                 int verbosity ) 
+                 int verbosity,
+                 int durable ) 
 {
     static int brokerId = 0;
-    stringstream path, prefix, module;
-    module << moduleDir << "/cluster.so";
-    path << srcRoot << "/qpidd";
+    stringstream path, prefix;
     prefix << "soak-" << brokerId;
     std::vector<std::string> argv = list_of<string>
         ("qpidd")
-        ("--no-module-dir")
-        ("--load-module=cluster.so")
-        ("--cluster-name")
-        (clusterName)
+        ("--cluster-name")(clusterName)
         ("--auth=no")
-        ("--no-data-dir")
         ("--mgmt-enable=no")
-        ("--log-prefix")
-        (prefix.str())
-        ("--log-to-file")
-        (prefix.str()+".log");
+        ("--log-prefix")(prefix.str())
+        ("--log-to-file")(prefix.str()+".log")
+        ("--log-enable=error+")
+        ("TMP_DATA_DIR");
 
-    newbie = new ForkedBroker ( argv );
+    if (endsWith(moduleOrDir, "cluster.so")) {
+        // Module path specified, load only that module.
+        argv.push_back(string("--load-module=")+moduleOrDir);
+        argv.push_back("--no-module-dir");
+        if ( durable ) {
+          std::cerr << "failover_soak warning: durable arg hass no effect.  Use \"dir\" option of \"moduleOrDir\".\n";
+        }
+    }
+    else {
+        // Module directory specified, load all modules in dir.
+        argv.push_back(string("--module-dir=")+moduleOrDir);
+    }
+
+    newbie = new ForkedBroker (argv);
     newbie_port = newbie->getPort();
     ForkedBroker * broker = newbie;
 
@@ -473,7 +432,8 @@
 runDeclareQueuesClient ( brokerVector brokers, 
                             char const *  host,
                             char const *  path,
-                            int verbosity
+                            int verbosity,
+                            int durable
                           ) 
 {
     string name("declareQueues");
@@ -492,6 +452,10 @@
     argv.push_back ( "declareQueues" );
     argv.push_back ( host );
     argv.push_back ( portSs.str().c_str() );
+    if ( durable )
+      argv.push_back ( "1" );
+    else
+      argv.push_back ( "0" );
     argv.push_back ( 0 );
     pid_t pid = fork();
 
@@ -562,7 +526,8 @@
                        char const *  senderPath,
                        char const *  nMessages,
                        char const *  reportFrequency,
-                       int verbosity
+                       int verbosity,
+                       int durability
                      ) 
 {
     string name("sender");
@@ -586,6 +551,10 @@
     argv.push_back ( nMessages );
     argv.push_back ( reportFrequency );
     argv.push_back ( verbosityStr );
+    if ( durability )
+      argv.push_back ( "1" );
+    else
+      argv.push_back ( "0" );
     argv.push_back ( 0 );
 
     pid_t pid = fork();
@@ -613,27 +582,33 @@
 #define ERROR_KILLING_BROKER  8
 
 
+// If you want durability, use the "dir" option of "moduleOrDir" .
+
+
 int
 main ( int argc, char const ** argv ) 
 {    
-    if ( argc < 9 ) {
-        cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n";
-        cerr << "    ( argc was " << argc << " )\n";
+    if ( argc != 9 ) {
+        cerr << "Usage: "
+             << argv[0]
+             << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable"
+             << endl;
+        cerr << "\tverbosity is an integer, durable is 0 or 1\n";
         return BAD_ARGS;
     }
-
     signal ( SIGCHLD, childExit );
 
-    char const * srcRoot            = argv[1];
-    char const * moduleDir          = argv[2];
-    char const * host               = argv[3];
-    char const * declareQueuesPath  = argv[4];
-    char const * senderPath         = argv[5];
-    char const * receiverPath       = argv[6];
-    char const * nMessages          = argv[7];
-    char const * reportFrequency    = argv[8];
-    int          verbosity          = atoi(argv[9]);
+    int i = 1;
+    char const * moduleOrDir        = argv[i++];
+    char const * declareQueuesPath  = argv[i++];
+    char const * senderPath         = argv[i++];
+    char const * receiverPath       = argv[i++];
+    char const * nMessages          = argv[i++];
+    char const * reportFrequency    = argv[i++];
+    int          verbosity          = atoi(argv[i++]);
+    int          durable            = atoi(argv[i++]);
 
+    char const * host               = "127.0.0.1";
     int maxBrokers = 50;
 
     allMyChildren.verbosity = verbosity;
@@ -652,10 +627,10 @@
     int nBrokers = 3;
     for ( int i = 0; i < nBrokers; ++ i ) {
         startNewBroker ( brokers,
-                         srcRoot,
-                         moduleDir, 
+                         moduleOrDir, 
                          clusterName,
-                         verbosity ); 
+                         verbosity,
+                         durable ); 
     }
 
 
@@ -665,7 +640,7 @@
      // Run the declareQueues child.
      int childStatus;
      pid_t dqClientPid = 
-     runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity );
+     runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity, durable );
      if ( -1 == dqClientPid ) {
          cerr << "END_OF_TEST ERROR_START_DECLARE_1\n";
          return CANT_FORK_DQ;
@@ -701,7 +676,8 @@
                             senderPath, 
                             nMessages,
                             reportFrequency,
-                            verbosity );
+                            verbosity,
+                            durable );
      if ( -1 == sendingClientPid ) {
          cerr << "END_OF_TEST ERROR_START_SENDER\n";
          return CANT_FORK_SENDER;
@@ -745,10 +721,10 @@
              cout << "Starting new broker.\n\n";
 
          startNewBroker ( brokers,
-                          srcRoot,
-                          moduleDir, 
+                          moduleOrDir, 
                           clusterName,
-                          verbosity ); 
+                          verbosity,
+                          durable ); 
        
          if ( verbosity > 1 )
              printBrokers ( brokers );
@@ -787,16 +763,6 @@
              return ERROR_ON_CHILD;
          }
 
-         // If one is hanging, quit.
-         if ( allMyChildren.hanging ( 120 ) )
-         {
-             /*
-              * Don't kill any processes.  Leave alive for questioning.
-              * */
-             std::cerr << "END_OF_TEST ERROR_HANGING\n";
-             return HANGING;
-         }
-
          if ( verbosity > 1 ) {
            std::cerr << "------- next kill-broker loop --------\n";
            allMyChildren.print();
diff --git a/qpid/cpp/src/tests/header_test.vcproj b/qpid/cpp/src/tests/header_test.vcproj
index da761a6..cdfc984 100644
--- a/qpid/cpp/src/tests/header_test.vcproj
+++ b/qpid/cpp/src/tests/header_test.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="header_test"

-	ProjectGUID="{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="header_test"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/interop_runner.cpp b/qpid/cpp/src/tests/interop_runner.cpp
deleted file mode 100644
index 8c6e0a6..0000000
--- a/qpid/cpp/src/tests/interop_runner.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/Options.h"
-#include "qpid/ptr_map.h"
-#include "qpid/Exception.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionOptions.h"
-#include "qpid/client/Exchange.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Time.h"
-#include <iostream>
-#include <memory>
-#include "BasicP2PTest.h"
-#include "BasicPubSubTest.h"
-#include "TestCase.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
-/**
- * Framework for interop tests.
- * 
- * [see http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification for details].
- */
-
-using namespace qpid::client;
-using namespace qpid::sys;
-using qpid::TestCase;
-using qpid::framing::FieldTable;
-using qpid::framing::ReplyTo;
-using namespace std;
-
-class DummyRun : public TestCase
-{
-public:
-    DummyRun() {}
-    void assign(const string&, FieldTable&, ConnectionOptions&) {}
-    void start() {}
-    void stop() {}
-    void report(qpid::client::Message&) {}
-};
-
-string parse_next_word(const string& input, const string& delims, string::size_type& position);
-
-/**
- */
-class Listener : public MessageListener, private Runnable{    
-    typedef boost::ptr_map<string, TestCase> TestMap;
-
-    Channel& channel;
-    ConnectionOptions& options;
-    TestMap tests;
-    const string name;
-    const string topic;
-    TestCase* test;
-    auto_ptr<Thread> runner;
-    ReplyTo reportTo;
-    string reportCorrelator;    
-
-    void shutdown();
-    bool invite(const string& name);
-    void run();
-
-    void sendResponse(Message& response, ReplyTo replyTo);
-    void sendResponse(Message& response, Message& request);
-    void sendSimpleResponse(const string& type, Message& request);
-    void sendReport();
-public:
-    Listener(Channel& channel, ConnectionOptions& options);
-    void received(Message& msg);
-    void bindAndConsume();
-    void registerTest(string name, TestCase* test);
-};
-
-struct TestSettings : ConnectionOptions
-{
-    bool help;
-
-    TestSettings() : help(false)
-    {
-        addOptions()
-            ("help", qpid::optValue(help), "print this usage statement");
-    }
-};
-
-int main(int argc, char** argv) {
-    try {
-        TestSettings options;
-        options.parse(argc, argv);
-        if (options.help) {
-            cout << options;
-        } else {
-            Connection connection;
-            connection.open(options.host, options.port, "guest", "guest", options.virtualhost);
-            
-            Channel channel;
-            connection.openChannel(channel);
-            
-            Listener listener(channel, options);
-            listener.registerTest("TC1_DummyRun", new DummyRun());
-            listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
-            listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
-            
-            listener.bindAndConsume();
-            
-            channel.run();
-            connection.close();
-        }
-    } catch(const exception& error) {
-        cout << error.what() << endl << "Type " << argv[0] << " --help for help" << endl;
-    }
-}
-
-Listener::Listener(Channel& _channel, ConnectionOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
-{}
-
-void Listener::registerTest(string name, TestCase* test)
-{
-    tests.insert(name, test);
-}
-
-void Listener::bindAndConsume()
-{
-    Queue control(name, true);
-    channel.declareQueue(control);
-    qpid::framing::FieldTable bindArgs;
-    //replace these separate binds with a wildcard once that is supported on java broker
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs);
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs);
-    
-    string tag;
-    channel.consume(control, tag, this);
-}
-
-void Listener::sendSimpleResponse(const string& type, Message& request)
-{
-    Message response;
-    response.getHeaders().setString("CONTROL_TYPE", type);
-    response.getHeaders().setString("CLIENT_NAME", name);
-    response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic);
-    response.getMessageProperties().setCorrelationId(request.getMessageProperties().getCorrelationId());
-    sendResponse(response, request);
-}
-
-void Listener::sendResponse(Message& response, Message& request)
-{
-    sendResponse(response, request.getMessageProperties().getReplyTo()); 
-}
-
-void Listener::sendResponse(Message& response, ReplyTo replyTo)
-{
-    string exchange = replyTo.getExchange();
-    string routingKey = replyTo.getRoutingKey();
-    channel.publish(response, exchange, routingKey);
-}
-
-void Listener::received(Message& message)
-{
-    string type(message.getHeaders().getString("CONTROL_TYPE"));
-
-    if (type == "INVITE") {
-        string name(message.getHeaders().getString("TEST_NAME"));
-        if (name.empty() || invite(name)) {
-            sendSimpleResponse("ENLIST", message);
-        } else {
-            cout << "Can't take part in '" << name << "'" << endl;
-        }
-    } else if (type == "ASSIGN_ROLE") {        
-        test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
-        sendSimpleResponse("ACCEPT_ROLE", message);
-    } else if (type == "START") {        
-        reportTo = message.getMessageProperties().getReplyTo();
-        reportCorrelator = message.getMessageProperties().getCorrelationId();
-        runner = auto_ptr<Thread>(new Thread(this));
-    } else if (type == "STATUS_REQUEST") {
-        reportTo = message.getMessageProperties().getReplyTo();
-        reportCorrelator = message.getMessageProperties().getCorrelationId();
-        test->stop();
-        sendReport();
-    } else if (type == "TERMINATE") {
-        if (test) test->stop();
-        shutdown();
-    } else {        
-        cerr <<"ERROR!: Received unknown control message: " << type << endl;
-        shutdown();
-    }
-}
-
-void Listener::shutdown()
-{
-    channel.close();
-}
-
-bool Listener::invite(const string& name)
-{
-    TestMap::iterator i = tests.find(name);
-    test = (i != tests.end()) ? qpid::ptr_map_ptr(i) : 0;
-    return test;
-}
-
-void Listener::run()
-{
-    //NB: this method will be called in its own thread 
-    //start test and when start returns...
-    test->start();
-    sendReport();
-}
-
-void Listener::sendReport()
-{
-    Message report;
-    report.getHeaders().setString("CONTROL_TYPE", "REPORT");
-    test->report(report);
-    report.getMessageProperties().setCorrelationId(reportCorrelator);
-    sendResponse(report, reportTo);
-}
-
-string parse_next_word(const string& input, const string& delims, string::size_type& position)
-{
-    string::size_type start = input.find_first_not_of(delims, position);
-    if (start == string::npos) {
-        return "";
-    } else {
-        string::size_type end = input.find_first_of(delims, start);
-        if (end == string::npos) {
-            end = input.length();
-        }
-        position = end;
-        return input.substr(start, end - start);
-    }
-}
diff --git a/qpid/cpp/src/tests/latencytest.vcproj b/qpid/cpp/src/tests/latencytest.vcproj
index 7583020..da48e68 100644
--- a/qpid/cpp/src/tests/latencytest.vcproj
+++ b/qpid/cpp/src/tests/latencytest.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="latencytest"

-	ProjectGUID="{4A809018-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{4A809018-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="latencytest"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/perftest.vcproj b/qpid/cpp/src/tests/perftest.vcproj
index 3a2397a..83cd550 100644
--- a/qpid/cpp/src/tests/perftest.vcproj
+++ b/qpid/cpp/src/tests/perftest.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="perftest"

-	ProjectGUID="{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="perftest"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/publish.vcproj b/qpid/cpp/src/tests/publish.vcproj
index 7422606..849b845 100644
--- a/qpid/cpp/src/tests/publish.vcproj
+++ b/qpid/cpp/src/tests/publish.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="publish"

-	ProjectGUID="{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="publish"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/receiver.vcproj b/qpid/cpp/src/tests/receiver.vcproj
index ce6fad3..8744ac3 100644
--- a/qpid/cpp/src/tests/receiver.vcproj
+++ b/qpid/cpp/src/tests/receiver.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="receiver"

-	ProjectGUID="{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{7D314A98-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="receiver"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/replaying_sender.cpp b/qpid/cpp/src/tests/replaying_sender.cpp
index ea2a13b..3ee69ee 100644
--- a/qpid/cpp/src/tests/replaying_sender.cpp
+++ b/qpid/cpp/src/tests/replaying_sender.cpp
@@ -40,9 +40,10 @@
   public:
     Sender(const std::string& queue, uint count, uint reportFreq);
     void execute(AsyncSession& session, bool isRetry);
-    uint getSent();
+    uint getSent(); 
 
-    int verbosity;
+    void setVerbosity   ( int v ) { verbosity   = v; }
+    void setPersistence ( int p ) { persistence = p; }
 
   private:
     MessageReplayTracker sender;
@@ -51,9 +52,11 @@
     const uint reportFrequency;
     Message message;
     
+    int verbosity;
+    int persistence;
 };
 
-Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq)
+Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq), verbosity(0), persistence(0)
 {
     message.getDeliveryProperties().setRoutingKey(queue);
 }
@@ -69,6 +72,9 @@
         message_data << ++sent;
         message.setData(message_data.str());
         message.getHeaders().setInt("sn", sent);
+        if ( persistence )
+          message.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+
         sender.send(message);
         if (count > reportFrequency && !(sent % reportFrequency)) {
             if ( verbosity > 0 )
@@ -91,9 +97,9 @@
 {
     ConnectionSettings settings;
 
-    if ( argc != 6 )
+    if ( argc != 7 )
     {
-      std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity\n";
+      std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity persistence\n";
       return 1;
     }
 
@@ -102,10 +108,12 @@
     int n_messages      = atoi(argv[3]);
     int reportFrequency = atoi(argv[4]);
     int verbosity       = atoi(argv[5]);
+    int persistence     = atoi(argv[6]);
 
     FailoverManager connection(settings);
     Sender sender("message_queue", n_messages, reportFrequency );
-    sender.verbosity = verbosity;
+    sender.setVerbosity   ( verbosity   );
+    sender.setPersistence ( persistence );
     try {
         connection.execute ( sender );
         if ( verbosity > 0 )
diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak
index 36dfed7..3c9a558 100755
--- a/qpid/cpp/src/tests/run_failover_soak
+++ b/qpid/cpp/src/tests/run_failover_soak
@@ -45,12 +45,12 @@
 
 host=127.0.0.1
 
-src_root=..
-module_dir=$src_root/.libs
-
+MODULES=${MODULES:-../.libs}
 MESSAGES=${MESSAGES:-300000}
 REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`}
 VERBOSITY=${VERBOSITY:-1}
+DURABILITY=${DURABILITY:-1}
 
-exec ./failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY
+rm -f soak-*.log
+exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY
 
diff --git a/qpid/cpp/src/tests/sender.vcproj b/qpid/cpp/src/tests/sender.vcproj
index 616b665..fe564cb 100644
--- a/qpid/cpp/src/tests/sender.vcproj
+++ b/qpid/cpp/src/tests/sender.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="sender"

-	ProjectGUID="{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{09714CB8-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="sender"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/shlibtest.vcproj b/qpid/cpp/src/tests/shlibtest.vcproj
index 4a139a2..2052230 100644
--- a/qpid/cpp/src/tests/shlibtest.vcproj
+++ b/qpid/cpp/src/tests/shlibtest.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="shlibtest"

-	ProjectGUID="{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="shlibtest"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster
index 053b23d..585ba08 100755
--- a/qpid/cpp/src/tests/start_cluster
+++ b/qpid/cpp/src/tests/start_cluster
@@ -28,15 +28,17 @@
     echo $* | newgrp ais
 }
 
-rm -f cluster*.log
-SIZE=${1:-1}; shift
+rm -f cluster*.log cluster.ports qpidd.port
+
+SIZE=${1:-3}; shift
 CLUSTER=`pwd`		# Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so  --cluster-name=$CLUSTER --no-data-dir --auth=no $@"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so  --cluster-name=$CLUSTER --auth=no $@"
 
 for (( i=0; i<SIZE; ++i )); do
-    PORT=`with_ais_group ../qpidd  -p0 --log-to-file=cluster$i.log $OPTS`  || exit 1
+    DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX`
+    PORT=`with_ais_group ../qpidd  -p0 --log-to-file=cluster$i.log $OPTS  --data-dir=$DDIR`  || exit 1
     echo $PORT >> cluster.ports
 done
 
-head cluster.ports > qpidd.port	# First member's port for tests.
+head -n 1 cluster.ports > qpidd.port	# First member's port for tests.
 
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp
new file mode 100644
index 0000000..e5c3522
--- /dev/null
+++ b/qpid/cpp/src/tests/test_store.cpp
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**@file
+ * Plug-in message store for tests.
+ * 
+ * Add functionality as required, build up a comprehensive set of
+ * features to support persistent behavior tests.
+ *
+ * Current features special "action" messages can:
+ *  - raise exception from enqueue.
+ *  - force host process to exit.
+ *  - do async completion after a delay.
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include <boost/algorithm/string.hpp>
+#include <boost/cast.hpp>
+#include <boost/lexical_cast.hpp>
+
+using namespace qpid;
+using namespace broker;
+using namespace std;
+using namespace boost;
+using namespace qpid::sys;
+
+struct TestStoreOptions : public Options {
+
+    string name;
+
+    TestStoreOptions() : Options("Test Store Options") {
+        addOptions()
+            ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+    }
+};
+
+struct Completer : public Runnable {
+    intrusive_ptr<PersistableMessage> message;
+    int usecs;
+    Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {}
+    void run() {
+        qpid::sys::usleep(usecs);
+        message->enqueueComplete();
+        delete this;
+    }
+};
+    
+class TestStore : public NullMessageStore {
+  public:
+    TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
+
+    ~TestStore() {
+        for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
+    }
+
+    void enqueue(TransactionContext* ,
+                 const boost::intrusive_ptr<PersistableMessage>& msg,
+                 const PersistableQueue& )
+    {
+        string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+
+        // Check the message for special instructions.
+        size_t i = string::npos;
+        size_t j = string::npos; 
+        if (starts_with(data, TEST_STORE_DO)
+            && (i = data.find(name+"[")) != string::npos
+            && (j = data.find("]", i)) != string::npos)
+        {
+            size_t start = i+name.size()+1;
+            string action = data.substr(start, j-start);
+
+            if (action == EXCEPTION) {
+                throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+            }
+            else if (action == EXIT_PROCESS) {
+                // FIXME aconway 2009-04-10: this is a dubious way to
+                // close the process at best, it can cause assertions or seg faults
+                // rather than clean exit.
+                QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data);
+                exit(0);
+            }
+            else if (starts_with(action, ASYNC)) {
+                std::string delayStr(action.substr(ASYNC.size()));
+                int delay = lexical_cast<int>(delayStr);
+                threads.push_back(Thread(*new Completer(msg, delay)));
+            }
+            else {
+                QPID_LOG(error, "TestStore " << name << " unknown action " << action);
+                msg->enqueueComplete();
+            }
+        }
+        else
+            msg->enqueueComplete();
+    }
+
+  private:
+    static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+    string name;
+    Broker& broker;
+    vector<Thread> threads;
+};
+
+const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
+const string TestStore::EXCEPTION = "exception";
+const string TestStore::EXIT_PROCESS = "exit_process";
+const string TestStore::ASYNC="async ";
+
+struct TestStorePlugin : public Plugin {
+
+    TestStoreOptions options;
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize (Plugin::Target& target)
+    {
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (!broker) return;
+        broker->setStore (new TestStore(options.name, *broker));
+    }
+
+    void initialize(qpid::Plugin::Target&) {}
+};
+
+static TestStorePlugin pluginInstance;
diff --git a/qpid/cpp/src/tests/tests.sln b/qpid/cpp/src/tests/tests.sln
index 273e90d..9e24487 100644
--- a/qpid/cpp/src/tests/tests.sln
+++ b/qpid/cpp/src/tests/tests.sln
@@ -8,37 +8,37 @@
 #

 # MPC Command:

 # C:\ace\MPC\mwc.pl -type vc9 -features boost=1 tests.mwc

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "client_test", "client_test.vcproj", "{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "client_test", "client_test.vcproj", "{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "consume", "consume.vcproj", "{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "consume", "consume.vcproj", "{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "echotest", "echotest.vcproj", "{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "echotest", "echotest.vcproj", "{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "header_test", "header_test.vcproj", "{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "header_test", "header_test.vcproj", "{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "latencytest", "latencytest.vcproj", "{4A809018-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "latencytest", "latencytest.vcproj", "{4A809018-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "perftest", "perftest.vcproj", "{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "perftest", "perftest.vcproj", "{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "publish", "publish.vcproj", "{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "publish", "publish.vcproj", "{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "receiver", "receiver.vcproj", "{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "receiver", "receiver.vcproj", "{7D314A98-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "sender", "sender.vcproj", "{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "sender", "sender.vcproj", "{09714CB8-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "shlibtest", "shlibtest.vcproj", "{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "shlibtest", "shlibtest.vcproj", "{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_listener", "topic_listener.vcproj", "{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_listener", "topic_listener.vcproj", "{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_publisher", "topic_publisher.vcproj", "{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "topic_publisher", "topic_publisher.vcproj", "{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txjob", "txjob.vcproj", "{09034A53-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txjob", "txjob.vcproj", "{09034A53-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txshift", "txshift.vcproj", "{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txshift", "txshift.vcproj", "{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txtest", "txtest.vcproj", "{697786BE-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "txtest", "txtest.vcproj", "{697786BE-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "unit_test", "unit_test.vcproj", "{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}"

+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "unit_test", "unit_test.vcproj", "{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}"

 EndProject

 Global

 	GlobalSection(SolutionConfigurationPlatforms) = preSolution

@@ -48,134 +48,134 @@
 		Release|x64 = Release|x64

 	EndGlobalSection

 	GlobalSection(ProjectConfigurationPlatforms) = postSolution

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{9A95F0E4-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{7F5DE0A1-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{0A5AF6BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{1B23F05D-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{4A809018-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{1F2066BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{AE773E7F-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{7D314A98-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{09714CB8-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{37AB26B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{09034A53-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{697786BE-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.ActiveCfg = Debug|Win32

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|Win32.Build.0 = Debug|Win32

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.ActiveCfg = Debug|x64

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Debug|x64.Build.0 = Debug|x64

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.ActiveCfg = Release|Win32

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|Win32.Build.0 = Release|Win32

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.ActiveCfg = Release|x64

-		{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}.Release|x64.Build.0 = Release|x64

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{9A95F0E4-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{7F5DE0A1-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{0A5AF6BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{1B23F05D-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{4A809018-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{1F2066BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{AE773E7F-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{7D314A98-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{09714CB8-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{37AB26B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{09034A53-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{697786BE-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.ActiveCfg = Debug|Win32

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|Win32.Build.0 = Debug|Win32

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.ActiveCfg = Debug|x64

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Debug|x64.Build.0 = Debug|x64

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.ActiveCfg = Release|Win32

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|Win32.Build.0 = Release|Win32

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.ActiveCfg = Release|x64

+		{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}.Release|x64.Build.0 = Release|x64

 	EndGlobalSection

 	GlobalSection(SolutionProperties) = preSolution

 		HideSolutionNode = FALSE

diff --git a/qpid/cpp/src/tests/topic_listener.vcproj b/qpid/cpp/src/tests/topic_listener.vcproj
index 0be31b8..3f338bf 100644
--- a/qpid/cpp/src/tests/topic_listener.vcproj
+++ b/qpid/cpp/src/tests/topic_listener.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="topic_listener"

-	ProjectGUID="{9392D1EE-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{9392D1EE-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="topic_listener"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/topic_publisher.vcproj b/qpid/cpp/src/tests/topic_publisher.vcproj
index 016c6d4..5b0fd21 100644
--- a/qpid/cpp/src/tests/topic_publisher.vcproj
+++ b/qpid/cpp/src/tests/topic_publisher.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="topic_publisher"

-	ProjectGUID="{7D66FE10-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{7D66FE10-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="topic_publisher"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/txjob.vcproj b/qpid/cpp/src/tests/txjob.vcproj
index 19fe3fb..e002d58 100644
--- a/qpid/cpp/src/tests/txjob.vcproj
+++ b/qpid/cpp/src/tests/txjob.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="txjob"

-	ProjectGUID="{09034A53-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{09034A53-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="txjob"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/txshift.vcproj b/qpid/cpp/src/tests/txshift.vcproj
index 3212881..75ccfc6 100644
--- a/qpid/cpp/src/tests/txshift.vcproj
+++ b/qpid/cpp/src/tests/txshift.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="txshift"

-	ProjectGUID="{6E3B2A6B-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{6E3B2A6B-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="txshift"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/txtest.vcproj b/qpid/cpp/src/tests/txtest.vcproj
index 663291a..3549ba9 100644
--- a/qpid/cpp/src/tests/txtest.vcproj
+++ b/qpid/cpp/src/tests/txtest.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="txtest"

-	ProjectGUID="{697786BE-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{697786BE-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="txtest"

 	Keyword="Win32Proj"

 	SignManifests="true"

diff --git a/qpid/cpp/src/tests/unit_test.vcproj b/qpid/cpp/src/tests/unit_test.vcproj
index 8710b61..9653290 100644
--- a/qpid/cpp/src/tests/unit_test.vcproj
+++ b/qpid/cpp/src/tests/unit_test.vcproj
@@ -3,7 +3,7 @@
 	ProjectType="Visual C++"

 	Version="9.00"

 	Name="unit_test"

-	ProjectGUID="{51E5F6B9-FECA-1BAD-2431-8A11DB0D67CE}"

+	ProjectGUID="{51E5F6B9-FECA-1BAD-2431-8A11CDC7409E}"

 	RootNamespace="unit_test"

 	Keyword="Win32Proj"

 	SignManifests="true"

@@ -76,7 +76,7 @@
 			/>

 			<Tool

 				Name="VCLinkerTool"

-				AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib"

+				AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib ws2_32.lib"

 				OutputFile="$(OutDir)\unit_test.exe"

 				LinkIncremental="2"

 				SuppressStartupBanner="true"

@@ -155,7 +155,7 @@
 			/>

 			<Tool

 				Name="VCLinkerTool"

-				AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib"

+				AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib ws2_32.lib"

 				OutputFile="$(OutDir)\unit_test.exe"

 				LinkIncremental="1"

 				SuppressStartupBanner="true"

@@ -240,7 +240,7 @@
 			<Tool

 				Name="VCLinkerTool"

 				AdditionalOptions="/machine:AMD64"

-				AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib"

+				AdditionalDependencies="qpidcommond.lib qpidclientd.lib qpidbrokerd.lib qmfconsoled.lib ws2_32.lib"

 				OutputFile="$(OutDir)\unit_test.exe"

 				LinkIncremental="2"

 				SuppressStartupBanner="true"

@@ -320,7 +320,7 @@
 			<Tool

 				Name="VCLinkerTool"

 				AdditionalOptions="/machine:AMD64"

-				AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib"

+				AdditionalDependencies="qpidcommon.lib qpidclient.lib qpidbroker.lib qmfconsole.lib ws2_32.lib"

 				OutputFile="$(OutDir)\unit_test.exe"

 				LinkIncremental="1"

 				SuppressStartupBanner="true"

diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 8fdde0a..df90fc6 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -48,6 +48,19 @@
       <field name="id" type="uint64"/>
     </control>
     
+    <domain name="error-type" type="uint8" label="Types of error">
+      <enum>
+	<choice name="none" value="0"/>
+	<choice name="session" value="1"/>
+	<choice name="connection" value="2"/>
+      </enum>
+    </domain>
+	
+    <control name="error-check" code="0x13">
+      <field name="type" type="error-type"/>
+      <field name="frame-seq" type="uint64"/>
+    </control>
+    
     <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
 
   </class>
@@ -132,6 +145,7 @@
     <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
+      <field name="frame-seq" type="uint64"/>	 <!-- frame sequence number -->
     </control>
 
     <!-- Set the position of a replicated queue. -->
@@ -146,5 +160,6 @@
 
     <!-- Set expiry-id for subsequent messages. -->
     <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
+
   </class>
 </amqp>