Merged Alan's cluster changes from trunk.


git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@929314 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp b/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
deleted file mode 100644
index 111d968..0000000
--- a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "DelegatingPeriodicTimer.h"
-
-namespace qpid {
-namespace broker {
-
-DelegatingPeriodicTimer::DelegatingPeriodicTimer() {}
-
-void DelegatingPeriodicTimer::add(
-    const Task& task, sys::Duration period, const std::string& taskName)
-{
-    if (delegate.get())
-        delegate->add(task, period, taskName);
-    else {
-        Entry e;
-        e.task = task;
-        e.period = period;
-        e.name = taskName;
-        entries.push_back(e);
-    }
-}
-
-void DelegatingPeriodicTimer::setDelegate(std::auto_ptr<PeriodicTimer> impl) {
-    assert(impl.get());
-    assert(!delegate.get());
-    delegate = impl;
-    for (Entries::iterator i = entries.begin(); i != entries.end(); ++i)
-        delegate->add(i->task, i->period, i->name);
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h b/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
deleted file mode 100644
index 5186f41..0000000
--- a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef QPID_BROKER_PERIODICTIMER_H
-#define QPID_BROKER_PERIODICTIMER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/PeriodicTimer.h"
-#include <vector>
-#include <memory>
-
-namespace qpid {
-namespace broker {
-
-/**
- * A PeriodicTimer implementation that delegates to another PeriodicTimer.
- *
- * Tasks added while there is no delegate timer are stored.
- * When a delgate timer is set, stored tasks are added to it.
- */
-class DelegatingPeriodicTimer : public sys::PeriodicTimer
-{
-  public:
-    DelegatingPeriodicTimer();
-    /** Add a task: if no delegate, store it. When delegate set, add stored tasks */
-    void add(const Task& task, sys::Duration period, const std::string& taskName);
-    /** Set the delegate, transfers ownership of delegate. */
-    void setDelegate(std::auto_ptr<PeriodicTimer> delegate);
-    bool hasDelegate() { return delegate.get(); }
-  private:
-    struct Entry { Task task; sys::Duration period; std::string name; };
-    typedef std::vector<Entry> Entries;
-    std::auto_ptr<PeriodicTimer> delegate;
-    Entries entries;
-
-};
-
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 6bb597d..858900b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -217,8 +217,11 @@
     void ready(const std::string& url) {
         cluster.ready(member, url, l);
     }
-    void configChange(const std::string& current) {
-        cluster.configChange(member, current, l);
+    void configChange(const std::string& members,
+                      const std::string& left,
+                      const std::string& joined)
+    {
+        cluster.configChange(member, members, left, joined, l);
     }
     void updateOffer(uint64_t updatee) {
         cluster.updateOffer(member, updatee, l);
@@ -316,7 +319,9 @@
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
+    deliverEventQueue.bypassOff();
     deliverEventQueue.start();
+    deliverFrameQueue.bypassOff();
     deliverFrameQueue.start();
     mcast.start();
 
@@ -554,40 +559,28 @@
                    boost::bind(&ConnectionMap::value_type::second, _1));
     return result;
 }
-  
-struct AddrList {
-    const cpg_address* addrs;
-    int count;
-    const char *prefix;
-    AddrList(const cpg_address* a, int n, const char* p="")
-        : addrs(a), count(n), prefix(p) {}
-};
 
-ostream& operator<<(ostream& o, const AddrList& a) {
-    if (!a.count) return o;
-    o << a.prefix;
-    for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
-        o << qpid::cluster::MemberId(*p) << " ";
-    return o;
-}
-
+// CPG config-change callback.
 void Cluster::configChange ( 
     cpg_handle_t /*handle*/,
     const cpg_name */*group*/,
-    const cpg_address *current, int nCurrent,
+    const cpg_address *members, int nMembers,
     const cpg_address *left, int nLeft,
     const cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
-    QPID_LOG(notice, *this << " membership change: "
-             << AddrList(current, nCurrent) << "("
-             << AddrList(joined, nJoined, "joined: ")
-             << AddrList(left, nLeft, "left: ")
-             << ")");
-    string addresses;
-    for (const cpg_address* p = current; p < current+nCurrent; ++p) 
-        addresses.append(MemberId(*p).str());
-    deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+    string membersStr, leftStr, joinedStr;
+    // Encode members and enqueue as an event so the config change can
+    // be executed in the correct thread.
+    for (const cpg_address* p = members; p < members+nMembers; ++p)
+        membersStr.append(MemberId(*p).str());
+    for (const cpg_address* p = left; p < left+nLeft; ++p)
+        leftStr.append(MemberId(*p).str());
+    for (const cpg_address* p = joined; p < joined+nJoined; ++p)
+        joinedStr.append(MemberId(*p).str());
+    deliverEvent(Event::control(ClusterConfigChangeBody(
+                                    ProtocolVersion(), membersStr, leftStr, joinedStr),
+                                self));
 }
 
 void Cluster::setReady(Lock&) {
@@ -606,6 +599,7 @@
         // We decide here whether we want to recover from our store.
         // We won't recover if we are joining an active cluster or our store is dirty.
         if (store.hasStore() &&
+            store.getState() != STORE_STATE_EMPTY_STORE &&
             (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
             broker.setRecovery(false); // Ditch my current store.
         state = INIT;
@@ -654,22 +648,33 @@
     }
 }
 
-void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
+void Cluster::configChange(const MemberId&,
+                           const std::string& membersStr,
+                           const std::string& leftStr,
+                           const std::string& joinedStr,
+                           Lock& l)
+{
     if (state == LEFT) return;
+    MemberSet members = decodeMemberSet(membersStr);
+    MemberSet left = decodeMemberSet(leftStr);
+    MemberSet joined = decodeMemberSet(joinedStr);
+    QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": "
+             << members);
+    QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
+    QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
 
-    MemberSet config = decodeMemberSet(configStr);
-    elders = intersection(elders, config);
+    // Update initital status for members joining or leaving.
+    elders = intersection(elders, members);
     if (elders.empty() && INIT < state && state < CATCHUP) {
         QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
         leave(l);
         return;
     }
-    bool memberChange = map.configChange(config);
-    QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+    bool memberChange = map.configChange(members);
     store.setConfigSeq(map.getConfigSeq());
 
     // Update initital status for members joining or leaving.
-    initMap.configChange(config);
+    initMap.configChange(members);
     if (initMap.isResendNeeded()) {
         mcast.mcastControl(
             ClusterInitialStatusBody(
@@ -965,8 +970,11 @@
 
     if (store.hasStore()) {
         // Mark store clean if I am the only broker, dirty otherwise.
-        if (size == 1) store.clean(Uuid(true));
-        else store.dirty(clusterId);
+        if (size == 1 ) {
+            if (!store.isClean()) store.clean(Uuid(true));
+        } else {
+            if (!store.isDirty()) store.dirty(clusterId);
+        }
     }
 
     if (size == 1 && lastSize > 1 && state >= CATCHUP) {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 4a64ad7..343a664 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -163,7 +163,11 @@
                        const std::string& firstConfig,
                        Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
-    void configChange(const MemberId&, const std::string& current, Lock& l);
+    void configChange(const MemberId&,
+                      const std::string& members,
+                      const std::string& left,
+                      const std::string& joined,
+                      Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
     void timerWakeup(const MemberId&, const std::string& name, Lock&);
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
index d66db85..be671c0 100644
--- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -110,7 +110,7 @@
         const ClusterConfigChangeBody* configChange =
             static_cast<const ClusterConfigChangeBody*>(method);
         if (configChange) {
-            MemberSet members(decodeMemberSet(configChange->getCurrent()));
+            MemberSet members(decodeMemberSet(configChange->getMembers()));
             QPID_LOG(debug, cluster << " apply config change to error "
                      << frameSeq << ": " << members);
             MemberSet intersect;
diff --git a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
deleted file mode 100644
index ced34b5..0000000
--- a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PeriodicTimerImpl.h"
-#include "Cluster.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/ClusterPeriodicTimerBody.h"
-
-namespace qpid {
-namespace cluster {
-
-PeriodicTimerImpl::PeriodicTimerImpl(Cluster& c) : cluster(c) {}
-
-PeriodicTimerImpl::TaskEntry::TaskEntry(
-    Cluster& c, const Task& t, sys::Duration d, const std::string& n)
-    : TimerTask(d), cluster(c), timer(c.getBroker().getTimer()),
-      task(t), name(n), inFlight(false)
-{
-    timer.add(this);
-}
-
-void PeriodicTimerImpl::TaskEntry::fire() {
-    setupNextFire();
-    timer.add(this);
-    bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock.
-    sys::Mutex::ScopedLock l(lock);
-    // Only the elder mcasts.
-    // Don't mcast another if we haven't yet received the last one.
-    if (isElder && !inFlight) {
-        QPID_LOG(trace, "Sending periodic-timer control for " << name);
-        inFlight = true;
-        cluster.getMulticast().mcastControl(
-            framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name),
-            cluster.getId());
-    }
-}
-
-void PeriodicTimerImpl::TaskEntry::deliver() {
-    task();
-    sys::Mutex::ScopedLock l(lock);
-    inFlight = false;
-}
-
-
-void PeriodicTimerImpl::add(
-    const Task& task, sys::Duration period, const std::string& name)
-{
-    sys::Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, "Periodic timer add entry for " << name);
-    if (map.find(name) != map.end())
-        throw Exception(QPID_MSG("Cluster timer task name added twice: " << name));
-    map[name] = new TaskEntry(cluster, task, period, name);
-}
-
-void PeriodicTimerImpl::deliver(const std::string& name) {
-    Map::iterator i;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        i = map.find(name);
-        if (i == map.end())
-            throw Exception(QPID_MSG("Cluster timer unknown task: " << name));
-    }
-    QPID_LOG(debug, "Periodic timer execute " << name);
-    i->second->deliver();
-}
-
-}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h b/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h
deleted file mode 100644
index cb0034b..0000000
--- a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef QPID_CLUSTER_PERIODICTIMERIMPL_H
-#define QPID_CLUSTER_PERIODICTIMERIMPL_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/PeriodicTimer.h"
-#include "qpid/sys/Mutex.h"
-#include <map>
-
-namespace qpid {
-namespace cluster {
-
-class Cluster;
-
-/**
- * Cluster implementation of PeriodicTimer.
- *
- * All members run a periodic task, elder mcasts periodic-timer control.
- * Actual task is executed on delivery of periodic-timer.
- */
-class PeriodicTimerImpl : public sys::PeriodicTimer
-{
-  public:
-    PeriodicTimerImpl(Cluster& cluster);
-    void add(const Task& task, sys::Duration period, const std::string& taskName);
-    void deliver(const std::string& name);
-
-  private:
-
-    class TaskEntry : public sys::TimerTask {
-      public:
-        TaskEntry(Cluster&, const Task&, sys::Duration period, const std::string& name);
-        void fire();
-        void deliver();
-      private:
-        sys::Mutex lock;
-        Cluster& cluster;
-        sys::Timer& timer;
-        Task task;
-        std::string name;
-        bool inFlight;
-    };
-
-    typedef std::map<std::string, boost::intrusive_ptr<TaskEntry> > Map;
-    struct TaskImpl;
-
-    sys::Mutex lock;
-    Map map;
-    Cluster& cluster;
-};
-}} // namespace qpid::cluster
-
-#endif  /*!QPID_CLUSTER_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
index 59d0bcd..10e2ed6 100644
--- a/qpid/cpp/src/qpid/cluster/PollableQueue.h
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -74,10 +74,7 @@
         else sys::PollableQueue<T>::push(t);
     }
 
-    void start() {
-        bypass = false;
-        sys::PollableQueue<T>::start();
-    }
+    void bypassOff() { bypass = false; }
 
   private:
     Callback callback;
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h
index 2371f04..b496fe0 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.h
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h
@@ -42,6 +42,9 @@
     StoreStatus(const std::string& dir);
 
     framing::cluster::StoreState getState() const { return state; }
+    bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; }
+    bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; }
+
     const Uuid& getClusterId() const { return clusterId; }
     const Uuid& getShutdownId() const { return shutdownId; }
     framing::SequenceNumber getConfigSeq() const { return configSeq; }
diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimer.h b/qpid/cpp/src/qpid/sys/PeriodicTimer.h
deleted file mode 100644
index 290ffb6..0000000
--- a/qpid/cpp/src/qpid/sys/PeriodicTimer.h
+++ /dev/null
@@ -1,56 +0,0 @@
-#ifndef QPID_SYS_PERIODICTIMER_H
-#define QPID_SYS_PERIODICTIMER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Timer.h"
-#include <boost/function.hpp>
-
-namespace qpid {
-namespace sys {
-
-/**
- * Interface of a timer for periodic tasks that should be synchronized
- * across all brokers in a periodic. The standalone broker
- * implementation simply wraps qpid::sys::Timer. The clustered broker
- * implementation synchronizes execution of periodic tasks on all
- * periodic members.
- */
-class PeriodicTimer
-{
-  public:
-    typedef boost::function<void()> Task;
-
-    QPID_COMMON_EXTERN virtual ~PeriodicTimer() {}
-
-    /**
-     * Add a named task to be executed at the given period.
-     *
-     * The task registered under the same name will be executed on
-     * all brokers at the given period.
-     */
-    virtual void add(const Task& task, Duration period, const std::string& taskName) = 0;
-
-};
-}} // namespace qpid::sys
-
-#endif  /*!QPID_SYS_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp b/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp
deleted file mode 100644
index e2a6dcc..0000000
--- a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PeriodicTimerImpl.h"
-
-namespace qpid {
-namespace sys {
-
-PeriodicTimerImpl::PeriodicTimerImpl(Timer& t) : timer(t) {}
-
-struct PeriodicTimerImpl::TaskImpl : public TimerTask {
-    Timer& timer;
-    Task task;
-
-    TaskImpl(Timer& timer_, const Task& task_, Duration period) :
-        TimerTask(period), timer(timer_), task(task_) {}
-
-    void fire() {
-        task();
-        setupNextFire();
-        timer.add(this);
-    }
-};
-
-void PeriodicTimerImpl::add(
-    const Task& task, Duration period, const std::string& /*taskName*/
-)
-{
-    timer.add(new TaskImpl(timer, task, period));
-}
-
-}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h b/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h
deleted file mode 100644
index 3fc9396..0000000
--- a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#ifndef QPID_SYS_PERIODICTIMERIMPL_H
-#define QPID_SYS_PERIODICTIMERIMPL_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "PeriodicTimer.h"
-#include "qpid/CommonImportExport.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Standalone broker implementation of PeriodicTimer.
- */
-class PeriodicTimerImpl : public PeriodicTimer
-{
-  public:
-    QPID_COMMON_EXTERN PeriodicTimerImpl(Timer& timer);
-    QPID_COMMON_EXTERN void add(const Task& task,
-                                Duration period,
-                                const std::string& taskName);
-
-  private:
-    struct TaskImpl;
-    Timer& timer;
-};
-}} // namespace qpid::sys
-
-#endif  /*!QPID_SYS_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h
index 0786b21..cb8c126 100644
--- a/qpid/cpp/src/qpid/sys/PollableQueue.h
+++ b/qpid/cpp/src/qpid/sys/PollableQueue.h
@@ -126,7 +126,7 @@
 
 template <class T> void PollableQueue<T>::push(const T& t) {
     ScopedLock l(lock);
-    if (queue.empty()) condition.set();
+    if (queue.empty() && !stopped) condition.set();
     queue.push_back(t);
 }
 
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index fe02078..0c4b035 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -404,12 +404,17 @@
         c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
         a.send_message("q", Message("x", durable=True))
         a.kill()
+        # FIXME aconway 2010-03-29: this test has too many sleeps.
+        # Need to tighten up status persistence to be more atomic and less
+        # prone to interruption.
+        time.sleep(0.1)   # pause for b to update status.
         b.kill()          # c is last man
         time.sleep(0.1)   # pause for c to find out hes last.
         a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
+        time.sleep(0.1)   # pause for c to find out hes no longer last.
         c.kill()          # a is now last man
         time.sleep(0.1)   # pause for a to find out hes last.
-        a.kill()          # really last
+        a.kill()          # really last, should be clean.
         # b & c should be dirty
         b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
         self.assert_dirty_store(b)
diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp
index 8bf6eca..cd7aaa6 100644
--- a/qpid/cpp/src/tests/failover_soak.cpp
+++ b/qpid/cpp/src/tests/failover_soak.cpp
@@ -337,7 +337,6 @@
                  int verbosity,
                  int durable )
 {
-        // ("--log-enable=notice+")
     static int brokerId = 0;
     stringstream path, prefix;
     prefix << "soak-" << brokerId;
@@ -348,7 +347,8 @@
         ("--mgmt-enable=no")
         ("--log-prefix")(prefix.str())
         ("--log-to-file")(prefix.str()+".log")
-        ("--log-enable=notice+")
+        ("--log-enable=info+")
+        ("--log-enable=debug+:cluster")
         ("TMP_DATA_DIR");
 
     if (endsWith(moduleOrDir, "cluster.so")) {
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 8f94eb3..c4750a3 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -73,7 +73,9 @@
     </control>
 
     <control name="config-change" code="0x11" label="Raw cluster membership.">
-      <field name="current" type="vbin16"/> <!-- packed member-id array -->
+      <field name="members" type="vbin16"/> <!-- packed member-id array -->
+      <field name="joined" type="vbin16"/> <!-- packed member-id array -->
+      <field name="left" type="vbin16"/> <!-- packed member-id array -->
     </control>
 
     <control name="message-expired" code="0x12">