QPID-3890: resync this branch to latest trunk

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3890@1303774 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index 1cebcfc..694751c 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -31,10 +31,11 @@
 #include <unistd.h>
 #include <sys/utsname.h>
 
-using namespace std;
-using namespace qpid;
-using qpid::broker::Broker;
-using qpid::broker::Daemon;
+using std::cout;
+using std::endl;
+
+namespace qpid {
+namespace broker {
 
 BootstrapOptions::BootstrapOptions(const char* argv0)
   : qpid::Options("Options"),
@@ -197,7 +198,9 @@
     return 0;
 }
 
+}} // namespace qpid::Broker
+
 int main(int argc, char* argv[])
 {
-    return run_broker(argc, argv);
+    return qpid::broker::run_broker(argc, argv);
 }
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index f183ff8..3f854c7 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -31,9 +31,11 @@
 #include <fstream>
 #include <boost/lexical_cast.hpp>
 
+namespace qpid {
+namespace management {
+
 using namespace qpid::client;
 using namespace qpid::framing;
-using namespace qpid::management;
 using namespace qpid::sys;
 using namespace std;
 using std::stringstream;
@@ -1260,7 +1262,7 @@
              int totalSleep = 0;
              do {
                  sys::Mutex::ScopedUnlock _unlock(connLock);
-                 ::sleep(delayMin);
+                 qpid::sys::sleep(delayMin);
                  totalSleep += delayMin;
              } while (totalSleep < delay && !shutdown);
              sleeping = false;
@@ -1396,8 +1398,10 @@
             sleepTime = 1;
 
         while (totalSleep < agent.getInterval() && !shutdown) {
-            ::sleep(sleepTime);
+            qpid::sys::sleep(sleepTime);
             totalSleep += sleepTime;
         }
     }
 }
+
+}}
\ No newline at end of file
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index f311b79..ad688ba 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -32,7 +32,9 @@
 #include "qpid/sys/ExceptionHolder.h"
 #include <stdexcept>
 
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
 using namespace qpid::framing;
 using qpid::framing::Buffer;
 using qpid::framing::FieldTable;
@@ -408,3 +410,5 @@
     }
     return msg.delivered;
 }
+
+}}
\ No newline at end of file
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 7093a68..40dfba3 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -131,12 +131,10 @@
 
 void Message::encode(framing::Buffer& buffer) const
 {
-    {
-        sys::Mutex::ScopedLock l(lock);   // prevent header modifications while encoding
-        //encode method and header frames
-        EncodeFrame f1(buffer);
-        frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
-    }
+    sys::Mutex::ScopedLock l(lock);
+    //encode method and header frames
+    EncodeFrame f1(buffer);
+    frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
 
     //then encode the payload of each content frame
     framing::EncodeBody f2(buffer);
@@ -145,6 +143,7 @@
 
 void Message::encodeContent(framing::Buffer& buffer) const
 {
+    sys::Mutex::ScopedLock l(lock);
     //encode the payload of each content frame
     EncodeBody f2(buffer);
     frames.map_if(f2, TypeFilter<CONTENT_BODY>());
@@ -157,6 +156,7 @@
 
 uint32_t Message::encodedContentSize() const
 {
+    sys::Mutex::ScopedLock l(lock);
     return  frames.getContentSize();
 }
 
@@ -222,8 +222,9 @@
             store->stage(pmsg);
             staged = true;
         }
-        //ensure required credit is cached before content frames are released
+        //ensure required credit and size is cached before content frames are released
         getRequiredCredit();
+        contentSize();
         //remove any content frames from the frameset
         frames.remove(TypeFilter<CONTENT_BODY>());
         setContentReleased();
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 5f450cd..2225353 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -43,9 +43,18 @@
 const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
 
 
+/** return an iterator to the message at position, or members.end() if not found */
+MessageGroupManager::GroupState::MessageFifo::iterator
+MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position)
+{
+    MessageState mState(position);
+    MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState);
+    return (found->position == position) ? found : members.end();
+}
+
 void MessageGroupManager::unFree( const GroupState& state )
 {
-    GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+    GroupFifo::iterator pos = freeGroups.find( state.members.front().position );
     assert( pos != freeGroups.end() && pos->second == &state );
     freeGroups.erase( pos );
 }
@@ -60,8 +69,8 @@
 {
     state.owner.clear();
     assert(state.members.size());
-    assert(freeGroups.find(state.members.front()) == freeGroups.end());
-    freeGroups[state.members.front()] = &state;
+    assert(freeGroups.find(state.members.front().position) == freeGroups.end());
+    freeGroups[state.members.front().position] = &state;
 }
 
 MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
@@ -106,7 +115,8 @@
     // @todo KAG optimization - store reference to group state in QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    state.members.push_back(qm.position);
+    GroupState::MessageState mState(qm.position);
+    state.members.push_back(mState);
     uint32_t total = state.members.size();
     QPID_LOG( trace, "group queue " << qName <<
               ": added message to group id=" << state.group << " total=" << total );
@@ -123,7 +133,9 @@
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    assert(state.members.size());   // there are msgs present
+    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+    assert(m != state.members.end());
+    m->acquired = true;
     state.acquired += 1;
     QPID_LOG( trace, "group queue " << qName <<
               ": acquired message in group id=" << state.group << " acquired=" << state.acquired );
@@ -137,6 +149,9 @@
     GroupState& state = findGroup(qm);
     assert( state.acquired != 0 );
     state.acquired -= 1;
+    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+    assert(m != state.members.end());
+    m->acquired = false;
     if (state.acquired == 0 && state.owned()) {
         QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << state.owner << " released group id=" << state.group);
@@ -152,13 +167,17 @@
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    assert( state.members.size() != 0 );
-    assert( state.acquired != 0 );
-    state.acquired -= 1;
+    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+    assert(m != state.members.end());
+    if (m->acquired) {
+        assert( state.acquired != 0 );
+        state.acquired -= 1;
+    }
 
-    // likely to be at or near begin() if dequeued in order
+    // special case if qm is first (oldest) message in the group:
+    // may need to re-insert it back on the freeGroups list, as the index will change
     bool reFreeNeeded = false;
-    if (state.members.front() == qm.position) {
+    if (m == state.members.begin()) {
         if (!state.owned()) {
             // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
             // if on freelist, it is indexed by first member, which is about to be removed!
@@ -167,15 +186,7 @@
         }
         state.members.pop_front();
     } else {
-        GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
-        GroupState::PositionFifo::iterator end = state.members.end();
-        while (pos != end) {
-            if (*pos == qm.position) {
-                state.members.erase(pos);
-                break;
-            }
-            ++pos;
-        }
+        state.members.erase(m);
     }
 
     uint32_t total = state.members.size();
@@ -220,11 +231,11 @@
         GroupState& group = findGroup(next);
         if (!group.owned()) {
             //TODO: make acquire more efficient when we already have the message in question
-            if (group.members.front() == next.position && messages.acquire(next.position, next)) {    // only take from head!
+            if (group.members.front().position == next.position && messages.acquire(next.position, next)) {    // only take from head!
                 return true;
             }
             QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
-                     << "'s head message still pending. pos=" << group.members.front());
+                     << "'s head message still pending. pos=" << group.members.front().position);
         } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
             return true;
         }
