QPID-3603: resync -kgiusti tracking branch with gsim's work.

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-kgiusti@1208483 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/design_docs/hot-standby-design.txt b/qpid/cpp/design_docs/hot-standby-design.txt
deleted file mode 100644
index 99a5dc0..0000000
--- a/qpid/cpp/design_docs/hot-standby-design.txt
+++ /dev/null
@@ -1,239 +0,0 @@
--*-org-*-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-* Another new design for Qpid clustering.
-
-For background see [[./new-cluster-design.txt]] which describes the issues
-with the old design and a new active-active design that could replace it.
-
-This document describes an alternative hot-standby approach.
-
-** Delivery guarantee
-
-We guarantee N-way redundant, at least once delivey. Once a message
-from a client has been acknowledged by the broker, it will be
-delivered even if N-1 brokers subsequently fail. There may be
-duplicates in the event of a failure. We don't make duplicates 
-during normal operation (i.e when no brokers have failed)
-
-This is the same guarantee as the old cluster and the alternative
-active-active design.
-
-** Active-active vs. hot standby (aka primary-backup)
-
-An active-active cluster allows clients to connect to any broker in
-the cluster. If a broker fails, clients can fail-over to any other
-live broker.
-
-A hot-standby cluster has only one active broker at a time (the
-"primary") and one or more brokers on standby (the "backups"). Clients
-are only served by the leader, clients that connect to a backup are
-redirected to the leader. The backpus are kept up-to-date in real time
-by the primary, if the primary fails a backup is elected to be the new
-primary.
-
-Aside: A cold-standby cluster is possible using a standalone broker,
-CMAN and shared storage. In this scenario only one broker runs at a
-time writing to a shared store. If it fails, another broker is started
-(by CMAN) and recovers from the store. This bears investigation but
-the store recovery time is probably too long for failover.
-
-** Why hot standby?
-
-Active-active has some advantages:
-- Finding a broker on startup or failover is simple, just pick any live broker.
-- All brokers are always running in active mode, there's no
-- Distributing clients across brokers gives better performance, but see [1].
-- A broker failure affects only clients connected to that broker.
-
-The main problem with active-active is co-ordinating consumers of the
-same queue on multiple brokers such that there are no duplicates in
-normal operation. There are 2 approaches:
-
-Predictive: each broker predicts which messages others will take. This
-the main weakness of the old design so not appealing.
-
-Locking: brokers "lock" a queue in order to take messages. This is
-complex to implement, its not straighforward to determine the most
-performant strategie for passing the lock.
-
-Hot-standby removes this problem. Only the primary can modify queues
-so it just has to tell the backups what it is doing, there's no
-locking.
-
-The primary can enqueue messages and replicate asynchronously -
-exactly like the store does, but it "writes" to the replicas over the
-network rather than writing to disk.
-
-** Failover in a hot-standby cluster.
-
-Hot-standby has some potential performance issues around failover:
-
-- Failover "spike": when the primary fails every client will fail over
-  at the same time, putting strain on the system.
-
-- Until a new primary is elected, cluster cannot serve any clients or
-  redirect clients to the primary.
-
-We want to minimize the number of re-connect attempts that clients
-have to make. The cluster can use a well-known algorithm to choose the
-new primary (e.g. round robin on a known sequence of brokers) so that
-clients can guess the new primary correctly in most cases.
-
-Even if clients do guess correctly it may be that the new primary is
-not yet aware of the death of the old primary, which is may to cause
-multiple failed connect attempts before clients eventually get
-connected. We will need to prototype to see how much this happens in
-reality and how we can best get clients redirected.
-
-** Threading and performance.
-
-The primary-backup cluster operates analogously to the way the disk store does now:
-- use the same MessageStore interface as the store to interact with the broker
-- use the same asynchronous-completion model for replicating messages.
-- use the same recovery interfaces (?) for new backups joining.
-
-Re-using the well-established store design gives credibility to the new cluster design.
-
-The single CPG dispatch thread was a severe performance bottleneck for the old cluster.
-
-The primary has the same threading model as a a standalone broker with
-a store, which we know that this performs well.
-
-If we use CPG for replication of messages, the backups will receive
-messages in the CPG dispatch thread. To get more concurency, the CPG
-thread can dump work onto internal PollableQueues to be processed in
-parallel. 
-
-Messages from the same broker queue need to go onto the same
-PollableQueue. There could be a separate PollableQueue for each broker
-queue. If that's too resource intensive we can use a fixed set of
-PollableQueues and assign broker queues to PollableQueues via hashing
-or round robin.
-
-Another possible optimization is to use multiple CPG queues: one per
-queue or a hashed set, to get more concurrency in the CPG layer. The
-old cluster is not able to keep CPG busy.
-
-TODO: Transactions pose a challenge with these concurrent models: how
-to co-ordinate multiple messages being added (commit a publish or roll
-back an accept) to multiple queues so that all replicas end up with
-the same message sequence while respecting atomicity.
-
-** Use of CPG
-
-CPG provides several benefits in the old cluster:
-- tracking membership (essential for determining the primary)
-- handling "spit brain" (integrates with partition support from CMAN)
-- reliable multicast protocol to distribute messages.
-
-I believe we still need CPG for membership and split brain. We could
-experiment with sending the bulk traffic over AMQP conections.
-
-** Flow control
-
-Need to ensure that
-1) In-memory internal queues used by the cluster don't overflow.
-2) The backups don't fall too far behind on processing CPG messages
-
-** Recovery
-When a new backup joins an active cluster it must get a snapshot
-from one of the other backups, or the primary if there are none. In
-store terms this is "recovery" (old cluster called it an "update)
-
-Compared to old cluster we only replidate well defined data set of the store.
-This is the crucial sore spot of old cluster. 
-
-We can also replicated it more efficiently by recovering queues in
-reverse (LIFO) order. That means as clients actively consume messages
-from the front of the queue, they are redeucing the work we have to do
-in recovering from the back. (NOTE: this may not be compatible with
-using the same recovery interfaces as the store.)
-
-** Selective replication
-In this model it's easy to support selective replication of individual queues via
-configuration. 
-- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate. 
-  Treated analogously to persistent/durable properties for the store.
-- if not explicitly marked, provide a choice of default
-  - default is replicate (replicated message on replicated queue)
-  - default is don't replicate
-  - default is replicate persistent/durable messages.
-
-** Inconsistent errors
-
-The new design eliminates most sources of inconsistent errors in the
-old design (connections, sessions, security, management etc.) and
-eliminates the need to stall the whole cluster till an error is
-resolved. We still have to handle inconsistent store errors when store
-and cluster are used together.
-
-We also have to include error handling in the async completion loop to
-guarantee N-way at least once: we should only report success to the
-client when we know the message was replicated and stored on all N-1
-backups.
-
-TODO: We have a lot more options than the old cluster, need to figure
-out the best approach, or possibly allow mutliple approaches. Need to
-go thru the various failure cases. We may be able to do recovery on a
-per-queue basis rather than restarting an entire node.
-
-** New members joining
-
-We should be able to catch up much faster than the the old design. A
-new backup can catch up ("recover") the current cluster state on a
-per-queue basis.
-- queues can be updated in parallel
-- "live" updates avoid the the "endless chase"
-
-During a "live" update several things are happening on a queue:
-- clients are publishing messages to the back of the queue, replicated to the backup
-- clients are consuming messages from the front of the queue, replicated to the backup.
-- the primary is sending pre-existing messages to the new backup.
-
-The primary sends pre-existing messages in LIFO order - starting from
-the back of the queue, at the same time clients are consuming from the front.
-The active consumers actually reduce the amount of work to be done, as there's
-no need to replicate messages that are no longer on the queue.
-
-* Steps to get there
-
-** Baseline replication
-Validate the overall design get initial notion of performance. Just
-message+wiring replication, no update/recovery for new members joining,
-single CPG dispatch thread on backups, no failover, no transactions.
-
-** Failover
-Electing primary, backups redirect to primary. Measure failover time
-for large # clients.  Strategies to minimise number of retries after a
-failure.
-
-** Flow Control
-Keep internal queues from over-flowing. Similar to internal flow control in old cluster.
-Needed for realistic performance/stress tests
-
-** Concurrency
-Experiment with multiple threads on backups, multiple CPG groups.
-
-** Recovery/new member joining
-Initial status handshake for new member. Recovering queues from the back.
-
-** Transactions
-TODO: How to implement transactions with concurrency.  Worst solution:
-a global --cluster-use-transactions flag that forces single thread
-mode. Need to find a better solution.
diff --git a/qpid/cpp/design_docs/replicating-browser-design.txt b/qpid/cpp/design_docs/new-ha-design.txt
similarity index 63%
rename from qpid/cpp/design_docs/replicating-browser-design.txt
rename to qpid/cpp/design_docs/new-ha-design.txt
index e304258..9b6d7d6 100644
--- a/qpid/cpp/design_docs/replicating-browser-design.txt
+++ b/qpid/cpp/design_docs/new-ha-design.txt
@@ -16,15 +16,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-* FIXME - rewrite all old stuff from hot-standby.txt.
-
-* Another new design for Qpid clustering.
+* An active-passive, hot-standby design for Qpid clustering.
 
 For some background see [[./new-cluster-design.txt]] which describes the
 issues with the old design and a new active-active design that could
 replace it.
 
-This document describes an alternative active-passive approach.
+This document describes an alternative active-passive approach based on
+queue browsing to replicate message data.
 
 ** Active-active vs. active-passive (hot-standby)
 
@@ -59,28 +58,6 @@
 exactly like the store does, but it "writes" to the replicas over the
 network rather than writing to disk.
 
-** Failover in a hot-standby cluster.
-
-We want to delegate the failover management to an existing cluster
-resource manager. Initially this is rgmanager from Cluster Suite, but
-other managers (e.g. PaceMaker) could be supported in future.
-
-Rgmanager takes care of starting and stopping brokers and informing
-brokers of their roles as primary or backup. It ensures there's
-exactly one broker running at any time. It also tracks quorum and
-protects against split-brain.
-
-Rgmanger can also manage a virtual IP address so clients can just
-retry on a single address to fail over. Alternatively we will support
-configuring a fixed list of broker addresses when qpid is run outside
-of a resource manager.
-
-Aside: Cold-standby is also possible using rgmanager with shared
-storage for the message store (e.g. GFS). If the broker fails, another
-broker is started on a different node and and recovers from the
-store. This bears investigation but the store recovery times are
-likely too long for failover.
-
 ** Replicating browsers
 
 The unit of replication is a replicating browser. This is an AMQP
@@ -93,6 +70,28 @@
 all of the replicating browsers have signaled completion. Thus a completed
 message is guaranteed to be on the backups.
 
+** Failover and Cluster Resource Managers
+
+We want to delegate the failover management to an existing cluster
+resource manager. Initially this is rgmanager from Cluster Suite, but
+other managers (e.g. PaceMaker) could be supported in future.
+
+Rgmanager takes care of starting and stopping brokers and informing
+brokers of their roles as primary or backup. It ensures there's
+exactly one primary broker running at any time. It also tracks quorum
+and protects against split-brain.
+
+Rgmanger can also manage a virtual IP address so clients can just
+retry on a single address to fail over. Alternatively we will also
+support configuring a fixed list of broker addresses when qpid is run
+outside of a resource manager.
+
+Aside: Cold-standby is also possible using rgmanager with shared
+storage for the message store (e.g. GFS). If the broker fails, another
+broker is started on a different node and and recovers from the
+store. This bears investigation but the store recovery times are
+likely too long for failover.
+
 ** Replicating wiring
 
 New queues and exchanges and their bindings also need to be replicated.
@@ -105,9 +104,10 @@
 manager takes care of membership and quorum.
 
 ** Selective replication
+
 In this model it's easy to support selective replication of individual queues via
-configuration. 
-- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate. 
+configuration.
+- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
   Treated analogously to persistent/durable properties for the store.
 - if not explicitly marked, provide a choice of default
   - default is replicate (replicated message on replicated queue)
@@ -136,7 +136,15 @@
 go thru the various failure cases. We may be able to do recovery on a
 per-queue basis rather than restarting an entire node.
 
-** New members joining
+
+** New backups joining
+
+New brokers can join the cluster as backups. Note - if the new broker
+has a new IP address, then the existing cluster members must be
+updated with a new client and broker URLs by a sysadmin.
+
+
+They discover
 
 We should be able to catch up much faster than the the old design. A
 new backup can catch up ("recover") the current cluster state on a
@@ -154,6 +162,85 @@
 The active consumers actually reduce the amount of work to be done, as there's
 no need to replicate messages that are no longer on the queue.
 
+** Broker discovery and lifecycle.
+
+The cluster has a client URL that can contain a single virtual IP
+address or a list of real IP addresses for the cluster.
+
+In backup mode, brokers reject connections except from a special
+cluster-admin user.
+
+Clients discover the primary by re-trying connection to the client URL
+until the successfully connect to the primary. In the case of a
+virtual IP they re-try the same address until it is relocated to the
+primary. In the case of a list of IPs the client tries each in
+turn. Clients do multiple retries over a configured period of time
+before giving up.
+
+Backup brokers discover the primary in the same way as clients. There
+is a separate broker URL for brokers since they often will connect
+over a different network to the clients. The broker URL has to be a
+list of IPs rather than one virtual IP so broker members can connect
+to each other.
+
+Brokers have the following states:
+- connecting: backup broker trying to connect to primary - loops retrying broker URL.
+- catchup: connected to primary, catching up on pre-existing wiring & messages.
+- backup: fully functional backup.
+- primary: Acting as primary, serving clients.
+
+** Interaction with rgmanager
+
+rgmanager interacts with qpid via 2 service scripts: backup & primary. These
+scripts interact with the underlying qpidd service.
+
+*** Initial cluster start
+
+rgmanager starts the backup service on all nodes and the primary service on one node.
+
+On the backup nodes qpidd is in the connecting state. The primary node goes into
+the primary state. Backups discover the primary, connect and catch up.
+
+*** Failover
+
+primary broker or node fails. Backup brokers see disconnect and go
+back to connecting mode.
+
+rgmanager notices the failure and starts the primary service on a new node.
+This tells qpidd to go to primary mode. Backups re-connect and catch up.
+
+*** Failback
+
+Cluster of N brokers has suffered a failure, only N-1 brokers
+remain. We want to start a new broker (possibly on a new node) to
+restore redundancy.
+
+If the new broker has a new IP address, the sysadmin pushes a new URL
+to all the existing brokers.
+
+The new broker starts in connecting mode. It discovers the primary,
+connects and catches up.
+
+*** Failure during failback
+
+A second failure occurs before the new backup B can complete its catch
+up.  The backup B refuses to become primary by failing the primary
+start script if rgmanager chooses it, so rgmanager will try another
+(hopefully caught-up) broker to be primary.
+
+** Interaction with the store.
+
+# FIXME aconway 2011-11-16: TBD
+- how to identify the "best" store after a total cluster crash.
+- best = last to be primary.
+- primary "generation" - counter passed to backups and incremented by new primary.
+
+restart after crash:
+- clean/dirty flag on disk for admin shutdown vs. crash
+- dirty brokers refuse to be primary
+- sysamdin tells best broker to be primary
+- erase stores? delay loading?
+
 ** Current Limitations
 
 (In no particular order at present)
@@ -219,6 +306,48 @@
 LC5 - Need richer control over which queues/exchanges are propagated, and
 which are not.
 
-Question: is it possible to miss an event on subscribing for
-configuration propagation? are the initial snapshot and subsequent
-events correctly synchronised?
+LC6 - The events and query responses are not fully synchronized.
+
+      In particular it *is* possible to not receive a delete event but
+      for the deleted object to still show up in the query response
+      (meaning the deletion is 'lost' to the update).
+
+      It is also possible for an create event to be received as well
+      as the created object being in the query response. Likewise it
+      is possible to receive a delete event and a query response in
+      which the object no longer appears. In these cases the event is
+      essentially redundant.
+
+      It is not possible to miss a create event and yet not to have
+      the object in question in the query response however.
+
+* User Documentation Notes
+
+Notes to seed initial user documentation. Loosely tracking the implementation,
+some points mentioned in the doc may not be implemented yet.
+
+** High Availability Overview
+Explain basic concepts: hot standby, primary/backup, replicated queue/exchange.
+Network topology: backup links, corosync, separate client/cluster networks.
+Describe failover mechanisms.
+- Client view: URLs, failover, exclusion & discovery.
+- Broker view: similar.
+Role of rmganager & corosync.
+
+** Client view.
+Clients use multi-address URL in base case.
+Clients can't connect to backups, retry till they find primary.
+Only qpid.cluster-admin can connect to backup, must not mess with replicated queues.
+Note connection known-hosts returns client URL, as does amq.failover exchange.
+
+Creating replicated queues & exchanges:
+- qpid.replicate argument,
+- examples using addressing and qpid-config)
+
+** Configuring corosync
+Must be on same network as backup links.
+
+** Configuring rgmanager
+
+** Configuring qpidd
+HA related options.
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index 59e951f..b8a1d26 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1498,6 +1498,10 @@
   def genNamePackageLower (self, stream, variables):
     stream.write (self.packageName.lower ())
 
