QPID-7306: Memory management error in Link/Bridge
qpid::broker Link and Bridge use Connection::requestIOProcessing() to register
callbacks in the connection thread. They were binding a plain "this" pointer to
the callback, but the classes are managed by boost::shared_ptr so if all the
shared_ptr were released, the callback could happen on a dangling pointer.
This fix uses boost::weak_ptr in the callbacks, so if all shared_ptr instances
are released, we don't use the dead pointer.
Link::destroy cannot be skipped, so use a shared_ptr for that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749780 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 06d3a0d..d6cd3e2 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -381,8 +381,11 @@
else
bindArgs.setString(qpidFedOrigin, origin);
- conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
- queueName, args.i_src, key, bindArgs));
+ conn->requestIOProcessing(
+ weakCallback<Bridge>(
+ boost::bind(&Bridge::ioThreadPropagateBinding, _1,
+ queueName, args.i_src, key, bindArgs),
+ this));
}
}
@@ -393,9 +396,13 @@
bindArgs.setString(qpidFedOp, fedOpReorigin);
bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
- conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
- queueName, args.i_src, args.i_key, bindArgs));
+ conn->requestIOProcessing(
+ weakCallback<Bridge>(
+ boost::bind(&Bridge::ioThreadPropagateBinding, _1,
+ queueName, args.i_src, args.i_key, bindArgs),
+ this));
}
+
bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(channel);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 9b85917..037d1e1 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -250,7 +250,8 @@
currentInterval = 1;
visitCount = 0;
connection = c;
- c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ c->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
}
}
if (isClosing)
@@ -416,7 +417,8 @@
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
if (connection)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ connection->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
}
@@ -443,7 +445,8 @@
needIOProcessing = !cancellations.empty();
}
if (needIOProcessing && connection)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ connection->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
}
void Link::ioThreadProcessing()
@@ -507,7 +510,8 @@
case STATE_OPERATIONAL:
if ((!active.empty() || !created.empty() || !cancellations.empty()) &&
connection && connection->isOpen())
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ connection->requestIOProcessing (
+ weakCallback<Link>(boost::bind(&Link::ioThreadProcessing, _1), this));
break;
default: // no-op for all other states
@@ -691,7 +695,7 @@
setStateLH(STATE_CLOSING);
if (connection) {
//connection can only be closed on the connections own IO processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ connection->requestIOProcessing(boost::bind(&Link::destroy, shared_from_this()));
} else if (old_state == STATE_CONNECTING) {
// cannot destroy Link now since a connection request is outstanding.
// destroy the link after we get a response (see Link::established,
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index ceee018..c606320 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -23,6 +23,7 @@
*/
#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/PersistableConfig.h"
@@ -50,7 +51,9 @@
class Connection;
}
-class Link : public PersistableConfig, public management::Manageable {
+class Link : public PersistableConfig, public management::Manageable,
+ public boost::enable_shared_from_this<Link>
+{
private:
mutable sys::Mutex lock;
const std::string name;
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
index 1bf6b2c..8fff520 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
@@ -226,7 +226,25 @@
qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
};
+}
-}}}
+// See weakCallback below.
+template <class T> void callIfValid(boost::function1<void, T*> f, boost::weak_ptr<T> wp) {
+ boost::shared_ptr<T> sp = wp.lock();
+ if (sp) f(sp.get());
+}
+
+// Memory safety helper for requestIOProcessing with boost::shared_ptr.
+//
+// Makes a function that calls f(p) only if p is still valid. The returned
+// function is bound to a weak_ptr, and only calls f(p) if the weak pointer is
+// still valid. Note this does not prevent the object being deleted before the
+// IO callback, instead it skips the callback if the object is already deleted.
+template <class T>
+boost::function0<void> weakCallback(boost::function1<void, T*> f, T* p) {
+ return boost::bind(&callIfValid<T>, f, p->shared_from_this());
+}
+
+}}
#endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/