@@ -284,7 +295,7 @@
         info[GROUP_TIMESTAMP] = 0;
         if (g->second.members.size() != 0) {
             QueuedMessage qm;
-            if (messages.find(g->second.members.front(), qm) &&
+            if (messages.find(g->second.members.front().position, qm) &&
                 qm.payload &&
                 qm.payload->hasProperties<framing::DeliveryProperties>()) {
                 info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
@@ -353,6 +364,7 @@
     const std::string GROUP_OWNER("owner");
     const std::string GROUP_ACQUIRED_CT("acquired-ct");
     const std::string GROUP_POSITIONS("positions");
+    const std::string GROUP_ACQUIRED_MSGS("acquired-msgs");
     const std::string GROUP_STATE("group-state");
 }
 
@@ -371,10 +383,14 @@
         group.setString(GROUP_OWNER, g->second.owner);
         group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
         framing::Array positions(TYPE_CODE_UINT32);
-        for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
-             p != g->second.members.end(); ++p)
-            positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+        for (GroupState::MessageFifo::const_iterator p = g->second.members.begin();
+             p != g->second.members.end(); ++p) {
+            positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position )));
+            acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired )));
+        }
         group.setArray(GROUP_POSITIONS, positions);
+        group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
         groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
     }
     state.setArray(GROUP_STATE, groupState);
@@ -425,13 +441,25 @@
                      qName << "\": position encoding error!");
             return;
         }
+        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+        ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
+        if (!ok || positions.count() != acquiredMsgs.count()) {
+            QPID_LOG(error, "Invalid message group state information for queue \"" <<
+                     qName << "\": acquired flag encoding error!");
+            return;
+        }
 
-        for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
-            state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
+        Array::const_iterator a = acquiredMsgs.begin();
+        for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) {
+            GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>());
+            mState.acquired = (*a++)->getIntegerValue<bool>();
+            state.members.push_back(mState);
+        }
+
         messageGroups[state.group] = state;
         if (!state.owned()) {
             assert(state.members.size());
-            freeGroups[state.members.front()] = &messageGroups[state.group];
+            freeGroups[state.members.front().position] = &messageGroups[state.group];
         }
     }
 
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index f4bffc4..340ebbc 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -45,19 +45,29 @@
 
     struct GroupState {
         // note: update getState()/setState() when changing this object's state implementation
-        typedef std::deque<framing::SequenceNumber> PositionFifo;
+
+        // track which messages are in this group, and if they have been acquired
+        struct MessageState {
+            qpid::framing::SequenceNumber position;
+            bool                    acquired;
+            MessageState() : acquired(false) {}
+            MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {}
+            bool operator<(const MessageState& b) const { return position < b.position; }
+        };
+        typedef std::deque<MessageState> MessageFifo;
 
         std::string group;  // group identifier
         std::string owner;  // consumer with outstanding acquired messages
         uint32_t acquired;  // count of outstanding acquired messages
-        PositionFifo members;   // msgs belonging to this group
+        MessageFifo members;   // msgs belonging to this group, in enqueue order
 
         GroupState() : acquired(0) {}
         bool owned() const {return !owner.empty();}
+        MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &);
     };
 
     typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
-    typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+    typedef std::map<qpid::framing::SequenceNumber, struct GroupState *> GroupFifo;
 
     GroupMap messageGroups; // index: group name
     GroupFifo freeGroups;   // ordered by oldest free msg
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3bd9233..fdd95ae 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -56,7 +56,9 @@
 #include <boost/intrusive_ptr.hpp>
 
 
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
+
 using namespace qpid::sys;
 using namespace qpid::framing;
 using qpid::management::ManagementAgent;
@@ -1469,7 +1471,7 @@
     Queue::shared_ptr queue;
 
     AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
-        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
 
     void fire()
     {
@@ -1824,3 +1826,5 @@
     parent.deleted = true;
     while (count) parent.messageLock.wait();
 }
+
+}}
\ No newline at end of file
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
index b4f7d00..90e4fa9 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -24,6 +24,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/ClusterTimerWakeupBody.h"
 #include "qpid/framing/ClusterTimerDropBody.h"