+  def genPackageNameUpper (self, stream, variables):
+    up = "_".join(self.packageName.split("."))
+    stream.write (up.upper())
+
   def genNameUpper (self, stream, variables):
     stream.write (self.name.upper ())
 
@@ -1642,6 +1646,7 @@
   def genNamespace (self, stream, variables):
     stream.write("::".join(self.packageName.split(".")))
 
+
   def genOpenNamespaces (self, stream, variables):
     for item in self.packageName.split("."):
       stream.write ("namespace %s {\n" % item)
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h
index 4bcd423..90f1b4d 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h
@@ -1,6 +1,6 @@
 /*MGEN:commentPrefix=//*/
-#ifndef _MANAGEMENT_/*MGEN:Class.NameUpper*/_
-#define _MANAGEMENT_/*MGEN:Class.NameUpper*/_
+#ifndef _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_
+#define _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_
 
 //
 // Licensed to the Apache Software Foundation (ASF) under one
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 8fde735..c525f8b 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -104,7 +104,8 @@
 	-c $(srcdir)/managementgen.cmake -q -b -o qmf \
 	$(top_srcdir)/../specs/management-schema.xml \
 	$(srcdir)/qpid/acl/management-schema.xml \
-	$(srcdir)/qpid/cluster/management-schema.xml
+	$(srcdir)/qpid/cluster/management-schema.xml \
+	$(srcdir)/qpid/ha/management-schema.xml
 
 $(srcdir)/managementgen.mk $(mgen_broker_cpp) $(dist_qpid_management_HEADERS): mgen.timestamp
 mgen.timestamp: $(mgen_generator)
@@ -209,6 +210,7 @@
 cmoduleexec_LTLIBRARIES =
 
 include cluster.mk
+include ha.mk
 include acl.mk
 include qmf.mk
 include qmfc.mk
@@ -533,6 +535,7 @@
   qpid/broker/ConnectionState.h \
   qpid/broker/ConnectionToken.h \
   qpid/broker/Consumer.h \
+  qpid/broker/ConsumerFactory.h \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
   qpid/broker/Deliverable.h \
@@ -593,8 +596,6 @@
   qpid/broker/PriorityQueue.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
-  qpid/broker/NodeClone.h \
-  qpid/broker/NodeClone.cpp \
   qpid/broker/NullMessageStore.cpp \
   qpid/broker/NullMessageStore.h \
   qpid/broker/OwnershipToken.h \
@@ -622,8 +623,6 @@
   qpid/broker/QueuedMessage.h \
   qpid/broker/QueueFlowLimit.h \
   qpid/broker/QueueFlowLimit.cpp \
-  qpid/broker/QueueReplicator.h \
-  qpid/broker/QueueReplicator.cpp \
   qpid/broker/RateFlowcontrol.h \
   qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
new file mode 100644
index 0000000..dc4e7c8
--- /dev/null
+++ b/qpid/cpp/src/ha.mk
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# HA plugin makefile fragment, to be included in Makefile.am
+#
+
+dmoduleexec_LTLIBRARIES += ha.la
+
+ha_la_SOURCES =					\
+  qpid/ha/Backup.cpp				\
+  qpid/ha/Backup.h				\
+  qpid/ha/HaBroker.cpp				\
+  qpid/ha/HaBroker.h				\
+  qpid/ha/HaPlugin.cpp				\
+  qpid/ha/Logging.h				\
+  qpid/ha/Logging.cpp				\
+  qpid/ha/Settings.h				\
+  qpid/ha/QueueReplicator.h			\
+  qpid/ha/QueueReplicator.cpp			\
+  qpid/ha/ReplicatingSubscription.h		\
+  qpid/ha/ReplicatingSubscription.cpp		\
+  qpid/ha/WiringReplicator.cpp			\
+  qpid/ha/WiringReplicator.h
+
+ha_la_LIBADD = libqpidbroker.la
+ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index b00cdd0..405482d 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,8 +24,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/LinkRegistry.h"
-#include "qpid/broker/NodeClone.h"
-#include "qpid/broker/QueueReplicator.h"
+#include "qpid/ha/WiringReplicator.h"
 #include "qpid/broker/SessionState.h"
 
 #include "qpid/management/ManagementAgent.h"
@@ -59,9 +58,11 @@
 }
 
 Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
-               const _qmf::ArgsLinkBridge& _args) : 
+               const _qmf::ArgsLinkBridge& _args,
+               InitializeCallback init) :
     link(_link), id(_id), args(_args), mgmtObject(0),
-    listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
+    listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0),
+    initialize(init)
 {
     std::stringstream title;
     title << id << "_" << link->getBroker()->getFederationTag();
@@ -77,9 +78,9 @@
     QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
 }
 
-Bridge::~Bridge() 
+Bridge::~Bridge()
 {
-    mgmtObject->resourceDestroy(); 
+    mgmtObject->resourceDestroy();
 }
 
 void Bridge::create(Connection& c)
@@ -98,7 +99,7 @@
 
         session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
         peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-        
+
         session->attach(name, false);
         session->commandPoint(0,0);
     } else {
@@ -108,60 +109,12 @@
     }
 
     if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
-    if (args.i_srcIsQueue) {
-        //TODO: something other than this which is nasty...
-        bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options);
-
-        peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options);
+    if (initialize) initialize(*this, sessionHandler);
+    else if (args.i_srcIsQueue) {
+        peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
-    } else if (NodeClone::isNodeCloneDestination(args.i_dest)) {
-        //declare and bind an event queue
-        peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable());
-        peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable());
-        //subscribe to the queue
-        peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
-        peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-        peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
-        //issue a query request for queues and another for exchanges using event queue as the reply-to address
-        for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
-            Variant::Map request;
-            request["_what"] = "OBJECT";
-            Variant::Map schema;
-            schema["_class_name"] = (i == 0 ? "queue" : "exchange");
-            schema["_package_name"] = "org.apache.qpid.broker";
-            request["_schema_id"] = schema;
-
-            AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0)));
-            method.setBof(true);
-            method.setEof(false);
-            method.setBos(true);
-            method.setEos(true);
-            AMQHeaderBody headerBody;
-            MessageProperties* props = headerBody.get<MessageProperties>(true);
-            props->setReplyTo(qpid::framing::ReplyTo("", queueName));
-            props->setAppId("qmf2");
-            props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
-            headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
-            AMQFrame header(headerBody);
-            header.setBof(false);
-            header.setEof(false);
-            header.setBos(true);
-            header.setEos(true);
-            AMQContentBody data;
-            qpid::amqp_0_10::MapCodec::encode(request, data.getData());
-            AMQFrame content(data);
-            content.setBof(false);
-            content.setEof(true);
-            content.setBos(true);
-            content.setEos(true);
-            sessionHandler.out->handle(method);
-            sessionHandler.out->handle(header);
-            sessionHandler.out->handle(content);
-        }
-
     } else {
         FieldTable queueSettings;
 
@@ -236,11 +189,6 @@
     persistenceId = pId;
 }
 
-const string& Bridge::getName() const
-{
-    return name;
-}
-
 Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
 {
     string   host;
@@ -268,7 +216,7 @@
                          is_queue, is_local, id, excludes, dynamic, sync).first;
 }
 
