QPID-2920: Batch acquire/dequeue messages in cluster.

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1197749 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/include/qpid/RangeSet.h b/qpid/cpp/include/qpid/RangeSet.h
index 36991fd..b36030c 100644
--- a/qpid/cpp/include/qpid/RangeSet.h
+++ b/qpid/cpp/include/qpid/RangeSet.h
@@ -133,6 +133,8 @@
     explicit RangeSet(const Range<T>& r) { *this += r; }
     RangeSet(const T& a, const T& b) { *this += Range<T>(a,b); }
 
+    void swap(RangeSet& x) { ranges.swap(x.ranges); }
+
     bool contiguous() const { return ranges.size() <= 1; }
 
     bool contains(const T& t) const;
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 250491c..5e59331 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -530,6 +530,7 @@
   qpid/broker/ConnectionHandler.h \
   qpid/broker/ConnectionState.h \
   qpid/broker/ConnectionToken.h \
+  qpid/broker/Context .h \
   qpid/broker/Consumer.h \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
diff --git a/qpid/cpp/src/qpid/broker/Context.h b/qpid/cpp/src/qpid/broker/Context.h
new file mode 100644
index 0000000..5770dd7
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Context.h
@@ -0,0 +1,38 @@
+#ifndef QPID_BROKER_CONTEXT_H
+#define QPID_BROKER_CONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for context objects.
+ */
+class Context
+{
+  public:
+    virtual ~Context() {}
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CONTEXT_H*/
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index aae858f..b57ec46 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -24,6 +24,7 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/OwnershipToken.h"
 #include "qpid/broker/Consumer.h"
+#include "qpid/broker/Context.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Messages.h"
 #include "qpid/broker/PersistableQueue.h"
@@ -130,7 +131,7 @@
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
     sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster
-    boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster
+    std::auto_ptr<Context> clusterContext; // Clustering state.
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -401,8 +402,8 @@
     bool isConsumingStopped();
 
     /** Context information used in a cluster. */
-    boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; }
-    void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext = context; }
+    Context* getClusterContext() { return clusterContext.get(); }
+    void setClusterContext(std::auto_ptr<Context> context) { clusterContext = context; }
 
 };
 }} // qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 9943b3d..5871810 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -26,8 +26,6 @@
 #include "QueueContext.h"
 #include "hash.h"
 #include "qpid/framing/ClusterMessageEnqueueBody.h"
-#include "qpid/framing/ClusterMessageAcquireBody.h"
-#include "qpid/framing/ClusterMessageDequeueBody.h"
 #include "qpid/framing/ClusterMessageRequeueBody.h"
 #include "qpid/framing/ClusterWiringCreateQueueBody.h"
 #include "qpid/framing/ClusterWiringCreateExchangeBody.h"
@@ -113,14 +111,13 @@
 void BrokerContext::acquire(const broker::QueuedMessage& qm) {
     if (tssReplicate) {
         assert(!qm.queue->isConsumingStopped());
-        mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
+        QueueContext::get(*qm.queue)->localAcquire(qm.position);
     }
 }
 
 void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
     if (tssReplicate)
-        mcaster(qm).mcast(
-            ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position));
+        QueueContext::get(*qm.queue)->localDequeue(qm.position);
 }
 
 void BrokerContext::requeue(const broker::QueuedMessage& qm) {
@@ -135,8 +132,7 @@
 void BrokerContext::create(broker::Queue& q) {
     if (!tssReplicate) return;
     assert(!QueueContext::get(q));
-    boost::intrusive_ptr<QueueContext> context(
-        new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks));
+    new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks);
     std::string data(q.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
     q.encode(buf);
@@ -188,7 +184,7 @@
 }
 
 void BrokerContext::stopped(broker::Queue& q) {
-    boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
+    QueueContext* qc = QueueContext::get(q);
     // Don't forward the stopped call if the queue does not yet have a
     // cluster context - this when the queue is first created locally.
     if (qc) qc->stopped();
diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
index 68b9d50..52ae3ee 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
@@ -36,7 +36,7 @@
             addOptions()
                 ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
                 ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.")
-                ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in the cluster.")
+                ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in microseconds.")
                 ("cluster2-consume-ticks", optValue(settings.consumeTicks, "N"), "Maximum number of ticks a broker can hold the consume lock on a shared queue.");
                 // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
                 // FIXME aconway 2011-10-05: rename to final option names.
diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
index 2e86949..144831d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
+++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
@@ -61,6 +61,8 @@
         }
     }
 