+#include "qpid/sys/ClusterSafe.h"
 
 namespace qpid {
 namespace cluster {
@@ -107,6 +108,7 @@
 // Deliver thread
 void ClusterTimer::deliverWakeup(const std::string& name) {
     QPID_LOG(trace, "Cluster timer wakeup delivered for " << name);
+    qpid::sys::assertClusterSafe();
     Map::iterator i = map.find(name);
     if (i == map.end())
         throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name));
diff --git a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
index ffa7633..8459938 100644
--- a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
+++ b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
@@ -30,6 +30,10 @@
 using std::string;
 using qpid::Exception;
 
+namespace qpid {
+namespace log {
+namespace posix {
+
 namespace {
 
 // SyslogFacilities maps from syslog values to the text equivalents.
@@ -110,10 +114,6 @@
 
 } // namespace
 
-namespace qpid {
-namespace log {
-namespace posix {
-
 std::ostream& operator<<(std::ostream& o, const SyslogFacility& f) {
     return o << SyslogFacilities().name(f.value);
 }
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 8c2cb95..cb07d5d 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -44,14 +44,15 @@
 #include <sstream>
 #include <typeinfo>
 
+namespace qpid {
+namespace management {
+
 using boost::intrusive_ptr;
 using qpid::framing::Uuid;
 using qpid::types::Variant;
 using qpid::amqp_0_10::MapCodec;
 using qpid::amqp_0_10::ListCodec;
-using qpid::sys::Mutex;
 using namespace qpid::framing;
-using namespace qpid::management;
 using namespace qpid::broker;
 using namespace qpid;
 using namespace std;
@@ -2961,9 +2962,6 @@
     return !deleteList.empty();
 }
 
-namespace qpid {
-namespace management {
-
 namespace {
 QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
 }
@@ -2977,4 +2975,4 @@
     return executionContext;
 }
 
-}}
+}}
\ No newline at end of file
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
index 2a8d971..20231bf 100644
--- a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
@@ -28,6 +28,9 @@
 #include "qpid/DataDir.h"
 #include "qpid/log/Statement.h"
 
+namespace qpid {
+namespace store {
+
 /*
  * The MessageStore pointer given to the Broker points to static storage.
  * Thus, it cannot be deleted, especially by the broker. To prevent deletion,
@@ -42,9 +45,6 @@
   };
 }
 
-namespace qpid {
-namespace store {
-
 static MessageStorePlugin static_instance_registers_plugin;
 
 
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
index 14d63a4..849a0a4 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
@@ -32,6 +32,10 @@
 #include "MessageLog.h"
 #include "Lsn.h"
 
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
 namespace {
 
 // Structures that hold log records. Each has a type field at the start.
@@ -97,10 +101,6 @@
 
 }   // namespace
 
-namespace qpid {
-namespace store {
-namespace ms_clfs {
-
 void
 MessageLog::initialize()
 {
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
index 04780e8..0ef046d 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
@@ -33,6 +33,10 @@
 #include "Transaction.h"
 #include "Lsn.h"
 
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
 namespace {
 
 // Structures that hold log records. Each has a type field at the start.
@@ -95,10 +99,6 @@
 
 }   // namespace
 
-namespace qpid {
-namespace store {
-namespace ms_clfs {
-
 void
 TransactionLog::initialize()
 {
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index c251599..01ff8b6 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -41,7 +41,9 @@
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
 
-using namespace qpid::sys;
+namespace qpid {
+namespace sys {
+namespace posix {
 
 namespace {
 
@@ -71,10 +73,6 @@
 /*
  * Asynch Acceptor
  */
-namespace qpid {
-namespace sys {
-namespace posix {
-
 class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
 public:
     AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
index b22a615..abff8a5 100644
--- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
@@ -1,6 +1,3 @@
-#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP
-#define QPID_SYS_LINUX_POLLABLECONDITION_CPP
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -120,5 +117,3 @@
 void PollableCondition::clear() { impl->clear(); }
 
 }} // namespace qpid::sys
-
-#endif  /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/
diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
index 73f1561..2a7cf16 100644
--- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -37,8 +37,9 @@
 
 #include <boost/bind.hpp>
 
-using namespace qpid::sys;
-using namespace qpid::sys::ssl;
+namespace qpid {
+namespace sys {
+namespace ssl {
 
 namespace {
 
@@ -448,3 +449,5 @@
     settings.authid = socket.getClientAuthId();
     return settings;
 }
+
+}}}
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 30378d4..fb8df5d 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -295,6 +295,8 @@
     volatile bool queuedDelete;
     // Socket close requested, but there are operations in progress.
     volatile bool queuedClose;
+    // Most recent asynch read request
+    volatile AsynchReadResult* pendingRead;
 
 private:
     // Dispatch events that have completed.
@@ -374,6 +376,7 @@
     writeInProgress(false),
     queuedDelete(false),
     queuedClose(false),
+    pendingRead(0),
     working(false) {
 }
 
@@ -504,6 +507,7 @@
             }
         }
         // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+        pendingRead = result;
     }
     else {
         notifyBuffersEmpty();
@@ -617,16 +621,17 @@
     int status = result->getStatus();
     size_t bytes = result->getTransferred();
     if (status == 0 && bytes > 0) {
-        bool restartRead = true;     // May not if receiver doesn't want more
         if (readCallback)
             readCallback(*this, result->getBuff());
-        if (restartRead)
-            startReading();
+        startReading();
     }
     else {
         // No data read, so put the buffer back. It may be partially filled,
         // so "unread" it back to the front of the queue.
         unread(result->getBuff());
+        if (queuedClose && status == ERROR_OPERATION_ABORTED) {
+            return; // Expected reap from CancelIoEx
+        }
         notifyEof();
         if (status != 0)
         {
@@ -697,8 +702,11 @@
             {
                 ScopedUnlock<Mutex> ul(completionLock);
                 AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
-                if (r != 0)
+                if (r != 0) {
                     readComplete(r);
+                    // Set pendingRead to 0 if it's still pointing to (newly completed) r
+                    InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r);
+                }
                 else {
                     AsynchWriteResult *w =
                         dynamic_cast<AsynchWriteResult*>(result);
@@ -732,6 +740,15 @@
         else if (queuedDelete)
             delete this;
     }
+    else {
+        if (queuedClose && pendingRead) {
+            // Force outstanding read to completion.  Layer above will
+            // call back.
+            CancelIoEx((HANDLE)toSocketHandle(socket),
+                       ((AsynchReadResult *)pendingRead)->overlapped());
+            pendingRead = 0;
+        }
+    }
 }
 
 } // namespace windows
diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
index 6a1d904..bb637be 100644
--- a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
@@ -1,6 +1,3 @@
-#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP
-#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -110,5 +107,3 @@
 }
 
 }} // namespace qpid::sys
-
-#endif  /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
index 1fa4768..b085f67 100644
--- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -32,6 +32,9 @@
 
 #include <winsock2.h>
 
+namespace qpid {
+namespace sys {
+
 // Need to initialize WinSock. Ideally, this would be a singleton or embedded
 // in some one-time initialization function. I tried boost singleton and could
 // not get it to compile (and others located in google had the same problem).
@@ -76,13 +79,6 @@
 
 static WinSockSetup setup;
 
-} /* namespace */
-
-namespace qpid {
-namespace sys {
-
-namespace {
-
 std::string getName(SOCKET fd, bool local)
 {
     ::sockaddr_storage name_s; // big enough for any socket address
diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
index 11a3389..25cc94b 100644
--- a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
@@ -38,6 +38,10 @@
 #include <queue>
 #include <boost/bind.hpp>
 
+namespace qpid {
+namespace sys {
+namespace windows {
+
 namespace {
 
     /*
@@ -66,10 +70,6 @@
     };
 }
 
-namespace qpid {
-namespace sys {
-namespace windows {
-
 SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s,
                          CredHandle hCred,
                          ReadCallback rCb,
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index a0e329c..b5686c6 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -29,6 +29,9 @@
 #include <memory>
 using namespace std;
 
+namespace qpid {
+namespace broker {
+
 auto_ptr<QpiddOptions> options;
 
 // Broker real entry; various system-invoked entrypoints call here.
@@ -87,3 +90,4 @@
     }
     return 1;
 }
+}}
diff --git a/qpid/cpp/src/qpidd.h b/qpid/cpp/src/qpidd.h
index a3150a2..f7f84d1 100644
--- a/qpid/cpp/src/qpidd.h
+++ b/qpid/cpp/src/qpidd.h
@@ -26,6 +26,9 @@
 
 #include <memory>
 
+namespace qpid {
+namespace broker {
+
 // BootstrapOptions is a minimal subset of options used for a pre-parse
 // of the command line to discover which plugin modules need to be loaded.
 // The pre-parse is necessary because plugin modules may supply their own
@@ -70,4 +73,5 @@
 // Broker real entry; various system-invoked entrypoints call here.
 int run_broker(int argc, char *argv[], bool hidden = false);
 
+}}
 #endif  /*!QPID_H*/
diff --git a/qpid/cpp/src/tests/RefCounted.cpp b/qpid/cpp/src/tests/RefCounted.cpp
index e4c1da5..3ac3895 100644
--- a/qpid/cpp/src/tests/RefCounted.cpp
+++ b/qpid/cpp/src/tests/RefCounted.cpp
@@ -21,15 +21,15 @@
 
 #include "unit_test.h"
 
+namespace qpid {
+namespace tests {
+
 QPID_AUTO_TEST_SUITE(RefCountedTestSuiteTestSuite)
 
 using boost::intrusive_ptr;
 using namespace std;
 using namespace qpid;
 
-namespace qpid {
-namespace tests {
-
 struct CountMe : public RefCounted {
     static int instances;
     CountMe() { ++instances; }
diff --git a/qpid/cpp/src/tests/StringUtils.cpp b/qpid/cpp/src/tests/StringUtils.cpp
index 6a19119..c50287a 100644
--- a/qpid/cpp/src/tests/StringUtils.cpp
+++ b/qpid/cpp/src/tests/StringUtils.cpp
@@ -23,9 +23,11 @@
 
 #include "unit_test.h"
 
+namespace qpid {
+namespace tests {
+
 QPID_AUTO_TEST_SUITE(StringUtilsTestSuite)
 
-using namespace qpid;
 using std::string;
 
 QPID_AUTO_TEST_CASE(testSplit_general)
@@ -75,3 +77,5 @@
 }
 
 QPID_AUTO_TEST_SUITE_END()
+
+}}
diff --git a/qpid/cpp/src/tests/cli_tests.py b/qpid/cpp/src/tests/cli_tests.py
index b9a7dda..7ac5b1d 100755
--- a/qpid/cpp/src/tests/cli_tests.py
+++ b/qpid/cpp/src/tests/cli_tests.py
@@ -330,6 +330,9 @@
         ret = os.system(self.qpid_config_command(" add queue %s --alternate-exchange=%s" % (qName, altName)))
         self.assertEqual(ret, 0)
 
+        ret = os.system(self.qpid_config_command(" queues"))
+        self.assertEqual(ret, 0)
+
         queues = self.broker_access.getAllQueues()
         found = False
         for queue in queues:
diff --git a/qpid/cpp/src/tests/qpid-ping.cpp b/qpid/cpp/src/tests/qpid-ping.cpp
index 0cb4afa..5233149 100644
--- a/qpid/cpp/src/tests/qpid-ping.cpp
+++ b/qpid/cpp/src/tests/qpid-ping.cpp
@@ -32,11 +32,20 @@
 #include <string>
 #include <iostream>
 
-using namespace std;
-using namespace qpid::sys;
-using namespace qpid::framing;
-using namespace qpid::client;
-using namespace qpid;
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::exception;
+using std::string;
+using namespace qpid::client::arg; // For keyword args
+using qpid::client::AsyncSession;
+using qpid::client::Connection;
+using qpid::client::Message;
+using qpid::client::SubscriptionManager;
+using qpid::framing::Uuid;
+
+namespace qpid {
+namespace tests {
 
 struct PingOptions : public qpid::TestOptions {
     int timeout;                // Timeout in seconds.
@@ -48,9 +57,11 @@
     }
 };
 
+}} // namespace qpid::tests
+
 int main(int argc, char** argv) {
     try {
-        PingOptions opts;
+        qpid::tests::PingOptions opts;
         opts.parse(argc, argv);
         opts.con.heartbeat = (opts.timeout+1)/2;
         Connection connection;
@@ -58,8 +69,8 @@
         if (!opts.quiet) cout << "Opened connection." << endl;
         AsyncSession s = connection.newSession();
         string qname(Uuid(true).str());
-        s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true);
-        s.messageTransfer(arg::content=Message("hello", qname));
+        s.queueDeclare(queue=qname, autoDelete=true, exclusive=true);
+        s.messageTransfer(content=Message("hello", qname));
         if (!opts.quiet) cout << "Sent message." << endl;
         SubscriptionManager subs(s);
         subs.get(qname);
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index b1213a4..91eef0c 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -36,15 +36,26 @@
 #include <iostream>
 #include <memory>
 
-using namespace std;
-using namespace qpid::messaging;
-using namespace qpid::types;
+using std::string;
+using std::ios_base;
 
-typedef std::vector<std::string> string_vector;
+using qpid::messaging::Address;
+using qpid::messaging::Connection;
+using qpid::messaging::Duration;
+using qpid::messaging::FailoverUpdates;
+using qpid::messaging::Message;
+using qpid::messaging::Receiver;
+using qpid::messaging::Session;
+using qpid::messaging::Sender;
+using qpid::types::Exception;
+using qpid::types::Uuid;
+using qpid::types::Variant;
 
 namespace qpid {
 namespace tests {
 
+typedef std::vector<std::string> string_vector;
+
 struct Options : public qpid::Options
 {
     bool help;
@@ -223,10 +234,6 @@
 const string SN("sn");
 const string TS("ts");
 
-}} // namespace qpid::tests
-
-using namespace qpid::tests;
-
 class ContentGenerator {
   public:
     virtual ~ContentGenerator() {}
@@ -329,6 +336,20 @@
     }
 };
 
+}} // namespace qpid::tests
+
+using qpid::tests::Options;
+using qpid::tests::Reporter;
+using qpid::tests::Throughput;
+using qpid::tests::ContentGenerator;
+using qpid::tests::GroupGenerator;
+using qpid::tests::GetlineContentGenerator;
+using qpid::tests::MapContentGenerator;
+using qpid::tests::FixedContentGenerator;
+using qpid::tests::SN;
+using qpid::tests::TS;
+using qpid::tests::EOS;
+
 int main(int argc, char ** argv)
 {
     Connection connection;
diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp
index 98520b4..e6010a8 100644
--- a/qpid/cpp/src/tests/testagent.cpp
+++ b/qpid/cpp/src/tests/testagent.cpp
@@ -36,9 +36,12 @@
 
 #include <sstream>
 
+namespace qpid {
+namespace tests {
+
 static bool running = true;
 
-using namespace std;
+using std::string;
 using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
@@ -191,12 +194,14 @@
     return 0;
 }
 
+}} // namespace qpid::tests
+
 int main(int argc, char** argv)
 {
     try {
-        return main_int(argc, argv);
+        return qpid::tests::main_int(argc, argv);
     } catch(std::exception& e) {
-        cerr << "Top Level Exception: " << e.what() << endl;
+        std::cerr << "Top Level Exception: " << e.what() << std::endl;
         return 1;
     }
 }
diff --git a/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp b/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp
index 024f20b..14f1e46 100644
--- a/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp
+++ b/qpid/cpp/src/tests/windows/DisableWin32ErrorWindows.cpp
@@ -32,7 +32,9 @@
 #include <windows.h>
 #include <iostream>
 
-namespace {
+namespace qpid {
+namespace tests {
+namespace windows {
 
 // Instead of popping up a window for exceptions, just print something out
 LONG _stdcall UnhandledExceptionFilter (PEXCEPTION_POINTERS pExceptionInfo)
@@ -73,4 +75,4 @@
     SetUnhandledExceptionFilter (&UnhandledExceptionFilter);
 }
 
-}  // namespace
+}}}  // namespace
diff --git a/qpid/cpp/src/windows/QpiddBroker.cpp b/qpid/cpp/src/windows/QpiddBroker.cpp
index 42ba97b..e73fcf0 100644
--- a/qpid/cpp/src/windows/QpiddBroker.cpp
+++ b/qpid/cpp/src/windows/QpiddBroker.cpp
@@ -32,7 +32,8 @@
 #include <iostream>
 #include <windows.h>
 
-using namespace qpid::broker;
+namespace qpid {
+namespace broker {
 
 BootstrapOptions::BootstrapOptions(const char* argv0)
   : qpid::Options("Options"),
@@ -451,6 +452,7 @@
     return 0;
 }
 
+}} // namespace qpid::broker
 
 int main(int argc, char* argv[])
 {
@@ -459,13 +461,13 @@
     // the service is stopped.
     SERVICE_TABLE_ENTRY dispatchTable[] =
     {
-        { "", (LPSERVICE_MAIN_FUNCTION)ServiceMain },
+        { "", (LPSERVICE_MAIN_FUNCTION)qpid::broker::ServiceMain },
         { NULL, NULL }
     };
     if (!StartServiceCtrlDispatcher(dispatchTable)) {
         DWORD err = ::GetLastError();
         if (err == ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) // Run as console
-            return run_broker(argc, argv);
+            return qpid::broker::run_broker(argc, argv);
         throw QPID_WINDOWS_ERROR(err);
     }
     return 0;
diff --git a/qpid/cpp/src/windows/SCM.cpp b/qpid/cpp/src/windows/SCM.cpp
index 232bb04..2eeb143 100644
--- a/qpid/cpp/src/windows/SCM.cpp
+++ b/qpid/cpp/src/windows/SCM.cpp
@@ -1,332 +1,332 @@
-/*

- *

- * Licensed to the Apache Software Foundation (ASF) under one

- * or more contributor license agreements.  See the NOTICE file

- * distributed with this work for additional information

- * regarding copyright ownership.  The ASF licenses this file

- * to you under the Apache License, Version 2.0 (the

- * "License"); you may not use this file except in compliance

- * with the License.  You may obtain a copy of the License at

- *

- *   http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing,

- * software distributed under the License is distributed on an

- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

- * KIND, either express or implied.  See the License for the

- * specific language governing permissions and limitations

- * under the License.

- *

- */

-

-#include "qpid/log/Statement.h"

-#include "qpid/sys/windows/check.h"

-#include "SCM.h"

-

-#pragma comment(lib, "advapi32.lib")

-

-namespace {

-

-// Container that will close a SC_HANDLE upon destruction.

-class AutoServiceHandle {

-public:

-    AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {}

-    ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); }

-    void release() { h = NULL; }

-    void reset(SC_HANDLE newHandle)

-    {

-        if (h != NULL)

-            ::CloseServiceHandle(h);

-        h = newHandle;

-    }

-    operator SC_HANDLE() const { return h; }

-

-private:

-    SC_HANDLE h;

-};

-

-}

-

-namespace qpid {

-namespace windows {

-

-SCM::SCM() : scmHandle(NULL)

-{

-}

-

-SCM::~SCM()

-{

-    if (NULL != scmHandle)

-        ::CloseServiceHandle(scmHandle);

-}

-

-/**

-  * Install this executable as a service

-  */

-void SCM::install(const string& serviceName,

-                  const string& serviceDesc,

-                  const string& args,

-                  DWORD startType,

-                  const string& account,

-                  const string& password,

-                  const string& depends)

-{

-    // Handle dependent service name list; Windows wants a set of nul-separated

-    // names ending with a double nul.

-    string depends2 = depends;

-    if (!depends2.empty()) {

-        // CDL to null delimiter w/ trailing double null

-        size_t p = 0;

-        while ((p = depends2.find_first_of( ',', p)) != string::npos)

-            depends2.replace(p, 1, 1, '\0');

-        depends2.push_back('\0');

-        depends2.push_back('\0');

-    }

-

-#if 0

-    // I'm nervous about adding a user/password check here. Is this a

-    // potential attack vector, letting users check passwords without

-    // control?   -Steve Huston, Feb 24, 2011

-

-    // Validate account, password

-    HANDLE hToken = NULL;

-    bool logStatus = false;

-    if (!account.empty() && !password.empty() &&

-        !(logStatus = ::LogonUserA(account.c_str(),

-                                   "",

-                                   password.c_str(),

-                                   LOGON32_LOGON_NETWORK,

-                                   LOGON32_PROVIDER_DEFAULT,

-                                   &hToken ) != 0))

-        std::cout << "warning: supplied account & password failed with LogonUser." << std::endl;

-    if (logStatus)

-        ::CloseHandle(hToken);

-#endif

-

-    // Get fully qualified .exe name

-    char myPath[MAX_PATH];

-    DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH);

-    QPID_WINDOWS_CHECK_NOT(myPathLength, 0);

-    string imagePath(myPath, myPathLength);

-    if (!args.empty())

-        imagePath += " " + args;

-

-    // Ensure there's a handle to the SCM database.

-    openSvcManager();

-

-    // Create the service

-    SC_HANDLE svcHandle;

-    svcHandle = ::CreateService(scmHandle,                 // SCM database

-                                serviceName.c_str(),       // name of service

-                                serviceDesc.c_str(),       // name to display

-                                SERVICE_ALL_ACCESS,        // desired access

-                                SERVICE_WIN32_OWN_PROCESS, // service type

-                                startType,                 // start type

-                                SERVICE_ERROR_NORMAL,      // error cntrl type

-                                imagePath.c_str(),         // path to service's binary w/ optional arguments

-                                NULL,                      // no load ordering group

-                                NULL,                      // no tag identifier

-                                depends2.empty() ? NULL : depends2.c_str(),

-                                account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem

-                                password.empty() ? NULL : password.c_str()); // password, or NULL for none

-    QPID_WINDOWS_CHECK_NULL(svcHandle);

-    ::CloseServiceHandle(svcHandle);

-    QPID_LOG(info, "Service installed successfully");

-}

-

-/**

-  *

-  */

-void SCM::uninstall(const string& serviceName)

-{

-    // Ensure there's a handle to the SCM database.

-    openSvcManager();

-    AutoServiceHandle svc(::OpenService(scmHandle,

-                                        serviceName.c_str(),

-                                        DELETE));

-    QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc);

-    QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0);

-    QPID_LOG(info, "Service deleted successfully.");

-}

-

-/**

-  * Attempt to start the service.

-  */

-void SCM::start(const string& serviceName)

-{

-    // Ensure we have a handle to the SCM database.

-    openSvcManager();

-

-    // Get a handle to the service.

-    AutoServiceHandle svc(::OpenService(scmHandle,

-                                        serviceName.c_str(),

-                                        SERVICE_ALL_ACCESS));

-    QPID_WINDOWS_CHECK_NULL(svc);

-

-    // Check the status in case the service is not stopped.

-    DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);

-    if (state == SERVICE_STOP_PENDING)

-        throw qpid::Exception("Timed out waiting for running service to stop.");

-

-    // Attempt to start the service.

-    QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0);

-

-    QPID_LOG(info, "Service start pending...");

-

-    // Check the status until the service is no longer start pending.

-    state = waitForStateChangeFrom(svc, SERVICE_START_PENDING);

-    // Determine whether the service is running.

-    if (state == SERVICE_RUNNING) {

-        QPID_LOG(info, "Service started successfully");

-    }

-    else {

-        throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state));

-    }

-}

-

-/**

-  *

-  */

-void SCM::stop(const string& serviceName)

-{

-    // Ensure a handle to the SCM database.

-    openSvcManager();

-

-    // Get a handle to the service.

-    AutoServiceHandle svc(::OpenService(scmHandle,

-                                        serviceName.c_str(),

-                                        SERVICE_STOP | SERVICE_QUERY_STATUS |

-                                        SERVICE_ENUMERATE_DEPENDENTS));

-    QPID_WINDOWS_CHECK_NULL(svc);

-

-    // Make sure the service is not already stopped; if it's stop-pending,

-    // wait for it to finalize.

-    DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);

-    if (state == SERVICE_STOPPED) {

-        QPID_LOG(info, "Service is already stopped");

-        return;

-    }

-

-    // If the service is running, dependencies must be stopped first.

-    std::auto_ptr<ENUM_SERVICE_STATUS> deps;

-    DWORD numDeps = getDependentServices(svc, deps);

-    for (DWORD i = 0; i < numDeps; i++)

-        stop(deps.get()[i].lpServiceName);

-

-    // Dependents stopped; send a stop code to the service.

-    SERVICE_STATUS_PROCESS ssp;

-    if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp))

-        throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " <<

-                                       qpid::sys::strError(::GetLastError())));

-

-    // Wait for the service to stop.

-    state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);

-    if (state == SERVICE_STOPPED)

-        QPID_LOG(info, QPID_MSG("Service " << serviceName <<

-                                " stopped successfully."));

-}