-void Bridge::encode(Buffer& buffer) const 
+void Bridge::encode(Buffer& buffer) const
 {
     buffer.putShortString(string("bridge"));
     buffer.putShortString(link->getHost());
@@ -285,8 +233,8 @@
     buffer.putShort(args.i_sync);
 }
 
-uint32_t Bridge::encodedSize() const 
-{ 
+uint32_t Bridge::encodedSize() const
+{
     return link->getHost().size() + 1 // short-string (host)
         + 7                // short-string ("bridge")
         + 2                // port
@@ -311,7 +259,7 @@
                                                           management::Args& /*args*/,
                                                           string&)
 {
-    if (methodId == _qmf::Bridge::METHOD_CLOSE) {  
+    if (methodId == _qmf::Bridge::METHOD_CLOSE) {
         //notify that we are closed
         destroy();
         return management::Manageable::STATUS_OK;
@@ -358,7 +306,7 @@
     conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
                                           queueName, args.i_src, args.i_key, bindArgs));
 }
-bool Bridge::resetProxy() 
+bool Bridge::resetProxy()
 {
     SessionHandler& sessionHandler = conn->getChannel(id);
     if (!sessionHandler.getSession()) peer.reset();
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 8b4559a..b849b11 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,15 +42,19 @@
 class ConnectionState;
 class Link;
 class LinkRegistry;
+class SessionHandler;
 
 class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
 {
 public:
     typedef boost::shared_ptr<Bridge> shared_ptr;
     typedef boost::function<void(Bridge*)> CancellationListener;
+    typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
 
     Bridge(Link* link, framing::ChannelId id, CancellationListener l,
-           const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
+           const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+           InitializeCallback init
+    );
     ~Bridge();
 
     void create(Connection& c);
@@ -70,8 +74,8 @@
     void     setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }
     uint32_t encodedSize() const;
-    void     encode(framing::Buffer& buffer) const; 
-    const std::string& getName() const;
+    void     encode(framing::Buffer& buffer) const;
+    const std::string& getName() const { return name; }
     static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
 
     // Exchange::DynamicBridge methods
@@ -81,6 +85,10 @@
     bool containsLocalTag(const std::string& tagList) const;
     const std::string& getLocalTag() const;
 
+    // Methods needed by initialization functions
+    std::string getQueueName() const { return queueName; }
+    const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+
 private:
     struct PushHandler : framing::FrameHandler {
         PushHandler(Connection* c) { conn = c; }
@@ -103,6 +111,7 @@
     mutable uint64_t  persistenceId;
     ConnectionState* connState;
     Connection* conn;
+    InitializeCallback initialize;
 
     bool resetProxy();
 };
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index b3b751b..e8ada49 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -37,6 +37,7 @@
 #include "qpid/broker/Vhost.h"
 #include "qpid/broker/System.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/ConsumerFactory.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -199,6 +200,7 @@
     bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
+    ConsumerFactories consumerFactories;
 
   public:
     virtual ~Broker();
@@ -357,6 +359,8 @@
                 const std::string& key,
                 const std::string& userId,
                 const std::string& connectionId);
+
+    ConsumerFactories&  getConsumerFactories() { return consumerFactories; }
 };
 
 }}
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 8451f35..0e20719 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -130,7 +130,7 @@
 {
     ScopedLock<Mutex> l(ioCallbackLock);
     ioCallbacks.push(callback);
-    out.activateOutput();
+    if (isOpen()) out.activateOutput();
 }
 
 Connection::~Connection()
@@ -156,11 +156,14 @@
 void Connection::received(framing::AMQFrame& frame) {
     // Received frame on connection so delay timeout
     restartTimeout();
+    bool wasOpen = isOpen();
     adapter.handle(frame);
     if (isLink) //i.e. we are acting as the client to another broker
         recordFromServer(frame);
     else
         recordFromClient(frame);
+    if (!wasOpen && isOpen())
+        doIoCallbacks(); // Do any callbacks registered before we opened.
 }
 
 void Connection::sent(const framing::AMQFrame& frame)
@@ -329,17 +332,16 @@
 }
 
 void Connection::doIoCallbacks() {
-    {
-        ScopedLock<Mutex> l(ioCallbackLock);
-        // Although IO callbacks execute in the connection thread context, they are
-        // not cluster safe because they are queued for execution in non-IO threads.
-        ClusterUnsafeScope cus;
-        while (!ioCallbacks.empty()) {
-            boost::function0<void> cb = ioCallbacks.front();
-            ioCallbacks.pop();
-            ScopedUnlock<Mutex> ul(ioCallbackLock);
-            cb(); // Lend the IO thread for management processing
-        }
+    if (!isOpen()) return; // Don't process IO callbacks until we are open.
+    ScopedLock<Mutex> l(ioCallbackLock);
+    // Although IO callbacks execute in the connection thread context, they are
+    // not cluster safe because they are queued for execution in non-IO threads.
+    ClusterUnsafeScope cus;
+    while (!ioCallbacks.empty()) {
+        boost::function0<void> cb = ioCallbacks.front();
+        ioCallbacks.pop();
+        ScopedUnlock<Mutex> ul(ioCallbackLock);
+        cb(); // Lend the IO thread for management processing
     }
 }
 
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index a3838fb..7ac7dba 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -31,6 +31,9 @@
 class Queue;
 class QueueListeners;
 
+/**
+ * Base class for consumers which represent a subscription to a queue.
+ */
 class Consumer {
     const bool acquires;
     const bool browseAcquired;
diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
new file mode 100644
index 0000000..abd39fb
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONSUMERFACTORY_H
+#define QPID_BROKER_CONSUMERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
+// Refactor to use a more abstract interface.
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for consumer factoires. Plugins can register a
+ * ConsumerFactory via Broker:: getConsumerFactories() Each time a
+ * conumer is created, each factory is tried in turn till one returns
+ * non-0.
+ */
+class ConsumerFactory
+{
+  public:
+    virtual ~ConsumerFactory() {}
+
+    virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+        SemanticState* parent,
+        const std::string& name, boost::shared_ptr<Queue> queue,
+        bool ack, bool acquire, bool exclusive, const std::string& tag,
+        const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0;
+};
+
+/** A set of factories held by the broker
+ * THREAD UNSAFE: see notes on member functions.
+ */
+class ConsumerFactories {
+  public:
+    typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories;
+
+    /** Thread safety: May only be called during plug-in initialization. */
+    void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf); }
+
+    /** Thread safety: May only be called after plug-in initialization. */
+    const Factories& get() const { return factories; }
+
+  private:
+    Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CONSUMERFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 43ca1ae..4fe76da 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -37,7 +37,7 @@
                                bool _acquired,
                                bool accepted, 
                                bool _windowing,
-                               uint32_t _credit, bool _delayedCompletion) : msg(_msg),
+                               uint32_t _credit, bool _isDelayedCompletion) : msg(_msg),
                                                   queue(_queue), 
                                                   tag(_tag),
                                                   acquired(_acquired),
@@ -47,7 +47,7 @@
                                                   ended(accepted && acquired),
                                                   windowing(_windowing),
                                                   credit(msg.payload ? msg.payload->getRequiredCredit() : _credit),
-                                                  delayedCompletion(_delayedCompletion)
+                                                  isDelayedCompletion(_isDelayedCompletion)
 {}
 
 bool DeliveryRecord::setEnded()
@@ -115,7 +115,7 @@
     if (!ended) {
         if (acquired) {
             queue->dequeue(ctxt, msg);
-        } else if (delayedCompletion) {
+        } else if (isDelayedCompletion) {
             //TODO: this is a nasty way to do this; change it
             msg.payload->getIngressCompletion().finishCompleter();
             QPID_LOG(debug, "Completed " << msg.payload.get());
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 90e72aa..ea33ed5 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -63,7 +63,7 @@
      * after that).
      */
     uint32_t credit;
-    bool delayedCompletion;
+    bool isDelayedCompletion;
 
   public:
     QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,7 +73,7 @@
                                       bool accepted,
                                       bool windowing,
                                       uint32_t credit=0,       // Only used if msg is empty.
-                                      bool delayedCompletion=false
+                                      bool isDelayedCompletion=false
     );
     
     bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 8010bf4..f92d543 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -47,13 +47,13 @@
 
 Link::Link(LinkRegistry*  _links,
            MessageStore*  _store,
-           string&        _host,
+           const string&        _host,
            uint16_t       _port,
-           string&        _transport,
+           const string&        _transport,
            bool           _durable,
-           string&        _authMechanism,
-           string&        _username,
-           string&        _password,
+           const string&        _authMechanism,
+           const string&        _username,
+           const string&        _password,
            Broker*        _broker,
            Manageable*    parent)
     : links(_links), store(_store), host(_host), port(_port),
@@ -79,6 +79,7 @@
         }
     }
     setStateLH(STATE_WAITING);
+    startConnectionLH();
 }
 
 Link::~Link ()
@@ -213,28 +214,30 @@
 {
     Mutex::ScopedLock mutex(lock);
     created.push_back (bridge);
+    if (connection)
+        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+
 }
 
 void Link::cancel(Bridge::shared_ptr bridge)
 {
-    {
-        Mutex::ScopedLock mutex(lock);
+    Mutex::ScopedLock mutex(lock);
 
-        for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
-            if ((*i).get() == bridge.get()) {
-                created.erase(i);
-                break;
-            }
-        }
-        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-            if ((*i).get() == bridge.get()) {
-                cancellations.push_back(bridge);
-                bridge->closed();
-                active.erase(i);
-                break;
-            }
+    for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+        if ((*i).get() == bridge.get()) {
+            created.erase(i);
+            break;
         }
     }
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+        if ((*i).get() == bridge.get()) {
+            cancellations.push_back(bridge);
+            bridge->closed();
+            active.erase(i);
+            break;
+        }
+    }
+
     if (!cancellations.empty()) {
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
     }
@@ -282,6 +285,8 @@
     Mutex::ScopedLock mutex(lock);
     connection = c;
     updateUrls = true;
+    // Process any IO tasks bridges added before setConnection.
+    connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 void Link::maintenanceVisit ()
@@ -311,7 +316,7 @@
     }
     else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
+    }
 
 void Link::reconnect(const qpid::Address& a)
 {
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 4badd8b..acaa9d1 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -92,19 +92,21 @@
 
             Link(LinkRegistry* links,
                  MessageStore* store,
-                 std::string&       host,
+                 const std::string&       host,
                  uint16_t      port,
-                 std::string&       transport,
+                 const std::string&       transport,
                  bool          durable,
-                 std::string&       authMechanism,
-                 std::string&       username,
-                 std::string&       password,
+                 const std::string&       authMechanism,
+                 const std::string&       username,
+                 const std::string&       password,
                  Broker*       broker,
                  management::Manageable* parent = 0);
             virtual ~Link();
 
             std::string getHost() { return host; }
             uint16_t    getPort() { return port; }
+            std::string getTransport() { return transport; }
+
             bool isDurable() { return durable; }
             void maintenanceVisit ();
             uint nextChannel();
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index e9885f5..31b4f1b 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -124,13 +124,13 @@
     }
 }
 
-pair<Link::shared_ptr, bool> LinkRegistry::declare(string&  host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string&  host,
                                                    uint16_t port,
-                                                   string&  transport,
+                                                   const string&  transport,
                                                    bool     durable,
-                                                   string&  authMechanism,
-                                                   string&  username,
-                                                   string&  password)
+                                                   const string&  authMechanism,
+                                                   const string&  username,
+                                                   const string&  password)
 
 {
     Mutex::ScopedLock   locker(lock);
@@ -151,18 +151,20 @@
     return std::pair<Link::shared_ptr, bool>(i->second, false);
 }
 
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
                                                      uint16_t     port,
                                                      bool         durable,
-                                                     std::string& src,
-                                                     std::string& dest,
-                                                     std::string& key,
+                                                     const std::string& src,
+                                                     const std::string& dest,
+                                                     const std::string& key,
                                                      bool         isQueue,
                                                      bool         isLocal,
-                                                     std::string& tag,
-                                                     std::string& excludes,
+                                                     const std::string& tag,
+                                                     const std::string& excludes,
                                                      bool         dynamic,
