QPID-7306: Fix race conditions during Queue destruction.

Stack traces indicate a Queue was being destroyed concurrently while still in
use by its ManagedObject.

ManagedObject holds a plain pointer to the Manageable object (e.g. Queue) it
belongs to.  The Manageable calls ManagedObject::resourceDestroy() when it is
deleted, but without any locking.

Added a locked wrapper class ManageablePtr so destroy is atomic with respect to
other calls via ManageablePtr, calls after pointer is reset to 0 in destroy()
are skipped.

Call resourceDestroy() in Queue::~Queue if it was not called already. This is
probably redundant given given the fixes above but can't hurt.

Queue::destroyed() was also being called without locking and could be called
concurrrently, e.g. if auto-delete happens concurrently with delete via QMF or
by a 0-10 client. Moved the destroyed() call into QueueRegistry::destroy(),
using QueueRegistry lock to guarantee it is called exactly once.

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749782 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index 7bf161d..19da8ef 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1371,10 +1371,10 @@
                                                    arg.dir.lower () + "_" +\
                                                    arg.name, "inBuf") + ";\n")
 
-      stream.write ("        bool allow = coreObject->AuthorizeMethod(METHOD_" +\
+      stream.write ("        bool allow = manageable.AuthorizeMethod(METHOD_" +\
                     method.getName().upper() + ", ioArgs, userId);\n")
       stream.write ("        if (allow)\n")
-      stream.write ("            status = coreObject->ManagementMethod (METHOD_" +\
+      stream.write ("            status = manageable.ManagementMethod (METHOD_" +\
                     method.getName().upper() + ", ioArgs, text);\n")
       stream.write ("        else\n")
       stream.write ("            status = Manageable::STATUS_FORBIDDEN;\n")
@@ -1413,10 +1413,10 @@
                                  False,
                                  arg.default)
 
-      stream.write ("        bool allow = coreObject->AuthorizeMethod(METHOD_" +\
+      stream.write ("        bool allow = manageable.AuthorizeMethod(METHOD_" +\
                     method.getName().upper() + ", ioArgs, userId);\n")
       stream.write ("        if (allow)\n")
-      stream.write ("            status = coreObject->ManagementMethod (METHOD_" +\
+      stream.write ("            status = manageable.ManagementMethod (METHOD_" +\
                     method.getName().upper() + ", ioArgs, text);\n")
       stream.write ("        else\n")
       stream.write ("            status = Manageable::STATUS_FORBIDDEN;\n")
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 055eba4..7767469 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1496,7 +1496,6 @@
                                  queue->isRedirectSource() ? peerQ : queue,
                                  false);
         queues.destroy(name, connectionId, userId);
-        queue->destroyed();
     } else {
         throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
     }
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index fea5946..625c6cc 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -237,8 +237,10 @@
 
 Queue::~Queue()
 {
-    if (mgmtObject != 0)
+    if (mgmtObject != 0) {
         mgmtObject->debugStats("destroying");
+        mgmtObject->resourceDestroy();
+    }
 }
 
 bool Queue::isLocal(const Message& msg)
@@ -1346,7 +1348,6 @@
                 broker->getAcl()->recordDestroyQueue(name);
 
             QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
-            destroyed();
         } else {
             //queue was accessed since the delayed auto-delete was scheduled, so try again
             QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 150ad1c..cd4a511 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -299,7 +299,6 @@
      */
     QPID_BROKER_EXTERN void create();
 
-    void destroyed();
     QPID_BROKER_EXTERN void bound(const std::string& exchange,
                                   const std::string& key,
                                   const qpid::framing::FieldTable& args);
@@ -534,6 +533,9 @@
     static bool reroute(boost::shared_ptr<Exchange> e, const Message& m);
     static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime);
 
+  private:
+    void destroyed();           // Only called by QueueRegistry::destroy()
+  friend class QueueRegistry;
   friend class QueueFactory;
   friend class QueueRegistry;
 };
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 2101d51..c3cd348 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -101,6 +101,13 @@
             eraseLH(i, q, name, connectionId, userId);
         }
     }
+    // Destroy management object, store record etc. The Queue will not
+    // actually be deleted till all shared_ptr to it are gone.
+    //
+    // Outside the lock (avoid deadlock) but guaranteed to be called exactly once,
+    // since q will only be set on the first call to destroy above.
+    if (q)
+        q->destroyed();
 }
 
 void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const string& name, const string& connectionId, const string& userId)
@@ -129,11 +136,17 @@
             q = i->second;
             if (q->version == version) {
                 eraseLH(i, q, name, connectionId, userId);
-                return true;
             }
         }
-        return false;
     }