-

-/**

-  *

-  */

-void SCM::openSvcManager()

-{

-    if (NULL != scmHandle)

-        return;

-

-    scmHandle = ::OpenSCManager(NULL,    // local computer

-                                NULL,    // ServicesActive database

-                                SC_MANAGER_ALL_ACCESS); // Rights

-    QPID_WINDOWS_CHECK_NULL(scmHandle);

-}

-

-DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState)

-{

-    SERVICE_STATUS_PROCESS ssStatus;

-    DWORD bytesNeeded;

-    DWORD waitTime;

-    if (!::QueryServiceStatusEx(svc,                    // handle to service

-                                SC_STATUS_PROCESS_INFO, // information level

-                                (LPBYTE)&ssStatus,      // address of structure

-                                sizeof(ssStatus),       // size of structure

-                                &bytesNeeded))          // size needed if buffer is too small

-        throw QPID_WINDOWS_ERROR(::GetLastError());

-

-    // Save the tick count and initial checkpoint.

-    DWORD startTickCount = ::GetTickCount();

-    DWORD oldCheckPoint = ssStatus.dwCheckPoint;

-

-    // Wait for the service to change out of the noted state.

-    while (ssStatus.dwCurrentState == originalState) {

-        // Do not wait longer than the wait hint. A good interval is

-        // one-tenth of the wait hint but not less than 1 second

-        // and not more than 10 seconds.

-        waitTime = ssStatus.dwWaitHint / 10;

-        if (waitTime < 1000)

-            waitTime = 1000;

-        else if (waitTime > 10000)

-            waitTime = 10000;

-

-        ::Sleep(waitTime);

-

-        // Check the status until the service is no longer stop pending.

-        if (!::QueryServiceStatusEx(svc,

-                                    SC_STATUS_PROCESS_INFO,

-                                    (LPBYTE) &ssStatus,

-                                    sizeof(ssStatus),

-                                    &bytesNeeded))

-            throw QPID_WINDOWS_ERROR(::GetLastError());

-

-        if (ssStatus.dwCheckPoint > oldCheckPoint) {

-            // Continue to wait and check.

-            startTickCount = ::GetTickCount();

-            oldCheckPoint = ssStatus.dwCheckPoint;

-        } else {

-            if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint)

-                break;

-        }

-    }