-                                                     uint16_t     sync)
+                                                     uint16_t     sync,
+                                                     Bridge::InitializeCallback init
+)
 {
     Mutex::ScopedLock locker(lock);
     QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
@@ -196,7 +198,8 @@
         bridge = Bridge::shared_ptr
             (new Bridge (l->second.get(), l->second->nextChannel(),
                          boost::bind(&LinkRegistry::destroy, this,
-                                     host, port, src, dest, key), args));
+                                     host, port, src, dest, key),
+                         args, init));
         bridges[bridgeKey] = bridge;
         l->second->add(bridge);
         return std::pair<Bridge::shared_ptr, bool>(bridge, true);
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 4c97e4f..7e5b39f 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -84,28 +84,32 @@
         ~LinkRegistry();
 
         std::pair<boost::shared_ptr<Link>, bool>
-            declare(std::string& host,
+            declare(const std::string& host,
                     uint16_t     port,
-                    std::string& transport,
+                    const std::string& transport,
                     bool         durable,
-                    std::string& authMechanism,
-                    std::string& username,
-                    std::string& password);
+                    const std::string& authMechanism,
+                    const std::string& username,
+                    const std::string& password);
+
         std::pair<Bridge::shared_ptr, bool>
-            declare(std::string& host,
+            declare(const std::string& host,
                     uint16_t     port,
                     bool         durable,
-                    std::string& src,
-                    std::string& dest,
-                    std::string& key,
+                    const std::string& src,
+                    const std::string& dest,
+                    const std::string& key,
                     bool         isQueue,
                     bool         isLocal,
-                    std::string& id,
-                    std::string& excludes,
+                    const std::string& id,
+                    const std::string& excludes,
                     bool         dynamic,
-                    uint16_t     sync);
+                    uint16_t     sync,
+                    Bridge::InitializeCallback=0
+            );
 
         void destroy(const std::string& host, const uint16_t port);
+
         void destroy(const std::string& host,
                      const uint16_t     port,
                      const std::string& src,
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.cpp b/qpid/cpp/src/qpid/broker/NodeClone.cpp
deleted file mode 100644
index e8fc227..0000000
--- a/qpid/cpp/src/qpid/broker/NodeClone.cpp
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "NodeClone.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/amqp_0_10/Codecs.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-
-using qmf::org::apache::qpid::broker::EventQueueDeclare;
-using qmf::org::apache::qpid::broker::EventQueueDelete;
-using qmf::org::apache::qpid::broker::EventExchangeDeclare;
-using qmf::org::apache::qpid::broker::EventExchangeDelete;
-
-namespace qpid {
-namespace broker {
-
-namespace{
-bool isQMFv2(const Message& message)
-{
-    const qpid::framing::MessageProperties* props = message.getProperties<qpid::framing::MessageProperties>();
-    return props && props->getAppId() == "qmf2";
-}
-
-template <class T> bool match(qpid::types::Variant::Map& schema)
-{
-    return T::match(schema["_class_name"], schema["_package_name"]);
-}
-
-}
-
-NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b) {}
-
-NodeClone::~NodeClone() {}
-
-void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable* headers)
-{
-    if (isQMFv2(msg.getMessage()) && headers) {
-        if (headers->getAsString("qmf.content") == "_event") {
-            //decode as list
-            std::string content = msg.getMessage().getFrames().getContent();
-            qpid::types::Variant::List list;
-            qpid::amqp_0_10::ListCodec::decode(content, list);
-            if (list.empty()) {
-                QPID_LOG(error, "Error parsing QMF event, expected non-empty list");
-            } else {
-                try {
-                    qpid::types::Variant::Map& map = list.front().asMap();
-                    qpid::types::Variant::Map& schema = map["_schema_id"].asMap();
-                    qpid::types::Variant::Map& values = map["_values"].asMap();
-                    if (match<EventQueueDeclare>(schema)) {
-                        if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
-                            qpid::framing::FieldTable args;
-                            qpid::amqp_0_10::translate(values["args"].asMap(), args);
-                            if (!broker.createQueue(
-                                    values["qName"].asString(),
-                                    values["durable"].asBool(),
-                                    values["autoDel"].asBool(),
-                                    0 /*i.e. no owner regardless of exclusivity on master*/,
-                                    values["altEx"].asString(),
-                                    args,
-                                    values["user"].asString(),
-                                    values["rhost"].asString()).second) {
-                                QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
-                            }
-                        }
-                    } else if (match<EventQueueDelete>(schema)) {
-                        std::string name = values["qName"].asString();
-                        QPID_LOG(debug, "Notified of deletion of queue " << name);
-                        boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-                        if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
-                            broker.deleteQueue(
-                                name,
-                                values["user"].asString(),
-                                values["rhost"].asString());
-                        } else {
-                            if (queue) {
-                                QPID_LOG(debug, "Ignoring deletion notification for non-propagated queue " << name);
-                            } else {
-                                QPID_LOG(debug, "No such queue " << name);
-                            }
-                        }
-                    } else if (match<EventExchangeDeclare>(schema)) {
-                        if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
-                            qpid::framing::FieldTable args;
-                            qpid::amqp_0_10::translate(values["args"].asMap(), args);
-                            if (!broker.createExchange(
-                                    values["exName"].asString(),
-                                    values["exType"].asString(),
-                                    values["durable"].asBool(),
-                                    values["altEx"].asString(),
-                                    args,
-                                    values["user"].asString(),
-                                    values["rhost"].asString()).second) {
-                                QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
-                            }
-                        }
-                    } else if (match<EventExchangeDelete>(schema)) {
-                        std::string name = values["exName"].asString();
-                        QPID_LOG(debug, "Notified of deletion of exchange " << name);
-                        try {
-                            boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
-                            if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
-                                broker.deleteExchange(
-                                    name,
-                                    values["user"].asString(),
-                                    values["rhost"].asString());
-                            } else {
-                                if (exchange) {
-                                    QPID_LOG(debug, "Ignoring deletion notification for non-propagated exchange " << name);
-                                } else {
-                                    QPID_LOG(debug, "No such exchange " << name);
-                                }
-                            }
-                        } catch (const qpid::framing::NotFoundException&) {}
-                    }
-                } catch (const std::exception& e) {
-                    QPID_LOG(error, "Error propagating configuration: " << e.what());
-                }
-            }
-        } else if (headers->getAsString("qmf.opcode") == "_query_response") {
-            //decode as list
-            std::string content = msg.getMessage().getFrames().getContent();
-            qpid::types::Variant::List list;
-            qpid::amqp_0_10::ListCodec::decode(content, list);
-            QPID_LOG(debug, "Got query response (" << list.size() << ")");
-            for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
-                std::string type = i->asMap()["_schema_id"].asMap()["_class_name"];
-                qpid::types::Variant::Map& values = i->asMap()["_values"].asMap();
-                QPID_LOG(debug, "class: " << type << ", values: " << values);
-                if (values["arguments"].asMap()["qpid.propagate"]) {
-                    qpid::framing::FieldTable args;
-                    qpid::amqp_0_10::translate(values["arguments"].asMap(), args);
-                    if (type == "queue") {
-                        if (!broker.createQueue(
-                                values["name"].asString(),
-                                values["durable"].asBool(),
-                                values["autoDelete"].asBool(),
-                                0 /*i.e. no owner regardless of exclusivity on master*/,
-                                ""/*TODO: need to include alternate-exchange*/,
-                                args,
-                                ""/*TODO: who is the user?*/,
-                                ""/*TODO: what should we use as connection id?*/).second) {
-                            QPID_LOG(warning, "Propagatable queue " << values["name"] << " already exists");
-                        }
-                    } else if (type == "exchange") {
-                        if (!broker.createExchange(
-                                values["name"].asString(),
-                                values["type"].asString(),
-                                values["durable"].asBool(),
-                                ""/*TODO: need to include alternate-exchange*/,
-                                args,
-                                ""/*TODO: who is the user?*/,
-                                ""/*TODO: what should we use as connection id?*/).second) {
-                            QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
-                        }
-                    } else {
-                        QPID_LOG(warning, "Ignoring unknow object class: " << type);
-                    }
-                }
-            }
-        } else {
-            QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers);
-        }
-    } else {
-        QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response");
-    }
-}
-
-bool NodeClone::isNodeCloneDestination(const std::string& target)
-{
-    return target == "qpid.node-cloner";
-}
-
-boost::shared_ptr<Exchange> NodeClone::create(const std::string& target, Broker& broker)
-{
-    boost::shared_ptr<Exchange> exchange;
-    if (isNodeCloneDestination(target)) {
-        //TODO: need to cache the exchange
-        QPID_LOG(info, "Creating node cloner");
-        exchange.reset(new NodeClone(target, broker));
-    }
-    return exchange;
-}
-
-bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-const std::string NodeClone::typeName("node-cloner");
-
-std::string NodeClone::getType() const
-{
-    return typeName;
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.h b/qpid/cpp/src/qpid/broker/NodeClone.h
deleted file mode 100644
index 71cac61..0000000
--- a/qpid/cpp/src/qpid/broker/NodeClone.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#ifndef QPID_BROKER_NODEPROPAGATOR_H
-#define QPID_BROKER_NODEPROPAGATOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Exchange.h"
-
-namespace qpid {
-namespace broker {
-
-class Broker;
-
-/**
- * Pseudo-exchange for recreating local queues and/or exchanges on
- * receipt of QMF events indicating their creation on another node
- */
-class NodeClone : public Exchange
-{
-  public:
-    NodeClone(const std::string&, Broker&);
-    ~NodeClone();
-    std::string getType() const;
-    bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
-    bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
-    void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
-    bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
-
-    static bool isNodeCloneDestination(const std::string&);
-    static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
-    static const std::string typeName;
-  private:
-    Broker& broker;
-};
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_NODEPROPAGATOR_H*/
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index c47ba55..0e85fe1 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1378,7 +1378,7 @@
 }
 
 
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
 {
     return broker;
 }
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index b66600e..a53916f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -403,7 +403,7 @@
 
     void flush();
 
-    const Broker* getBroker();
+    Broker* getBroker();
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     void setDequeueSincePurge(uint32_t value);
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
deleted file mode 100644
index 01c0c8e..0000000
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/QueueReplicator.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/framing/SequenceSet.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace broker {
-
-QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {}
-QueueReplicator::~QueueReplicator() {}
-
-namespace {
-const std::string DEQUEUE_EVENT("dequeue-event");
-const std::string REPLICATOR("qpid.replicator-");
-}
-
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
-{
-    if (key == DEQUEUE_EVENT) {
-        std::string content;
-        msg.getMessage().getFrames().getContent(content);
-        qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
-        qpid::framing::SequenceSet latest;
-        latest.decode(buffer);
-
-        //TODO: should be able to optimise the following
-        for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
-            if (current < *i) {
-                //haven't got that far yet, record the dequeue
-                dequeued.add(*i);
-                QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName());
-            } else {
-                QueuedMessage message;
-                if (queue->acquireMessageAt(*i, message)) {
-                    queue->dequeue(0, message);
-                    QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName());
-                } else {
-                    QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName());
-                }
-            }
-        }
-    } else {
-        //take account of any gaps in sequence created by messages
-        //dequeued before our subscription reached them
-        while (dequeued.contains(++current)) {
-            dequeued.remove(current);
-            QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName());
-            queue->setPosition(current);
-        }
-        QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current);
-        msg.deliverTo(queue);
-    }
-}
-
-bool QueueReplicator::isReplicatingLink(const std::string& name)
-{
-    return name.find(REPLICATOR) == 0;
-}
-
-boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target, QueueRegistry& queues)
-{
-    boost::shared_ptr<Exchange> exchange;
-    if (isReplicatingLink(target)) {
-        std::string queueName = target.substr(REPLICATOR.size());
-        boost::shared_ptr<Queue> queue = queues.find(queueName);
-        if (!queue) {
-            QPID_LOG(warning, "Unable to create replicator, can't find " << queueName);
-        } else {
-            //TODO: need to cache the replicator
-            QPID_LOG(info, "Creating replicator for " << queueName);
-            exchange.reset(new QueueReplicator(target, queue));
-        }
-    }
-    return exchange;
-}
-
-bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings)
-{
-    if (isReplicatingLink(target)) {
-        std::string queueName = target.substr(REPLICATOR.size());
-        boost::shared_ptr<Queue> queue = queues.find(queueName);
-        if (queue) {
-            settings.setInt("qpid.replicating-subscription", 1);
-            settings.setInt("qpid.high_sequence_number", queue->getPosition());
-            qpid::framing::SequenceNumber oldest;
-            if (queue->getOldest(oldest)) {
-                settings.setInt("qpid.low_sequence_number", oldest);
-            }
-        }
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-const std::string QueueReplicator::typeName("queue-replicator");
-
-std::string QueueReplicator::getType() const
-{
-    return typeName;
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/broker/QueueReplicator.h
deleted file mode 100644
index 679aa92..0000000
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef QPID_BROKER_QUEUEREPLICATOR_H
-#define QPID_BROKER_QUEUEREPLICATOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Exchange.h"
-#include "qpid/framing/SequenceSet.h"
-
-namespace qpid {
-namespace broker {
-
-class QueueRegistry;
-
-/**
- * Dummy exchange for processing replication messages
- */
-class QueueReplicator : public Exchange
-{
-  public:
-    QueueReplicator(const std::string& name, boost::shared_ptr<Queue>);
-    ~QueueReplicator();
-    std::string getType() const;
-    bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
-    bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
-    void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
-    bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
-    static bool isReplicatingLink(const std::string&);
-    static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&);
-    static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&);
-    static const std::string typeName;
-  private:
-    boost::shared_ptr<Queue> queue;
-    qpid::framing::SequenceNumber current;
-    qpid::framing::SequenceSet dequeued;
-};
-
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 7d10322..43b5d99 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -25,9 +25,7 @@
 #include "qpid/broker/DtxAck.h"
 #include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/Message.h"
-#include "qpid/broker/NodeClone.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueReplicator.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/TxAccept.h"
@@ -110,15 +108,25 @@
 namespace {
     const std::string SEPARATOR("::");
 }
-    
+
 void SemanticState::consume(const string& tag,
                             Queue::shared_ptr queue, bool ackRequired, bool acquire,
-                            bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
+                            bool exclusive, const string& resumeId, uint64_t resumeTtl,
+                            const FieldTable& arguments)
 {
     // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
     // Create a globally unique name so the broker can identify individual consumers
     std::string name = session.getSessionId().str() + SEPARATOR + tag;
-    ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+    const ConsumerFactories::Factories& cf(
+        session.getBroker().getConsumerFactories().get());
+    ConsumerImpl::shared_ptr c;
+    for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i)
+        c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
+                         resumeId, resumeTtl, arguments);
+    if (!c)                     // Create plain consumer
+        c = ConsumerImpl::shared_ptr(
+            new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+                             resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     consumers[tag] = c;
 }
@@ -268,224 +276,6 @@
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
-{
-  public:
-    ReplicatingSubscription(SemanticState* parent,
-                            const std::string& name, boost::shared_ptr<Queue> queue,
-                            bool ack, bool acquire, bool exclusive, const std::string& tag,
-                            const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
-    ~ReplicatingSubscription();
-
-    void init();
-    void cancel();
-    bool deliver(QueuedMessage& msg);
-    void enqueued(const QueuedMessage&);
-    void dequeued(const QueuedMessage&);
-    void acquired(const QueuedMessage&) {}
-    void requeued(const QueuedMessage&) {}
-
-  protected:
-    bool doDispatch();
-  private:
-    boost::shared_ptr<Queue> events;
-    boost::shared_ptr<Consumer> consumer;
-    qpid::framing::SequenceSet range;
-
-    void generateDequeueEvent();
-    class DelegatingConsumer : public Consumer
-    {
-      public:
-        DelegatingConsumer(ReplicatingSubscription&);
-        ~DelegatingConsumer();
-        bool deliver(QueuedMessage& msg);
-        void notify();
-        //bool filter(boost::intrusive_ptr<Message>);
-        //bool accept(boost::intrusive_ptr<Message>);
-        Consumer::Action accept(const QueuedMessage&);
-        OwnershipToken* getSession();
-      private:
-        ReplicatingSubscription& delegate;
-    };
-};
-
-SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent,
-                                                                            const string& name,
-                                                                            Queue::shared_ptr queue,
-                                                                            bool ack,
-                                                                            bool acquire,
-                                                                            bool exclusive,
-                                                                            const string& tag,
-                                                                            const string& resumeId,
-                                                                            uint64_t resumeTtl,
-                                                                            const framing::FieldTable& arguments)
-{
-    if (arguments.isSet("qpid.replicating-subscription")) {
-        shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
-        boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init();
-        return result;
-    } else {
-        return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
-    }
-}
-
-std::string mask(const std::string& in)
-{
-    return std::string("$") + in + std::string("_internal");
-}
-
-class ReplicationStateInitialiser
-{
-  public:
-    ReplicationStateInitialiser(qpid::framing::SequenceSet& results,
-                                const qpid::framing::SequenceNumber& start,
-                                const qpid::framing::SequenceNumber& end);
-    void operator()(const QueuedMessage& m) { process(m); }
-  private:
-    qpid::framing::SequenceSet& results;
-    const qpid::framing::SequenceNumber start;
-    const qpid::framing::SequenceNumber end;
-    void process(const QueuedMessage&);
-};
-
-ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent,
-                                                                const string& _name,
-                                                                Queue::shared_ptr _queue,
-                                                                bool ack,
-                                                                bool _acquire,
-                                                                bool _exclusive,
-                                                                const string& _tag,
-                                                                const string& _resumeId,
-                                                                uint64_t _resumeTtl,
-                                                                const framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
-    events(new Queue(mask(_name))),
-    consumer(new DelegatingConsumer(*this))
-{
-
-    if (_arguments.isSet("qpid.high_sequence_number")) {
-        qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
-        qpid::framing::SequenceNumber lwm;
-        if (_arguments.isSet("qpid.low_sequence_number")) {
-            lwm = _arguments.getAsInt("qpid.low_sequence_number");
-        } else {
-            lwm = hwm;
-        }
-        qpid::framing::SequenceNumber oldest;
-        if (_queue->getOldest(oldest)) {
-            if (oldest >= hwm) {
-                range.add(lwm, --oldest);
-            } else if (oldest >= lwm) {
-                ReplicationStateInitialiser initialiser(range, lwm, hwm);
-                _queue->eachMessage(initialiser);
-            } else { //i.e. have older message on master than is reported to exist on replica
-                QPID_LOG(warning, "Replica appears to be missing message on master");
-            }
-        } else {
-            //local queue (i.e. master) is empty
-            range.add(lwm, _queue->getPosition());
-        }
-        QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range
-                 << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")");
-        //set position of 'cursor'
-        position = hwm;
-    }
-}
-
-bool ReplicatingSubscription::deliver(QueuedMessage& m)
-{
-    return ConsumerImpl::deliver(m);
-}
-
-void ReplicatingSubscription::init()
-{
-    getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-void ReplicatingSubscription::cancel()
-{
-    getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-//called before we get notified of the message being available and
-//under the message lock in the queue
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
-    QPID_LOG(debug, "Enqueued message at " << m.position);
-    //delay completion
-    m.payload->getIngressCompletion().startCompleter();
-    QPID_LOG(debug, "Delayed " << m.payload.get());
-}
-
-class Buffer : public qpid::framing::Buffer
-{
-  public:
-    Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {}
-    ~Buffer() { delete[] getPointer(); }
-};
-
-void ReplicatingSubscription::generateDequeueEvent()
-{
-    Buffer buffer(range.encodedSize());
-    range.encode(buffer);
-    range.clear();
-    buffer.reset();
-
-    //generate event message
-    boost::intrusive_ptr<Message> event = new Message();
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    AMQFrame content((AMQContentBody()));
-    content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
-    header.setBof(false);
-    header.setEof(false);
-    header.setBos(true);
-    header.setEos(true);
-    content.setBof(false);
-    content.setEof(true);
-    content.setBos(true);
-    content.setEos(true);
-    event->getFrames().append(method);
-    event->getFrames().append(header);
-    event->getFrames().append(content);
-
-    DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
-    props->setRoutingKey("dequeue-event");
-
-    events->deliver(event);
-}
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
-    {
-        Mutex::ScopedLock l(lock);
-        range.add(m.position);
-        QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position);
-    }
-    notify();
-    if (m.position > position) {
-        m.payload->getIngressCompletion().finishCompleter();
-        QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue");
-    }
-}
-
-bool ReplicatingSubscription::doDispatch()
-{
-    {
-        Mutex::ScopedLock l(lock);
-        if (!range.empty()) {
-            generateDequeueEvent();
-        }
-    }
-    bool r1 = events->dispatch(consumer);
-    bool r2 = ConsumerImpl::doDispatch();
-    return r1 || r2;
-}
-
 SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
                                           const string& _name,
                                           Queue::shared_ptr _queue,