+    bool isRunning() const { return timerRunning; }
+
   private:
 
     class Task : public sys::TimerTask {
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index dc10548..3c19967 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -71,11 +71,11 @@
         try {
             handle(frame);
         } catch (const std::exception& e) {
-        // FIXME aconway 2011-10-19: error handling.
-        QPID_LOG(error, "cluster: error in deliver on " << cpg.getName()
-                     << " from " << PrettyId(sender, self)
-                     << ": " << frame
-                     << ": " << e.what());
+            // FIXME aconway 2011-10-19: error handling.
+            QPID_LOG(error, "cluster event: " << e.what()
+                     << " (sender=" << PrettyId(sender, self) << " group=" << cpg.getName()
+                     << " " << frame << ")");
+
         }
     }
 }
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
index 9f9ae1a..f3a678f 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -78,6 +78,14 @@
     MemberId getSelf() { return self; }
     Cpg& getCpg() { return cpg; }
 
+    template <class HandlerT> boost::intrusive_ptr<HandlerT> getHandler() {
+        for (size_t i = 0; i < handlers.size(); ++i) {
+            boost::intrusive_ptr<HandlerT> p(dynamic_cast<HandlerT*>(handlers[i].get()));
+            if (p) return p;
+        }
+        return 0;
+    }
+
   private:
     void handle(const framing::AMQFrame&);
 
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
index c6d9885..0bb805d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
@@ -54,4 +54,7 @@
 
 void Group::mcast(const framing::AMQBody& b) { multicaster->mcast(b); }
 void Group::mcast(const framing::AMQFrame& f) { multicaster->mcast(f); }
+
+MemberId Group::getSelf() const { return eventHandler->getSelf(); }
+
 }} // namespace qpid::cluster::exp
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h
index 49b33c6..cb216fa 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.h
@@ -22,6 +22,7 @@
  *
  */
 
+#include "qpid/cluster/types.h"
 #include "qpid/RefCounted.h"
 #include <memory>
 
@@ -57,6 +58,7 @@
     MessageHolder& getMessageHolder() { return *messageHolder; }
     MessageBuilders& getMessageBuilders() { return *messageBuilders; }
     Ticker& getTicker() { return *ticker; }
+    MemberId getSelf() const;
 
     void mcast(const framing::AMQBody&);
     void mcast(const framing::AMQFrame&);
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 21129b0..c899787 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -88,38 +88,26 @@
     // We only need to build message from other brokers, our own messages
     // are held by the MessageHolder.
     if (sender() != self()) {
-        boost::shared_ptr<Queue> queue = findQueue(q, "cluster: enqueue");
+        boost::shared_ptr<Queue> queue = findQueue(q, "cluster enqueue");
         messageBuilders.announce(sender(), channel, queue);
     }
 }
 
-// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
-// and scan queue once.
 void MessageHandler::acquire(const std::string& q, uint32_t position) {
     // FIXME aconway 2011-09-15: systematic logging across cluster module.
-    QPID_LOG(trace, "cluster: message " << q << "[" << position
+    QPID_LOG(trace, "cluster message " << q << "[" << position
              << "] acquired by " << PrettyId(sender(), self()));
     // Note acquires from other members. My own acquires were executed in
     // the broker thread
     if (sender() != self()) {
-        boost::shared_ptr<Queue> queue = findQueue(q, "cluster: acquire");
-        QueuedMessage qm;
-        BrokerContext::ScopedSuppressReplication ssr;
-        if (!queue->acquireMessageAt(position, qm))
-            throw Exception(QPID_MSG("cluster: acquire: message not found: "
-                                     << q << "[" << position << "]"));
-        assert(qm.position.getValue() == position);
-        assert(qm.payload);
-        // Save on context for possible requeue if released/rejected.
-        QueueContext::get(*queue)->acquire(qm);
-        // FIXME aconway 2011-09-19: need to record by member-ID to
-        // requeue if member leaves.
+        boost::shared_ptr<Queue> queue = findQueue(q, "cluster acquire");
+        QueueContext::get(*queue)->acquire(position);
     }
 }
 
 void MessageHandler::dequeue(const std::string& q, uint32_t position) {
     // FIXME aconway 2011-09-15: systematic logging across cluster module.
-    QPID_LOG(trace, "cluster: message " << q << "[" << position
+    QPID_LOG(trace, "cluster message " << q << "[" << position
              << "] dequeued by " << PrettyId(sender(), self()));
 
     // FIXME aconway 2010-10-28: for local dequeues, we should
@@ -128,16 +116,14 @@
 
     // My own dequeues were processed in the broker thread before multicasting.
     if (sender() != self()) {
-        boost::shared_ptr<Queue> queue = findQueue(q, "cluster: dequeue");
-        QueuedMessage qm = QueueContext::get(*queue)->dequeue(position);
-        BrokerContext::ScopedSuppressReplication ssr;
-        if (qm.queue) queue->dequeue(0, qm);
+        boost::shared_ptr<Queue> queue = findQueue(q, "cluster dequeue");
+        QueueContext::get(*queue)->dequeue(position);
     }
 }
 
 void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) {
     if (sender() != self()) {
-        boost::shared_ptr<Queue> queue = findQueue(q, "cluster: requeue");
+        boost::shared_ptr<Queue> queue = findQueue(q, "cluster requeue");
         QueueContext::get(*queue)->requeue(position, redelivered);
     }
 }
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index ba06ee8..ff9c050 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -21,34 +21,45 @@
  */
 
 #include "BrokerContext.h"
+#include "EventHandler.h"
 #include "Group.h"
 #include "Multicaster.h"
 #include "QueueContext.h"
+#include "QueueHandler.h"
 #include "hash.h"
-#include "qpid/cluster/types.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/ClusterQueueResubscribeBody.h"
-#include "qpid/framing/ClusterQueueSubscribeBody.h"
-#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/ClusterMessageAcquireBody.h"
+#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterQueueConsumedBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
+#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/framing/ProtocolVersion.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace cluster {
 
+using framing::SequenceSet;
+const framing::ProtocolVersion pv;     // shorthand
+
 QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_)
-    : consumers(0), consuming(true), ticks(0),
+    : ownership(UNSUBSCRIBED), consumers(0), consuming(false), ticks(0),
       queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())),
-      maxTicks(maxTicks_)
+      maxTicks(maxTicks_), group(g)
 {
-    q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+    q.setClusterContext(std::auto_ptr<broker::Context>(this));
     q.stopConsumers();          // Stop queue initially.
-    g.getTicker().add(this);
+    group.getTicker().add(this);
 }
 