-    return ssStatus.dwCurrentState;

-}

-

-/**

-  * Get the services that depend on @arg svc.  All dependent service info

-  * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps.

-  *

-  * @retval The number of dependent services.

-  */

-DWORD SCM::getDependentServices(SC_HANDLE svc,

-                                std::auto_ptr<ENUM_SERVICE_STATUS>& deps)

-{

-    DWORD bytesNeeded;

-    DWORD numEntries;

-

-    // Pass a zero-length buffer to get the required buffer size.

-    if (::EnumDependentServices(svc,

-                                SERVICE_ACTIVE, 

-                                0,

-                                0,

-                                &bytesNeeded,

-                                &numEntries)) {

-        // If the Enum call succeeds, then there are no dependent

-        // services, so do nothing.

-        return 0;

-    }

-

-    if (::GetLastError() != ERROR_MORE_DATA)

-        throw QPID_WINDOWS_ERROR((::GetLastError()));

-

-    // Allocate a buffer for the dependencies.

-    deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded]));

-    // Enumerate the dependencies.

-    if (!::EnumDependentServices(svc,

-                                 SERVICE_ACTIVE,

-                                 deps.get(),

-                                 bytesNeeded,

-                                 &bytesNeeded,

-                                 &numEntries))

-        throw QPID_WINDOWS_ERROR((::GetLastError()));

