QPID-7302: Restart delayed auto-delete timer if the queue is declared

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1748523 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index d90bd11..fea5946 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1297,9 +1297,10 @@
 struct AutoDeleteTask : qpid::sys::TimerTask
 {
     Queue::shared_ptr queue;
+    long expectedVersion;
 
     AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime)
-        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q) {}
+        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q), expectedVersion(q->version) {}
 
     void fire()
     {
@@ -1307,7 +1308,7 @@
         //created, but then became unused again before the task fired;
         //in this case ignore this request as there will have already
         //been a later task added
-        queue->tryAutoDelete();
+        queue->tryAutoDelete(expectedVersion);
     }
 };
 
@@ -1320,29 +1321,37 @@
             broker->getTimer().add(autoDeleteTask);
             QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated");
         } else {
-            tryAutoDelete();
+            tryAutoDelete(version);
         }
     }
 }
 
-void Queue::tryAutoDelete()
+void Queue::tryAutoDelete(long expectedVersion)
 {
     bool proceed(false);
     {
         Mutex::ScopedLock locker(messageLock);
         if (!deleted && checkAutoDelete(locker)) {
             proceed = true;
-            deleted = true;
         }
     }
 
     if (proceed) {
-        broker->getQueues().destroy(name);
-        if (broker->getAcl())
-            broker->getAcl()->recordDestroyQueue(name);
+        if (broker->getQueues().destroyIfUntouched(name, expectedVersion)) {
+            {
+                Mutex::ScopedLock locker(messageLock);
+                deleted = true;
+            }
+            if (broker->getAcl())
+                broker->getAcl()->recordDestroyQueue(name);
 
-        QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
-        destroyed();
+            QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
+            destroyed();
+        } else {
+            //queue was accessed since the delayed auto-delete was scheduled, so try again
+            QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name);
+            scheduleAutoDelete();
+        }
     } else {
         QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name);
     }
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 875b996..150ad1c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -37,6 +37,7 @@
 
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/AtomicCount.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/management/Manageable.h"
@@ -219,6 +220,7 @@
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
     boost::shared_ptr<MessageDistributor> allocator;
     boost::scoped_ptr<Selector> selector;
+    qpid::sys::AtomicCount version;
 
     // Redirect source and target refer to each other. Only one is source.
     Queue::shared_ptr redirectPeer;
@@ -271,7 +273,7 @@
                     uint32_t maxTests=0);
 
     virtual bool checkDepth(const QueueDepth& increment, const Message&);
-    void tryAutoDelete();
+    void tryAutoDelete(long expectedVersion);
   public:
 
     typedef std::vector<shared_ptr> vector;
@@ -533,6 +535,7 @@
     static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime);
 
   friend class QueueFactory;
+  friend class QueueRegistry;
 };
 }
 }
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 1283a42..2101d51 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -74,6 +74,7 @@
             result = std::pair<Queue::shared_ptr, bool>(queue, true);
         } else {
             result = std::pair<Queue::shared_ptr, bool>(i->second, false);
+            ++(i->second->version);
         }
         if (getBroker() && getBroker()->getManagementAgent()) {
             getBroker()->getManagementAgent()->raiseEvent(
@@ -97,17 +98,41 @@
         QueueMap::iterator i = queues.find(name);
         if (i != queues.end()) {
             q = i->second;
-            queues.erase(i);
-            if (getBroker()) {
-                // NOTE: queueDestroy and raiseEvent must be called with the
-                // lock held in order to ensure events are generated
-                // in the correct order.
-                getBroker()->getBrokerObservers().queueDestroy(q);
-                if (getBroker()->getManagementAgent())
-                    getBroker()->getManagementAgent()->raiseEvent(
-                        _qmf::EventQueueDelete(connectionId, userId, name));
+            eraseLH(i, q, name, connectionId, userId);
+        }
+    }
+}
+
+void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const string& name, const string& connectionId, const string& userId)
+{
+    queues.erase(i);
+    if (getBroker()) {
+        // NOTE: queueDestroy and raiseEvent must be called with the
+        // lock held in order to ensure events are generated
+        // in the correct order.
+        getBroker()->getBrokerObservers().queueDestroy(q);
+        if (getBroker()->getManagementAgent())
+            getBroker()->getManagementAgent()->raiseEvent(
+                _qmf::EventQueueDelete(connectionId, userId, name));
+    }
+}
+
+
+bool QueueRegistry::destroyIfUntouched(const string& name, long version,
+                                       const string& connectionId, const string& userId)
+{
+    Queue::shared_ptr q;
+    {
+        qpid::sys::RWlock::ScopedWlock locker(lock);
+        QueueMap::iterator i = queues.find(name);
+        if (i != queues.end()) {
+            q = i->second;
+            if (q->version == version) {
+                eraseLH(i, q, name, connectionId, userId);
+                return true;
             }
         }
+        return false;
     }
 }
 
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index af4e8e5..0ff96b6 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -80,15 +80,9 @@
         const std::string& connectionId=std::string(),
         const std::string& userId=std::string());
 
-    template <class Test> bool destroyIf(const std::string& name, Test test)
-    {
-        if (test()) {
-            destroy(name);
-            return true;
-        } else {
-            return false;
-        }
-    }
+    QPID_BROKER_EXTERN bool destroyIfUntouched(const std::string& name, long version,
+                                               const std::string& connectionId=std::string(),
+                                               const std::string& userId=std::string());
 
     /**
      * Find the named queue. Return 0 if not found.
@@ -126,6 +120,8 @@
     typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
     QueueMap queues;
     mutable qpid::sys::RWlock lock;
+
+    void eraseLH(QueueMap::iterator, boost::shared_ptr<Queue>, const std::string& name, const std::string& connectionId, const std::string& userId);
 };