-QueueContext::~QueueContext() {}
+QueueContext::~QueueContext() {
+    // Lifecycle: must remove all references to this context  before it is deleted.
+    // Must be sure that there can be no use of this context later.
+    group.getTicker().remove(this);
+    group.getEventHandler().getHandler<QueueHandler>()->remove(queue);
+}
 
 namespace {
 bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
@@ -57,72 +68,66 @@
 // Called by QueueReplica in CPG deliver thread when state changes.
 void QueueContext::replicaState(QueueOwnership before, QueueOwnership after)
 {
+    sys::Mutex::ScopedLock l(lock);
     // Interested in state changes which lead to ownership.
     // We voluntarily give up ownership before multicasting
     // the state change so we don't need to handle transitions
     // that lead to non-ownership.
+
     if (before != after && isOwner(after)) {
-        bool start = false;
-        {
-            sys::Mutex::ScopedLock l(lock);
-            start = !consuming;
-            consuming = true;
-            ticks = 0;
-        }
-        if (start) queue.startConsumers();
+        assert(before == ownership);
+        if (!consuming) queue.startConsumers();
+        consuming = true;
+        ticks = 0;
     }
+    ownership = after;
 }
 
 // FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
 
 // Called in broker threads when a consumer is added
 void QueueContext::consume(size_t n) {
-    {
-        sys::Mutex::ScopedLock l(lock);
-        consumers = n;
-    }
-    if (n == 1) mcast.mcast(
-        framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
+    sys::Mutex::ScopedLock l(lock);
+    if (consumers == 0 && n > 0 && ownership == UNSUBSCRIBED)
+        mcast.mcast(
+            framing::ClusterQueueSubscribeBody(pv, queue.getName()));
+    consumers = n;
 }
 
 // Called in broker threads when a consumer is cancelled
 void QueueContext::cancel(size_t n) {
-    bool stop = false;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        consumers = n;
-        stop = (n == 0 && consuming);
-    }
-    if (stop) queue.stopConsumers();
+    sys::Mutex::ScopedLock l(lock);
+    consumers = n;
+    if (n == 0 && consuming) queue.stopConsumers();
 }
 
+// FIXME aconway 2011-11-03: review scope of locking around sendConsumed
+
 // Called in Ticker thread.
 void QueueContext::tick() {
-    bool stop = false;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        stop = (consuming && ++ticks >= maxTicks);
-    }
-    // When all threads have stopped, queue will call stopped()
-    if (stop) queue.stopConsumers();
+    sys::Mutex::ScopedLock l(lock);
+    if (!consuming) return;     // Nothing to do if we don't have the lock.
+    if (ownership == SHARED_OWNER && ++ticks >= maxTicks) queue.stopConsumers();
+    else if (ownership == SOLE_OWNER) sendConsumed(l); // Status report on consumption
 }
 
 // Callback set up by queue.stopConsumers() called in connection or timer thread.
 // Called when no threads are dispatching from the queue.
 void QueueContext::stopped() {
-    bool resubscribe = false;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        assert(consuming);
-        consuming = false;
-        resubscribe = consumers;
-    }
-    if (resubscribe)
-        mcast.mcast(framing::ClusterQueueResubscribeBody(
-                        framing::ProtocolVersion(), queue.getName()));
-    else
-        mcast.mcast(framing::ClusterQueueUnsubscribeBody(
-                        framing::ProtocolVersion(), queue.getName()));
+    sys::Mutex::ScopedLock l(lock);
+    if (!consuming) return; // !consuming => initial stopConsumers in ctor.
+    sendConsumed(l);
+    mcast.mcast(
+        framing::ClusterQueueUnsubscribeBody(pv, queue.getName(), consumers));
+    consuming = false;
+}
+
+void QueueContext::sendConsumed(const sys::Mutex::ScopedLock&) {
+    if (acquired.empty() && dequeued.empty()) return; // Nothing to send
+    mcast.mcast(
+        framing::ClusterQueueConsumedBody(pv, queue.getName(), acquired,dequeued));
+    acquired.clear();
+    dequeued.clear();
 }
 
 void QueueContext::requeue(uint32_t position, bool redelivered) {
@@ -135,17 +140,76 @@
     }
 }
 