-    return numEntries;

-}

-

-} }   // namespace qpid::windows

+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/sys/windows/check.h"
+#include "SCM.h"
+
+#pragma comment(lib, "advapi32.lib")
+
+namespace qpid {
+namespace windows {
+
+namespace {
+
+// Container that will close a SC_HANDLE upon destruction.
+class AutoServiceHandle {
+public:
+    AutoServiceHandle(SC_HANDLE h_ = NULL) : h(h_) {}
+    ~AutoServiceHandle() { if (h != NULL) ::CloseServiceHandle(h); }
+    void release() { h = NULL; }
+    void reset(SC_HANDLE newHandle)
+    {
+        if (h != NULL)
+            ::CloseServiceHandle(h);
+        h = newHandle;
+    }
+    operator SC_HANDLE() const { return h; }
+
+private:
+    SC_HANDLE h;
+};
+
+}
+
+SCM::SCM() : scmHandle(NULL)
+{
+}
+
+SCM::~SCM()
+{
+    if (NULL != scmHandle)
+        ::CloseServiceHandle(scmHandle);
+}
+
+/**
+  * Install this executable as a service
+  */
+void SCM::install(const string& serviceName,
+                  const string& serviceDesc,
+                  const string& args,
+                  DWORD startType,
+                  const string& account,
+                  const string& password,
+                  const string& depends)
+{
+    // Handle dependent service name list; Windows wants a set of nul-separated
+    // names ending with a double nul.
+    string depends2 = depends;
+    if (!depends2.empty()) {
+        // CDL to null delimiter w/ trailing double null
+        size_t p = 0;
+        while ((p = depends2.find_first_of( ',', p)) != string::npos)
+            depends2.replace(p, 1, 1, '\0');
+        depends2.push_back('\0');
+        depends2.push_back('\0');
+    }
+
+#if 0
+    // I'm nervous about adding a user/password check here. Is this a
+    // potential attack vector, letting users check passwords without
+    // control?   -Steve Huston, Feb 24, 2011
+
+    // Validate account, password
+    HANDLE hToken = NULL;
+    bool logStatus = false;
+    if (!account.empty() && !password.empty() &&
+        !(logStatus = ::LogonUserA(account.c_str(),
+                                   "",
+                                   password.c_str(),
+                                   LOGON32_LOGON_NETWORK,
+                                   LOGON32_PROVIDER_DEFAULT,
+                                   &hToken ) != 0))
+        std::cout << "warning: supplied account & password failed with LogonUser." << std::endl;
+    if (logStatus)
+        ::CloseHandle(hToken);
+#endif
+
+    // Get fully qualified .exe name
+    char myPath[MAX_PATH];
+    DWORD myPathLength = ::GetModuleFileName(NULL, myPath, MAX_PATH);
+    QPID_WINDOWS_CHECK_NOT(myPathLength, 0);
+    string imagePath(myPath, myPathLength);
+    if (!args.empty())
+        imagePath += " " + args;
+
+    // Ensure there's a handle to the SCM database.
+    openSvcManager();
+
+    // Create the service
+    SC_HANDLE svcHandle;
+    svcHandle = ::CreateService(scmHandle,                 // SCM database
+                                serviceName.c_str(),       // name of service
+                                serviceDesc.c_str(),       // name to display
+                                SERVICE_ALL_ACCESS,        // desired access
+                                SERVICE_WIN32_OWN_PROCESS, // service type
+                                startType,                 // start type
+                                SERVICE_ERROR_NORMAL,      // error cntrl type
+                                imagePath.c_str(),         // path to service's binary w/ optional arguments
+                                NULL,                      // no load ordering group
+                                NULL,                      // no tag identifier
+                                depends2.empty() ? NULL : depends2.c_str(),
+                                account.empty() ? NULL : account.c_str(), // account name, or NULL for LocalSystem
+                                password.empty() ? NULL : password.c_str()); // password, or NULL for none
+    QPID_WINDOWS_CHECK_NULL(svcHandle);
+    ::CloseServiceHandle(svcHandle);
+    QPID_LOG(info, "Service installed successfully");
+}
+
+/**
+  *
+  */
+void SCM::uninstall(const string& serviceName)
+{
+    // Ensure there's a handle to the SCM database.
+    openSvcManager();
+    AutoServiceHandle svc(::OpenService(scmHandle,
+                                        serviceName.c_str(),
+                                        DELETE));
+    QPID_WINDOWS_CHECK_NULL((SC_HANDLE)svc);
+    QPID_WINDOWS_CHECK_NOT(::DeleteService(svc), 0);
+    QPID_LOG(info, "Service deleted successfully.");
+}
+
+/**
+  * Attempt to start the service.
+  */
+void SCM::start(const string& serviceName)
+{
+    // Ensure we have a handle to the SCM database.
+    openSvcManager();
+
+    // Get a handle to the service.
+    AutoServiceHandle svc(::OpenService(scmHandle,
+                                        serviceName.c_str(),
+                                        SERVICE_ALL_ACCESS));
+    QPID_WINDOWS_CHECK_NULL(svc);
+
+    // Check the status in case the service is not stopped.
+    DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+    if (state == SERVICE_STOP_PENDING)
+        throw qpid::Exception("Timed out waiting for running service to stop.");
+
+    // Attempt to start the service.
+    QPID_WINDOWS_CHECK_NOT(::StartService(svc, 0, NULL), 0);
+
+    QPID_LOG(info, "Service start pending...");
+
+    // Check the status until the service is no longer start pending.
+    state = waitForStateChangeFrom(svc, SERVICE_START_PENDING);
+    // Determine whether the service is running.
+    if (state == SERVICE_RUNNING) {
+        QPID_LOG(info, "Service started successfully");
+    }
+    else {
+        throw qpid::Exception(QPID_MSG("Service not yet running; state now " << state));
+    }
+}
+
+/**
+  *
+  */
+void SCM::stop(const string& serviceName)
+{
+    // Ensure a handle to the SCM database.
+    openSvcManager();
+
+    // Get a handle to the service.
+    AutoServiceHandle svc(::OpenService(scmHandle,
+                                        serviceName.c_str(),
+                                        SERVICE_STOP | SERVICE_QUERY_STATUS |
+                                        SERVICE_ENUMERATE_DEPENDENTS));
+    QPID_WINDOWS_CHECK_NULL(svc);
+
+    // Make sure the service is not already stopped; if it's stop-pending,
+    // wait for it to finalize.
+    DWORD state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+    if (state == SERVICE_STOPPED) {
+        QPID_LOG(info, "Service is already stopped");
+        return;
+    }
+
+    // If the service is running, dependencies must be stopped first.
+    std::auto_ptr<ENUM_SERVICE_STATUS> deps;
+    DWORD numDeps = getDependentServices(svc, deps);
+    for (DWORD i = 0; i < numDeps; i++)
+        stop(deps.get()[i].lpServiceName);
+
+    // Dependents stopped; send a stop code to the service.
+    SERVICE_STATUS_PROCESS ssp;
+    if (!::ControlService(svc, SERVICE_CONTROL_STOP, (LPSERVICE_STATUS)&ssp))
+        throw qpid::Exception(QPID_MSG("Stopping " << serviceName << ": " <<
+                                       qpid::sys::strError(::GetLastError())));
+
+    // Wait for the service to stop.
+    state = waitForStateChangeFrom(svc, SERVICE_STOP_PENDING);
+    if (state == SERVICE_STOPPED)
+        QPID_LOG(info, QPID_MSG("Service " << serviceName <<
+                                " stopped successfully."));
+}
+
+/**
+  *
+  */
+void SCM::openSvcManager()
+{
+    if (NULL != scmHandle)
+        return;
+
+    scmHandle = ::OpenSCManager(NULL,    // local computer
+                                NULL,    // ServicesActive database
+                                SC_MANAGER_ALL_ACCESS); // Rights
+    QPID_WINDOWS_CHECK_NULL(scmHandle);
+}
+
+DWORD SCM::waitForStateChangeFrom(SC_HANDLE svc, DWORD originalState)
+{
+    SERVICE_STATUS_PROCESS ssStatus;
+    DWORD bytesNeeded;
+    DWORD waitTime;
+    if (!::QueryServiceStatusEx(svc,                    // handle to service
+                                SC_STATUS_PROCESS_INFO, // information level
+                                (LPBYTE)&ssStatus,      // address of structure
+                                sizeof(ssStatus),       // size of structure
+                                &bytesNeeded))          // size needed if buffer is too small
+        throw QPID_WINDOWS_ERROR(::GetLastError());
+
+    // Save the tick count and initial checkpoint.
+    DWORD startTickCount = ::GetTickCount();
+    DWORD oldCheckPoint = ssStatus.dwCheckPoint;
+
+    // Wait for the service to change out of the noted state.
+    while (ssStatus.dwCurrentState == originalState) {
+        // Do not wait longer than the wait hint. A good interval is
+        // one-tenth of the wait hint but not less than 1 second
+        // and not more than 10 seconds.
+        waitTime = ssStatus.dwWaitHint / 10;
+        if (waitTime < 1000)
+            waitTime = 1000;
+        else if (waitTime > 10000)
+            waitTime = 10000;
+
+        ::Sleep(waitTime);
+
+        // Check the status until the service is no longer stop pending.
+        if (!::QueryServiceStatusEx(svc,
+                                    SC_STATUS_PROCESS_INFO,
+                                    (LPBYTE) &ssStatus,
+                                    sizeof(ssStatus),
+                                    &bytesNeeded))
+            throw QPID_WINDOWS_ERROR(::GetLastError());
+
+        if (ssStatus.dwCheckPoint > oldCheckPoint) {
+            // Continue to wait and check.
+            startTickCount = ::GetTickCount();
+            oldCheckPoint = ssStatus.dwCheckPoint;
+        } else {
+            if ((::GetTickCount() - startTickCount) > ssStatus.dwWaitHint)
+                break;
+        }
+    }
+    return ssStatus.dwCurrentState;
+}
+
+/**
+  * Get the services that depend on @arg svc.  All dependent service info
+  * is returned in an array of ENUM_SERVICE_STATUS structures via @arg deps.
+  *
+  * @retval The number of dependent services.
+  */
+DWORD SCM::getDependentServices(SC_HANDLE svc,
+                                std::auto_ptr<ENUM_SERVICE_STATUS>& deps)
+{
+    DWORD bytesNeeded;
+    DWORD numEntries;
+
+    // Pass a zero-length buffer to get the required buffer size.
+    if (::EnumDependentServices(svc,
+                                SERVICE_ACTIVE, 
+                                0,
+                                0,
+                                &bytesNeeded,
+                                &numEntries)) {
+        // If the Enum call succeeds, then there are no dependent
+        // services, so do nothing.
+        return 0;
+    }
+
+    if (::GetLastError() != ERROR_MORE_DATA)
+        throw QPID_WINDOWS_ERROR((::GetLastError()));
+
+    // Allocate a buffer for the dependencies.
+    deps.reset((LPENUM_SERVICE_STATUS)(new char[bytesNeeded]));
+    // Enumerate the dependencies.
+    if (!::EnumDependentServices(svc,
+                                 SERVICE_ACTIVE,
+                                 deps.get(),
+                                 bytesNeeded,
+                                 &bytesNeeded,
+                                 &numEntries))
+        throw QPID_WINDOWS_ERROR((::GetLastError()));
+    return numEntries;
+}
+
+} }   // namespace qpid::windows
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index fe0ca63..ec9eacb 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -35,6 +35,7 @@
 geronimo-openejb=lib/geronimo-ejb_3.0_spec-1.0.1.jar
 
 junit=lib/junit-3.8.1.jar
