blob: ce6a2b2a097795d49f40e4a962e91db9222588f0 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/default_container.hpp"
#include "proton/connection_options.hpp"
#include "proton/connection.hpp"
#include "proton/session.hpp"
#include "proton/error.hpp"
#include "proton/sender.hpp"
#include "proton/receiver.hpp"
#include "proton/task.hpp"
#include "proton/ssl.hpp"
#include "proton/sasl.hpp"
#include "proton/transport.hpp"
#include "proton/url.hpp"
#include "proton/uuid.hpp"
#include "acceptor.hpp"
#include "connector.hpp"
#include "container_impl.hpp"
#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "msg.hpp"
#include "proton_bits.hpp"
#include "proton_event.hpp"
#include <proton/connection.h>
#include <proton/session.h>
#include <proton/handlers.h>
#include <proton/reactor.h>
namespace proton {
class handler_context {
public:
static handler_context& get(pn_handler_t* h) {
return *reinterpret_cast<handler_context*>(pn_handler_mem(h));
}
static void cleanup(pn_handler_t*) {}
/*
* NOTE: this call, at the transition from C to C++ is possibly
* the biggest performance bottleneck. "Average" clients ignore
* 90% of these events. Current strategy is to create the
* messaging_event on the stack. For success, the messaging_event
* should be small and free of indirect malloc/free/new/delete.
*/
static void dispatch(pn_handler_t *c_handler, pn_event_t *c_event, pn_event_type_t)
{
handler_context& hc(handler_context::get(c_handler));
proton_event pevent(c_event, *hc.container_);
pevent.dispatch(*hc.handler_);
return;
}
container *container_;
proton_handler *handler_;
};
// Used to sniff for connector events before the reactor's global handler sees them.
class override_handler : public proton_handler
{
public:
internal::pn_ptr<pn_handler_t> base_handler;
container_impl &container_impl_;
override_handler(pn_handler_t *h, container_impl &c) : base_handler(h), container_impl_(c) {}
virtual void on_unhandled(proton_event &pe) {
proton_event::event_type type = pe.type();
if (type==proton_event::EVENT_NONE) return; // Also not from the reactor
pn_event_t *cevent = pe.pn_event();
pn_connection_t *conn = pn_event_connection(cevent);
if (conn) {
proton_handler *oh = connection_context::get(conn).handler.get();
if (oh && type != proton_event::CONNECTION_INIT) {
// Send event to connector
pe.dispatch(*oh);
}
else if (!oh && type == proton_event::CONNECTION_INIT) {
// Newly accepted connection from lister socket
connection c(make_wrapper(conn));
container_impl_.configure_server_connection(c);
}
}
pn_handler_dispatch(base_handler.get(), cevent, pn_event_type_t(type));
}
};
internal::pn_ptr<pn_handler_t> container_impl::cpp_handler(proton_handler *h) {
pn_handler_t *handler = h ? pn_handler_new(&handler_context::dispatch,
sizeof(class handler_context),
&handler_context::cleanup) : 0;
if (handler) {
handler_context &hc = handler_context::get(handler);
hc.container_ = this;
hc.handler_ = h;
}
return internal::take_ownership(handler);
}
container_impl::container_impl(const std::string& id, messaging_handler *h) :
reactor_(reactor::create()), handler_(h ? h->messaging_adapter_.get() : 0),
id_(id.empty() ? uuid::random().str() : id), id_gen_(),
auto_stop_(true)
{
container_context::set(reactor_, *this);
// Set our own global handler that "subclasses" the existing one
pn_handler_t *global_handler = reactor_.pn_global_handler();
override_handler_.reset(new override_handler(global_handler, *this));
internal::pn_ptr<pn_handler_t> cpp_global_handler(cpp_handler(override_handler_.get()));
reactor_.pn_global_handler(cpp_global_handler.get());
if (handler_) {
reactor_.pn_handler(cpp_handler(handler_).get());
}
// Note: we have just set up the following handlers that see
// events in this order: messaging_adapter, connector override,
// the reactor's default globalhandler (pn_iohandler)
}
namespace {
void close_acceptor(acceptor a) {
listen_handler*& lh = listener_context::get(unwrap(a)).listen_handler_;
if (lh) {
lh->on_close();
lh = 0;
}
a.close();
}
}
container_impl::~container_impl() {
for (acceptors::iterator i = acceptors_.begin(); i != acceptors_.end(); ++i)
close_acceptor(i->second);
}
returned<connection> container_impl::connect(const std::string &urlstr, const connection_options &user_opts) {
connection_options opts = client_connection_options(); // Defaults
opts.update(user_opts);
proton_handler *h = opts.handler();
proton::url url(urlstr);
internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
connection conn(reactor_.connection_to_host(url.host(), url.port(), chandler.get()));
internal::pn_unique_ptr<connector> ctor(new connector(conn, url, opts));
connection_context& cc(connection_context::get(conn));
cc.handler.reset(ctor.release());
pn_connection_set_container(unwrap(conn), id_.c_str());
conn.open(opts);
return make_thread_safe(conn);
}
returned<sender> container_impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) {
proton::sender_options lopts(sender_options_);
lopts.update(o1);
connection_options copts(client_connection_options_);
copts.update(o2);
connection conn = connect(url, copts);
return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts));
}
returned<receiver> container_impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) {
proton::receiver_options lopts(receiver_options_);
lopts.update(o1);
connection_options copts(client_connection_options_);
copts.update(o2);
connection conn = connect(url, copts);
return make_thread_safe(
conn.default_session().open_receiver(proton::url(url).path(), lopts));
}
listener container_impl::listen(const std::string& url, listen_handler& lh) {
if (acceptors_.find(url) != acceptors_.end())
throw error("already listening on " + url);
connection_options opts = server_connection_options(); // Defaults
proton_handler *h = opts.handler();
internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>();
proton::url u(url);
pn_acceptor_t *acptr = pn_reactor_acceptor(
reactor_.pn_object(), u.host().c_str(), u.port().c_str(), chandler.get());
if (!acptr) {
std::string err(pn_error_text(pn_io_error(reactor_.pn_io())));
lh.on_error(err);
lh.on_close();
throw error(err);
}
// Do not use pn_acceptor_set_ssl_domain(). Manage the incoming connections ourselves for
// more flexibility (i.e. ability to change the server cert for a long running listener).
listener_context& lc(listener_context::get(acptr));
lc.listen_handler_ = &lh;
lc.ssl = u.scheme() == url::AMQPS;
listener_context::get(acptr).listen_handler_ = &lh;
acceptors_[url] = make_wrapper(acptr);
return listener(*this, url);
}
void container_impl::stop_listening(const std::string& url) {
acceptors::iterator i = acceptors_.find(url);
if (i != acceptors_.end())
close_acceptor(i->second);
}
task container_impl::schedule(int delay, proton_handler *h) {
internal::pn_ptr<pn_handler_t> task_handler;
if (h)
task_handler = cpp_handler(h);
return reactor_.schedule(delay, task_handler.get());
}
void container_impl::client_connection_options(const connection_options &opts) {
client_connection_options_ = opts;
}
void container_impl::server_connection_options(const connection_options &opts) {
server_connection_options_ = opts;
}
void container_impl::sender_options(const proton::sender_options &opts) {
sender_options_ = opts;
}
void container_impl::receiver_options(const proton::receiver_options &opts) {
receiver_options_ = opts;
}
void container_impl::configure_server_connection(connection &c) {
pn_acceptor_t *pnp = pn_connection_acceptor(unwrap(c));
listener_context &lc(listener_context::get(pnp));
pn_connection_set_container(unwrap(c), id_.c_str());
connection_options opts = server_connection_options_;
opts.update(lc.get_options());
opts.apply(c);
// Handler applied separately
proton_handler *h = opts.handler();
if (h) {
internal::pn_ptr<pn_handler_t> chandler = cpp_handler(h);
pn_record_t *record = pn_connection_attachments(unwrap(c));
pn_record_set_handler(record, chandler.get());
}
}
void container_impl::run() {
do {
reactor_.run();
} while (!auto_stop_);
}
void container_impl::stop(const error_condition&) {
reactor_.stop();
}
void container_impl::auto_stop(bool set) {
auto_stop_ = set;
}
default_container::default_container(messaging_handler& h, const std::string& id) : impl_(new container_impl(id, &h)) {}
default_container::default_container(const std::string& id) : impl_(new container_impl(id)) {}
returned<connection> default_container::connect(const std::string& url, const connection_options &o) { return impl_->connect(url, o); }
listener default_container::listen(const std::string& url, listen_handler& l) { return impl_->listen(url, l); }
void default_container::stop_listening(const std::string& url) { impl_->stop_listening(url); }
void default_container::run() { impl_->run(); }
void default_container::auto_stop(bool set) { impl_->auto_stop(set); }
void default_container::stop(const error_condition& err) { impl_->stop(err); }
returned<sender> default_container::open_sender(const std::string &u, const proton::sender_options &o, const connection_options &c) { return impl_->open_sender(u, o, c); }
returned<receiver> default_container::open_receiver(const std::string &u, const proton::receiver_options &o, const connection_options &c) { return impl_->open_receiver(u, o, c); }
std::string default_container::id() const { return impl_->id(); }
void default_container::client_connection_options(const connection_options &o) { impl_->client_connection_options(o); }
connection_options default_container::client_connection_options() const { return impl_->client_connection_options(); }
void default_container::server_connection_options(const connection_options &o) { impl_->server_connection_options(o); }
connection_options default_container::server_connection_options() const { return impl_->server_connection_options(); }
void default_container::sender_options(const class sender_options &o) { impl_->sender_options(o); }
class sender_options default_container::sender_options() const { return impl_->sender_options(); }
void default_container::receiver_options(const class receiver_options & o) { impl_->receiver_options(o); }
class receiver_options default_container::receiver_options() const { return impl_->receiver_options(); }
}