-void QueueContext::acquire(const broker::QueuedMessage& qm) {
-    unacked.put(qm.position, qm);
+void QueueContext::localAcquire(uint32_t position) {
+    QPID_LOG(trace, "cluster queue " << queue.getName() << " acquired " << position);
+    sys::Mutex::ScopedLock l(lock);
+    assert(consuming);
+    acquired.add(position);
 }
 
-broker::QueuedMessage QueueContext::dequeue(uint32_t position) {
-    return unacked.pop(position);
+void QueueContext::localDequeue(uint32_t position) {
+    QPID_LOG(trace, "cluster queue " << queue.getName() << " dequeued " << position);
+    // FIXME aconway 2010-10-28: for local dequeues, we should
+    // complete the ack that initiated the dequeue at this point.
+    sys::Mutex::ScopedLock l(lock);
+
+    // FIXME aconway 2011-11-03: this assertion fails for explicit accept
+    // because it doesn't respect the consume lock.
+    // assert(consuming);
+
+    dequeued.add(position);
 }
 
-boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
-    return boost::intrusive_ptr<QueueContext>(
-        static_cast<QueueContext*>(q.getClusterContext().get()));
+void QueueContext::consumed(
+    const MemberId& sender,
+    const SequenceSet& acquired,
+    const SequenceSet& dequeued)
+{
+    // No lock, doesn't touch any members.
+
+    // FIXME aconway 2011-09-15: systematic logging across cluster module.
+    // FIXME aconway 2011-09-23: pretty printing for identifier.
+    QPID_LOG(trace, "cluster: " << sender << " acquired: " << acquired
+             << " dequeued: " << dequeued << " on queue: " << queue.getName());
+
+    // Note acquires from other members. My own acquires were executed in
+    // the connection thread
+    if (sender != group.getSelf()) {
+        // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+        for (SequenceSet::iterator i =  acquired.begin(); i != acquired.end(); ++i)
+            acquire(*i);
+    }
+    // Process deques from the queue owner.
+    // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+    for (SequenceSet::iterator i =  dequeued.begin(); i != dequeued.end(); ++i)
+        dequeue(*i);
 }
 
+// Remote acquire
+void QueueContext::acquire(uint32_t position) {
+    // No lock, doesn't touch any members.
+    broker::QueuedMessage qm;
+    BrokerContext::ScopedSuppressReplication ssr;
+    if (!queue.acquireMessageAt(position, qm))
+        // FIXME aconway 2011-10-31: error handling
+        throw Exception(QPID_MSG("cluster: acquire: message not found: "
+                                 << queue.getName() << "[" << position << "]"));
+    assert(qm.position.getValue() == position);
+    assert(qm.payload);
+    unacked.put(qm.position, qm); // unacked has its own lock.
+}
+
+void QueueContext::dequeue(uint32_t position) {
+    // No lock, doesn't touch any members. unacked has its own lock.
+    broker::QueuedMessage qm = unacked.pop(position);
+    BrokerContext::ScopedSuppressReplication ssr;
+    if (qm.queue) queue.dequeue(0, qm);
+}
+
+QueueContext* QueueContext::get(broker::Queue& q) {
+    return static_cast<QueueContext*>(q.getClusterContext());
+}
+
+// FIXME aconway 2011-09-23: make unacked a plain map, use lock.
+
 }} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 20c2aab..d7079ab 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -25,11 +25,12 @@
 #include "LockedMap.h"
 #include "Ticker.h"
 #include "qpid/RefCounted.h"
-#include "qpid/sys/AtomicValue.h"
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/broker/Context.h"
 #include "qpid/cluster/types.h"