@@ -497,7 +287,6 @@
                                           uint64_t _resumeTtl,
                                           const framing::FieldTable& _arguments
 
-
 ) :
     Consumer(_name, _acquire, !_acquire),   /** @todo KAG - allow configuration of 'browse acquired' */
     parent(_parent),
@@ -512,7 +301,7 @@
     tag(_tag),
     resumeTtl(_resumeTtl),
     arguments(_arguments),
-    msgCredit(0),
+     msgCredit(0),
     byteCredit(0),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -558,7 +347,7 @@
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, dynamic_cast<const ReplicatingSubscription*>(this));
+    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, isDelayedCompletion());
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);
@@ -714,10 +503,10 @@
     msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
 
     std::string exchangeName = msg->getExchangeName();
-    if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) {
-        cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
-        if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, getSession().getBroker());
-        if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+    if (!cacheExchange || cacheExchange->getName() != exchangeName
+        || cacheExchange->isDestroyed())
+    {
+        cacheExchange = session.getBroker().getExchanges().get(exchangeName);
     }
     cacheExchange->setProperties(msg);
 
@@ -1099,36 +888,4 @@
     }
 }
 
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
-{
-    return delegate.deliver(m);
-}
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
-ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r,
-                                                         const qpid::framing::SequenceNumber& s,
-                                                         const qpid::framing::SequenceNumber& e)
-    : results(r), start(s), end(e)
-{
-    results.add(start, end);
-}
-
-void ReplicationStateInitialiser::process(const QueuedMessage& message)
-{
-    if (message.position < start) {
-        //replica does not have a message that should still be on the queue
-        QPID_LOG(warning, "Replica appears to be missing message at " << message.position);
-    } else if (message.position >= start && message.position <= end) {
-        //i.e. message is within the intial range and has not been dequeued, so remove it from the results
-        results.remove(message.position);
-    } //else message has not been seen by replica yet so can be ignored here
-
-}
-
 }} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 62c829e..e467ec6 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -157,11 +157,10 @@
         management::ManagementObject* GetManagementObject (void) const;
         management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
 
-        static shared_ptr create(SemanticState* parent,
-                                 const std::string& name, boost::shared_ptr<Queue> queue,
-                                 bool ack, bool acquire, bool exclusive, const std::string& tag,
-                                 const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
-
+        /** This consumer wants delayed completion.
+         * Overridden by ConsumerImpl subclasses.
+         */
+        virtual bool isDelayedCompletion() const { return false; }
     };
 
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ca6d6bb..8cd5072 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/amqp_0_10/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 
 namespace qpid {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index fe66b77..f56697c 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -208,6 +208,8 @@
 
     void queueDequeueSincePurgeState(const std::string&, uint32_t);
 
+    bool isAnnounced() const { return announced; }
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4bf03ee..2cd1cf9 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -97,7 +97,7 @@
 }
 
 void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
