PROTON-1241: [C++ binding] Move ownership chain of messaging_handler & messaging_adapter
- messaging_handler now has no data members
- messaging_adapters are now owned by the container_impl
They are kept in a list and are auto deleted when the container_impl is deleted
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 0c88042..9fbdbdc 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -155,7 +155,7 @@
private:
void apply_unbound(connection&) const;
void apply_bound(connection&) const;
- proton_handler* handler() const;
+ messaging_handler* handler() const;
class impl;
internal::pn_unique_ptr<impl> impl_;
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
index 9d678f7..a492191 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -203,7 +203,7 @@
connection_engine(const connection_engine&);
connection_engine& operator=(const connection_engine&);
- proton::proton_handler* handler_;
+ internal::pn_unique_ptr<proton_handler> handler_;
proton::container* container_;
pn_connection_engine_t c_engine_;
};
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index 8520846..2c5423f 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -23,7 +23,6 @@
*/
#include "./internal/export.hpp"
-#include "./internal/pn_unique_ptr.hpp"
namespace proton {
@@ -169,19 +168,6 @@
/// Fallback error handling.
PN_CPP_EXTERN virtual void on_error(const error_condition &c);
-
- private:
- internal::pn_unique_ptr<messaging_adapter> messaging_adapter_;
-
- /// @cond INTERNAL
- friend class container;
- friend class container_impl;
- friend class io::connection_engine;
- friend class connection_options;
- friend class receiver_options;
- friend class sender_options;
- friend class session_options;
- /// @endcond
};
} // proton
diff --git a/proton-c/bindings/cpp/include/proton/session_options.hpp b/proton-c/bindings/cpp/include/proton/session_options.hpp
index 059afb5..8bef3e8 100644
--- a/proton-c/bindings/cpp/include/proton/session_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/session_options.hpp
@@ -53,7 +53,7 @@
PN_CPP_EXTERN session_options& operator=(const session_options&);
/// Set a messaging_handler for the session.
- PN_CPP_EXTERN session_options& handler(class messaging_handler *);
+ PN_CPP_EXTERN session_options& handler(class messaging_handler &);
/// @cond INTERNAL
// Other useful session configuration TBD.
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index d953369..8567ee3 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -48,7 +48,7 @@
class connection_options::impl {
public:
- option<proton_handler*> handler;
+ option<messaging_handler*> handler;
option<uint32_t> max_frame_size;
option<uint16_t> max_sessions;
option<duration> idle_timeout;
@@ -187,7 +187,7 @@
return *this;
}
-connection_options& connection_options::handler(class messaging_handler &h) { impl_->handler = h.messaging_adapter_.get(); return *this; }
+connection_options& connection_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; }
connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size = n; return *this; }
connection_options& connection_options::max_sessions(uint16_t n) { impl_->max_sessions = n; return *this; }
connection_options& connection_options::idle_timeout(duration t) { impl_->idle_timeout = t; return *this; }
@@ -206,5 +206,5 @@
void connection_options::apply_unbound(connection& c) const { impl_->apply_unbound(c); }
void connection_options::apply_bound(connection& c) const { impl_->apply_bound(c); }
-proton_handler* connection_options::handler() const { return impl_->handler.value; }
+messaging_handler* connection_options::handler() const { return impl_->handler.value; }
} // namespace proton
diff --git a/proton-c/bindings/cpp/src/connector.cpp b/proton-c/bindings/cpp/src/connector.cpp
index 5e6f0fd..91ba730 100644
--- a/proton-c/bindings/cpp/src/connector.cpp
+++ b/proton-c/bindings/cpp/src/connector.cpp
@@ -89,7 +89,7 @@
}
else {
// log "Disconnected, reconnecting in " << delay << " milliseconds"
- static_cast<container_impl&>(connection_.container()).schedule(delay, this);
+ container_impl::schedule(connection_.container(), delay, this);
return;
}
}
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp
index 7b14afa..52ae42b 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -120,8 +120,8 @@
return internal::take_ownership(handler);
}
-container_impl::container_impl(const std::string& id, messaging_handler *h) :
- reactor_(reactor::create()), handler_(h ? h->messaging_adapter_.get() : 0),
+container_impl::container_impl(const std::string& id, messaging_handler *mh) :
+ reactor_(reactor::create()),
id_(id.empty() ? uuid::random().str() : id),
auto_stop_(true)
{
@@ -129,11 +129,13 @@
// Set our own global handler that "subclasses" the existing one
pn_handler_t *global_handler = reactor_.pn_global_handler();
- override_handler_.reset(new override_handler(global_handler, *this));
- internal::pn_ptr<pn_handler_t> cpp_global_handler(cpp_handler(override_handler_.get()));
- reactor_.pn_global_handler(cpp_global_handler.get());
- if (handler_) {
- reactor_.pn_handler(cpp_handler(handler_).get());
+ proton_handler* oh = new override_handler(global_handler, *this);
+ handlers_.push_back(oh);
+ reactor_.pn_global_handler(cpp_handler(oh).get());
+ if (mh) {
+ proton_handler* h = new messaging_adapter(*mh);
+ handlers_.push_back(h);
+ reactor_.pn_handler(cpp_handler(h).get());
}
// Note: we have just set up the following handlers that see
@@ -179,10 +181,15 @@
returned<connection> container_impl::connect(const std::string &urlstr, const connection_options &user_opts) {
connection_options opts = client_connection_options(); // Defaults
opts.update(user_opts);
- proton_handler *h = opts.handler();
+ messaging_handler* mh = opts.handler();
+ internal::pn_ptr<pn_handler_t> chandler;
+ if (mh) {
+ proton_handler* h = new messaging_adapter(*mh);
+ handlers_.push_back(h);
+ chandler = cpp_handler(h);
+ }
proton::url url(urlstr);
- internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
connection conn(reactor_.connection_to_host(url.host(), url.port(), chandler.get()));
internal::pn_unique_ptr<connector> ctor(new connector(conn, opts, url));
connection_context& cc(connection_context::get(conn));
@@ -224,8 +231,15 @@
if (acceptors_.find(url) != acceptors_.end())
throw error("already listening on " + url);
connection_options opts = server_connection_options(); // Defaults
- proton_handler *h = opts.handler();
- internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
+
+ messaging_handler* mh = opts.handler();
+ internal::pn_ptr<pn_handler_t> chandler;
+ if (mh) {
+ proton_handler* h = new messaging_adapter(*mh);
+ handlers_.push_back(h);
+ chandler = cpp_handler(h);
+ }
+
proton::url u(url);
pn_acceptor_t *acptr = pn_reactor_acceptor(
reactor_.pn_object(), u.host().c_str(), u.port().c_str(), chandler.get());
@@ -251,11 +265,12 @@
close_acceptor(i->second);
}
-task container_impl::schedule(int delay, proton_handler *h) {
+task container_impl::schedule(container& c, int delay, proton_handler *h) {
+ container_impl& ci = static_cast<container_impl&>(c);
internal::pn_ptr<pn_handler_t> task_handler;
if (h)
- task_handler = cpp_handler(h);
- return reactor_.schedule(delay, task_handler.get());
+ task_handler = ci.cpp_handler(h);
+ return ci.reactor_.schedule(delay, task_handler.get());
}
namespace {
@@ -278,7 +293,7 @@
}
void container_impl::schedule(duration delay, void_function0& f) {
- schedule(delay.milliseconds(), new timer_handler_03(f));
+ schedule(*this, delay.milliseconds(), new timer_handler_03(f));
}
#if PN_CPP_HAS_STD_FUNCTION
@@ -291,7 +306,7 @@
}
void container_impl::schedule(duration delay, std::function<void()> f) {
- schedule(delay.milliseconds(), new timer_handler_std(f));
+ schedule(*this, delay.milliseconds(), new timer_handler_std(f));
}
#endif
@@ -320,8 +335,10 @@
// Unbound options don't apply to server connection
opts.apply_bound(c);
// Handler applied separately
- proton_handler *h = opts.handler();
- if (h) {
+ messaging_handler* mh = opts.handler();
+ if (mh) {
+ proton_handler* h = new messaging_adapter(*mh);
+ handlers_.push_back(h);
internal::pn_ptr<pn_handler_t> chandler = cpp_handler(h);
pn_record_t *record = pn_connection_attachments(unwrap(c));
pn_record_set_handler(record, chandler.get());
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp
index 28745ae..97f6be2 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -28,15 +28,17 @@
#include "proton/connection.hpp"
#include "proton/connection_options.hpp"
#include "proton/duration.hpp"
-#include "proton/messaging_handler.hpp"
#include "proton/sender.hpp"
#include "proton/receiver.hpp"
-#include <proton/reactor.h>
+
+#include "messaging_adapter.hpp"
#include "reactor.hpp"
+#include "proton_bits.hpp"
#include "proton_handler.hpp"
+#include <list>
+#include <map>
#include <string>
-#include <sstream>
namespace proton {
@@ -86,17 +88,17 @@
// non-interface functions
void configure_server_connection(connection &c);
- task schedule(int delay, proton_handler *h);
- internal::pn_ptr<pn_handler_t> cpp_handler(proton_handler *h);
- std::string next_link_name();
+ static task schedule(container& c, int delay, proton_handler *h);
+ template <class T> static void set_handler(T s, messaging_handler* h);
private:
+ internal::pn_ptr<pn_handler_t> cpp_handler(proton_handler *h);
+
typedef std::map<std::string, acceptor> acceptors;
reactor reactor_;
- proton_handler *handler_;
- internal::pn_unique_ptr<proton_handler> override_handler_;
- internal::pn_unique_ptr<proton_handler> flow_controller_;
+ // Keep a list of all the handlers used by the container so they last as long as the container
+ std::list<internal::pn_unique_ptr<proton_handler> > handlers_;
std::string id_;
connection_options client_connection_options_;
connection_options server_connection_options_;
@@ -108,6 +110,15 @@
friend class messaging_adapter;
};
+template <class T>
+void container_impl::set_handler(T s, messaging_handler* mh) {
+ pn_record_t *record = internal::get_attachments(unwrap(s));
+ proton_handler* h = new messaging_adapter(*mh);
+ container_impl& ci = static_cast<container_impl&>(s.container());
+ ci.handlers_.push_back(h);
+ pn_record_set_handler(record, ci.cpp_handler(h).get());
+}
+
}
#endif /*!PROTON_CPP_CONTAINERIMPL_H*/
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index cda8acb..453aa42 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -35,7 +35,7 @@
namespace proton {
-messaging_handler::messaging_handler() : messaging_adapter_(new messaging_adapter(*this)) {}
+messaging_handler::messaging_handler(){}
messaging_handler::~messaging_handler(){}
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index e93dbbb..f15b019 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -42,7 +42,6 @@
namespace io {
connection_engine::connection_engine() :
- handler_(0),
container_(0)
{
int err;
@@ -51,7 +50,6 @@
}
connection_engine::connection_engine(class container& cont, event_loop* loop) :
- handler_(0),
container_(&cont)
{
int err;
@@ -66,7 +64,7 @@
proton::connection c = connection();
opts.apply_unbound(c);
opts.apply_bound(c);
- handler_ = opts.handler();
+ handler_.reset(new messaging_adapter(*opts.handler()));
connection_context::get(connection()).collector = c_engine_.collector;
}
@@ -100,7 +98,7 @@
while ((c_event = pn_connection_engine_dispatch(&c_engine_)) != NULL) {
proton_event cpp_event(c_event, container_);
try {
- cpp_event.dispatch(*handler_);
+ if (!!handler_) cpp_event.dispatch(*handler_);
} catch (const std::exception& e) {
disconnected(error_condition("exception", e.what()));
}
diff --git a/proton-c/bindings/cpp/src/proton_bits.cpp b/proton-c/bindings/cpp/src/proton_bits.cpp
index 11dfd47..18fc589 100644
--- a/proton-c/bindings/cpp/src/proton_bits.cpp
+++ b/proton-c/bindings/cpp/src/proton_bits.cpp
@@ -18,7 +18,6 @@
*/
#include "proton_bits.hpp"
-
#include "proton/error_condition.hpp"
#include <string>
diff --git a/proton-c/bindings/cpp/src/proton_bits.hpp b/proton-c/bindings/cpp/src/proton_bits.hpp
index 8d58938..97d4bee 100644
--- a/proton-c/bindings/cpp/src/proton_bits.hpp
+++ b/proton-c/bindings/cpp/src/proton_bits.hpp
@@ -18,6 +18,8 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include <proton/link.h>
+#include <proton/session.h>
#include <string>
#include <iosfwd>
@@ -41,6 +43,7 @@
struct pn_acceptor_t;
struct pn_terminus_t;
struct pn_reactor_t;
+struct pn_record_t;
namespace proton {
@@ -116,7 +119,7 @@
template <> struct wrapper<pn_terminus_t> { typedef terminus type; };
template <> struct wrapper<pn_reactor_t> { typedef reactor type; };
- // Factory for wrapper types
+// Factory for wrapper types
template <class T>
class factory {
public:
@@ -124,6 +127,12 @@
static typename wrapped<T>::type* unwrap(T t) { return t.pn_object(); }
};
+// Get attachments for various proton-c types
+template <class T>
+inline pn_record_t* get_attachments(T*);
+
+template <> inline pn_record_t* get_attachments(pn_session_t* s) { return pn_session_attachments(s); }
+template <> inline pn_record_t* get_attachments(pn_link_t* l) { return pn_link_attachments(l); }
}
template <class T>
diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp
index 807974e..f1fcf80 100644
--- a/proton-c/bindings/cpp/src/receiver_options.cpp
+++ b/proton-c/bindings/cpp/src/receiver_options.cpp
@@ -43,14 +43,6 @@
};
class receiver_options::impl {
- static void set_handler(receiver l, proton_handler &h) {
- pn_record_t *record = pn_link_attachments(unwrap(l));
- // FIXME aconway 2016-05-04: container_impl specific, fix for engine.
- internal::pn_ptr<pn_handler_t> chandler =
- static_cast<container_impl&>(l.connection().container()).cpp_handler(&h);
- pn_record_set_handler(record, chandler.get());
- }
-
static link_context& get_context(receiver l) {
return link_context::get(unwrap(l));
}
@@ -70,7 +62,7 @@
}
public:
- option<proton_handler*> handler;
+ option<messaging_handler*> handler;
option<proton::delivery_mode> delivery_mode;
option<bool> auto_accept;
option<bool> auto_settle;
@@ -83,7 +75,7 @@
void apply(receiver& r) {
if (r.uninitialized()) {
if (delivery_mode.set) set_delivery_mode(r, delivery_mode.value);
- if (handler.set && handler.value) set_handler(r, *handler.value);
+ if (handler.set && handler.value) container_impl::set_handler(r, handler.value);
if (auto_settle.set) get_context(r).auto_settle = auto_settle.value;
if (auto_accept.set) get_context(r).auto_accept = auto_accept.value;
if (credit_window.set) get_context(r).credit_window = credit_window.value;
@@ -125,7 +117,7 @@
void receiver_options::update(const receiver_options& x) { impl_->update(*x.impl_); }
-receiver_options& receiver_options::handler(class messaging_handler &h) { impl_->handler = h.messaging_adapter_.get(); return *this; }
+receiver_options& receiver_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; }
receiver_options& receiver_options::delivery_mode(proton::delivery_mode m) {impl_->delivery_mode = m; return *this; }
receiver_options& receiver_options::auto_accept(bool b) {impl_->auto_accept = b; return *this; }
receiver_options& receiver_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; }
diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp
index ce98d64..f5d5525 100644
--- a/proton-c/bindings/cpp/src/sender_options.cpp
+++ b/proton-c/bindings/cpp/src/sender_options.cpp
@@ -41,14 +41,6 @@
};
class sender_options::impl {
- static void set_handler(sender l, proton_handler &h) {
- pn_record_t *record = pn_link_attachments(unwrap(l));
- // FIXME aconway 2016-05-04: container_impl specific, fix for engine.
- internal::pn_ptr<pn_handler_t> chandler =
- static_cast<container_impl&>(l.connection().container()).cpp_handler(&h);
- pn_record_set_handler(record, chandler.get());
- }
-
static link_context& get_context(sender l) {
return link_context::get(unwrap(l));
}
@@ -68,7 +60,7 @@
}
public:
- option<proton_handler*> handler;
+ option<messaging_handler*> handler;
option<proton::delivery_mode> delivery_mode;
option<bool> auto_settle;
option<source_options> source;
@@ -77,7 +69,7 @@
void apply(sender& s) {
if (s.uninitialized()) {
if (delivery_mode.set) set_delivery_mode(s, delivery_mode.value);
- if (handler.set && handler.value) set_handler(s, *handler.value);
+ if (handler.set && handler.value) container_impl::set_handler(s, handler.value);
if (auto_settle.set) get_context(s).auto_settle = auto_settle.value;
if (source.set) {
proton::source local_s(make_wrapper<proton::source>(pn_link_source(unwrap(s))));
@@ -113,7 +105,7 @@
void sender_options::update(const sender_options& x) { impl_->update(*x.impl_); }
-sender_options& sender_options::handler(class messaging_handler &h) { impl_->handler = h.messaging_adapter_.get(); return *this; }
+sender_options& sender_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; }
sender_options& sender_options::delivery_mode(proton::delivery_mode m) {impl_->delivery_mode = m; return *this; }
sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; }
sender_options& sender_options::source(source_options &s) {impl_->source = s; return *this; }
diff --git a/proton-c/bindings/cpp/src/session_options.cpp b/proton-c/bindings/cpp/src/session_options.cpp
index c8d91f2..8c563d8 100644
--- a/proton-c/bindings/cpp/src/session_options.cpp
+++ b/proton-c/bindings/cpp/src/session_options.cpp
@@ -43,17 +43,11 @@
class session_options::impl {
public:
- option<proton_handler *> handler;
+ option<messaging_handler *> handler;
void apply(session& s) {
if (s.uninitialized()) {
- if (handler.set) {
- pn_record_t *record = pn_session_attachments(unwrap(s));
- // FIXME aconway 2016-05-04: container_impl specific
- internal::pn_ptr<pn_handler_t> chandler =
- static_cast<container_impl&>(s.connection().container()).cpp_handler(handler.value);
- pn_record_set_handler(record, chandler.get());
- }
+ if (handler.set && handler.value) container_impl::set_handler(s, handler.value);
}
}
@@ -70,7 +64,7 @@
return *this;
}
-session_options& session_options::handler(class messaging_handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; }
+session_options& session_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; }
void session_options::apply(session& s) const { impl_->apply(s); }