-#include <boost/intrusive_ptr.hpp>
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
 
 namespace qpid {
 namespace broker {
@@ -43,12 +44,13 @@
 class Group;
 
  /**
- * Queue state that is not replicated to the cluster.
- * Manages the local queue start/stop status.
+ * Local Queue state, manage start/stop consuming on the queue.
+ * Destroyed when the queue is destroyed, it must erase itself
+ * from any cluster data structures in its destructor.
  *
-* THREAD SAFE: Called by connection threads and Ticker dispatch threads.
+ * THREAD SAFE: Called by connection threads and Ticker dispatch threads.
  */
-class QueueContext : public Ticker::Tickable {
+class QueueContext : public broker::Context, Ticker::Tickable {
   public:
     QueueContext(broker::Queue&, Group&, size_t consumeTicks);
     ~QueueContext();
@@ -80,33 +82,50 @@
     /** Called by MessageHandler to requeue a message. */
     void requeue(uint32_t position, bool redelivered);
 
+    /** Called by BrokerContext when a mesages is acquired locally. */
+    void localAcquire(uint32_t position);
+
     /** Called by MessageHandler when a mesages is acquired. */
-    void acquire(const broker::QueuedMessage& qm);
+    void acquire(uint32_t position);
 
     /** Called by MesageHandler when a message is dequeued. */
-    broker::QueuedMessage dequeue(uint32_t position);
+    void dequeue(uint32_t position);
+
+    /** Called by BrokerContext when a message is dequeued locally. */
+    void localDequeue(uint32_t position);
+
+    /** Called in deliver thread, take note of another brokers acquires/dequeues. */
+    void consumed(const MemberId&,
+                  const framing::SequenceSet& acquired,
+                  const framing::SequenceSet& dequeued);
+
 
     size_t getHash() const { return hash; }
-
+    broker::Queue& getQueue() { return queue; }
 
     /** Get the cluster context for a broker queue. */
-    static boost::intrusive_ptr<QueueContext> get(broker::Queue&) ;
+    static QueueContext* get(broker::Queue&);
 
 private:
+    void sendConsumed(const sys::Mutex::ScopedLock&);
+
     sys::Mutex lock;
-    size_t consumers;           // Number of local consumers
-    bool consuming;             // True if we have the lock & local consumers are active
+    QueueOwnership ownership;   // Ownership status.
+    size_t consumers;           // Number of local consumers.
+    bool consuming;             // True if we have the lock.
     size_t ticks;               // Ticks since we got the lock.
 
     // Following members are immutable
-    broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
+    broker::Queue& queue;
     Multicaster& mcast;
     size_t hash;
     size_t maxTicks;            // Max ticks we are allowed.
+    framing::SequenceSet acquired, dequeued; // Track local acquires/dequeues.
 
     // Following members are safe to use without holding a lock
     typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap;
     UnackedMap unacked;
+    Group& group;
 };
 
 }} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 0c96e93..f763841 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -45,25 +45,36 @@
 void QueueHandler::subscribe(const std::string& queue) {
     find(queue)->subscribe(sender());
 }
-void QueueHandler::unsubscribe(const std::string& queue) {
-    find(queue)->unsubscribe(sender());
+
+void QueueHandler::unsubscribe(const std::string& queue,
+                               bool resubscribe) {
+    find(queue)->unsubscribe(sender(), resubscribe);
 }
-void QueueHandler::resubscribe(const std::string& queue) {
-    find(queue)->resubscribe(sender());
+
+void QueueHandler::consumed(const std::string& queue,
+                            const framing::SequenceSet& acquired,
+                            const framing::SequenceSet& dequeued)
+{
+    find(queue)->consumed(sender(), acquired, dequeued);
 }
 
 void QueueHandler::left(const MemberId& member) {
     // Unsubscribe for members that leave.
     for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
-        i->second->unsubscribe(member);
+        i->second->unsubscribe(member, false);
 }
 
-void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
+void QueueHandler::add(broker::Queue& q) {
     // Local queues already have a context, remote queues need one.
-    if (!QueueContext::get(*q))
-        new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue
-    queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
-        new QueueReplica(q, self()));
+    if (!QueueContext::get(q))
+        new QueueContext(q, group, consumeTicks); // Context attaches to the Queue
+    assert(QueueContext::get(q));
+    queues[q.getName()] = boost::intrusive_ptr<QueueReplica>(
+        new QueueReplica(*QueueContext::get(q), self()));
+}
+
+void QueueHandler::remove(broker::Queue& q) {
+    queues.erase(q.getName());
 }
 
 boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
index 84e8b75..0f99376 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -60,15 +60,17 @@
 
     // Events
     void subscribe(const std::string& queue);
-    void unsubscribe(const std::string& queue);
-    void resubscribe(const std::string& queue);
+
+    void unsubscribe(const std::string& queue, bool resubscribe);
+
+    void consumed(const std::string& queue,
+                  const framing::SequenceSet& acquired,
+                  const framing::SequenceSet& dequeued);
+
     void left(const MemberId&);
 
-    void add(boost::shared_ptr<broker::Queue>);
-
-    // NB: These functions ar called in broker threads, not deliver threads.
-    void acquired(const broker::QueuedMessage& qm);
-    void empty(const broker::Queue& q);
+    void add(broker::Queue&);
+    void remove(broker::Queue&);
 
   private:
     typedef std::map<std::string, boost::intrusive_ptr<QueueReplica> > QueueMap;
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 11a7496..66a7a81 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -28,9 +28,8 @@
 namespace qpid {
 namespace cluster {
 
-QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
-                           const MemberId& self_)
-    : queue(q), self(self_), context(QueueContext::get(*q))
+QueueReplica::QueueReplica(QueueContext& qc, const MemberId& self_)
+    : self(self_), context(qc)
 {}
 
 struct PrintSubscribers {
@@ -56,27 +55,28 @@
     update(before);
 }
 
-// FIXME aconway 2011-09-20: need to requeue.
-void QueueReplica::unsubscribe(const MemberId& member) {
+void QueueReplica::unsubscribe(const MemberId& member, bool resubscribe)
+{
+    assert(!resubscribe || member == subscribers.front());
     QueueOwnership before = getState();
     MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
-    if (i != subscribers.end()) subscribers.erase(i, subscribers.end());
+    subscribers.erase(i, subscribers.end());
+    if (resubscribe) subscribers.push_back(member);
     update(before);
 }
 
-void QueueReplica::resubscribe(const MemberId& member) {
-    assert (member == subscribers.front());
-    QueueOwnership before = getState();
-    subscribers.pop_front();
-    subscribers.push_back(member);
-    update(before);
+void QueueReplica::consumed(const MemberId& member,
+                            const framing::SequenceSet& acquired,
+                            const framing::SequenceSet& dequeued)
+{
+    context.consumed(member, acquired, dequeued);
 }
 
 void QueueReplica::update(QueueOwnership before) {
     QueueOwnership after = getState();
-    QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": "
+    QPID_LOG(trace, "cluster: queue replica: " << context.getQueue().getName() << ": "
                  << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
-    context->replicaState(before, after);
+    context.replicaState(before, after);
 }
 
 QueueOwnership QueueReplica::getState() const {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index 31faf48..ca92de1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -50,20 +50,22 @@
 class QueueReplica : public RefCounted
 {
   public:
-    QueueReplica(boost::shared_ptr<broker::Queue> , const MemberId& );
+    QueueReplica(QueueContext&, const MemberId& );
     void subscribe(const MemberId&);
-    void unsubscribe(const MemberId&);
-    void resubscribe(const MemberId&);
+    void unsubscribe(const MemberId&, bool resubscribe);
+    void consumed(const MemberId&,
+                  const framing::SequenceSet& acquired,
+                  const framing::SequenceSet& dequeued);
 
     MemberId getSelf() const { return self; }
 
   private:
     typedef std::deque<MemberId> MemberQueue;
 
-    boost::shared_ptr<broker::Queue> queue;
+    std::string name;
     MemberQueue subscribers;
     MemberId self;
-    boost::intrusive_ptr<QueueContext> context;
+    QueueContext& context;
 
     QueueOwnership getState() const;
     bool isOwner() const;
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
index 9ff04f2..1210eb7 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
@@ -34,21 +34,21 @@
     timer.add(this);
 }
 
-void Ticker::add(boost::intrusive_ptr<Tickable> t) {
+void Ticker::add(Tickable* t) {
     sys::Mutex::ScopedLock l(lock);
     tickables.push_back(t);
 }
 
-void Ticker::remove(boost::intrusive_ptr<Tickable> t) {
+void Ticker::remove(Tickable* t) {
     sys::Mutex::ScopedLock l(lock);
     Tickables::iterator i = std::find(tickables.begin(), tickables.end(), t);
     if (i != tickables.end()) tickables.erase(i);
 }
 
-// Called by timer thread, sets condition
+// Called by timer thread
 void Ticker::fire() {
     condition.set();
-    setupNextFire();
+    setupNextFire();            // FIXME aconway 2011-11-03: restart()?
     timer.add(this);
 }
 
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.h b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
index 0a8d508..6910b7a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Ticker.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
@@ -53,18 +53,18 @@
 class Ticker : public  sys::TimerTask
 {
   public:
-    struct Tickable : public RefCounted {
+    struct Tickable {
         virtual ~Tickable();
         virtual void tick() = 0;
     };
 
     Ticker(sys::Duration tick, sys::Timer&, boost::shared_ptr<sys::Poller>);
 
-    void add(boost::intrusive_ptr<Tickable>);
-    void remove(boost::intrusive_ptr<Tickable>);
+    void add(Tickable*);
+    void remove(Tickable*);
 
   private:
-    typedef std::vector<boost::intrusive_ptr<Tickable> > Tickables;
+    typedef std::vector<Tickable*> Tickables;
 
     void fire();                // Called in timer thread.
     void dispatch(sys::PollableCondition&); // Called in IO thread
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index da110fe..c2f52f2 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -72,7 +72,7 @@
     }
     boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
     assert(q);                  // FIXME aconway 2011-05-10: error handling.
-    queueHandler->add(q);
+    queueHandler->add(*q);
     QPID_LOG(debug, "cluster: create queue " << q->getName());
 }
 
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index d5c9ffb..c5f6157 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -50,12 +50,6 @@
                 if time.time() >  timeout: fail("Time out in wait_for_queue(%s))"%queue)
                 time.sleep(0.01)
 
-    # FIXME aconway 2011-05-17: remove, use assert_browse.
-    def verify_content(self, expect, receiver):
-        actual = [receiver.fetch(1).content for x in expect]
-        self.assertEqual(expect, actual)
-        self.assertRaises(Empty, receiver.fetch, 0)
-
     def test_message_enqueue(self):
         """Test basic replication of enqueued messages.
         Verify that fanout messages are replicated correctly.
@@ -64,13 +58,12 @@
         cluster = self.cluster(2, cluster2=True)
 
         sn0 = cluster[0].connect().session()
-        r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
-        r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
         s0 = sn0.sender("amq.fanout");
-
         sn1 = cluster[1].connect().session()
-        r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
-        r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+        # Bind queues to amq.fanout
+        sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+        sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
 
 
         # Send messages on member 0
@@ -78,10 +71,10 @@
         for m in content: s0.send(Message(m))
 
         # Browse on both members.
-        self.verify_content(content, r0p)
-        self.verify_content(content, r0q)
-        self.verify_content(content, r1p)
-        self.verify_content(content, r1q)
+        self.assert_browse(sn0, "p", content)
+        self.assert_browse(sn0, "q", content)
+        self.assert_browse(sn1, "p", content)
+        self.assert_browse(sn1, "q", content)
 
         sn1.connection.close()
         sn0.connection.close()
@@ -98,16 +91,16 @@
         content = ["a","b","c"]
         for m in content: s0.send(Message(m))
         # Verify enqueued on members 0 and 1
-        self.verify_content(content, sn0.receiver("q;{mode:browse}"))
-        self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+        self.assert_browse(sn0, "q", content)
+        self.assert_browse(sn1, "q", content)
 
         # Dequeue on cluster[0]
         self.assertEqual(r0.fetch(1).content, "a")
         sn0.acknowledge(sync=True)
 
         # Verify dequeued on cluster[0] and cluster[1]
-        self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}"))
-        self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}"))
+        self.assert_browse(sn0, "q", ["b", "c"])
+        self.assert_browse(sn1, "q", ["b", "c"])
 
     def test_wiring(self):
         """Test replication of wiring"""
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 26283ad..9a0ee6b 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -184,8 +184,7 @@
         # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
         for b in brokers:
             while queue_exists(q,b): time.sleep(0.1);
-        for q in queues:
-            s.sender("%s;{create:always}"%q)
+        s.sender("%s;{create:always}"%q)
         # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
         for b in brokers:
             while not queue_exists(q,b): time.sleep(0.1);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 0d325c4..18d4f9b 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -351,7 +351,6 @@
       <field name="position" type="uint32"/>
       <field name="redelivered" type="bit"/>
     </control>
-
   </class>
 
   <class name="cluster-wiring" code="0x83">
@@ -387,6 +386,9 @@
 
   </class>
 
+  <!--  FIXME aconway 2011-10-31: terminology. Use lock/acquire/release terminology
+  rather than subscription-->
+
   <!-- Manage subscriptions to a queue.
 
        Each queue has a "subscriber queue" of members waiting take
@@ -394,18 +396,23 @@
        is the only one allowed to take messages. -->
 
   <class name="cluster-queue" code="0x84">
+
     <!-- Join at the back of the subscriber queue -->
     <control name="subscribe" code="0x1">
       <field name="queue" type="queue.name"/>
     </control>
-    <!-- Leave the subscriber queue -->
+
+    <!-- Unsubscribe from queue to release the lock. -->
     <control name="unsubscribe" code="0x2">
       <field name="queue" type="queue.name"/>
+      <!-- Set this bit to automatically re-subscribe -->
+      <field name="resubscribe" type="bit"/>
     </control>
-    <!-- Move the member at the front to the back. -->
-    <control name="resubscribe" code="0x3">
+
+    <control name="consumed" code="0x3">
       <field name="queue" type="queue.name"/>
+      <field name="acquired" type="sequence-set"/>
+      <field name="dequeued" type="sequence-set"/>
     </control>
   </class>
-
 </amqp>