+mockito-all=lib/mockito-all-1.9.0.jar
 
 log4j=lib/log4j-1.2.12.jar
 
@@ -58,7 +59,7 @@
 
 broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs}
 
-junit-toolkit.libs=${log4j} ${junit} ${slf4j-api}
+junit-toolkit.libs=${log4j} ${junit} ${slf4j-api} ${mockito-all}
 test.libs=${slf4j-log4j} ${junit-toolkit.libs}
 
 ibm-icu=lib/com.ibm.icu_3.8.1.v20080530.jar
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 56ee56d..a1a06c5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -327,6 +327,8 @@
             }
         }
 
+        _conn.setClosed();
+
         ExceptionListener listener = _conn.getExceptionListenerNoCheck();
         if (listener == null)
         {
diff --git a/qpid/java/lib/mockito-all-1.9.0.jar b/qpid/java/lib/mockito-all-1.9.0.jar
new file mode 100644
index 0000000..273fd50
--- /dev/null
+++ b/qpid/java/lib/mockito-all-1.9.0.jar
Binary files differ
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
index 6b83929..5b3bca7 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
@@ -25,6 +25,7 @@
 
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.management.jmx.ManagedConnectionMBeanTest;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.transport.ConnectionException;
@@ -62,10 +63,13 @@
     {
         final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQConnectionClosedException.class;
 
+        assertConnectionOpen();
+
         stopBroker();
 
         JMSException exception = _recordingExceptionListener.awaitException(10000);
         assertConnectionCloseWasReported(exception, expectedLinkedException);
+        assertConnectionClosed();
 
         ensureCanCloseWithoutException();
     }
@@ -79,10 +83,13 @@
             return;
         }
 
