QPID-7306: Memory management error in Link/Bridge

qpid::broker Link and Bridge use Connection::requestIOProcessing() to register
callbacks in the connection thread. They were binding a plain "this" pointer to
the callback, but the classes are managed by boost::shared_ptr so if all the
shared_ptr were released, the callback could happen on a dangling pointer.

This fix uses boost::weak_ptr in the callbacks, so if all shared_ptr instances
are released, we don't use the dead pointer.

Link::destroy cannot be skipped, so use a shared_ptr for that.

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749780 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 06d3a0d..d6cd3e2 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -381,8 +381,11 @@
          else
              bindArgs.setString(qpidFedOrigin, origin);
 
-         conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
-                                               queueName, args.i_src, key, bindArgs));
+         conn->requestIOProcessing(
+             weakCallback<Bridge>(
+                 boost::bind(&Bridge::ioThreadPropagateBinding, _1,
+                             queueName, args.i_src, key, bindArgs),
+                 this));
     }
 }
 
@@ -393,9 +396,13 @@
     bindArgs.setString(qpidFedOp, fedOpReorigin);
     bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
 
-    conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
-                                          queueName, args.i_src, args.i_key, bindArgs));
+    conn->requestIOProcessing(
+        weakCallback<Bridge>(
+            boost::bind(&Bridge::ioThreadPropagateBinding, _1,
+                        queueName, args.i_src, args.i_key, bindArgs),
+            this));
 }
+
 bool Bridge::resetProxy()
 {
     SessionHandler& sessionHandler = conn->getChannel(channel);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 9b85917..037d1e1 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -250,7 +250,8 @@
             currentInterval = 1;
             visitCount      = 0;
             connection = c;
-            c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+            c->requestIOProcessing (
+                weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
         }
     }
     if (isClosing)
@@ -416,7 +417,8 @@
     Mutex::ScopedLock mutex(lock);
     created.push_back (bridge);
     if (connection)
-        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+            connection->requestIOProcessing (
+                weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
 
 }
 
@@ -443,7 +445,8 @@
         needIOProcessing = !cancellations.empty();
     }
     if (needIOProcessing && connection)
-        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+        connection->requestIOProcessing (
+            weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
 }
 
 void Link::ioThreadProcessing()
@@ -507,7 +510,8 @@
     case STATE_OPERATIONAL:
         if ((!active.empty() || !created.empty() || !cancellations.empty()) &&
             connection && connection->isOpen())
-            connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+        connection->requestIOProcessing (
+            weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
         break;
 
     default:    // no-op for all other states
@@ -691,7 +695,7 @@
             setStateLH(STATE_CLOSING);
             if (connection) {
                 //connection can only be closed on the connections own IO processing thread
-                connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+                connection->requestIOProcessing(boost::bind(&Link::destroy, shared_from_this()));
             } else if (old_state == STATE_CONNECTING) {
                 // cannot destroy Link now since a connection request is outstanding.
                 // destroy the link after we get a response (see Link::established,
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index ceee018..c606320 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -23,6 +23,7 @@
  */
 
 #include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
 #include "qpid/Url.h"
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/PersistableConfig.h"
@@ -50,7 +51,9 @@
 class Connection;
 }
 
-class Link : public PersistableConfig, public management::Manageable {
+class Link : public PersistableConfig, public management::Manageable,
+             public boost::enable_shared_from_this<Link>
+{
   private:
     mutable sys::Mutex  lock;
     const std::string   name;
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
index 1bf6b2c..8fff520 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
@@ -226,7 +226,25 @@
 
     qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
 };
+}
 
-}}}
+// See weakCallback below.
+template <class T> void callIfValid(boost::function1<void, T*> f, boost::weak_ptr<T> wp) {
+    boost::shared_ptr<T> sp = wp.lock();
+    if (sp) f(sp.get());
+}
+
+// Memory safety helper for requestIOProcessing with boost::shared_ptr.
+//
+// Makes a function that calls f(p) only if p is still valid. The returned
+// function is bound to a weak_ptr, and only calls f(p) if the weak pointer is
+// still valid. Note this does not prevent the object being deleted before the
+// IO callback, instead it skips the callback if the object is already deleted.
+template <class T>
+boost::function0<void> weakCallback(boost::function1<void, T*> f, T* p) {
+    return boost::bind(&callIfValid<T>, f, p->shared_from_this());
+}
+
+}}
 
 #endif  /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/