blob: 44dfffe117228130dba1d1eac45206ed8cf6f243 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "proactor_container_impl.hpp"
#include "proactor_work_queue_impl.hpp"
#include "connect_config.hpp"
#include "proton/error_condition.hpp"
#include "proton/listen_handler.hpp"
#include "proton/listener.hpp"
#include "proton/reconnect_options.hpp"
#include "proton/ssl.hpp"
#include "proton/transport.hpp"
#include "proton/url.hpp"
#include "proton/connection.h"
#include "proton/listener.h"
#include "proton/proactor.h"
#include "proton/transport.h"
#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "reconnect_options_impl.hpp"
#include "proton_bits.hpp"
#include <assert.h>
#include <string.h>
#include <algorithm>
#include <vector>
#include <thread>
#include <random>
// XXXX: Debug
//#include <iostream>
namespace proton {
class container::impl::common_work_queue : public work_queue::impl {
public:
common_work_queue(container::impl& c): container_(c), finished_(false), running_(false) {}
typedef std::vector<work> jobs;
void run_all_jobs();
void finished() { GUARD(lock_); finished_ = true; }
void schedule(duration, work);
MUTEX(lock_)
container::impl& container_;
jobs jobs_;
bool finished_;
bool running_;
};
void container::impl::common_work_queue::schedule(duration d, work f) {
// Note this is an unbounded work queue.
// A resource-safe implementation should be bounded.
if (finished_) return;
container_.schedule(d, make_work(&work_queue::impl::add_void, (work_queue::impl*)this, f));
}
void container::impl::common_work_queue::run_all_jobs() {
jobs j;
// Lock this operation for mt
{
GUARD(lock_);
// Ensure that we never run work from this queue concurrently
if (running_) return;
running_ = true;
// But allow adding to the queue concurrently to running
std::swap(j, jobs_);
}
// Run queued work, but ignore any exceptions
for (jobs::iterator f = j.begin(); f != j.end(); ++f) try {
(*f)();
} catch (...) {};
{
GUARD(lock_);
running_ = false;
}
return;
}
class container::impl::connection_work_queue : public common_work_queue {
public:
connection_work_queue(container::impl& ct, pn_connection_t* c): common_work_queue(ct), connection_(c) {}
bool add(work f);
pn_connection_t* connection_;
};
bool container::impl::connection_work_queue::add(work f) {
// Note this is an unbounded work queue.
// A resource-safe implementation should be bounded.
GUARD(lock_);
if (finished_) return false;
jobs_.push_back(f);
pn_connection_wake(connection_);
return true;
}
class container::impl::container_work_queue : public common_work_queue {
public:
container_work_queue(container::impl& c): common_work_queue(c) {}
~container_work_queue() { container_.remove_work_queue(this); }
bool add(work f);
};
bool container::impl::container_work_queue::add(work f) {
// Note this is an unbounded work queue.
// A resource-safe implementation should be bounded.
GUARD(lock_);
if (finished_) return false;
jobs_.push_back(f);
pn_proactor_set_timeout(container_.proactor_, 0);
return true;
}
class work_queue::impl* container::impl::make_work_queue(container& c) {
return c.impl_->add_work_queue();
}
container::impl::impl(container& c, const std::string& id, messaging_handler* mh)
: threads_(0), container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
reconnecting_(0), auto_stop_(true), stopping_(false)
{}
container::impl::~impl() {
pn_proactor_free(proactor_);
}
container::impl::container_work_queue* container::impl::add_work_queue() {
container_work_queue* c = new container_work_queue(*this);
GUARD(work_queues_lock_);
work_queues_.insert(c);
return c;
}
void container::impl::remove_work_queue(container::impl::container_work_queue* l) {
GUARD(work_queues_lock_);
work_queues_.erase(l);
}
namespace {
void default_url_options(connection_options& opts, const url& url) {
opts.virtual_host(url.host());
if (!url.user().empty())
opts.user(url.user());
if (!url.password().empty())
opts.password(url.password());
// If scheme is amqps then use default tls settings
if (url.scheme()==url.AMQPS) {
opts.ssl_client_options(ssl_client_options());
}
}
}
pn_connection_t* container::impl::make_connection_lh(
const url& url,
const connection_options& user_opts)
{
if (stopping_)
throw proton::error("container is stopping");
connection_options opts;
opts.container_id(id_);
default_url_options(opts, url);
opts.update(client_connection_options_);
opts.update(user_opts);
messaging_handler* mh = opts.handler();
pn_connection_t *pnc = pn_connection();
connection_context& cc(connection_context::get(pnc));
cc.container = &container_;
cc.handler = mh;
cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc);
cc.reconnect_url_ = url;
cc.connection_options_.reset(new connection_options(opts));
make_wrapper(pnc).open(*cc.connection_options_);
return pnc; // 1 refcount from pn_connection()
}
// Takes ownership of pnc
//
// NOTE: After the call to start_connection() pnc is active in a proactor thread,
// and may even have been freed already. It is undefined to use pnc (or any
// object belonging to it) except in appropriate handlers.
//
// SUBTLE NOTE: There must not be any proton::object wrappers in scope when
// start_connection() is called. The wrapper destructor will call pn_decref()
// after start_connection() which is undefined!
//
void container::impl::start_connection(const url& url, pn_connection_t *pnc) {
pn_transport_t* pnt = pn_transport();
connection_context& cc = connection_context::get(pnc);
connection_options& co = *cc.connection_options_;
co.apply_unbound_client(pnt);
char caddr[PN_MAX_ADDR];
pn_proactor_addr(caddr, sizeof(caddr), url.host().c_str(), url.port().c_str());
pn_proactor_connect2(proactor_, pnc, pnt, caddr); // Takes ownership of pnc, pnt
}
void container::impl::reconnect(pn_connection_t* pnc) {
--reconnecting_;
if (stopping_ && reconnecting_==0) {
pn_connection_free(pnc);
//TODO: We've lost the error - we should really propagate it here
pn_proactor_disconnect(proactor_, NULL);
return;
}
connection_context& cc = connection_context::get(pnc);
reconnect_context& rc = *cc.reconnect_context_.get();
connection_options& co = *cc.connection_options_;
co.apply_reconnect_urls(pnc);
// Figure out next connection url to try
// rc.current_url_ == -1 means try the url specified in connect, not a failover url
const proton::url url(rc.current_url_==-1 ? cc.reconnect_url_ : cc.failover_urls_[rc.current_url_]);
// XXXX Debug:
//std::cout << "Retries: " << rc.retries_ << " Delay: " << rc.delay_ << " Trying: " << url << "@" << rc.current_url_ << std::endl;
++rc.current_url_;
// Did we go through all the urls?
if (rc.current_url_==int(cc.failover_urls_.size())) {
rc.current_url_ = -1;
++rc.retries_;
}
connection_options opts;
opts.container_id(id_);
default_url_options(opts, url);
opts.update(co);
messaging_handler* mh = opts.handler();
cc.handler = mh;
make_wrapper(pnc).open(co);
start_connection(url, pnc);
}
namespace {
duration random_between(duration min, duration max)
{
static thread_local std::default_random_engine gen;
std::uniform_int_distribution<duration::numeric_type> dist{min.milliseconds(), max.milliseconds()};
return duration(dist(gen));
}
duration next_delay(reconnect_context& rc) {
// If we've not retried before do it immediately
if (rc.retries_==0) return duration(0);
// If we haven't tried all failover urls yet this round do it immediately
if (rc.current_url_!=-1) return duration(0);
const reconnect_options_base& roi = rc.reconnect_options_;
if (rc.retries_==1) {
rc.delay_ = roi.delay;
} else {
rc.delay_ = std::min(roi.max_delay, rc.delay_ * roi.delay_multiplier);
}
return random_between(roi.delay, rc.delay_);
}
inline reconnect_context* get_reconnect_context(pn_connection_t* pnc) {
return connection_context::get(pnc).reconnect_context_.get();
}
void reset_reconnect(pn_connection_t* pnc) {
reconnect_context* rc = get_reconnect_context(pnc);
if (!rc) return;
rc->delay_ = 0;
rc->retries_ = 0;
// set retry to the initial url next
rc->current_url_ = -1;
}
}
bool container::impl::can_reconnect(pn_connection_t* pnc) {
reconnect_context* rc = get_reconnect_context(pnc);
// If reconnect not enabled just fail
if (!rc) return false;
// Don't reconnect if we are locally closed, the application will
// not expect a connection it closed to re-open.
if (rc->stop_reconnect_) return false;
// If container stopping don't try to reconnect
// - we pretend to have set up a reconnect attempt so
// that the proactor disconnect will finish and we will exit
// the run loop without error.
{
GUARD(lock_);
if (stopping_) return true;
}
const reconnect_options_base& roi = rc->reconnect_options_;
pn_transport_t* t = pn_connection_transport(pnc);
pn_condition_t* condition = pn_transport_condition(t);
// If we failed to authenticate then don't reconnect any more and just fail
if ( !strcmp(pn_condition_get_name(condition), "amqp:unauthorized-access") ) return false;
// If too many reconnect attempts just fail
if ( roi.max_attempts != 0 && rc->retries_ >= roi.max_attempts) {
pn_condition_format(condition, "proton:io", "Too many reconnect attempts (%d)", rc->retries_);
return false;
}
return true;
}
void container::impl::setup_reconnect(pn_connection_t* pnc) {
connection_context& cc = connection_context::get(pnc);
reconnect_context* rc = cc.reconnect_context_.get();
if (!rc) return;
rc->reconnected_ = true;
// Recover connection from proactor
pn_proactor_release_connection(pnc);
// Figure out delay till next reconnect
duration delay = next_delay(*rc);
// Schedule reconnect - can do this on container work queue as no one can have the connection
// now anyway
schedule(delay, make_work(&container::impl::reconnect, this, pnc));
++reconnecting_;
}
returned<connection> container::impl::connect(
const std::string& addr,
const proton::connection_options& user_opts)
{
proton::url url(addr);
pn_connection_t* pnc = 0;
{
GUARD(lock_);
pnc = make_connection_lh(url, user_opts);
}
start_connection(url, pnc); // See comment on start_connection
return make_returned<proton::connection>(pnc);
}
returned<connection> container::impl::connect() {
connection_options opts;
std::string addr = apply_config(opts);
return connect(addr, opts);
}
returned<sender> container::impl::open_sender(const std::string &urlstr, const proton::sender_options &o1, const connection_options &o2)
{
proton::url url(urlstr);
pn_link_t* pnl = 0;
pn_connection_t* pnc = 0;
{
GUARD(lock_);
proton::sender_options lopts(sender_options_);
lopts.update(o1);
pnc = make_connection_lh(url, o2);
connection conn(make_wrapper(pnc));
pnl = unwrap(conn.default_session().open_sender(url.path(), lopts));
}
start_connection(url, pnc); // See comment on start_connection
return make_returned<sender>(pnl); // Unsafe returned pointer
}
returned<receiver> container::impl::open_receiver(const std::string &urlstr, const proton::receiver_options &o1, const connection_options &o2) {
proton::url url(urlstr);
pn_link_t* pnl = 0;
pn_connection_t* pnc = 0;
{
GUARD(lock_);
proton::receiver_options lopts(receiver_options_);
lopts.update(o1);
pnc = make_connection_lh(url, o2);
connection conn(make_wrapper(pnc));
pnl = unwrap(conn.default_session().open_receiver(url.path(), lopts));
}
start_connection(url, pnc); // See comment on start_connection
return make_returned<receiver>(pnl);
}
pn_listener_t* container::impl::listen_common_lh(const std::string& addr) {
if (stopping_)
throw proton::error("container is stopping");
proton::url url(addr, false); // Don't want un-helpful defaults like "localhost"
// Figure out correct string len then create connection address
int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str());
std::vector<char> caddr(len+1);
pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str());
pn_listener_t* listener = pn_listener();
pn_listener_set_context(listener, &container_);
pn_proactor_listen(proactor_, listener, &caddr[0], 16);
return listener;
}
proton::listener container::impl::listen(const std::string& addr) {
GUARD(lock_);
pn_listener_t* listener = listen_common_lh(addr);
return proton::listener(listener);
}
proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) {
GUARD(lock_);
pn_listener_t* listener = listen_common_lh(addr);
listener_context& lc=listener_context::get(listener);
lc.connection_options_.reset(new connection_options(opts));
return proton::listener(listener);
}
proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) {
GUARD(lock_);
pn_listener_t* listener = listen_common_lh(addr);
listener_context& lc=listener_context::get(listener);
lc.listen_handler_ = &lh;
return proton::listener(listener);
}
void container::impl::schedule(duration delay, work f) {
GUARD(deferred_lock_);
timestamp now = timestamp::now();
// Record timeout; Add callback to timeout sorted list
scheduled s = {now+delay, f};
deferred_.push_back(s);
std::push_heap(deferred_.begin(), deferred_.end());
// Set timeout for current head of timeout queue
scheduled* next = &deferred_.front();
pn_millis_t timeout_ms = (now < next->time) ? (next->time-now).milliseconds() : 0;
pn_proactor_set_timeout(proactor_, timeout_ms);
}
void container::impl::client_connection_options(const connection_options &opts) {
GUARD(lock_);
client_connection_options_ = opts;
}
void container::impl::server_connection_options(const connection_options &opts) {
GUARD(lock_);
server_connection_options_ = opts;
}
void container::impl::sender_options(const proton::sender_options &opts) {
GUARD(lock_);
sender_options_ = opts;
}
void container::impl::receiver_options(const proton::receiver_options &opts) {
GUARD(lock_);
receiver_options_ = opts;
}
void container::impl::run_timer_jobs() {
timestamp now = timestamp::now();
std::vector<scheduled> tasks;
// We first extract all the runnable tasks and then run them - this is to avoid having tasks
// injected as we are running them (which could potentially never end)
{
GUARD(deferred_lock_);
// Figure out how many tasks we need to execute and pop them to the back of the
// queue (in reverse order)
unsigned i = 0;
for (;;) {
// Have we seen all the queued tasks?
if ( deferred_.size()-i==0 ) break;
// Is the next task in the future?
timestamp next_time = deferred_.front().time;
if ( next_time>now ) {
pn_proactor_set_timeout(proactor_, (next_time-now).milliseconds());
break;
}
std::pop_heap(deferred_.begin(), deferred_.end()-i);
++i;
}
// Nothing to do
if ( i==0 ) return;
// Now we know how many tasks to run
if ( deferred_.size()==i ) {
// If we sorted the entire heap, then we're executing every task
// so don't need to copy and can just swap
tasks.swap(deferred_);
} else {
// Otherwise just copy the ones we sorted
tasks = std::vector<scheduled>(deferred_.end()-i, deferred_.end());
// Remove tasks to be executed
deferred_.resize(deferred_.size()-i);
}
}
// We've now taken the tasks to run from the deferred tasks
// so we can run them unlocked
// NB. We copied the due tasks in reverse order so execute from end
for (int i = tasks.size()-1; i>=0; --i) tasks[i].task();
}
// Return true if this thread is finished
container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {
// If we have any pending connection work, do it now
pn_connection_t* c = pn_event_connection(event);
if (c) {
work_queue::impl* queue = connection_context::get(c).work_queue_.impl_.get();
queue->run_all_jobs();
}
// Process events that shouldn't be sent to messaging_handler
switch (pn_event_type(event)) {
case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
// If we're stopping interrupt all other threads still running
if (auto_stop_) pn_proactor_interrupt(proactor_);
return ContinueLoop;
// We only interrupt to stop threads
case PN_PROACTOR_INTERRUPT: {
// Interrupt any other threads still running
GUARD(lock_);
if (threads_>1) pn_proactor_interrupt(proactor_);
return EndLoop;
}
case PN_PROACTOR_TIMEOUT: {
// Can get an immediate timeout, if we have a container event loop inject
run_timer_jobs();
// Run every container event loop job
// This is not at all efficient and single threads all these jobs, but it does correctly
// serialise them
work_queues queues;
{
GUARD(work_queues_lock_);
queues = work_queues_;
}
for (work_queues::iterator queue = queues.begin(); queue!=queues.end(); ++queue) {
(*queue)->run_all_jobs();
}
return EndBatch;
}
case PN_LISTENER_OPEN: {
pn_listener_t* l = pn_event_listener(event);
proton::listen_handler* handler;
{
GUARD(lock_);
listener_context &lc(listener_context::get(l));
handler = lc.listen_handler_;
}
if (handler) {
listener lstnr(l);
handler->on_open(lstnr);
}
return ContinueLoop;
}
case PN_LISTENER_ACCEPT: {
pn_listener_t* l = pn_event_listener(event);
pn_connection_t* c = pn_connection();
pn_connection_set_container(c, id_.c_str());
connection_options opts = server_connection_options_;
listen_handler* handler;
listener_context* lc;
const connection_options* options;
{
GUARD(lock_);
lc = &listener_context::get(l);
handler = lc->listen_handler_;
options = lc->connection_options_.get();
}
if (handler) {
listener lstr(l);
opts.update(handler->on_accept(lstr));
}
else if (options) opts.update(*options);
// Handler applied separately
connection_context& cc = connection_context::get(c);
cc.container = &container_;
cc.listener_context_ = lc;
cc.handler = opts.handler();
cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c);
pn_transport_t* pnt = pn_transport();
pn_transport_set_server(pnt);
opts.apply_unbound_server(pnt);
pn_listener_accept2(l, c, pnt);
return ContinueLoop;
}
case PN_LISTENER_CLOSE: {
pn_listener_t* l = pn_event_listener(event);
proton::listen_handler* handler;
{
GUARD(lock_);
listener_context &lc(listener_context::get(l));
handler = lc.listen_handler_;
}
listener lstnr(l);
if (handler) {
pn_condition_t* c = pn_listener_condition(l);
if (pn_condition_is_set(c)) {
handler->on_error(lstnr, make_wrapper(c).what());
}
handler->on_close(lstnr);
}
return ContinueLoop;
}
// Connection driver will bind a new transport to the connection at this point
case PN_CONNECTION_INIT:
return ContinueLoop;
case PN_CONNECTION_REMOTE_OPEN: {
// This is the only event that we get indicating that the connection succeeded so
// it's the only place to reset the reconnection logic.
//
// Just note we have a connection then process normally
pn_connection_t* c = pn_event_connection(event);
reset_reconnect(c);
break;
}
case PN_CONNECTION_REMOTE_CLOSE: {
pn_connection_t *c = pn_event_connection(event);
pn_condition_t *cc = pn_connection_remote_condition(c);
// If reconnect is on, amqp:connection:forced should be treated specially:
// Hide the connection error/close events from the application;
// Then we close the connection noting the forced close;
// Then set up for reconnect handling.
if (get_reconnect_context(c) &&
pn_condition_is_set(cc) &&
!strcmp(pn_condition_get_name(cc), "amqp:connection:forced"))
{
pn_transport_t* t = pn_event_transport(event);
pn_condition_t* tc = pn_transport_condition(t);
pn_condition_copy(tc, cc);
pn_transport_close_tail(t);
pn_connection_close(c);
return ContinueLoop;
}
break;
}
case PN_TRANSPORT_CLOSED: {
// If reconnect is turned on then handle closed on error here with reconnect attempt
pn_connection_t* c = pn_event_connection(event);
pn_transport_t* t = pn_event_transport(event);
if (pn_condition_is_set(pn_transport_condition(t)) && can_reconnect(c)) {
messaging_handler *mh = get_handler(event);
if (mh) { // Notify handler of pending reconnect
transport trans = make_wrapper(t);
try {
mh->on_transport_error(trans);
} catch (const proton::error& e) {
// If this is the same error we are re-connecting for,
// ignore it. It was probably thrown by the default
// messaging_handler::on_error(), and if not the user has
// already seen it.
//
// If this isn't the same error, then something unexpected
// has happened, so re-throw.
if (std::string(e.what()) != trans.error().what())
throw;
}
}
// on_transport_error() may have closed the connection, check again.
reconnect_context* rc = get_reconnect_context(c);
if (rc && !(rc->stop_reconnect_)) {
setup_reconnect(c);
return ContinueLoop;
}
}
// Otherwise, this connection will be freed by the proactor.
// Mark its work_queue finished so it won't try to use the freed connection.
connection_context::get(c).work_queue_.impl_.get()->finished();
break;
}
default:
break;
}
messaging_handler *mh = get_handler(event);
if (mh) messaging_adapter::dispatch(*mh, event);
return ContinueLoop;
}
// Figure out the handler for the primary object for event
messaging_handler* container::impl::get_handler(pn_event_t *event) {
messaging_handler *mh = 0;
// First try for a link (send/receiver) handler
pn_link_t *link = pn_event_link(event);
if (link) mh = get_handler(link);
// Try for session handler if no link handler
pn_session_t *session = pn_event_session(event);
if (session && !mh) mh = get_handler(session);
// Try for connection handler if none of the above
pn_connection_t *connection = pn_event_connection(event);
if (connection && !mh) mh = get_handler(connection);
// Use container handler if nothing more specific (must be a container handler)
return mh ? mh : handler_;
}
void container::impl::thread() {
bool finished;
{
GUARD(lock_);
++threads_;
finished = stopping_;
}
while (!finished) {
pn_event_batch_t *events = pn_proactor_wait(proactor_);
pn_event_t *e;
error_condition error;
try {
while ((e = pn_event_batch_next(events))) {
dispatch_result r = dispatch(e);
finished = r==EndLoop;
if (r!=ContinueLoop) break;
}
} catch (const std::exception& e) {
// If we caught an exception then shutdown the (other threads of the) container
error = error_condition("exception", e.what());
} catch (...) {
error = error_condition("exception", "container shut-down by unknown exception");
}
pn_proactor_done(proactor_, events);
if (!error.empty()) {
finished = true;
{
GUARD(lock_);
disconnect_error_ = error;
}
stop(error);
}
}
{
GUARD(lock_);
--threads_;
}
}
void container::impl::start_event() {
if (handler_) handler_->on_container_start(container_);
}
void container::impl::stop_event() {
if (handler_) handler_->on_container_stop(container_);
}
void container::impl::run(int threads) {
// Have to "manually" generate container events
CALL_ONCE(start_once_, &impl::start_event, this);
// Run handler threads
threads = std::max(threads, 1); // Ensure at least 1 thread
typedef std::vector<std::thread*> vt; // pointer vector to work around failures in older compilers
vt ts(threads-1);
for (vt::iterator i = ts.begin(); i != ts.end(); ++i) {
*i = new std::thread(&impl::thread, this);
}
thread(); // Use this thread too.
// Wait for the other threads to stop
for (vt::iterator i = ts.begin(); i != ts.end(); ++i) {
(*i)->join();
delete *i;
}
bool last = false;
{
GUARD(lock_);
last = threads_==0;
}
if (last) CALL_ONCE(stop_once_, &impl::stop_event, this);
// Throw an exception if we disconnected the proactor because of an exception
{
GUARD(lock_);
if (!disconnect_error_.empty()) throw proton::error(disconnect_error_.description());
};
}
void container::impl::auto_stop(bool set) {
GUARD(lock_);
auto_stop_ = set;
}
void container::impl::stop(const proton::error_condition& err) {
{
GUARD(lock_);
if (stopping_) return; // Already stopping
auto_stop_ = true;
stopping_ = true;
// Have to wait until actual reconnect to stop or we leak the connection
if (reconnecting_>0) return;
}
pn_condition_t* error_condition = pn_condition();
set_error_condition(err, error_condition);
pn_proactor_disconnect(proactor_, error_condition);
pn_condition_free(error_condition);
}
}