+        assertConnectionOpen();
+
         killBroker();
 
         JMSException exception = _recordingExceptionListener.awaitException(10000);
         assertConnectionCloseWasReported(exception, expectedLinkedException);
+        assertConnectionClosed();
 
         ensureCanCloseWithoutException();
     }
@@ -107,6 +114,16 @@
         assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass());
     }
 
+    private void assertConnectionClosed()
+    {
+        assertTrue("Connection should be marked as closed", ((AMQConnection)_connection).isClosed());
+    }
+
+    private void assertConnectionOpen()
+    {
+        assertFalse("Connection should not be marked as closed", ((AMQConnection)_connection).isClosed());
+    }
+
     private final class RecordingExceptionListener implements ExceptionListener
     {
         private final CountDownLatch _exceptionReceivedLatch = new CountDownLatch(1);
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index 18fb0a6..ace7611 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -1122,6 +1122,70 @@
         snd.close()
 
 
+    def test_ttl_expire(self):
+        """ Verify that expired (TTL) group messages are skipped correctly
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
+
+        groups = ["A","B","C","A","B","C"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            if m.properties['THE-GROUP'] == 'B':
+                m.ttl = 1;
+            snd.send(m)
+
+        sleep(2)  # let all B's expire
+
+        # create consumers on separate sessions: C1,C2
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
+        s2 = self.setup_session()
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
+
+        # C1 should acquire A-0, then C2 should acquire C-2, Group B should
+        # expire and never be fetched
+
+        m1 = c1.fetch(0);
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        m2 = c2.fetch(0);
+        assert m2.properties['THE-GROUP'] == 'C'
+        assert m2.content['index'] == 2
+
+        m1 = c1.fetch(0);
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 3
+
+        m2 = c2.fetch(0);
+        assert m2.properties['THE-GROUP'] == 'C'
+        assert m2.content['index'] == 5
+
+        # there should be no more left for either consumer
+        try:
+            mx = c1.fetch(0)
+            assert False     # should never get here
+        except Empty:
+            pass
+        try:
+            mx = c2.fetch(0)
+            assert False     # should never get here
+        except Empty:
+            pass
+
+        c1.session.acknowledge()
+        c2.session.acknowledge()
+        c1.close()
+        c2.close()
+        snd.close()
+
+
 class StickyConsumerMsgGroupTests(Base):
     """
     Tests for the behavior of sticky-consumer message groups.  These tests
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index 896ae89..1308df7 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -481,7 +481,7 @@
                 if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY],
                 if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
                 if q.altExchange:
-                    print "--alternate-exchange=%s" % q._altExchange_.name,
+                    print "--alternate-exchange=%s" % q.altExchange,
                 if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
                 if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
                 if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],