-    if (parent.isLocal() && !sentDoOutput && !closing) {
+    if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced()) {
         sentDoOutput = true;
         parent.getCluster().getMulticast().mcastControl(
             ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
new file mode 100644
index 0000000..17da13e
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "Settings.h"
+#include "WiringReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using types::Variant;
+using std::string;
+
+Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+    // FIXME aconway 2011-11-24: identifying the primary.
+    if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
+        Url url(s.brokerUrl);
+        QPID_LOG(info, "HA: Acting as backup");
+        string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+
+        // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
+        // Declare the link
+        std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+            url[0].host, url[0].port, protocol,
+            false,              // durable
+            s.mechanism, s.username, s.password);
+        assert(result.second);  // FIXME aconway 2011-11-23: error handling
+        link = result.first;
+        boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+        broker.getExchanges().registerExchange(wr);
+    }
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
new file mode 100644
index 0000000..b4183a4
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -0,0 +1,56 @@
+#ifndef QPID_HA_BACKUP_H
+#define QPID_HA_BACKUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+#include "qpid/Url.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class Link;
+}
+
+namespace ha {
+class Settings;
+
+/**
+ * State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE: trivially because currently it only has a constructor.
+ * May need locking as the functionality grows.
+ */
+class Backup
+{
+  public:
+    Backup(broker::Broker&, const Settings&);
+
+  private:
+    broker::Broker& broker;
+    Settings settings;
+    boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_BACKUP_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
new file mode 100644
index 0000000..22b7e46
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "HaBroker.h"
+#include "Settings.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+namespace _qmf = ::qmf::org::apache::qpid::ha;
+using namespace management;
+using namespace std;
+
+namespace {
+Url url(const std::string& s, const std::string& id) {
+    try {
+        return Url(s);
+    } catch (const std::exception& e) {
+        throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'");
+    }
+}
+} // namespace
+
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+    : broker(b), 
+      clientUrl(url(s.clientUrl, "ha-client-url")),
+      brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+      mgmtObject(0)
+{
+    ManagementAgent* ma = broker.getManagementAgent();
+    if (ma) {
+        _qmf::Package  packageInit(ma);
+        mgmtObject = new _qmf::HaBroker(ma, this);
+        // FIXME aconway 2011-11-11: Placeholder - initialize cluster role.
+        mgmtObject->set_status("solo");
+        ma->addObject(mgmtObject);
+    }
+    QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
+             << " broker-url=" << brokerUrl);
+    backup.reset(new Backup(broker, s));
+    // Register a factory for replicating subscriptions.
+    broker.getConsumerFactories().add(
+        boost::shared_ptr<ReplicatingSubscription::Factory>(
+            new ReplicatingSubscription::Factory()));
+}
+
+HaBroker::~HaBroker() {}
+
+Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
+    switch (methodId) {
+      case _qmf::HaBroker::METHOD_SETSTATUS: {
+          std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status;
+          // FIXME aconway 2011-11-11: placeholder, validate & execute status change.
+          mgmtObject->set_status(status);
+          break;
+      }
+      default:
+        return Manageable::STATUS_UNKNOWN_METHOD;
+    }
+    return Manageable::STATUS_OK;
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
new file mode 100644
index 0000000..1a835c9
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -0,0 +1,62 @@
+#ifndef QPID_HA_BROKER_H
+#define QPID_HA_BROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Url.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
+#include "qpid/management/Manageable.h"
+
+namespace qpid {
+namespace broker {
+class Broker;
+}
+namespace ha {
+class Settings;
+class Backup;
+
+/**
+ * HA state and actions associated with a broker.
+ *
+ * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+ */
+class HaBroker : public management::Manageable
+{
+  public:
+    HaBroker(broker::Broker&, const Settings&);
+    ~HaBroker();
+
+    // Implement Manageable.
+    qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+    management::Manageable::status_t ManagementMethod (
+        uint32_t methodId, management::Args& args, std::string& text);
+
+  private:
+    broker::Broker& broker;
+    Url clientUrl, brokerUrl;
+    std::auto_ptr<Backup> backup;
+    qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+};
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_BROKER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
new file mode 100644
index 0000000..80f21e4
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "HaBroker.h"
+#include "Settings.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Broker.h"
+
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+struct Options : public qpid::Options {
+    Settings& settings;
+    Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
+        addOptions()
+            ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features")
+            ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.")
+            ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.")
+            ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
+            ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
+            ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+            ;
+    }
+};
+
+struct HaPlugin : public Plugin {
+
+    Settings settings;
+    Options options;
+    auto_ptr<HaBroker> haBroker;
+
+    HaPlugin() : options(settings) {}
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize(Plugin::Target& ) {}
+
+    void initialize(Plugin::Target& target) {
+        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+        if (broker && settings.enabled) {
+            haBroker.reset(new ha::HaBroker(*broker, settings));
+        } else
+            QPID_LOG(info, "HA: Disabled");
+    }
+};
+
+static HaPlugin instance; // Static initialization.
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp
new file mode 100644
index 0000000..7d8ee38
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Logging.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace ha {
+
+QueuePos::QueuePos(const broker::QueuedMessage& qm)
+    : queue(qm.queue), position(qm.position) {}
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& qp) {
+    return o << qp.queue->getName() << "[" << qp.position << "]";
+}
+
+}} // namesopace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h
new file mode 100644
index 0000000..3b12baa
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.h
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_HAOSTREAM_H
+#define QPID_HA_HAOSTREAM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iosfwd>
+
+/**@file ostream helpers used in log messages. */
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace framing {
+class SequenceNumber;
+}
+
+namespace ha {
+
+// Other printable helpers
+
+struct QueuePos {
+    const broker::Queue* queue;
+    const framing::SequenceNumber& position;
+    QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos)
+        : queue(q), position(pos) {}
+    QueuePos(const broker::QueuedMessage& qm);
+};
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& h);
+
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_HAOSTREAM_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
new file mode 100644
index 0000000..2de9ec5
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "Logging.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+}
+
+namespace qpid {
+namespace ha {
+using namespace broker;
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
+    : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
+      queue(q), link(l), current(queue->getPosition())
+{
+    // FIXME aconway 2011-11-24: consistent logging.
+    QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
+    queue->getBroker()->getLinks().declare(
+        link->getHost(), link->getPort(),
+        false,              // durable
+        queue->getName(),   // src
+        getName(),          // dest
+        "",                 // key
+        false,              // isQueue
+        false,              // isLocal
+        "",                 // id/tag
+        "",                 // excludes
+        false,              // dynamic
+        0,                  // sync?
+        boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
+    );
+}
+
+QueueReplicator::~QueueReplicator() {}
+
+// NB: This is called back ina broker connection thread when the
+// bridge is created.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+    // No lock needed, no mutable member variables are used.
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+    framing::FieldTable settings;
+    // FIXME aconway 2011-11-28: string constants.
+    settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+    // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
+    settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+    qpid::framing::SequenceNumber oldest;
+    if (queue->getOldest(oldest))
+        settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
+
+    peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
+    peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+    peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+    QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
+}
+
+
+namespace {
+const std::string DEQUEUE_EVENT("dequeue-event");
+const std::string REPLICATOR("qpid.replicator-");
+}
+
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
+{
+    if (key == DEQUEUE_EVENT) {
+        std::string content;
+        msg.getMessage().getFrames().getContent(content);
+        qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+        qpid::framing::SequenceSet latest;
+        latest.decode(buffer);
+
+        //TODO: should be able to optimise the following
+        for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
+            if (current < *i) {
+                //haven't got that far yet, record the dequeue
+                dequeued.add(*i);
+                QPID_LOG(trace, "HA: Recording dequeue of message at " <<
+                         QueuePos(queue.get(), *i));
+            } else {
+                QueuedMessage message;
+                if (queue->acquireMessageAt(*i, message)) {
+                    queue->dequeue(0, message);
+                    QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
+                } else {
+                    // FIXME aconway 2011-11-29: error handling
+                    QPID_LOG(error, "HA: Unable to dequeue message at "
+                             << QueuePos(queue.get(), *i));
+                }
+            }
+        }
+    } else {
+        //take account of any gaps in sequence created by messages
+        //dequeued before our subscription reached them
+        while (dequeued.contains(++current)) {
+            dequeued.remove(current);
+            QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName());
+            queue->setPosition(current);
+        }
+        QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current);
+        msg.deliverTo(queue);
+    }
+}
+
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
+
+// FIXME aconway 2011-11-28: rationalise string constants.
+static const std::string TYPE_NAME("qpid.queue-replicator");
+
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
new file mode 100644
index 0000000..8085c11
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -0,0 +1,72 @@
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceSet.h"
+
+namespace qpid {
+
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
+class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
+
+/**
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE.
+ */
+class QueueReplicator : public broker::Exchange
+{
+  public:
+    QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
+    ~QueueReplicator();
+    std::string getType() const;
+    bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+    bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+  private:
+    void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+
+    sys::Mutex lock;
+    boost::shared_ptr<broker::Queue> queue;
+    boost::shared_ptr<broker::Link> link;
+    framing::SequenceNumber current;
+    framing::SequenceSet dequeued;
+};
+
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
new file mode 100644
index 0000000..aabbd43
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "Logging.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using namespace std;
+
+// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
+// Do we want a common HA prefix?
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
+const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+
+const string DOLLAR("$");
+const string INTERNAL("-internal");
+
+class ReplicationStateInitialiser
+{
+  public:
+    ReplicationStateInitialiser(
+        qpid::framing::SequenceSet& r,
+        const qpid::framing::SequenceNumber& s,
+        const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+    {
+        results.add(start, end);
+    }
+
+    void operator()(const QueuedMessage& message) {
+        if (message.position < start) {
+            //replica does not have a message that should still be on the queue
+            QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
+        } else if (message.position >= start && message.position <= end) {
+            //i.e. message is within the intial range and has not been dequeued, so remove it from the results
+            results.remove(message.position);
+        } //else message has not been seen by replica yet so can be ignored here
+    }
+
+  private:
+    qpid::framing::SequenceSet& results;
+    const qpid::framing::SequenceNumber start;
+    const qpid::framing::SequenceNumber end;
+};
+
+string mask(const string& in)
+{
+    return DOLLAR + in + INTERNAL;
+}
+
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+    SemanticState* parent,
+    const string& name,
+    Queue::shared_ptr queue,
+    bool ack,
+    bool acquire,
+    bool exclusive,
+    const string& tag,
+    const string& resumeId,
+    uint64_t resumeTtl,
+    const framing::FieldTable& arguments
+) {
+    return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
+        new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+    SemanticState* parent,
+    const string& name,
+    Queue::shared_ptr queue,
+    bool ack,
+    bool acquire,
+    bool exclusive,
+    const string& tag,
+    const string& resumeId,
+    uint64_t resumeTtl,
+    const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+                 resumeId, resumeTtl, arguments),
+    events(new Queue(mask(name))),
+    consumer(new DelegatingConsumer(*this))
+{
+    QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
+    // FIXME aconway 2011-11-25: string constants.
+    if (arguments.isSet("qpid.high_sequence_number")) {
+        qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number");
+        qpid::framing::SequenceNumber lwm;
+        if (arguments.isSet("qpid.low_sequence_number")) {
+            lwm = arguments.getAsInt("qpid.low_sequence_number");
+        } else {
+            lwm = hwm;
+        }
+        qpid::framing::SequenceNumber oldest;
+        if (queue->getOldest(oldest)) {
+            if (oldest >= hwm) {
+                range.add(lwm, --oldest);
+            } else if (oldest >= lwm) {
+                ReplicationStateInitialiser initialiser(range, lwm, hwm);
+                queue->eachMessage(initialiser);
+            } else { //i.e. have older message on master than is reported to exist on replica
+                QPID_LOG(warning, "HA: Replica  missing message on master");
+            }
+        } else {
+            //local queue (i.e. master) is empty
+            range.add(lwm, queue->getPosition());
+        }
+        QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range
+                 << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")");
+        //set position of 'cursor'
+        position = hwm;
+    }
+}
+
+bool ReplicatingSubscription::deliver(QueuedMessage& m)
+{
+    return ConsumerImpl::deliver(m);
+}
+
+void ReplicatingSubscription::cancel()
+{
+    getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+//called before we get notified of the message being available and
+//under the message lock in the queue
+void ReplicatingSubscription::enqueued(const QueuedMessage& m)
+{
+    QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
+    //delay completion
+    m.payload->getIngressCompletion().startCompleter();
+}
+
+void ReplicatingSubscription::generateDequeueEvent()
+{
+    string buf(range.encodedSize(),'\0');
+    framing::Buffer buffer(&buf[0], buf.size());
+    range.encode(buffer);
+    range.clear();
+    buffer.reset();
+
+    //generate event message
+    boost::intrusive_ptr<Message> event = new Message();
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
+    AMQFrame header((AMQHeaderBody()));
+    AMQFrame content((AMQContentBody()));
+    content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+    header.setBof(false);
+    header.setEof(false);
+    header.setBos(true);
+    header.setEos(true);
+    content.setBof(false);
+    content.setEof(true);
+    content.setBos(true);
+    content.setEos(true);
+    event->getFrames().append(method);
+    event->getFrames().append(header);
+    event->getFrames().append(content);
+
+    DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+    props->setRoutingKey("dequeue-event");
+
+    events->deliver(event);
+}
+
+//called after the message has been removed from the deque and under
+//the message lock in the queue
+void ReplicatingSubscription::dequeued(const QueuedMessage& m)
+{
+    {
+        sys::Mutex::ScopedLock l(lock);
+        range.add(m.position);
+        // FIXME aconway 2011-11-29: q[pos]
+        QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
+    }
+    notify();
+    if (m.position > position) {
+        m.payload->getIngressCompletion().finishCompleter();
+        QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue");
+    }
+}
+
+bool ReplicatingSubscription::doDispatch()
+{
+    {
+        sys::Mutex::ScopedLock l(lock);
+        if (!range.empty()) {
+            generateDequeueEvent();
+        }
+    }
+    bool r1 = events->dispatch(consumer);
+    bool r2 = ConsumerImpl::doDispatch();
+    return r1 || r2;
+}
+
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
+{
+    return delegate.deliver(m);
+}
+void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
+bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
new file mode 100644
index 0000000..b83842a
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -0,0 +1,109 @@
+#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace ha {
+
+/**
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD UNSAFE: used only in broker connection thread.
+ */
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+                                public broker::QueueObserver
+{
+  public:
+    struct Factory : public broker::ConsumerFactory {
+        boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+            broker::SemanticState* parent,
+            const std::string& name, boost::shared_ptr<broker::Queue> ,
+            bool ack, bool acquire, bool exclusive, const std::string& tag,
+            const std::string& resumeId, uint64_t resumeTtl,
+            const framing::FieldTable& arguments);
+    };
+
+    // Argument names for consume command.
+    static const std::string QPID_REPLICATING_SUBSCRIPTION;
+    static const std::string QPID_HIGH_SEQUENCE_NUMBER;
+    static const std::string QPID_LOW_SEQUENCE_NUMBER;
+
+    ReplicatingSubscription(broker::SemanticState* parent,
+                            const std::string& name, boost::shared_ptr<broker::Queue> ,
+                            bool ack, bool acquire, bool exclusive, const std::string& tag,
+                            const std::string& resumeId, uint64_t resumeTtl,
+                            const framing::FieldTable& arguments);
+
+    ~ReplicatingSubscription();
+
+    void cancel();
+    bool deliver(broker::QueuedMessage& msg);
+    void enqueued(const broker::QueuedMessage&);
+    void dequeued(const broker::QueuedMessage&);
+    void acquired(const broker::QueuedMessage&) {}
+    void requeued(const broker::QueuedMessage&) {}
+
+    bool isDelayedCompletion() const { return true; }
+    
+  protected:
+    bool doDispatch();
+  private:
+    boost::shared_ptr<broker::Queue> events;
+    boost::shared_ptr<broker::Consumer> consumer;
+    qpid::framing::SequenceSet range;
+
+    void generateDequeueEvent();
+    class DelegatingConsumer : public Consumer
+    {
+      public:
+        DelegatingConsumer(ReplicatingSubscription&);
+        ~DelegatingConsumer();
+        bool deliver(broker::QueuedMessage& msg);
+        void notify();
+        bool filter(boost::intrusive_ptr<broker::Message>);
+        bool accept(boost::intrusive_ptr<broker::Message>);
+        broker::OwnershipToken* getSession();
+      private:
+        ReplicatingSubscription& delegate;
+    };
+};
+
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
new file mode 100644
index 0000000..a2d2e89
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -0,0 +1,47 @@
+#ifndef QPID_HA_SETTINGS_H
+#define QPID_HA_SETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace ha {
+
+using std::string;
+
+/**
+ * Configurable settings for HA.
+ */
+class Settings
+{
+  public:
+    Settings() : enabled(false) {}
+    bool enabled;
+    string clientUrl;
+    string brokerUrl;
+    string username, password, mechanism;
+  private:
+};
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_SETTINGS_H*/
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
new file mode 100644
index 0000000..125e2c0
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -0,0 +1,463 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "WiringReplicator.h"
+#include "QueueReplicator.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+
+namespace qpid {
+namespace ha {
+
+using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
+using namespace framing;
+using std::string;
+using types::Variant;
+using namespace broker;
+
+namespace {
+
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_REPLICATE("qpid.replicate");
+
+const string CLASS_NAME("_class_name");
+const string EVENT("_event");
+const string OBJECT_NAME("_object_name");
+const string PACKAGE_NAME("_package_name");
+const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
+
+const string ALTEX("altEx");
+const string ARGS("args");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
+const string BIND("bind");
+const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
+const string DURABLE("durable");
+const string EXCHANGE("exchange");
+const string EXNAME("exName");
+const string EXTYPE("exType");
+const string KEY("key");
+const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
+const string TYPE("type");
+const string USER("user");
+
+const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
+
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+
+bool isQMFv2(const Message& message) {
+    const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
+    return props && props->getAppId() == QMF2;
+}
+
+template <class T> bool match(Variant::Map& schema) {
+    return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
+}
+
+// FIXME aconway 2011-11-24: this should be a class.
+enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+const string S_NONE="none";
+const string S_WIRING="wiring";
+const string S_ALL="all";
+
+ReplicateLevel replicateLevel(const string& str) {
+    // FIXME aconway 2011-11-24: case insenstive comparison.
+    ReplicateLevel rl = RL_NONE;
+    if (str == S_WIRING) rl = RL_WIRING;
+    else if (str == S_ALL) rl = RL_ALL;
+    return rl;
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+    if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+    else return RL_NONE;
+}
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+    Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+    if (i != m.end()) return replicateLevel(i->second.asString());
+    else return RL_NONE;
+}
+
+void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    Variant::Map request;
+    request[_WHAT] = OBJECT;
+    Variant::Map schema;
+    schema[_CLASS_NAME] = className;
+    schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+    request[_SCHEMA_ID] = schema;
+
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
+    method.setBof(true);
+    method.setEof(false);
+    method.setBos(true);
+    method.setEos(true);
+    AMQHeaderBody headerBody;
+    MessageProperties* props = headerBody.get<MessageProperties>(true);
+    props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+    props->setAppId(QMF2);
+    props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+    headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+    AMQFrame header(headerBody);
+    header.setBof(false);
+    header.setEof(false);
+    header.setBos(true);
+    header.setEos(true);
+    AMQContentBody data;
+    qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+    AMQFrame content(data);
+    content.setBof(false);
+    content.setEof(true);
+    content.setBos(true);
+    content.setEos(true);
+    sessionHandler.out->handle(method);
+    sessionHandler.out->handle(header);
+    sessionHandler.out->handle(content);
+}
+} // namespace
+
+WiringReplicator::~WiringReplicator() {}
+
+WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
+    : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+{
+    QPID_LOG(debug, "HA: Starting replication from " <<
+             link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
+    broker.getLinks().declare(
+        link->getHost(), link->getPort(),
+        false,              // durable
+        QPID_WIRING_REPLICATOR, // src
+        QPID_WIRING_REPLICATOR, // dest
+        "",                 // key
+        false,              // isQueue
+        false,              // isLocal
+        "",                 // id/tag
+        "",                 // excludes
+        false,              // dynamic
+        0,                  // sync?
+        boost::bind(&WiringReplicator::initializeBridge, this, _1, _2)
+    );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    string queueName = bridge.getQueueName();
+    const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+    //declare and bind an event queue
+    peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+    peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+    //subscribe to the queue
+    peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+    peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+    peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+    //issue a query request for queues and another for exchanges using event queue as the reply-to address
+    sendQuery(QUEUE, queueName, sessionHandler);
+    sendQuery(EXCHANGE, queueName, sessionHandler);
+    sendQuery(BINDING, queueName, sessionHandler);
+    QPID_LOG(debug, "HA: Activated wiring replicator")
+}
+
+void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+    Variant::List list;
+    try {
+        if (!isQMFv2(msg.getMessage()) || !headers)
+            throw Exception("Unexpected message, not QMF2 event or query response.");
+        // decode as list
+        string content = msg.getMessage().getFrames().getContent();
+        amqp_0_10::ListCodec::decode(content, list);
+
+        if (headers->getAsString(QMF_CONTENT) == EVENT) {
+            for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+                Variant::Map& map = i->asMap();
+                Variant::Map& schema = map[SCHEMA_ID].asMap();
+                Variant::Map& values = map[VALUES].asMap();
+                QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values);
+                if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
+                else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
+                else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
+                else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
+                else if (match<EventBind>(schema)) doEventBind(values);
+                // FIXME aconway 2011-11-21: handle unbind & all other events.
+                else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
+                else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema)));
+            }
+        } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+            for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+                string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+                Variant::Map& values = i->asMap()[VALUES].asMap();
+                framing::FieldTable args;
+                amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+                QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values);
+                if      (type == QUEUE) doResponseQueue(values);
+                else if (type == EXCHANGE) doResponseExchange(values);
+                else if (type == BINDING) doResponseBind(values);
+                else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
+            }
+        } else {
+            QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers));
+        }
+    } catch (const std::exception& e) {
+        QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
+        QPID_LOG(debug, "HA: Error processing configuration message: " << list);
+    }
+}
+
+void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
+    string name = values[QNAME].asString();
+    Variant::Map argsMap = values[ARGS].asMap();
+    if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+         framing::FieldTable args;
+        amqp_0_10::translate(argsMap, args);
+
+        QPID_LOG(debug, "HA: Creating queue from event " << name);
+       std::pair<boost::shared_ptr<Queue>, bool> result =
+            broker.createQueue(
+                name,
+                values[DURABLE].asBool(),
+                values[AUTODEL].asBool(),
+                0 /*i.e. no owner regardless of exclusivity on master*/,
+                values[ALTEX].asString(),
+                args,
+                values[USER].asString(),
+                values[RHOST].asString());
+        if (result.second) {
+            // FIXME aconway 2011-11-22: should delete old queue and
+            // re-create from event.
+            // Events are always up to date, whereas responses may be
+            // out of date.
+            QPID_LOG(debug, "HA: New queue replica " << name);
+            startQueueReplicator(result.first);
+        } else {
+            QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
+        }
+    }
+}
+
+void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
+    string name = values[QNAME].asString();
+    boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+    if (queue && replicateLevel(queue->getSettings())) {
+        QPID_LOG(debug, "HA: Deleting queue from event: " << name);
+        broker.deleteQueue(
+            name,
+            values[USER].asString(),
+            values[RHOST].asString());
+    }
+}
+
+void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
+    Variant::Map argsMap(values[ARGS].asMap());
+    if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+        string name = values[EXNAME].asString();
+        framing::FieldTable args;
+        amqp_0_10::translate(argsMap, args);
+        QPID_LOG(debug, "HA: New exchange replica " << name);
+        if (!broker.createExchange(
+                name,
+                values[EXTYPE].asString(),
+                values[DURABLE].asBool(),
+                values[ALTEX].asString(),
+                args,
+                values[USER].asString(),
+                values[RHOST].asString()).second) {
+            // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+            // and re-create from event. See comment in doEventQueueDeclare.
+            QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists");
+        }
+    }
+}
+
+void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
+    string name = values[EXNAME].asString();
+    try {
+        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+        if (exchange && replicateLevel(exchange->getArgs())) {
+            QPID_LOG(debug, "HA: Deleting exchange:" << name);
+            broker.deleteExchange(
+                name,
+                values[USER].asString(),
+                values[RHOST].asString());
+        }
+    } catch (const framing::NotFoundException&) {}
+}
+
+void WiringReplicator::doEventBind(Variant::Map& values) {
+    try {
+        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString());
+        boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString());
+        // We only replicated a binds for a replicated queue to replicated exchange.
+        if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) {
+            framing::FieldTable args;
+            amqp_0_10::translate(values[ARGS].asMap(), args);
+            string key = values[KEY].asString();
+            QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+                     << " queue=" << queue->getName()
+                     << " key=" << key);
+            exchange->bind(queue, key, &args);
+        }
+    } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange.
+}
+
+void WiringReplicator::doResponseQueue(Variant::Map& values) {
+    // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
+    Variant::Map argsMap(values[ARGUMENTS].asMap());
+    if (!replicateLevel(argsMap)) return;
+    framing::FieldTable args;
+    amqp_0_10::translate(argsMap, args);
+    string name(values[NAME].asString());
+    std::pair<boost::shared_ptr<Queue>, bool> result =
+        broker.createQueue(
+            name,
+            values[DURABLE].asBool(),
+            values[AUTODELETE].asBool(),
+            0 /*i.e. no owner regardless of exclusivity on master*/,
+            ""/*TODO: need to include alternate-exchange*/,
+            args,
+            ""/*TODO: who is the user?*/,
+            ""/*TODO: what should we use as connection id?*/);
+    if (result.second) {
+        QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)");
+        startQueueReplicator(result.first);
+    } else {
+        // FIXME aconway 2011-11-22: Normal to find queue already
+        // exists if we're failing over.
+        QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)");
+    }
+}
+
+void WiringReplicator::doResponseExchange(Variant::Map& values) {
+    Variant::Map argsMap(values[ARGUMENTS].asMap());
+    if (!replicateLevel(argsMap)) return;
+    framing::FieldTable args;
+    amqp_0_10::translate(argsMap, args);
+    QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)");
+    if (!broker.createExchange(
+            values[NAME].asString(),
+            values[TYPE].asString(),
+            values[DURABLE].asBool(),
+            ""/*TODO: need to include alternate-exchange*/,
+            args,
+            ""/*TODO: who is the user?*/,
+            ""/*TODO: what should we use as connection id?*/).second) {
+        QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)");
+    }
+}
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+    Variant::Map map(ref.asMap());
+    Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+    if (i == map.end())
+        throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
+    const std::string name = i->second.asString();
+    if (name.compare(0, prefix.size(), prefix) != 0)
+        throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
+    std::string ret = name.substr(prefix.size());
+    return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
+void WiringReplicator::doResponseBind(Variant::Map& values) {
+    try {
+        std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+        std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+        boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
+        boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+        // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+        // Automatically replicate exchange if queue and exchange are replicated
+        if (exchange && replicateLevel(exchange->getArgs()) &&
+            queue && replicateLevel(queue->getSettings()))
+        {
+            framing::FieldTable args;
+            amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+            string key = values[KEY].asString();
+            QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+                     << " queue=" << queue->getName()
+                     << " key=" << key);
+            exchange->bind(queue, key, &args);
+        }
+    } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange.
+}
+
+void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+    // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed.
+    if (replicateLevel(queue->getSettings()) == RL_ALL) {
+        boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+        broker.getExchanges().registerExchange(qr);
+    }
+}
+
+bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+
+string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
+
+}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h
new file mode 100644
index 0000000..32109d8
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h
@@ -0,0 +1,81 @@
+#ifndef QPID_HA_REPLICATOR_H
+#define QPID_HA_REPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
+}
+
+namespace ha {
+
+/**
+ * Replicate wiring on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
+ */
+class WiringReplicator : public broker::Exchange
+{
+  public:
+    WiringReplicator(const boost::shared_ptr<broker::Link>&);
+    ~WiringReplicator();
+    std::string getType() const;
+
+    // Exchange methods
+    bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+    bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+  private:
+    void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+    void doEventQueueDeclare(types::Variant::Map& values);
+    void doEventQueueDelete(types::Variant::Map& values);
+    void doEventExchangeDeclare(types::Variant::Map& values);
+    void doEventExchangeDelete(types::Variant::Map& values);
+    void doEventBind(types::Variant::Map&);
+    void doResponseQueue(types::Variant::Map& values);
+    void doResponseExchange(types::Variant::Map& values);
+    void doResponseBind(types::Variant::Map& values);
+    void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+
+    broker::Broker& broker;
+    boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_HA_REPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
new file mode 100644
index 0000000..bb06e77
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -0,0 +1,34 @@
+<schema package="org.apache.qpid.ha">
+
+  <!--
+      Licensed to the Apache Software Foundation (ASF) under one
+      or more contributor license agreements.  See the NOTICE file
+      distributed with this work for additional information
+      regarding copyright ownership.  The ASF licenses this file
+      to you under the Apache License, Version 2.0 (the
+      "License"); you may not use this file except in compliance
+      with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+      Unless required by applicable law or agreed to in writing,
+      software distributed under the License is distributed on an
+      "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+      KIND, either express or implied.  See the License for the
+      specific language governing permissions and limitations
+      under the License.
+  -->
+
+  <!-- Monitor and control HA status of a broker. -->
+  <class name="HaBroker">
+    <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, SOLO"/>
+
+    <method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO">
+      <arg name="status" type="sstr" dir="I"/>
+    </method>
+
+    <property name="clientUrl" type="sstr" desc="URL used by clients to connect to the cluster."/>
+    <property name="brokerUrl" type="sstr" desc="URL used by brokers to connect to other brokers in the cluster."/>
+  </class>
+
+</schema>
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 16d7fb0..91d5f6a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -440,6 +440,7 @@
     # Environment settings.
     qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
     cluster_lib = os.getenv("CLUSTER_LIB")
+    ha_lib = os.getenv("HA_LIB")
     xml_lib = os.getenv("XML_LIB")
     qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
     qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -514,6 +515,12 @@
         actual_contents = self.browse(session, queue, timeout)
         self.assertEqual(expect_contents, actual_contents)
 
+    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+        """Wait up to timeout for contents of queue to match expect_contents"""
+        def test(): return self.browse(session, queue, 0) == expect_contents
+        retry(test, timeout, delay)
+        self.assertEqual(expect_contents, self.browse(session, queue, 0))
+
 def join(thread, timeout=10):
     thread.join(timeout)
     if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 199d1e7..424d416 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -61,15 +61,25 @@
 # You should do "newgrp ais" before running the tests to run these.
 # 
 
+# FIXME aconway 2011-11-14: Disable cluster tests on qpid-3603 branch
+# Some cluster tests are known to fail on this branch.
+# Immediate priority is to develop then new HA solution,
+# Cluster will brought up to date when thats done.
+#
+# gsim: its due to the keeping of deleted messages on the deque until they can be popped off either end
+# gsim: that is state that isn't available to new nodes of course
+# gsim: i.e. if you dequeue a message from the middle of the deque
+# gsim: it will not be on updatee but will be hidden on original node(s)
+# gsim: and is needed for the direct indexing
 
-# ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS +=					\
-	run_cluster_test			\
-	cluster_read_credit			\
-	test_watchdog				\
-	run_cluster_tests			\
-	federated_cluster_test			\
-	clustered_replication_test
+
+# TESTS +=					\
+# 	run_cluster_test			\
+# 	cluster_read_credit			\
+# 	test_watchdog				\
+# 	run_cluster_tests			\
+# 	federated_cluster_test			\
+# 	clustered_replication_test
 
 # Clean up after cluster_test and start_cluster
 CLEANFILES += cluster_test.acl cluster.ports
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
new file mode 100755
index 0000000..9b52c2f
--- /dev/null
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
+from qpid.messaging import Message, NotFound
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger
+
+
+log = getLogger("qpid.ha-tests")
+
+class ShortTests(BrokerTest):
+    """Short HA functionality tests."""
+
+    def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs):
+        assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+        return Broker(self, args=["--load-module", BrokerTest.ha_lib,
+                                  "--ha-enable=yes",
+                                  "--ha-client-url", client_url,
+                                  "--ha-broker-url", broker_url,
+                                  ] + args,
+                      **kwargs)
+
+    # FIXME aconway 2011-11-15: work around async replication.
+    def wait(self, session, address):
+        def check():
+            try:
+                session.sender(address)
+                return True
+            except NotFound: return False
+        assert retry(check), "Timed out waiting for %s"%(address)
+
+    def assert_missing(self,session, address):
+        try:
+            session.receiver(address)
+            self.fail("Should not have been replicated: %s"%(address))
+        except NotFound: pass
+
+    def test_replication(self):
+        def queue(name, replicate):
+            return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+
+        def exchange(name, replicate, bindq):
+            return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+        def setup(p, prefix):
+            """Create config, send messages on the primary p"""
+            p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+            p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+            p.sender(queue(prefix+"q3", "none")).send(Message("3"))
+            p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
+            p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+            # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
+            p.sender(queue(prefix+"x", "wiring"))
+
+        def verify(b, prefix):
+            """Verify setup was replicated to backup b"""
+            # FIXME aconway 2011-11-21: wait for wiring to replicate.
+            self.wait(b, prefix+"x");
+            # Verify backup
+            # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
+            self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+            self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+            self.assert_missing(b, prefix+"q3")
+            b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+            self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
+            b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
+            self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
+
+        # Create config, send messages before starting the backup, to test catch-up replication.
+        primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+        p = primary.connect().session()
+        setup(p, "1")
+        # Start the backup
+        backup  = self.ha_broker(name="backup", broker_url=primary.host_port())
+        b = backup.connect().session()
+        verify(b, "1")
+
+        # Create config, send messages after starting the backup, to test steady-state replication.
+        setup(p, "2")
+        verify(b, "2")
+
+if __name__ == "__main__":
+    shutil.rmtree("brokertest.tmp", True)
+    os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 26be15b..100612f 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -63,6 +63,7 @@
 exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
 exportmodule ACL_LIB acl.so
 exportmodule CLUSTER_LIB cluster.so
+exportmodule HA_LIB ha.so
 exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
 exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
 exportmodule SSLCONNECTOR_LIB sslconnector.so
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
index 1a0f711..13f31fe 100755
--- a/qpid/python/qpid-python-test
+++ b/qpid/python/qpid-python-test
@@ -570,6 +570,8 @@
 if opts.xml and not list_only:
    xmlr = JunitXmlStyleReporter(opts.xml);
    xmlr.begin();
+else:
+   xmlr = None
 
 passed = 0
 failed = 0
diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py
index feae4bb..48e29ad 100755
--- a/qpid/tools/setup.py
+++ b/qpid/tools/setup.py
@@ -31,7 +31,8 @@
                "src/py/qpid-route",
                "src/py/qpid-stat",
                "src/py/qpid-tool",
-               "src/py/qmf-tool"],
+               "src/py/qmf-tool",
+               "src/py/qpid-ha-status"],
       url="http://qpid.apache.org/",
       license="Apache Software License",
       description="Diagnostic and management tools for Apache Qpid brokers.")
diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status
new file mode 100755
index 0000000..c70e4c9
--- /dev/null
+++ b/qpid/tools/src/py/qpid-ha-status
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import qmf.console, optparse, sys
+from qpid.management import managementChannel, managementClient
+
+usage="""
+Usage: qpid-ha-status [broker-address] [status]
+If status is specified, sets the HA status of the broker. Otherwise prints the current HA status. Status must be one of: primary, backup, solo.
+"""
+
+STATUS_VALUES=["primary", "backup", "solo"]
+
+def is_valid_status(value): return value in STATUS_VALUES
+
+def validate_status(value):
+        if not is_valid_status(value):
+            raise Exception("Invalid HA status value: %s"%(value))
+
+class HaBroker:
+    def __init__(self, broker, session):
+        self.session = session
+        try:
+            self.qmf_broker = self.session.addBroker(broker)
+        except Exception, e:
+            raise Exception("Can't connect to %s: %s"%(broker,e))
+        ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha")
+        if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
+        self.ha_broker = ha_brokers[0];
+
+    def get_status(self):
+        return self.ha_broker.status
+
+    def set_status(self, value):
+        validate_status(value)
+        self.ha_broker.setStatus(value)
+
+def parse_args(args):
+    broker, status = "localhost:5672", None
+    if args and is_valid_status(args[-1]):
+        status = args[-1]
+        args.pop()
+    if args: broker = args[0]
+    return broker, status
+
+def main():
+    try:
+        session = qmf.console.Session()
+        try:
+            broker, status = parse_args(sys.argv[1:])
+            hb = HaBroker(broker, session)
+            if status: hb.set_status(status)
+            else: print hb.get_status()
+        finally:
+            session.close()
+        return 0
+    except Exception, e:
+        print e
+        return -1
+
+if __name__ == "__main__":
+    sys.exit(main())