+    // Destroy management object, store record etc. The Queue will not
+    // actually be deleted till all shared_ptr to it are gone.
+    //
+    // Outside the lock (avoid deadlock) but guaranteed to be called exactly once,
+    // since q will only be set on the first call to destroy above.
+    if (q)
+        q->destroyed();
+    return q;
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){
diff --git a/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp
index f8b26a6..55f644e 100644
--- a/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp
@@ -37,7 +37,6 @@
         if (broker->getAcl())
             broker->getAcl()->recordDestroyQueue(name);
         QPID_LOG_CAT(debug, model, "Queue " << name << " deleted itself due to reaching limit: " << current << " (policy is " << settings.maxDepth << ")");
-        destroyed();
     }
     current += increment;
     return true;
diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h
index 70c9a5a..5b263e1 100644
--- a/qpid/cpp/src/qpid/management/Manageable.h
+++ b/qpid/cpp/src/qpid/management/Manageable.h
@@ -20,10 +20,12 @@
 // under the License.
 //
 
-#include "qpid/management/ManagementObject.h"
-#include "qpid/management/Args.h"
-#include <string>
 #include "qpid/CommonImportExport.h"
+#include "qpid/management/Args.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/sys/IntegerTypes.h"
+
+#include <string>
 
 namespace qpid {
 namespace management {
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 019963e..f18f575 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -23,6 +23,7 @@
 #include "qpid/management/ManagementObject.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/log/Statement.h"
@@ -245,11 +246,37 @@
 
 }}
 
+// Called with lock held
+Manageable* ManagementObject::ManageablePtr::get() const {
+    if (ptr == 0)
+        throw framing::ResourceDeletedException("managed object deleted");
+    return ptr;
+}
+
+void ManagementObject::ManageablePtr::reset() {
+    Mutex::ScopedLock l(lock);
+    ptr = 0;
+}
+
+uint32_t ManagementObject::ManageablePtr::ManagementMethod(
+    uint32_t methodId, Args& args, std::string& text)
+{
+    Mutex::ScopedLock l(lock);
+    return get()->ManagementMethod(methodId, args, text);
+}
+
+bool ManagementObject::ManageablePtr:: AuthorizeMethod(
+    uint32_t methodId, Args& args, const std::string& userId)
+{
+    Mutex::ScopedLock l(lock);
+    return get()->AuthorizeMethod(methodId, args, userId);
+}
+
 ManagementObject::ManagementObject(Manageable* _core) :
     createTime(qpid::sys::Duration::FromEpoch()),
     destroyTime(0), updateTime(createTime), configChanged(true),
     instChanged(true), deleted(false),
-    coreObject(_core), flags(0), forcePublish(false) {}
+    manageable(_core), flags(0), forcePublish(false) {}
 
 void ManagementObject::setUpdateTime()
 {
@@ -261,6 +288,7 @@
     QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());
     destroyTime = sys::Duration::FromEpoch();
     deleted     = true;
+    manageable.reset();
 }
 
 int ManagementObject::maxThreads = 1;
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index 5719c23..a299f1e 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -23,6 +23,7 @@
  */
 #include "qpid/CommonImportExport.h"
 
+#include "qpid/management/Args.h"
 #include "qpid/management/Mutex.h"
 #include "qpid/types/Variant.h"
 #include <map>
@@ -33,9 +34,9 @@
 namespace qpid {
 namespace management {
 
-class Manageable;
 class ObjectId;
 class ManagementObject;
+class Manageable;
 
 
 class AgentAttachment {
@@ -135,6 +136,21 @@
 class QPID_COMMON_CLASS_EXTERN ManagementObject : public ManagementItem
 {
 protected:
+    // Thread safe wrapper for Manageable* with atomic calls and destroy().
+    class ManageablePtr {
+        Manageable* ptr;
+        mutable Mutex lock;
+        Manageable* get() const;
+        ManageablePtr(const ManageablePtr&); // not copyable
+        ManageablePtr& operator=(const ManageablePtr&); // not copyable
+
+      public:
+        ManageablePtr(Manageable* m) : ptr(m) {}
+
+        uint32_t ManagementMethod(uint32_t methodId, Args& args, std::string& text);
+        bool AuthorizeMethod(uint32_t methodId, Args& args, const std::string& userId);
+        void reset();
+    };
 
     uint64_t         createTime;
     uint64_t         destroyTime;
@@ -143,7 +159,7 @@
     mutable bool     configChanged;
     mutable bool     instChanged;
     bool             deleted;
-    Manageable*      coreObject;
+    ManageablePtr    manageable;
     mutable Mutex    accessLock;
     uint32_t         flags;