QPID-3603: resync -kgiusti tracking branch with gsim's work.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-kgiusti@1208483 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/design_docs/hot-standby-design.txt b/qpid/cpp/design_docs/hot-standby-design.txt
deleted file mode 100644
index 99a5dc0..0000000
--- a/qpid/cpp/design_docs/hot-standby-design.txt
+++ /dev/null
@@ -1,239 +0,0 @@
--*-org-*-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-* Another new design for Qpid clustering.
-
-For background see [[./new-cluster-design.txt]] which describes the issues
-with the old design and a new active-active design that could replace it.
-
-This document describes an alternative hot-standby approach.
-
-** Delivery guarantee
-
-We guarantee N-way redundant, at least once delivey. Once a message
-from a client has been acknowledged by the broker, it will be
-delivered even if N-1 brokers subsequently fail. There may be
-duplicates in the event of a failure. We don't make duplicates
-during normal operation (i.e when no brokers have failed)
-
-This is the same guarantee as the old cluster and the alternative
-active-active design.
-
-** Active-active vs. hot standby (aka primary-backup)
-
-An active-active cluster allows clients to connect to any broker in
-the cluster. If a broker fails, clients can fail-over to any other
-live broker.
-
-A hot-standby cluster has only one active broker at a time (the
-"primary") and one or more brokers on standby (the "backups"). Clients
-are only served by the leader, clients that connect to a backup are
-redirected to the leader. The backpus are kept up-to-date in real time
-by the primary, if the primary fails a backup is elected to be the new
-primary.
-
-Aside: A cold-standby cluster is possible using a standalone broker,
-CMAN and shared storage. In this scenario only one broker runs at a
-time writing to a shared store. If it fails, another broker is started
-(by CMAN) and recovers from the store. This bears investigation but
-the store recovery time is probably too long for failover.
-
-** Why hot standby?
-
-Active-active has some advantages:
-- Finding a broker on startup or failover is simple, just pick any live broker.
-- All brokers are always running in active mode, there's no
-- Distributing clients across brokers gives better performance, but see [1].
-- A broker failure affects only clients connected to that broker.
-
-The main problem with active-active is co-ordinating consumers of the
-same queue on multiple brokers such that there are no duplicates in
-normal operation. There are 2 approaches:
-
-Predictive: each broker predicts which messages others will take. This
-the main weakness of the old design so not appealing.
-
-Locking: brokers "lock" a queue in order to take messages. This is
-complex to implement, its not straighforward to determine the most
-performant strategie for passing the lock.
-
-Hot-standby removes this problem. Only the primary can modify queues
-so it just has to tell the backups what it is doing, there's no
-locking.
-
-The primary can enqueue messages and replicate asynchronously -
-exactly like the store does, but it "writes" to the replicas over the
-network rather than writing to disk.
-
-** Failover in a hot-standby cluster.
-
-Hot-standby has some potential performance issues around failover:
-
-- Failover "spike": when the primary fails every client will fail over
- at the same time, putting strain on the system.
-
-- Until a new primary is elected, cluster cannot serve any clients or
- redirect clients to the primary.
-
-We want to minimize the number of re-connect attempts that clients
-have to make. The cluster can use a well-known algorithm to choose the
-new primary (e.g. round robin on a known sequence of brokers) so that
-clients can guess the new primary correctly in most cases.
-
-Even if clients do guess correctly it may be that the new primary is
-not yet aware of the death of the old primary, which is may to cause
-multiple failed connect attempts before clients eventually get
-connected. We will need to prototype to see how much this happens in
-reality and how we can best get clients redirected.
-
-** Threading and performance.
-
-The primary-backup cluster operates analogously to the way the disk store does now:
-- use the same MessageStore interface as the store to interact with the broker
-- use the same asynchronous-completion model for replicating messages.
-- use the same recovery interfaces (?) for new backups joining.
-
-Re-using the well-established store design gives credibility to the new cluster design.
-
-The single CPG dispatch thread was a severe performance bottleneck for the old cluster.
-
-The primary has the same threading model as a a standalone broker with
-a store, which we know that this performs well.
-
-If we use CPG for replication of messages, the backups will receive
-messages in the CPG dispatch thread. To get more concurency, the CPG
-thread can dump work onto internal PollableQueues to be processed in
-parallel.
-
-Messages from the same broker queue need to go onto the same
-PollableQueue. There could be a separate PollableQueue for each broker
-queue. If that's too resource intensive we can use a fixed set of
-PollableQueues and assign broker queues to PollableQueues via hashing
-or round robin.
-
-Another possible optimization is to use multiple CPG queues: one per
-queue or a hashed set, to get more concurrency in the CPG layer. The
-old cluster is not able to keep CPG busy.
-
-TODO: Transactions pose a challenge with these concurrent models: how
-to co-ordinate multiple messages being added (commit a publish or roll
-back an accept) to multiple queues so that all replicas end up with
-the same message sequence while respecting atomicity.
-
-** Use of CPG
-
-CPG provides several benefits in the old cluster:
-- tracking membership (essential for determining the primary)
-- handling "spit brain" (integrates with partition support from CMAN)
-- reliable multicast protocol to distribute messages.
-
-I believe we still need CPG for membership and split brain. We could
-experiment with sending the bulk traffic over AMQP conections.
-
-** Flow control
-
-Need to ensure that
-1) In-memory internal queues used by the cluster don't overflow.
-2) The backups don't fall too far behind on processing CPG messages
-
-** Recovery
-When a new backup joins an active cluster it must get a snapshot
-from one of the other backups, or the primary if there are none. In
-store terms this is "recovery" (old cluster called it an "update)
-
-Compared to old cluster we only replidate well defined data set of the store.
-This is the crucial sore spot of old cluster.
-
-We can also replicated it more efficiently by recovering queues in
-reverse (LIFO) order. That means as clients actively consume messages
-from the front of the queue, they are redeucing the work we have to do
-in recovering from the back. (NOTE: this may not be compatible with
-using the same recovery interfaces as the store.)
-
-** Selective replication
-In this model it's easy to support selective replication of individual queues via
-configuration.
-- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
- Treated analogously to persistent/durable properties for the store.
-- if not explicitly marked, provide a choice of default
- - default is replicate (replicated message on replicated queue)
- - default is don't replicate
- - default is replicate persistent/durable messages.
-
-** Inconsistent errors
-
-The new design eliminates most sources of inconsistent errors in the
-old design (connections, sessions, security, management etc.) and
-eliminates the need to stall the whole cluster till an error is
-resolved. We still have to handle inconsistent store errors when store
-and cluster are used together.
-
-We also have to include error handling in the async completion loop to
-guarantee N-way at least once: we should only report success to the
-client when we know the message was replicated and stored on all N-1
-backups.
-
-TODO: We have a lot more options than the old cluster, need to figure
-out the best approach, or possibly allow mutliple approaches. Need to
-go thru the various failure cases. We may be able to do recovery on a
-per-queue basis rather than restarting an entire node.
-
-** New members joining
-
-We should be able to catch up much faster than the the old design. A
-new backup can catch up ("recover") the current cluster state on a
-per-queue basis.
-- queues can be updated in parallel
-- "live" updates avoid the the "endless chase"
-
-During a "live" update several things are happening on a queue:
-- clients are publishing messages to the back of the queue, replicated to the backup
-- clients are consuming messages from the front of the queue, replicated to the backup.
-- the primary is sending pre-existing messages to the new backup.
-
-The primary sends pre-existing messages in LIFO order - starting from
-the back of the queue, at the same time clients are consuming from the front.
-The active consumers actually reduce the amount of work to be done, as there's
-no need to replicate messages that are no longer on the queue.
-
-* Steps to get there
-
-** Baseline replication
-Validate the overall design get initial notion of performance. Just
-message+wiring replication, no update/recovery for new members joining,
-single CPG dispatch thread on backups, no failover, no transactions.
-
-** Failover
-Electing primary, backups redirect to primary. Measure failover time
-for large # clients. Strategies to minimise number of retries after a
-failure.
-
-** Flow Control
-Keep internal queues from over-flowing. Similar to internal flow control in old cluster.
-Needed for realistic performance/stress tests
-
-** Concurrency
-Experiment with multiple threads on backups, multiple CPG groups.
-
-** Recovery/new member joining
-Initial status handshake for new member. Recovering queues from the back.
-
-** Transactions
-TODO: How to implement transactions with concurrency. Worst solution:
-a global --cluster-use-transactions flag that forces single thread
-mode. Need to find a better solution.
diff --git a/qpid/cpp/design_docs/replicating-browser-design.txt b/qpid/cpp/design_docs/new-ha-design.txt
similarity index 63%
rename from qpid/cpp/design_docs/replicating-browser-design.txt
rename to qpid/cpp/design_docs/new-ha-design.txt
index e304258..9b6d7d6 100644
--- a/qpid/cpp/design_docs/replicating-browser-design.txt
+++ b/qpid/cpp/design_docs/new-ha-design.txt
@@ -16,15 +16,14 @@
# specific language governing permissions and limitations
# under the License.
-* FIXME - rewrite all old stuff from hot-standby.txt.
-
-* Another new design for Qpid clustering.
+* An active-passive, hot-standby design for Qpid clustering.
For some background see [[./new-cluster-design.txt]] which describes the
issues with the old design and a new active-active design that could
replace it.
-This document describes an alternative active-passive approach.
+This document describes an alternative active-passive approach based on
+queue browsing to replicate message data.
** Active-active vs. active-passive (hot-standby)
@@ -59,28 +58,6 @@
exactly like the store does, but it "writes" to the replicas over the
network rather than writing to disk.
-** Failover in a hot-standby cluster.
-
-We want to delegate the failover management to an existing cluster
-resource manager. Initially this is rgmanager from Cluster Suite, but
-other managers (e.g. PaceMaker) could be supported in future.
-
-Rgmanager takes care of starting and stopping brokers and informing
-brokers of their roles as primary or backup. It ensures there's
-exactly one broker running at any time. It also tracks quorum and
-protects against split-brain.
-
-Rgmanger can also manage a virtual IP address so clients can just
-retry on a single address to fail over. Alternatively we will support
-configuring a fixed list of broker addresses when qpid is run outside
-of a resource manager.
-
-Aside: Cold-standby is also possible using rgmanager with shared
-storage for the message store (e.g. GFS). If the broker fails, another
-broker is started on a different node and and recovers from the
-store. This bears investigation but the store recovery times are
-likely too long for failover.
-
** Replicating browsers
The unit of replication is a replicating browser. This is an AMQP
@@ -93,6 +70,28 @@
all of the replicating browsers have signaled completion. Thus a completed
message is guaranteed to be on the backups.
+** Failover and Cluster Resource Managers
+
+We want to delegate the failover management to an existing cluster
+resource manager. Initially this is rgmanager from Cluster Suite, but
+other managers (e.g. PaceMaker) could be supported in future.
+
+Rgmanager takes care of starting and stopping brokers and informing
+brokers of their roles as primary or backup. It ensures there's
+exactly one primary broker running at any time. It also tracks quorum
+and protects against split-brain.
+
+Rgmanger can also manage a virtual IP address so clients can just
+retry on a single address to fail over. Alternatively we will also
+support configuring a fixed list of broker addresses when qpid is run
+outside of a resource manager.
+
+Aside: Cold-standby is also possible using rgmanager with shared
+storage for the message store (e.g. GFS). If the broker fails, another
+broker is started on a different node and and recovers from the
+store. This bears investigation but the store recovery times are
+likely too long for failover.
+
** Replicating wiring
New queues and exchanges and their bindings also need to be replicated.
@@ -105,9 +104,10 @@
manager takes care of membership and quorum.
** Selective replication
+
In this model it's easy to support selective replication of individual queues via
-configuration.
-- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
+configuration.
+- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
Treated analogously to persistent/durable properties for the store.
- if not explicitly marked, provide a choice of default
- default is replicate (replicated message on replicated queue)
@@ -136,7 +136,15 @@
go thru the various failure cases. We may be able to do recovery on a
per-queue basis rather than restarting an entire node.
-** New members joining
+
+** New backups joining
+
+New brokers can join the cluster as backups. Note - if the new broker
+has a new IP address, then the existing cluster members must be
+updated with a new client and broker URLs by a sysadmin.
+
+
+They discover
We should be able to catch up much faster than the the old design. A
new backup can catch up ("recover") the current cluster state on a
@@ -154,6 +162,85 @@
The active consumers actually reduce the amount of work to be done, as there's
no need to replicate messages that are no longer on the queue.
+** Broker discovery and lifecycle.
+
+The cluster has a client URL that can contain a single virtual IP
+address or a list of real IP addresses for the cluster.
+
+In backup mode, brokers reject connections except from a special
+cluster-admin user.
+
+Clients discover the primary by re-trying connection to the client URL
+until the successfully connect to the primary. In the case of a
+virtual IP they re-try the same address until it is relocated to the
+primary. In the case of a list of IPs the client tries each in
+turn. Clients do multiple retries over a configured period of time
+before giving up.
+
+Backup brokers discover the primary in the same way as clients. There
+is a separate broker URL for brokers since they often will connect
+over a different network to the clients. The broker URL has to be a
+list of IPs rather than one virtual IP so broker members can connect
+to each other.
+
+Brokers have the following states:
+- connecting: backup broker trying to connect to primary - loops retrying broker URL.
+- catchup: connected to primary, catching up on pre-existing wiring & messages.
+- backup: fully functional backup.
+- primary: Acting as primary, serving clients.
+
+** Interaction with rgmanager
+
+rgmanager interacts with qpid via 2 service scripts: backup & primary. These
+scripts interact with the underlying qpidd service.
+
+*** Initial cluster start
+
+rgmanager starts the backup service on all nodes and the primary service on one node.
+
+On the backup nodes qpidd is in the connecting state. The primary node goes into
+the primary state. Backups discover the primary, connect and catch up.
+
+*** Failover
+
+primary broker or node fails. Backup brokers see disconnect and go
+back to connecting mode.
+
+rgmanager notices the failure and starts the primary service on a new node.
+This tells qpidd to go to primary mode. Backups re-connect and catch up.
+
+*** Failback
+
+Cluster of N brokers has suffered a failure, only N-1 brokers
+remain. We want to start a new broker (possibly on a new node) to
+restore redundancy.
+
+If the new broker has a new IP address, the sysadmin pushes a new URL
+to all the existing brokers.
+
+The new broker starts in connecting mode. It discovers the primary,
+connects and catches up.
+
+*** Failure during failback
+
+A second failure occurs before the new backup B can complete its catch
+up. The backup B refuses to become primary by failing the primary
+start script if rgmanager chooses it, so rgmanager will try another
+(hopefully caught-up) broker to be primary.
+
+** Interaction with the store.
+
+# FIXME aconway 2011-11-16: TBD
+- how to identify the "best" store after a total cluster crash.
+- best = last to be primary.
+- primary "generation" - counter passed to backups and incremented by new primary.
+
+restart after crash:
+- clean/dirty flag on disk for admin shutdown vs. crash
+- dirty brokers refuse to be primary
+- sysamdin tells best broker to be primary
+- erase stores? delay loading?
+
** Current Limitations
(In no particular order at present)
@@ -219,6 +306,48 @@
LC5 - Need richer control over which queues/exchanges are propagated, and
which are not.
-Question: is it possible to miss an event on subscribing for
-configuration propagation? are the initial snapshot and subsequent
-events correctly synchronised?
+LC6 - The events and query responses are not fully synchronized.
+
+ In particular it *is* possible to not receive a delete event but
+ for the deleted object to still show up in the query response
+ (meaning the deletion is 'lost' to the update).
+
+ It is also possible for an create event to be received as well
+ as the created object being in the query response. Likewise it
+ is possible to receive a delete event and a query response in
+ which the object no longer appears. In these cases the event is
+ essentially redundant.
+
+ It is not possible to miss a create event and yet not to have
+ the object in question in the query response however.
+
+* User Documentation Notes
+
+Notes to seed initial user documentation. Loosely tracking the implementation,
+some points mentioned in the doc may not be implemented yet.
+
+** High Availability Overview
+Explain basic concepts: hot standby, primary/backup, replicated queue/exchange.
+Network topology: backup links, corosync, separate client/cluster networks.
+Describe failover mechanisms.
+- Client view: URLs, failover, exclusion & discovery.
+- Broker view: similar.
+Role of rmganager & corosync.
+
+** Client view.
+Clients use multi-address URL in base case.
+Clients can't connect to backups, retry till they find primary.
+Only qpid.cluster-admin can connect to backup, must not mess with replicated queues.
+Note connection known-hosts returns client URL, as does amq.failover exchange.
+
+Creating replicated queues & exchanges:
+- qpid.replicate argument,
+- examples using addressing and qpid-config)
+
+** Configuring corosync
+Must be on same network as backup links.
+
+** Configuring rgmanager
+
+** Configuring qpidd
+HA related options.
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index 59e951f..b8a1d26 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1498,6 +1498,10 @@
def genNamePackageLower (self, stream, variables):
stream.write (self.packageName.lower ())
+ def genPackageNameUpper (self, stream, variables):
+ up = "_".join(self.packageName.split("."))
+ stream.write (up.upper())
+
def genNameUpper (self, stream, variables):
stream.write (self.name.upper ())
@@ -1642,6 +1646,7 @@
def genNamespace (self, stream, variables):
stream.write("::".join(self.packageName.split(".")))
+
def genOpenNamespaces (self, stream, variables):
for item in self.packageName.split("."):
stream.write ("namespace %s {\n" % item)
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h
index 4bcd423..90f1b4d 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h
@@ -1,6 +1,6 @@
/*MGEN:commentPrefix=//*/
-#ifndef _MANAGEMENT_/*MGEN:Class.NameUpper*/_
-#define _MANAGEMENT_/*MGEN:Class.NameUpper*/_
+#ifndef _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_
+#define _MANAGEMENT_/*MGEN:Class.PackageNameUpper*/_/*MGEN:Class.NameUpper*/_
//
// Licensed to the Apache Software Foundation (ASF) under one
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 8fde735..c525f8b 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -104,7 +104,8 @@
-c $(srcdir)/managementgen.cmake -q -b -o qmf \
$(top_srcdir)/../specs/management-schema.xml \
$(srcdir)/qpid/acl/management-schema.xml \
- $(srcdir)/qpid/cluster/management-schema.xml
+ $(srcdir)/qpid/cluster/management-schema.xml \
+ $(srcdir)/qpid/ha/management-schema.xml
$(srcdir)/managementgen.mk $(mgen_broker_cpp) $(dist_qpid_management_HEADERS): mgen.timestamp
mgen.timestamp: $(mgen_generator)
@@ -209,6 +210,7 @@
cmoduleexec_LTLIBRARIES =
include cluster.mk
+include ha.mk
include acl.mk
include qmf.mk
include qmfc.mk
@@ -533,6 +535,7 @@
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionToken.h \
qpid/broker/Consumer.h \
+ qpid/broker/ConsumerFactory.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
qpid/broker/Deliverable.h \
@@ -593,8 +596,6 @@
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
- qpid/broker/NodeClone.h \
- qpid/broker/NodeClone.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
@@ -622,8 +623,6 @@
qpid/broker/QueuedMessage.h \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
- qpid/broker/QueueReplicator.h \
- qpid/broker/QueueReplicator.cpp \
qpid/broker/RateFlowcontrol.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
new file mode 100644
index 0000000..dc4e7c8
--- /dev/null
+++ b/qpid/cpp/src/ha.mk
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# HA plugin makefile fragment, to be included in Makefile.am
+#
+
+dmoduleexec_LTLIBRARIES += ha.la
+
+ha_la_SOURCES = \
+ qpid/ha/Backup.cpp \
+ qpid/ha/Backup.h \
+ qpid/ha/HaBroker.cpp \
+ qpid/ha/HaBroker.h \
+ qpid/ha/HaPlugin.cpp \
+ qpid/ha/Logging.h \
+ qpid/ha/Logging.cpp \
+ qpid/ha/Settings.h \
+ qpid/ha/QueueReplicator.h \
+ qpid/ha/QueueReplicator.cpp \
+ qpid/ha/ReplicatingSubscription.h \
+ qpid/ha/ReplicatingSubscription.cpp \
+ qpid/ha/WiringReplicator.cpp \
+ qpid/ha/WiringReplicator.h
+
+ha_la_LIBADD = libqpidbroker.la
+ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index b00cdd0..405482d 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,8 +24,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
-#include "qpid/broker/NodeClone.h"
-#include "qpid/broker/QueueReplicator.h"
+#include "qpid/ha/WiringReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
@@ -59,9 +58,11 @@
}
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
- const _qmf::ArgsLinkBridge& _args) :
+ const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0),
+ initialize(init)
{
std::stringstream title;
title << id << "_" << link->getBroker()->getFederationTag();
@@ -77,9 +78,9 @@
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
-Bridge::~Bridge()
+Bridge::~Bridge()
{
- mgmtObject->resourceDestroy();
+ mgmtObject->resourceDestroy();
}
void Bridge::create(Connection& c)
@@ -98,7 +99,7 @@
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
+
session->attach(name, false);
session->commandPoint(0,0);
} else {
@@ -108,60 +109,12 @@
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (args.i_srcIsQueue) {
- //TODO: something other than this which is nasty...
- bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options);
-
- peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options);
+ if (initialize) initialize(*this, sessionHandler);
+ else if (args.i_srcIsQueue) {
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
- } else if (NodeClone::isNodeCloneDestination(args.i_dest)) {
- //declare and bind an event queue
- peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable());
- peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable());
- //subscribe to the queue
- peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- //issue a query request for queues and another for exchanges using event queue as the reply-to address
- for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
- Variant::Map request;
- request["_what"] = "OBJECT";
- Variant::Map schema;
- schema["_class_name"] = (i == 0 ? "queue" : "exchange");
- schema["_package_name"] = "org.apache.qpid.broker";
- request["_schema_id"] = schema;
-
- AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0)));
- method.setBof(true);
- method.setEof(false);
- method.setBos(true);
- method.setEos(true);
- AMQHeaderBody headerBody;
- MessageProperties* props = headerBody.get<MessageProperties>(true);
- props->setReplyTo(qpid::framing::ReplyTo("", queueName));
- props->setAppId("qmf2");
- props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
- headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
- AMQFrame header(headerBody);
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- AMQContentBody data;
- qpid::amqp_0_10::MapCodec::encode(request, data.getData());
- AMQFrame content(data);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- sessionHandler.out->handle(method);
- sessionHandler.out->handle(header);
- sessionHandler.out->handle(content);
- }
-
} else {
FieldTable queueSettings;
@@ -236,11 +189,6 @@
persistenceId = pId;
}
-const string& Bridge::getName() const
-{
- return name;
-}
-
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
@@ -268,7 +216,7 @@
is_queue, is_local, id, excludes, dynamic, sync).first;
}
-void Bridge::encode(Buffer& buffer) const
+void Bridge::encode(Buffer& buffer) const
{
buffer.putShortString(string("bridge"));
buffer.putShortString(link->getHost());
@@ -285,8 +233,8 @@
buffer.putShort(args.i_sync);
}
-uint32_t Bridge::encodedSize() const
-{
+uint32_t Bridge::encodedSize() const
+{
return link->getHost().size() + 1 // short-string (host)
+ 7 // short-string ("bridge")
+ 2 // port
@@ -311,7 +259,7 @@
management::Args& /*args*/,
string&)
{
- if (methodId == _qmf::Bridge::METHOD_CLOSE) {
+ if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
destroy();
return management::Manageable::STATUS_OK;
@@ -358,7 +306,7 @@
conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
queueName, args.i_src, args.i_key, bindArgs));
}
-bool Bridge::resetProxy()
+bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(id);
if (!sessionHandler.getSession()) peer.reset();
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 8b4559a..b849b11 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,15 +42,19 @@
class ConnectionState;
class Link;
class LinkRegistry;
+class SessionHandler;
class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
typedef boost::function<void(Bridge*)> CancellationListener;
+ typedef boost::function<void(Bridge&, SessionHandler&)> InitializeCallback;
Bridge(Link* link, framing::ChannelId id, CancellationListener l,
- const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+ InitializeCallback init
+ );
~Bridge();
void create(Connection& c);
@@ -70,8 +74,8 @@
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
- void encode(framing::Buffer& buffer) const;
- const std::string& getName() const;
+ void encode(framing::Buffer& buffer) const;
+ const std::string& getName() const { return name; }
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
// Exchange::DynamicBridge methods
@@ -81,6 +85,10 @@
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
+ // Methods needed by initialization functions
+ std::string getQueueName() const { return queueName; }
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
+
private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
@@ -103,6 +111,7 @@
mutable uint64_t persistenceId;
ConnectionState* connState;
Connection* conn;
+ InitializeCallback initialize;
bool resetProxy();
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index b3b751b..e8ada49 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -37,6 +37,7 @@
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/ConsumerFactory.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -199,6 +200,7 @@
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
+ ConsumerFactories consumerFactories;
public:
virtual ~Broker();
@@ -357,6 +359,8 @@
const std::string& key,
const std::string& userId,
const std::string& connectionId);
+
+ ConsumerFactories& getConsumerFactories() { return consumerFactories; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 8451f35..0e20719 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -130,7 +130,7 @@
{
ScopedLock<Mutex> l(ioCallbackLock);
ioCallbacks.push(callback);
- out.activateOutput();
+ if (isOpen()) out.activateOutput();
}
Connection::~Connection()
@@ -156,11 +156,14 @@
void Connection::received(framing::AMQFrame& frame) {
// Received frame on connection so delay timeout
restartTimeout();
+ bool wasOpen = isOpen();
adapter.handle(frame);
if (isLink) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
+ if (!wasOpen && isOpen())
+ doIoCallbacks(); // Do any callbacks registered before we opened.
}
void Connection::sent(const framing::AMQFrame& frame)
@@ -329,17 +332,16 @@
}
void Connection::doIoCallbacks() {
- {
- ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context, they are
- // not cluster safe because they are queued for execution in non-IO threads.
- ClusterUnsafeScope cus;
- while (!ioCallbacks.empty()) {
- boost::function0<void> cb = ioCallbacks.front();
- ioCallbacks.pop();
- ScopedUnlock<Mutex> ul(ioCallbackLock);
- cb(); // Lend the IO thread for management processing
- }
+ if (!isOpen()) return; // Don't process IO callbacks until we are open.
+ ScopedLock<Mutex> l(ioCallbackLock);
+ // Although IO callbacks execute in the connection thread context, they are
+ // not cluster safe because they are queued for execution in non-IO threads.
+ ClusterUnsafeScope cus;
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
}
}
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index a3838fb..7ac7dba 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -31,6 +31,9 @@
class Queue;
class QueueListeners;
+/**
+ * Base class for consumers which represent a subscription to a queue.
+ */
class Consumer {
const bool acquires;
const bool browseAcquired;
diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
new file mode 100644
index 0000000..abd39fb
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONSUMERFACTORY_H
+#define QPID_BROKER_CONSUMERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
+// Refactor to use a more abstract interface.
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for consumer factoires. Plugins can register a
+ * ConsumerFactory via Broker:: getConsumerFactories() Each time a
+ * conumer is created, each factory is tried in turn till one returns
+ * non-0.
+ */
+class ConsumerFactory
+{
+ public:
+ virtual ~ConsumerFactory() {}
+
+ virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+ SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0;
+};
+
+/** A set of factories held by the broker
+ * THREAD UNSAFE: see notes on member functions.
+ */
+class ConsumerFactories {
+ public:
+ typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories;
+
+ /** Thread safety: May only be called during plug-in initialization. */
+ void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf); }
+
+ /** Thread safety: May only be called after plug-in initialization. */
+ const Factories& get() const { return factories; }
+
+ private:
+ Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONSUMERFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 43ca1ae..4fe76da 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -37,7 +37,7 @@
bool _acquired,
bool accepted,
bool _windowing,
- uint32_t _credit, bool _delayedCompletion) : msg(_msg),
+ uint32_t _credit, bool _isDelayedCompletion) : msg(_msg),
queue(_queue),
tag(_tag),
acquired(_acquired),
@@ -47,7 +47,7 @@
ended(accepted && acquired),
windowing(_windowing),
credit(msg.payload ? msg.payload->getRequiredCredit() : _credit),
- delayedCompletion(_delayedCompletion)
+ isDelayedCompletion(_isDelayedCompletion)
{}
bool DeliveryRecord::setEnded()
@@ -115,7 +115,7 @@
if (!ended) {
if (acquired) {
queue->dequeue(ctxt, msg);
- } else if (delayedCompletion) {
+ } else if (isDelayedCompletion) {
//TODO: this is a nasty way to do this; change it
msg.payload->getIngressCompletion().finishCompleter();
QPID_LOG(debug, "Completed " << msg.payload.get());
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 90e72aa..ea33ed5 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -63,7 +63,7 @@
* after that).
*/
uint32_t credit;
- bool delayedCompletion;
+ bool isDelayedCompletion;
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,7 +73,7 @@
bool accepted,
bool windowing,
uint32_t credit=0, // Only used if msg is empty.
- bool delayedCompletion=false
+ bool isDelayedCompletion=false
);
bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 8010bf4..f92d543 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -47,13 +47,13 @@
Link::Link(LinkRegistry* _links,
MessageStore* _store,
- string& _host,
+ const string& _host,
uint16_t _port,
- string& _transport,
+ const string& _transport,
bool _durable,
- string& _authMechanism,
- string& _username,
- string& _password,
+ const string& _authMechanism,
+ const string& _username,
+ const string& _password,
Broker* _broker,
Manageable* parent)
: links(_links), store(_store), host(_host), port(_port),
@@ -79,6 +79,7 @@
}
}
setStateLH(STATE_WAITING);
+ startConnectionLH();
}
Link::~Link ()
@@ -213,28 +214,30 @@
{
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
+ if (connection)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+
}
void Link::cancel(Bridge::shared_ptr bridge)
{
- {
- Mutex::ScopedLock mutex(lock);
+ Mutex::ScopedLock mutex(lock);
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
- }
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- cancellations.push_back(bridge);
- bridge->closed();
- active.erase(i);
- break;
- }
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
}
}
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
+ }
+
if (!cancellations.empty()) {
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -282,6 +285,8 @@
Mutex::ScopedLock mutex(lock);
connection = c;
updateUrls = true;
+ // Process any IO tasks bridges added before setConnection.
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::maintenanceVisit ()
@@ -311,7 +316,7 @@
}
else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
+ }
void Link::reconnect(const qpid::Address& a)
{
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 4badd8b..acaa9d1 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -92,19 +92,21 @@
Link(LinkRegistry* links,
MessageStore* store,
- std::string& host,
+ const std::string& host,
uint16_t port,
- std::string& transport,
+ const std::string& transport,
bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password,
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password,
Broker* broker,
management::Manageable* parent = 0);
virtual ~Link();
std::string getHost() { return host; }
uint16_t getPort() { return port; }
+ std::string getTransport() { return transport; }
+
bool isDurable() { return durable; }
void maintenanceVisit ();
uint nextChannel();
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index e9885f5..31b4f1b 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -124,13 +124,13 @@
}
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
uint16_t port,
- string& transport,
+ const string& transport,
bool durable,
- string& authMechanism,
- string& username,
- string& password)
+ const string& authMechanism,
+ const string& username,
+ const string& password)
{
Mutex::ScopedLock locker(lock);
@@ -151,18 +151,20 @@
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& tag,
- std::string& excludes,
+ const std::string& tag,
+ const std::string& excludes,
bool dynamic,
- uint16_t sync)
+ uint16_t sync,
+ Bridge::InitializeCallback init
+)
{
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
@@ -196,7 +198,8 @@
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key), args));
+ host, port, src, dest, key),
+ args, init));
bridges[bridgeKey] = bridge;
l->second->add(bridge);
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 4c97e4f..7e5b39f 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -84,28 +84,32 @@
~LinkRegistry();
std::pair<boost::shared_ptr<Link>, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
- std::string& transport,
+ const std::string& transport,
bool durable,
- std::string& authMechanism,
- std::string& username,
- std::string& password);
+ const std::string& authMechanism,
+ const std::string& username,
+ const std::string& password);
+
std::pair<Bridge::shared_ptr, bool>
- declare(std::string& host,
+ declare(const std::string& host,
uint16_t port,
bool durable,
- std::string& src,
- std::string& dest,
- std::string& key,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key,
bool isQueue,
bool isLocal,
- std::string& id,
- std::string& excludes,
+ const std::string& id,
+ const std::string& excludes,
bool dynamic,
- uint16_t sync);
+ uint16_t sync,
+ Bridge::InitializeCallback=0
+ );
void destroy(const std::string& host, const uint16_t port);
+
void destroy(const std::string& host,
const uint16_t port,
const std::string& src,
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.cpp b/qpid/cpp/src/qpid/broker/NodeClone.cpp
deleted file mode 100644
index e8fc227..0000000
--- a/qpid/cpp/src/qpid/broker/NodeClone.cpp
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "NodeClone.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/amqp_0_10/Codecs.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-
-using qmf::org::apache::qpid::broker::EventQueueDeclare;
-using qmf::org::apache::qpid::broker::EventQueueDelete;
-using qmf::org::apache::qpid::broker::EventExchangeDeclare;
-using qmf::org::apache::qpid::broker::EventExchangeDelete;
-
-namespace qpid {
-namespace broker {
-
-namespace{
-bool isQMFv2(const Message& message)
-{
- const qpid::framing::MessageProperties* props = message.getProperties<qpid::framing::MessageProperties>();
- return props && props->getAppId() == "qmf2";
-}
-
-template <class T> bool match(qpid::types::Variant::Map& schema)
-{
- return T::match(schema["_class_name"], schema["_package_name"]);
-}
-
-}
-
-NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b) {}
-
-NodeClone::~NodeClone() {}
-
-void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable* headers)
-{
- if (isQMFv2(msg.getMessage()) && headers) {
- if (headers->getAsString("qmf.content") == "_event") {
- //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- if (list.empty()) {
- QPID_LOG(error, "Error parsing QMF event, expected non-empty list");
- } else {
- try {
- qpid::types::Variant::Map& map = list.front().asMap();
- qpid::types::Variant::Map& schema = map["_schema_id"].asMap();
- qpid::types::Variant::Map& values = map["_values"].asMap();
- if (match<EventQueueDeclare>(schema)) {
- if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(), args);
- if (!broker.createQueue(
- values["qName"].asString(),
- values["durable"].asBool(),
- values["autoDel"].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- }
- } else if (match<EventQueueDelete>(schema)) {
- std::string name = values["qName"].asString();
- QPID_LOG(debug, "Notified of deletion of queue " << name);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
- broker.deleteQueue(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- } else {
- if (queue) {
- QPID_LOG(debug, "Ignoring deletion notification for non-propagated queue " << name);
- } else {
- QPID_LOG(debug, "No such queue " << name);
- }
- }
- } else if (match<EventExchangeDeclare>(schema)) {
- if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(), args);
- if (!broker.createExchange(
- values["exName"].asString(),
- values["exType"].asString(),
- values["durable"].asBool(),
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- }
- } else if (match<EventExchangeDelete>(schema)) {
- std::string name = values["exName"].asString();
- QPID_LOG(debug, "Notified of deletion of exchange " << name);
- try {
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
- if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
- broker.deleteExchange(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- } else {
- if (exchange) {
- QPID_LOG(debug, "Ignoring deletion notification for non-propagated exchange " << name);
- } else {
- QPID_LOG(debug, "No such exchange " << name);
- }
- }
- } catch (const qpid::framing::NotFoundException&) {}
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Error propagating configuration: " << e.what());
- }
- }
- } else if (headers->getAsString("qmf.opcode") == "_query_response") {
- //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- QPID_LOG(debug, "Got query response (" << list.size() << ")");
- for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
- std::string type = i->asMap()["_schema_id"].asMap()["_class_name"];
- qpid::types::Variant::Map& values = i->asMap()["_values"].asMap();
- QPID_LOG(debug, "class: " << type << ", values: " << values);
- if (values["arguments"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["arguments"].asMap(), args);
- if (type == "queue") {
- if (!broker.createQueue(
- values["name"].asString(),
- values["durable"].asBool(),
- values["autoDelete"].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " << values["name"] << " already exists");
- }
- } else if (type == "exchange") {
- if (!broker.createExchange(
- values["name"].asString(),
- values["type"].asString(),
- values["durable"].asBool(),
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- } else {
- QPID_LOG(warning, "Ignoring unknow object class: " << type);
- }
- }
- }
- } else {
- QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers);
- }
- } else {
- QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response");
- }
-}
-
-bool NodeClone::isNodeCloneDestination(const std::string& target)
-{
- return target == "qpid.node-cloner";
-}
-
-boost::shared_ptr<Exchange> NodeClone::create(const std::string& target, Broker& broker)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isNodeCloneDestination(target)) {
- //TODO: need to cache the exchange
- QPID_LOG(info, "Creating node cloner");
- exchange.reset(new NodeClone(target, broker));
- }
- return exchange;
-}
-
-bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-const std::string NodeClone::typeName("node-cloner");
-
-std::string NodeClone::getType() const
-{
- return typeName;
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.h b/qpid/cpp/src/qpid/broker/NodeClone.h
deleted file mode 100644
index 71cac61..0000000
--- a/qpid/cpp/src/qpid/broker/NodeClone.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#ifndef QPID_BROKER_NODEPROPAGATOR_H
-#define QPID_BROKER_NODEPROPAGATOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Exchange.h"
-
-namespace qpid {
-namespace broker {
-
-class Broker;
-
-/**
- * Pseudo-exchange for recreating local queues and/or exchanges on
- * receipt of QMF events indicating their creation on another node
- */
-class NodeClone : public Exchange
-{
- public:
- NodeClone(const std::string&, Broker&);
- ~NodeClone();
- std::string getType() const;
- bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
- bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
-
- static bool isNodeCloneDestination(const std::string&);
- static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
- static const std::string typeName;
- private:
- Broker& broker;
-};
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_NODEPROPAGATOR_H*/
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index c47ba55..0e85fe1 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1378,7 +1378,7 @@
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index b66600e..a53916f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -403,7 +403,7 @@
void flush();
- const Broker* getBroker();
+ Broker* getBroker();
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
deleted file mode 100644
index 01c0c8e..0000000
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/QueueReplicator.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/framing/SequenceSet.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace broker {
-
-QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {}
-QueueReplicator::~QueueReplicator() {}
-
-namespace {
-const std::string DEQUEUE_EVENT("dequeue-event");
-const std::string REPLICATOR("qpid.replicator-");
-}
-
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
-{
- if (key == DEQUEUE_EVENT) {
- std::string content;
- msg.getMessage().getFrames().getContent(content);
- qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
- qpid::framing::SequenceSet latest;
- latest.decode(buffer);
-
- //TODO: should be able to optimise the following
- for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
- if (current < *i) {
- //haven't got that far yet, record the dequeue
- dequeued.add(*i);
- QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName());
- } else {
- QueuedMessage message;
- if (queue->acquireMessageAt(*i, message)) {
- queue->dequeue(0, message);
- QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName());
- } else {
- QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName());
- }
- }
- }
- } else {
- //take account of any gaps in sequence created by messages
- //dequeued before our subscription reached them
- while (dequeued.contains(++current)) {
- dequeued.remove(current);
- QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName());
- queue->setPosition(current);
- }
- QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current);
- msg.deliverTo(queue);
- }
-}
-
-bool QueueReplicator::isReplicatingLink(const std::string& name)
-{
- return name.find(REPLICATOR) == 0;
-}
-
-boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target, QueueRegistry& queues)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (!queue) {
- QPID_LOG(warning, "Unable to create replicator, can't find " << queueName);
- } else {
- //TODO: need to cache the replicator
- QPID_LOG(info, "Creating replicator for " << queueName);
- exchange.reset(new QueueReplicator(target, queue));
- }
- }
- return exchange;
-}
-
-bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings)
-{
- if (isReplicatingLink(target)) {
- std::string queueName = target.substr(REPLICATOR.size());
- boost::shared_ptr<Queue> queue = queues.find(queueName);
- if (queue) {
- settings.setInt("qpid.replicating-subscription", 1);
- settings.setInt("qpid.high_sequence_number", queue->getPosition());
- qpid::framing::SequenceNumber oldest;
- if (queue->getOldest(oldest)) {
- settings.setInt("qpid.low_sequence_number", oldest);
- }
- }
- return true;
- } else {
- return false;
- }
-}
-
-bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-const std::string QueueReplicator::typeName("queue-replicator");
-
-std::string QueueReplicator::getType() const
-{
- return typeName;
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/broker/QueueReplicator.h
deleted file mode 100644
index 679aa92..0000000
--- a/qpid/cpp/src/qpid/broker/QueueReplicator.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef QPID_BROKER_QUEUEREPLICATOR_H
-#define QPID_BROKER_QUEUEREPLICATOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Exchange.h"
-#include "qpid/framing/SequenceSet.h"
-
-namespace qpid {
-namespace broker {
-
-class QueueRegistry;
-
-/**
- * Dummy exchange for processing replication messages
- */
-class QueueReplicator : public Exchange
-{
- public:
- QueueReplicator(const std::string& name, boost::shared_ptr<Queue>);
- ~QueueReplicator();
- std::string getType() const;
- bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
- void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
- bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
- static bool isReplicatingLink(const std::string&);
- static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&);
- static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&);
- static const std::string typeName;
- private:
- boost::shared_ptr<Queue> queue;
- qpid::framing::SequenceNumber current;
- qpid::framing::SequenceSet dequeued;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 7d10322..43b5d99 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -25,9 +25,7 @@
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/NodeClone.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
@@ -110,15 +108,25 @@
namespace {
const std::string SEPARATOR("::");
}
-
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
- bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
+ bool exclusive, const string& resumeId, uint64_t resumeTtl,
+ const FieldTable& arguments)
{
// "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
// Create a globally unique name so the broker can identify individual consumers
std::string name = session.getSessionId().str() + SEPARATOR + tag;
- ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ const ConsumerFactories::Factories& cf(
+ session.getBroker().getConsumerFactories().get());
+ ConsumerImpl::shared_ptr c;
+ for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i)
+ c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments);
+ if (!c) // Create plain consumer
+ c = ConsumerImpl::shared_ptr(
+ new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -268,224 +276,6 @@
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
-{
- public:
- ReplicatingSubscription(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
- ~ReplicatingSubscription();
-
- void init();
- void cancel();
- bool deliver(QueuedMessage& msg);
- void enqueued(const QueuedMessage&);
- void dequeued(const QueuedMessage&);
- void acquired(const QueuedMessage&) {}
- void requeued(const QueuedMessage&) {}
-
- protected:
- bool doDispatch();
- private:
- boost::shared_ptr<Queue> events;
- boost::shared_ptr<Consumer> consumer;
- qpid::framing::SequenceSet range;
-
- void generateDequeueEvent();
- class DelegatingConsumer : public Consumer
- {
- public:
- DelegatingConsumer(ReplicatingSubscription&);
- ~DelegatingConsumer();
- bool deliver(QueuedMessage& msg);
- void notify();
- //bool filter(boost::intrusive_ptr<Message>);
- //bool accept(boost::intrusive_ptr<Message>);
- Consumer::Action accept(const QueuedMessage&);
- OwnershipToken* getSession();
- private:
- ReplicatingSubscription& delegate;
- };
-};
-
-SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent,
- const string& name,
- Queue::shared_ptr queue,
- bool ack,
- bool acquire,
- bool exclusive,
- const string& tag,
- const string& resumeId,
- uint64_t resumeTtl,
- const framing::FieldTable& arguments)
-{
- if (arguments.isSet("qpid.replicating-subscription")) {
- shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
- boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init();
- return result;
- } else {
- return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
- }
-}
-
-std::string mask(const std::string& in)
-{
- return std::string("$") + in + std::string("_internal");
-}
-
-class ReplicationStateInitialiser
-{
- public:
- ReplicationStateInitialiser(qpid::framing::SequenceSet& results,
- const qpid::framing::SequenceNumber& start,
- const qpid::framing::SequenceNumber& end);
- void operator()(const QueuedMessage& m) { process(m); }
- private:
- qpid::framing::SequenceSet& results;
- const qpid::framing::SequenceNumber start;
- const qpid::framing::SequenceNumber end;
- void process(const QueuedMessage&);
-};
-
-ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
- bool ack,
- bool _acquire,
- bool _exclusive,
- const string& _tag,
- const string& _resumeId,
- uint64_t _resumeTtl,
- const framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
- events(new Queue(mask(_name))),
- consumer(new DelegatingConsumer(*this))
-{
-
- if (_arguments.isSet("qpid.high_sequence_number")) {
- qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
- qpid::framing::SequenceNumber lwm;
- if (_arguments.isSet("qpid.low_sequence_number")) {
- lwm = _arguments.getAsInt("qpid.low_sequence_number");
- } else {
- lwm = hwm;
- }
- qpid::framing::SequenceNumber oldest;
- if (_queue->getOldest(oldest)) {
- if (oldest >= hwm) {
- range.add(lwm, --oldest);
- } else if (oldest >= lwm) {
- ReplicationStateInitialiser initialiser(range, lwm, hwm);
- _queue->eachMessage(initialiser);
- } else { //i.e. have older message on master than is reported to exist on replica
- QPID_LOG(warning, "Replica appears to be missing message on master");
- }
- } else {
- //local queue (i.e. master) is empty
- range.add(lwm, _queue->getPosition());
- }
- QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range
- << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")");
- //set position of 'cursor'
- position = hwm;
- }
-}
-
-bool ReplicatingSubscription::deliver(QueuedMessage& m)
-{
- return ConsumerImpl::deliver(m);
-}
-
-void ReplicatingSubscription::init()
-{
- getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-void ReplicatingSubscription::cancel()
-{
- getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-//called before we get notified of the message being available and
-//under the message lock in the queue
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
- QPID_LOG(debug, "Enqueued message at " << m.position);
- //delay completion
- m.payload->getIngressCompletion().startCompleter();
- QPID_LOG(debug, "Delayed " << m.payload.get());
-}
-
-class Buffer : public qpid::framing::Buffer
-{
- public:
- Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {}
- ~Buffer() { delete[] getPointer(); }
-};
-
-void ReplicatingSubscription::generateDequeueEvent()
-{
- Buffer buffer(range.encodedSize());
- range.encode(buffer);
- range.clear();
- buffer.reset();
-
- //generate event message
- boost::intrusive_ptr<Message> event = new Message();
- AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody()));
- content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- event->getFrames().append(method);
- event->getFrames().append(header);
- event->getFrames().append(content);
-
- DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey("dequeue-event");
-
- events->deliver(event);
-}
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
- {
- Mutex::ScopedLock l(lock);
- range.add(m.position);
- QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position);
- }
- notify();
- if (m.position > position) {
- m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue");
- }
-}
-
-bool ReplicatingSubscription::doDispatch()
-{
- {
- Mutex::ScopedLock l(lock);
- if (!range.empty()) {
- generateDequeueEvent();
- }
- }
- bool r1 = events->dispatch(consumer);
- bool r2 = ConsumerImpl::doDispatch();
- return r1 || r2;
-}
-
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
@@ -497,7 +287,6 @@
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
-
) :
Consumer(_name, _acquire, !_acquire), /** @todo KAG - allow configuration of 'browse acquired' */
parent(_parent),
@@ -512,7 +301,7 @@
tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
+ msgCredit(0),
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -558,7 +347,7 @@
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, dynamic_cast<const ReplicatingSubscription*>(this));
+ DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, windowing, 0, isDelayedCompletion());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
@@ -714,10 +503,10 @@
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) {
- cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
- if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, getSession().getBroker());
- if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+ if (!cacheExchange || cacheExchange->getName() != exchangeName
+ || cacheExchange->isDestroyed())
+ {
+ cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
@@ -1099,36 +888,4 @@
}
}
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
-{
- return delegate.deliver(m);
-}
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
-ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r,
- const qpid::framing::SequenceNumber& s,
- const qpid::framing::SequenceNumber& e)
- : results(r), start(s), end(e)
-{
- results.add(start, end);
-}
-
-void ReplicationStateInitialiser::process(const QueuedMessage& message)
-{
- if (message.position < start) {
- //replica does not have a message that should still be on the queue
- QPID_LOG(warning, "Replica appears to be missing message at " << message.position);
- } else if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued, so remove it from the results
- results.remove(message.position);
- } //else message has not been seen by replica yet so can be ignored here
-
-}
-
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 62c829e..e467ec6 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -157,11 +157,10 @@
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- static shared_ptr create(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
-
+ /** This consumer wants delayed completion.
+ * Overridden by ConsumerImpl subclasses.
+ */
+ virtual bool isDelayedCompletion() const { return false; }
};
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index ca6d6bb..8cd5072 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -23,6 +23,7 @@
*/
#include "qpid/amqp_0_10/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index fe66b77..f56697c 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -208,6 +208,8 @@
void queueDequeueSincePurgeState(const std::string&, uint32_t);
+ bool isAnnounced() const { return announced; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4bf03ee..2cd1cf9 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -97,7 +97,7 @@
}
void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
- if (parent.isLocal() && !sentDoOutput && !closing) {
+ if (parent.isLocal() && !sentDoOutput && !closing && parent.isAnnounced()) {
sentDoOutput = true;
parent.getCluster().getMulticast().mcastControl(
ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
new file mode 100644
index 0000000..17da13e
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "Settings.h"
+#include "WiringReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using types::Variant;
+using std::string;
+
+Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
+ // FIXME aconway 2011-11-24: identifying the primary.
+ if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
+ Url url(s.brokerUrl);
+ QPID_LOG(info, "HA: Acting as backup");
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+
+ // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
+ // Declare the link
+ std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ s.mechanism, s.username, s.password);
+ assert(result.second); // FIXME aconway 2011-11-23: error handling
+ link = result.first;
+ boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+ broker.getExchanges().registerExchange(wr);
+ }
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
new file mode 100644
index 0000000..b4183a4
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -0,0 +1,56 @@
+#ifndef QPID_HA_BACKUP_H
+#define QPID_HA_BACKUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+#include "qpid/Url.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class Link;
+}
+
+namespace ha {
+class Settings;
+
+/**
+ * State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE: trivially because currently it only has a constructor.
+ * May need locking as the functionality grows.
+ */
+class Backup
+{
+ public:
+ Backup(broker::Broker&, const Settings&);
+
+ private:
+ broker::Broker& broker;
+ Settings settings;
+ boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BACKUP_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
new file mode 100644
index 0000000..22b7e46
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "HaBroker.h"
+#include "Settings.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+namespace _qmf = ::qmf::org::apache::qpid::ha;
+using namespace management;
+using namespace std;
+
+namespace {
+Url url(const std::string& s, const std::string& id) {
+ try {
+ return Url(s);
+ } catch (const std::exception& e) {
+ throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'");
+ }
+}
+} // namespace
+
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+ : broker(b),
+ clientUrl(url(s.clientUrl, "ha-client-url")),
+ brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+ mgmtObject(0)
+{
+ ManagementAgent* ma = broker.getManagementAgent();
+ if (ma) {
+ _qmf::Package packageInit(ma);
+ mgmtObject = new _qmf::HaBroker(ma, this);
+ // FIXME aconway 2011-11-11: Placeholder - initialize cluster role.
+ mgmtObject->set_status("solo");
+ ma->addObject(mgmtObject);
+ }
+ QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
+ << " broker-url=" << brokerUrl);
+ backup.reset(new Backup(broker, s));
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory()));
+}
+
+HaBroker::~HaBroker() {}
+
+Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
+ switch (methodId) {
+ case _qmf::HaBroker::METHOD_SETSTATUS: {
+ std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status;
+ // FIXME aconway 2011-11-11: placeholder, validate & execute status change.
+ mgmtObject->set_status(status);
+ break;
+ }
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
+ }
+ return Manageable::STATUS_OK;
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
new file mode 100644
index 0000000..1a835c9
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -0,0 +1,62 @@
+#ifndef QPID_HA_BROKER_H
+#define QPID_HA_BROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Url.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
+#include "qpid/management/Manageable.h"
+
+namespace qpid {
+namespace broker {
+class Broker;
+}
+namespace ha {
+class Settings;
+class Backup;
+
+/**
+ * HA state and actions associated with a broker.
+ *
+ * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+ */
+class HaBroker : public management::Manageable
+{
+ public:
+ HaBroker(broker::Broker&, const Settings&);
+ ~HaBroker();
+
+ // Implement Manageable.
+ qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+ management::Manageable::status_t ManagementMethod (
+ uint32_t methodId, management::Args& args, std::string& text);
+
+ private:
+ broker::Broker& broker;
+ Url clientUrl, brokerUrl;
+ std::auto_ptr<Backup> backup;
+ qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BROKER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
new file mode 100644
index 0000000..80f21e4
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "HaBroker.h"
+#include "Settings.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Broker.h"
+
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+struct Options : public qpid::Options {
+ Settings& settings;
+ Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
+ addOptions()
+ ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features")
+ ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.")
+ ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.")
+ ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
+ ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
+ ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+ ;
+ }
+};
+
+struct HaPlugin : public Plugin {
+
+ Settings settings;
+ Options options;
+ auto_ptr<HaBroker> haBroker;
+
+ HaPlugin() : options(settings) {}
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Plugin::Target& ) {}
+
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && settings.enabled) {
+ haBroker.reset(new ha::HaBroker(*broker, settings));
+ } else
+ QPID_LOG(info, "HA: Disabled");
+ }
+};
+
+static HaPlugin instance; // Static initialization.
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.cpp b/qpid/cpp/src/qpid/ha/Logging.cpp
new file mode 100644
index 0000000..7d8ee38
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Logging.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace ha {
+
+QueuePos::QueuePos(const broker::QueuedMessage& qm)
+ : queue(qm.queue), position(qm.position) {}
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& qp) {
+ return o << qp.queue->getName() << "[" << qp.position << "]";
+}
+
+}} // namesopace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Logging.h b/qpid/cpp/src/qpid/ha/Logging.h
new file mode 100644
index 0000000..3b12baa
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Logging.h
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_HAOSTREAM_H
+#define QPID_HA_HAOSTREAM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iosfwd>
+
+/**@file ostream helpers used in log messages. */
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace framing {
+class SequenceNumber;
+}
+
+namespace ha {
+
+// Other printable helpers
+
+struct QueuePos {
+ const broker::Queue* queue;
+ const framing::SequenceNumber& position;
+ QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos)
+ : queue(q), position(pos) {}
+ QueuePos(const broker::QueuedMessage& qm);
+};
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& h);
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_HAOSTREAM_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
new file mode 100644
index 0000000..2de9ec5
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "Logging.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+}
+
+namespace qpid {
+namespace ha {
+using namespace broker;
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
+ : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
+ queue(q), link(l), current(queue->getPosition())
+{
+ // FIXME aconway 2011-11-24: consistent logging.
+ QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
+ queue->getBroker()->getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ queue->getName(), // src
+ getName(), // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+QueueReplicator::~QueueReplicator() {}
+
+// NB: This is called back ina broker connection thread when the
+// bridge is created.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ // No lock needed, no mutable member variables are used.
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+ framing::FieldTable settings;
+ // FIXME aconway 2011-11-28: string constants.
+ settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest))
+ settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
+
+ peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
+}
+
+
+namespace {
+const std::string DEQUEUE_EVENT("dequeue-event");
+const std::string REPLICATOR("qpid.replicator-");
+}
+
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
+{
+ if (key == DEQUEUE_EVENT) {
+ std::string content;
+ msg.getMessage().getFrames().getContent(content);
+ qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+ qpid::framing::SequenceSet latest;
+ latest.decode(buffer);
+
+ //TODO: should be able to optimise the following
+ for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
+ if (current < *i) {
+ //haven't got that far yet, record the dequeue
+ dequeued.add(*i);
+ QPID_LOG(trace, "HA: Recording dequeue of message at " <<
+ QueuePos(queue.get(), *i));
+ } else {
+ QueuedMessage message;
+ if (queue->acquireMessageAt(*i, message)) {
+ queue->dequeue(0, message);
+ QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
+ } else {
+ // FIXME aconway 2011-11-29: error handling
+ QPID_LOG(error, "HA: Unable to dequeue message at "
+ << QueuePos(queue.get(), *i));
+ }
+ }
+ }
+ } else {
+ //take account of any gaps in sequence created by messages
+ //dequeued before our subscription reached them
+ while (dequeued.contains(++current)) {
+ dequeued.remove(current);
+ QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName());
+ queue->setPosition(current);
+ }
+ QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current);
+ msg.deliverTo(queue);
+ }
+}
+
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
+
+// FIXME aconway 2011-11-28: rationalise string constants.
+static const std::string TYPE_NAME("qpid.queue-replicator");
+
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
new file mode 100644
index 0000000..8085c11
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -0,0 +1,72 @@
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceSet.h"
+
+namespace qpid {
+
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
+class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
+
+/**
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE.
+ */
+class QueueReplicator : public broker::Exchange
+{
+ public:
+ QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
+ ~QueueReplicator();
+ std::string getType() const;
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+
+ sys::Mutex lock;
+ boost::shared_ptr<broker::Queue> queue;
+ boost::shared_ptr<broker::Link> link;
+ framing::SequenceNumber current;
+ framing::SequenceSet dequeued;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
new file mode 100644
index 0000000..aabbd43
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "Logging.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using namespace std;
+
+// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
+// Do we want a common HA prefix?
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
+const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+
+const string DOLLAR("$");
+const string INTERNAL("-internal");
+
+class ReplicationStateInitialiser
+{
+ public:
+ ReplicationStateInitialiser(
+ qpid::framing::SequenceSet& r,
+ const qpid::framing::SequenceNumber& s,
+ const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+ {
+ results.add(start, end);
+ }
+
+ void operator()(const QueuedMessage& message) {
+ if (message.position < start) {
+ //replica does not have a message that should still be on the queue
+ QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
+ } else if (message.position >= start && message.position <= end) {
+ //i.e. message is within the intial range and has not been dequeued, so remove it from the results
+ results.remove(message.position);
+ } //else message has not been seen by replica yet so can be ignored here
+ }
+
+ private:
+ qpid::framing::SequenceSet& results;
+ const qpid::framing::SequenceNumber start;
+ const qpid::framing::SequenceNumber end;
+};
+
+string mask(const string& in)
+{
+ return DOLLAR + in + INTERNAL;
+}
+
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) {
+ return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
+ new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments),
+ events(new Queue(mask(name))),
+ consumer(new DelegatingConsumer(*this))
+{
+ QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
+ // FIXME aconway 2011-11-25: string constants.
+ if (arguments.isSet("qpid.high_sequence_number")) {
+ qpid::framing::SequenceNumber hwm = arguments.getAsInt("qpid.high_sequence_number");
+ qpid::framing::SequenceNumber lwm;
+ if (arguments.isSet("qpid.low_sequence_number")) {
+ lwm = arguments.getAsInt("qpid.low_sequence_number");
+ } else {
+ lwm = hwm;
+ }
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest)) {
+ if (oldest >= hwm) {
+ range.add(lwm, --oldest);
+ } else if (oldest >= lwm) {
+ ReplicationStateInitialiser initialiser(range, lwm, hwm);
+ queue->eachMessage(initialiser);
+ } else { //i.e. have older message on master than is reported to exist on replica
+ QPID_LOG(warning, "HA: Replica missing message on master");
+ }
+ } else {
+ //local queue (i.e. master) is empty
+ range.add(lwm, queue->getPosition());
+ }
+ QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range
+ << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")");
+ //set position of 'cursor'
+ position = hwm;
+ }
+}
+
+bool ReplicatingSubscription::deliver(QueuedMessage& m)
+{
+ return ConsumerImpl::deliver(m);
+}
+
+void ReplicatingSubscription::cancel()
+{
+ getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+//called before we get notified of the message being available and
+//under the message lock in the queue
+void ReplicatingSubscription::enqueued(const QueuedMessage& m)
+{
+ QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
+ //delay completion
+ m.payload->getIngressCompletion().startCompleter();
+}
+
+void ReplicatingSubscription::generateDequeueEvent()
+{
+ string buf(range.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ range.encode(buffer);
+ range.clear();
+ buffer.reset();
+
+ //generate event message
+ boost::intrusive_ptr<Message> event = new Message();
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+ content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ event->getFrames().append(method);
+ event->getFrames().append(header);
+ event->getFrames().append(content);
+
+ DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ props->setRoutingKey("dequeue-event");
+
+ events->deliver(event);
+}
+
+//called after the message has been removed from the deque and under
+//the message lock in the queue
+void ReplicatingSubscription::dequeued(const QueuedMessage& m)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ range.add(m.position);
+ // FIXME aconway 2011-11-29: q[pos]
+ QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
+ }
+ notify();
+ if (m.position > position) {
+ m.payload->getIngressCompletion().finishCompleter();
+ QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue");
+ }
+}
+
+bool ReplicatingSubscription::doDispatch()
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!range.empty()) {
+ generateDequeueEvent();
+ }
+ }
+ bool r1 = events->dispatch(consumer);
+ bool r2 = ConsumerImpl::doDispatch();
+ return r1 || r2;
+}
+
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
+{
+ return delegate.deliver(m);
+}
+void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
+bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
new file mode 100644
index 0000000..b83842a
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -0,0 +1,109 @@
+#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace ha {
+
+/**
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD UNSAFE: used only in broker connection thread.
+ */
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
+{
+ public:
+ struct Factory : public broker::ConsumerFactory {
+ boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ };
+
+ // Argument names for consume command.
+ static const std::string QPID_REPLICATING_SUBSCRIPTION;
+ static const std::string QPID_HIGH_SEQUENCE_NUMBER;
+ static const std::string QPID_LOW_SEQUENCE_NUMBER;
+
+ ReplicatingSubscription(broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+
+ ~ReplicatingSubscription();
+
+ void cancel();
+ bool deliver(broker::QueuedMessage& msg);
+ void enqueued(const broker::QueuedMessage&);
+ void dequeued(const broker::QueuedMessage&);
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+
+ bool isDelayedCompletion() const { return true; }
+
+ protected:
+ bool doDispatch();
+ private:
+ boost::shared_ptr<broker::Queue> events;
+ boost::shared_ptr<broker::Consumer> consumer;
+ qpid::framing::SequenceSet range;
+
+ void generateDequeueEvent();
+ class DelegatingConsumer : public Consumer
+ {
+ public:
+ DelegatingConsumer(ReplicatingSubscription&);
+ ~DelegatingConsumer();
+ bool deliver(broker::QueuedMessage& msg);
+ void notify();
+ bool filter(boost::intrusive_ptr<broker::Message>);
+ bool accept(boost::intrusive_ptr<broker::Message>);
+ broker::OwnershipToken* getSession();
+ private:
+ ReplicatingSubscription& delegate;
+ };
+};
+
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
new file mode 100644
index 0000000..a2d2e89
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -0,0 +1,47 @@
+#ifndef QPID_HA_SETTINGS_H
+#define QPID_HA_SETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace ha {
+
+using std::string;
+
+/**
+ * Configurable settings for HA.
+ */
+class Settings
+{
+ public:
+ Settings() : enabled(false) {}
+ bool enabled;
+ string clientUrl;
+ string brokerUrl;
+ string username, password, mechanism;
+ private:
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_SETTINGS_H*/
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
new file mode 100644
index 0000000..125e2c0
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -0,0 +1,463 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "WiringReplicator.h"
+#include "QueueReplicator.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+
+namespace qpid {
+namespace ha {
+
+using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
+using namespace framing;
+using std::string;
+using types::Variant;
+using namespace broker;
+
+namespace {
+
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_REPLICATE("qpid.replicate");
+
+const string CLASS_NAME("_class_name");
+const string EVENT("_event");
+const string OBJECT_NAME("_object_name");
+const string PACKAGE_NAME("_package_name");
+const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
+
+const string ALTEX("altEx");
+const string ARGS("args");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
+const string BIND("bind");
+const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
+const string DURABLE("durable");
+const string EXCHANGE("exchange");
+const string EXNAME("exName");
+const string EXTYPE("exType");
+const string KEY("key");
+const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
+const string TYPE("type");
+const string USER("user");
+
+const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
+
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+
+bool isQMFv2(const Message& message) {
+ const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
+ return props && props->getAppId() == QMF2;
+}
+
+template <class T> bool match(Variant::Map& schema) {
+ return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
+}
+
+// FIXME aconway 2011-11-24: this should be a class.
+enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL };
+const string S_NONE="none";
+const string S_WIRING="wiring";
+const string S_ALL="all";
+
+ReplicateLevel replicateLevel(const string& str) {
+ // FIXME aconway 2011-11-24: case insenstive comparison.
+ ReplicateLevel rl = RL_NONE;
+ if (str == S_WIRING) rl = RL_WIRING;
+ else if (str == S_ALL) rl = RL_ALL;
+ return rl;
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else return RL_NONE;
+}
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end()) return replicateLevel(i->second.asString());
+ else return RL_NONE;
+}
+
+void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
+} // namespace
+
+WiringReplicator::~WiringReplicator() {}
+
+WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+{
+ QPID_LOG(debug, "HA: Starting replication from " <<
+ link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
+ broker.getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ QPID_WIRING_REPLICATOR, // src
+ QPID_WIRING_REPLICATOR, // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&WiringReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "HA: Activated wiring replicator")
+}
+
+void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+ Variant::List list;
+ try {
+ if (!isQMFv2(msg.getMessage()) || !headers)
+ throw Exception("Unexpected message, not QMF2 event or query response.");
+ // decode as list
+ string content = msg.getMessage().getFrames().getContent();
+ amqp_0_10::ListCodec::decode(content, list);
+
+ if (headers->getAsString(QMF_CONTENT) == EVENT) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ Variant::Map& map = i->asMap();
+ Variant::Map& schema = map[SCHEMA_ID].asMap();
+ Variant::Map& values = map[VALUES].asMap();
+ QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values);
+ if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
+ else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
+ else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
+ else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
+ else if (match<EventBind>(schema)) doEventBind(values);
+ // FIXME aconway 2011-11-21: handle unbind & all other events.
+ else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
+ else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema)));
+ }
+ } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+ Variant::Map& values = i->asMap()[VALUES].asMap();
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values);
+ if (type == QUEUE) doResponseQueue(values);
+ else if (type == EXCHANGE) doResponseExchange(values);
+ else if (type == BINDING) doResponseBind(values);
+ else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
+ }
+ } else {
+ QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers));
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
+ QPID_LOG(debug, "HA: Error processing configuration message: " << list);
+ }
+}
+
+void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
+ string name = values[QNAME].asString();
+ Variant::Map argsMap = values[ARGS].asMap();
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+
+ QPID_LOG(debug, "HA: Creating queue from event " << name);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODEL].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString());
+ if (result.second) {
+ // FIXME aconway 2011-11-22: should delete old queue and
+ // re-create from event.
+ // Events are always up to date, whereas responses may be
+ // out of date.
+ QPID_LOG(debug, "HA: New queue replica " << name);
+ startQueueReplicator(result.first);
+ } else {
+ QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
+ }
+ }
+}
+
+void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
+ string name = values[QNAME].asString();
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ if (queue && replicateLevel(queue->getSettings())) {
+ QPID_LOG(debug, "HA: Deleting queue from event: " << name);
+ broker.deleteQueue(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
+}
+
+void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
+ Variant::Map argsMap(values[ARGS].asMap());
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ string name = values[EXNAME].asString();
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ QPID_LOG(debug, "HA: New exchange replica " << name);
+ if (!broker.createExchange(
+ name,
+ values[EXTYPE].asString(),
+ values[DURABLE].asBool(),
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString()).second) {
+ // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // and re-create from event. See comment in doEventQueueDeclare.
+ QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists");
+ }
+ }
+}
+
+void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
+ string name = values[EXNAME].asString();
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+ if (exchange && replicateLevel(exchange->getArgs())) {
+ QPID_LOG(debug, "HA: Deleting exchange:" << name);
+ broker.deleteExchange(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
+ } catch (const framing::NotFoundException&) {}
+}
+
+void WiringReplicator::doEventBind(Variant::Map& values) {
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString());
+ // We only replicated a binds for a replicated queue to replicated exchange.
+ if (replicateLevel(exchange->getArgs()) && replicateLevel(queue->getSettings())) {
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+ } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange.
+}
+
+void WiringReplicator::doResponseQueue(Variant::Map& values) {
+ // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
+ Variant::Map argsMap(values[ARGUMENTS].asMap());
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ string name(values[NAME].asString());
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODELETE].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/);
+ if (result.second) {
+ QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)");
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-11-22: Normal to find queue already
+ // exists if we're failing over.
+ QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)");
+ }
+}
+
+void WiringReplicator::doResponseExchange(Variant::Map& values) {
+ Variant::Map argsMap(values[ARGUMENTS].asMap());
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)");
+ if (!broker.createExchange(
+ values[NAME].asString(),
+ values[TYPE].asString(),
+ values[DURABLE].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second) {
+ QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)");
+ }
+}
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+ Variant::Map map(ref.asMap());
+ Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
+ const std::string name = i->second.asString();
+ if (name.compare(0, prefix.size(), prefix) != 0)
+ throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
+ std::string ret = name.substr(prefix.size());
+ return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
+void WiringReplicator::doResponseBind(Variant::Map& values) {
+ try {
+ std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+ std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+ // Automatically replicate exchange if queue and exchange are replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+ } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange.
+}
+
+void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+ // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed.
+ if (replicateLevel(queue->getSettings()) == RL_ALL) {
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ }
+}
+
+bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+
+string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
+
+}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h
new file mode 100644
index 0000000..32109d8
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h
@@ -0,0 +1,81 @@
+#ifndef QPID_HA_REPLICATOR_H
+#define QPID_HA_REPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
+}
+
+namespace ha {
+
+/**
+ * Replicate wiring on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
+ */
+class WiringReplicator : public broker::Exchange
+{
+ public:
+ WiringReplicator(const boost::shared_ptr<broker::Link>&);
+ ~WiringReplicator();
+ std::string getType() const;
+
+ // Exchange methods
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ void doEventQueueDeclare(types::Variant::Map& values);
+ void doEventQueueDelete(types::Variant::Map& values);
+ void doEventExchangeDeclare(types::Variant::Map& values);
+ void doEventExchangeDelete(types::Variant::Map& values);
+ void doEventBind(types::Variant::Map&);
+ void doResponseQueue(types::Variant::Map& values);
+ void doResponseExchange(types::Variant::Map& values);
+ void doResponseBind(types::Variant::Map& values);
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+
+ broker::Broker& broker;
+ boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_HA_REPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
new file mode 100644
index 0000000..bb06e77
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -0,0 +1,34 @@
+<schema package="org.apache.qpid.ha">
+
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+ <!-- Monitor and control HA status of a broker. -->
+ <class name="HaBroker">
+ <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, SOLO"/>
+
+ <method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO">
+ <arg name="status" type="sstr" dir="I"/>
+ </method>
+
+ <property name="clientUrl" type="sstr" desc="URL used by clients to connect to the cluster."/>
+ <property name="brokerUrl" type="sstr" desc="URL used by brokers to connect to other brokers in the cluster."/>
+ </class>
+
+</schema>
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 16d7fb0..91d5f6a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -440,6 +440,7 @@
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
cluster_lib = os.getenv("CLUSTER_LIB")
+ ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -514,6 +515,12 @@
actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+ """Wait up to timeout for contents of queue to match expect_contents"""
+ def test(): return self.browse(session, queue, 0) == expect_contents
+ retry(test, timeout, delay)
+ self.assertEqual(expect_contents, self.browse(session, queue, 0))
+
def join(thread, timeout=10):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 199d1e7..424d416 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -61,15 +61,25 @@
# You should do "newgrp ais" before running the tests to run these.
#
+# FIXME aconway 2011-11-14: Disable cluster tests on qpid-3603 branch
+# Some cluster tests are known to fail on this branch.
+# Immediate priority is to develop then new HA solution,
+# Cluster will brought up to date when thats done.
+#
+# gsim: its due to the keeping of deleted messages on the deque until they can be popped off either end
+# gsim: that is state that isn't available to new nodes of course
+# gsim: i.e. if you dequeue a message from the middle of the deque
+# gsim: it will not be on updatee but will be hidden on original node(s)
+# gsim: and is needed for the direct indexing
-# ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS += \
- run_cluster_test \
- cluster_read_credit \
- test_watchdog \
- run_cluster_tests \
- federated_cluster_test \
- clustered_replication_test
+
+# TESTS += \
+# run_cluster_test \
+# cluster_read_credit \
+# test_watchdog \
+# run_cluster_tests \
+# federated_cluster_test \
+# clustered_replication_test
# Clean up after cluster_test and start_cluster
CLEANFILES += cluster_test.acl cluster.ports
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
new file mode 100755
index 0000000..9b52c2f
--- /dev/null
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
+from qpid.messaging import Message, NotFound
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger
+
+
+log = getLogger("qpid.ha-tests")
+
+class ShortTests(BrokerTest):
+ """Short HA functionality tests."""
+
+ def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ return Broker(self, args=["--load-module", BrokerTest.ha_lib,
+ "--ha-enable=yes",
+ "--ha-client-url", client_url,
+ "--ha-broker-url", broker_url,
+ ] + args,
+ **kwargs)
+
+ # FIXME aconway 2011-11-15: work around async replication.
+ def wait(self, session, address):
+ def check():
+ try:
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for %s"%(address)
+
+ def assert_missing(self,session, address):
+ try:
+ session.receiver(address)
+ self.fail("Should not have been replicated: %s"%(address))
+ except NotFound: pass
+
+ def test_replication(self):
+ def queue(name, replicate):
+ return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+
+ def exchange(name, replicate, bindq):
+ return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+ def setup(p, prefix):
+ """Create config, send messages on the primary p"""
+ p.sender(queue(prefix+"q1", "all")).send(Message("1"))
+ p.sender(queue(prefix+"q2", "wiring")).send(Message("2"))
+ p.sender(queue(prefix+"q3", "none")).send(Message("3"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+ # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
+ p.sender(queue(prefix+"x", "wiring"))
+
+ def verify(b, prefix):
+ """Verify setup was replicated to backup b"""
+ # FIXME aconway 2011-11-21: wait for wiring to replicate.
+ self.wait(b, prefix+"x");
+ # Verify backup
+ # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+ self.assert_browse_retry(b, prefix+"q2", []) # wiring only
+ self.assert_missing(b, prefix+"q3")
+ b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
+ b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=wiring
+ self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
+
+ # Create config, send messages before starting the backup, to test catch-up replication.
+ primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ p = primary.connect().session()
+ setup(p, "1")
+ # Start the backup
+ backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ b = backup.connect().session()
+ verify(b, "1")
+
+ # Create config, send messages after starting the backup, to test steady-state replication.
+ setup(p, "2")
+ verify(b, "2")
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 26be15b..100612f 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -63,6 +63,7 @@
exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
exportmodule ACL_LIB acl.so
exportmodule CLUSTER_LIB cluster.so
+exportmodule HA_LIB ha.so
exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
exportmodule SSLCONNECTOR_LIB sslconnector.so
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
index 1a0f711..13f31fe 100755
--- a/qpid/python/qpid-python-test
+++ b/qpid/python/qpid-python-test
@@ -570,6 +570,8 @@
if opts.xml and not list_only:
xmlr = JunitXmlStyleReporter(opts.xml);
xmlr.begin();
+else:
+ xmlr = None
passed = 0
failed = 0
diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py
index feae4bb..48e29ad 100755
--- a/qpid/tools/setup.py
+++ b/qpid/tools/setup.py
@@ -31,7 +31,8 @@
"src/py/qpid-route",
"src/py/qpid-stat",
"src/py/qpid-tool",
- "src/py/qmf-tool"],
+ "src/py/qmf-tool",
+ "src/py/qpid-ha-status"],
url="http://qpid.apache.org/",
license="Apache Software License",
description="Diagnostic and management tools for Apache Qpid brokers.")
diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status
new file mode 100755
index 0000000..c70e4c9
--- /dev/null
+++ b/qpid/tools/src/py/qpid-ha-status
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import qmf.console, optparse, sys
+from qpid.management import managementChannel, managementClient
+
+usage="""
+Usage: qpid-ha-status [broker-address] [status]
+If status is specified, sets the HA status of the broker. Otherwise prints the current HA status. Status must be one of: primary, backup, solo.
+"""
+
+STATUS_VALUES=["primary", "backup", "solo"]
+
+def is_valid_status(value): return value in STATUS_VALUES
+
+def validate_status(value):
+ if not is_valid_status(value):
+ raise Exception("Invalid HA status value: %s"%(value))
+
+class HaBroker:
+ def __init__(self, broker, session):
+ self.session = session
+ try:
+ self.qmf_broker = self.session.addBroker(broker)
+ except Exception, e:
+ raise Exception("Can't connect to %s: %s"%(broker,e))
+ ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha")
+ if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
+ self.ha_broker = ha_brokers[0];
+
+ def get_status(self):
+ return self.ha_broker.status
+
+ def set_status(self, value):
+ validate_status(value)
+ self.ha_broker.setStatus(value)
+
+def parse_args(args):
+ broker, status = "localhost:5672", None
+ if args and is_valid_status(args[-1]):
+ status = args[-1]
+ args.pop()
+ if args: broker = args[0]
+ return broker, status
+
+def main():
+ try:
+ session = qmf.console.Session()
+ try:
+ broker, status = parse_args(sys.argv[1:])
+ hb = HaBroker(broker, session)
+ if status: hb.set_status(status)
+ else: print hb.get_status()
+ finally:
+ session.close()
+ return 0
+ except Exception, e:
+ print e
+ return -1
+
+if __name__ == "__main__":
+ sys.exit(main())