blob: b46fb723157d0aa64a21216dad2d3a7493208d59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/server.h>
#include "entity.h"
#include "entity_cache.h"
#include "dispatch_private.h"
#include "server_private.h"
#include "timer_private.h"
#include "alloc.h"
#include "config.h"
#include <stdio.h>
#include <time.h>
#include <string.h>
#include <errno.h>
static __thread qd_server_t *thread_server = 0;
ALLOC_DEFINE(qd_work_item_t);
ALLOC_DEFINE(qd_listener_t);
ALLOC_DEFINE(qd_connector_t);
ALLOC_DEFINE(qd_deferred_call_t);
ALLOC_DEFINE(qd_connection_t);
ALLOC_DEFINE(qd_user_fd_t);
const char *QD_CONNECTION_TYPE = "connection";
static const char *conn_state_names[] = {
"connecting",
"opening",
"operational",
"failed",
"user"
};
ENUM_DEFINE(conn_state, conn_state_names);
static qd_thread_t *thread(qd_server_t *qd_server, int id)
{
qd_thread_t *thread = NEW(qd_thread_t);
if (!thread)
return 0;
thread->qd_server = qd_server;
thread->thread_id = id;
thread->running = 0;
thread->canceled = 0;
thread->using_thread = 0;
return thread;
}
qd_error_t qd_entity_update_connection(qd_entity_t* entity, void *impl);
static qd_error_t connection_entity_update_host(qd_entity_t* entity, qd_connection_t *conn)
{
const qd_server_config_t *config;
if (conn->connector) {
config = conn->connector->config;
char host[strlen(config->host)+strlen(config->port)+2];
snprintf(host, sizeof(host), "%s:%s", config->host, config->port);
return qd_entity_set_string(entity, "host", host);
}
else
return qd_entity_set_string(entity, "host", qdpn_connector_name(conn->pn_cxtr));
}
qd_error_t qd_entity_refresh_connection(qd_entity_t* entity, void *impl)
{
qd_connection_t *conn = (qd_connection_t*)impl;
const qd_server_config_t *config =
conn->connector ? conn->connector->config : conn->listener->config;
pn_transport_t *tport = 0;
pn_sasl_t *sasl = 0;
pn_ssl_t *ssl = 0;
const char *mech = 0;
const char *user = 0;
if (conn->pn_conn) {
tport = pn_connection_transport(conn->pn_conn);
ssl = conn->ssl;
}
if (tport) {
sasl = pn_sasl(tport);
user = pn_transport_get_user(tport);
}
if (sasl)
mech = pn_sasl_get_mech(sasl);
if (qd_entity_set_string(entity, "state", conn_state_name(conn->state)) == 0 &&
qd_entity_set_string(entity, "container",
conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0) == 0 &&
connection_entity_update_host(entity, conn) == 0 &&
qd_entity_set_string(entity, "sasl", mech) == 0 &&
qd_entity_set_string(entity, "role", config->role) == 0 &&
qd_entity_set_string(entity, "dir", conn->connector ? "out" : "in") == 0 &&
qd_entity_set_string(entity, "user", user) == 0 &&
qd_entity_set_bool(entity, "isAuthenticated", tport && pn_transport_is_authenticated(tport)) == 0 &&
qd_entity_set_bool(entity, "isEncrypted", tport && pn_transport_is_encrypted(tport)) == 0 &&
qd_entity_set_bool(entity, "ssl", ssl != 0) == 0) {
if (ssl) {
#define SSL_ATTR_SIZE 50
char proto[SSL_ATTR_SIZE];
char cipher[SSL_ATTR_SIZE];
pn_ssl_get_protocol_name(ssl, proto, SSL_ATTR_SIZE);
pn_ssl_get_cipher_name(ssl, cipher, SSL_ATTR_SIZE);
qd_entity_set_string(entity, "sslProto", proto);
qd_entity_set_string(entity, "sslCipher", cipher);
qd_entity_set_long(entity, "sslSsf", pn_ssl_get_ssf(ssl));
}
return QD_ERROR_NONE;
}
return qd_error_code();
}
static qd_error_t listener_setup_ssl(qd_connection_t *ctx, const qd_server_config_t *config, pn_transport_t *tport)
{
pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
if (!domain) return qd_error(QD_ERROR_RUNTIME, "No SSL support");
// setup my identifying cert:
if (pn_ssl_domain_set_credentials(domain,
config->ssl_certificate_file,
config->ssl_private_key_file,
config->ssl_password)) {
pn_ssl_domain_free(domain);
return qd_error(QD_ERROR_RUNTIME, "Cannot set SSL credentials");
}
if (!config->ssl_required) {
if (pn_ssl_domain_allow_unsecured_client(domain)) {
pn_ssl_domain_free(domain);
return qd_error(QD_ERROR_RUNTIME, "Cannot allow unsecured client");
}
}
// for peer authentication:
if (config->ssl_trusted_certificate_db) {
if (pn_ssl_domain_set_trusted_ca_db(domain, config->ssl_trusted_certificate_db)) {
pn_ssl_domain_free(domain);
return qd_error(QD_ERROR_RUNTIME, "Cannot set trusted SSL CA" );
}
}
const char *trusted = config->ssl_trusted_certificate_db;
if (config->ssl_trusted_certificates)
trusted = config->ssl_trusted_certificates;
// do we force the peer to send a cert?
if (config->ssl_require_peer_authentication) {
if (!trusted || pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER, trusted)) {
pn_ssl_domain_free(domain);
return qd_error(QD_ERROR_RUNTIME, "Cannot set peer authentication");
}
}
ctx->ssl = pn_ssl(tport);
if (!ctx->ssl || pn_ssl_init(ctx->ssl, domain, 0)) {
pn_ssl_domain_free(domain);
return qd_error(QD_ERROR_RUNTIME, "Cannot initialize SSL");
}
return QD_ERROR_NONE;
}
// Format the identity of an incoming connection to buf for logging
static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr)
{
qd_listener_t *qd_listener = qdpn_listener_context(qdpn_connector_listener(cxtr));
assert(qd_listener);
const char *cname = qdpn_connector_name(cxtr);
const char *host = qd_listener->config->host;
const char *port = qd_listener->config->port;
snprintf(buf, size, "incoming connection from %s to %s:%s", cname, host, port);
return buf;
}
static void add_connection_properties(pn_connection_t *conn)
{
static char *product_key = "product";
static char *product_val = "qpid-dispatch-router";
static char *version_key = "version";
pn_data_put_map(pn_connection_properties(conn));
pn_data_enter(pn_connection_properties(conn));
pn_data_put_symbol(pn_connection_properties(conn), pn_bytes(strlen(product_key), product_key));
pn_data_put_string(pn_connection_properties(conn), pn_bytes(strlen(product_val), product_val));
pn_data_put_symbol(pn_connection_properties(conn), pn_bytes(strlen(version_key), version_key));
pn_data_put_string(pn_connection_properties(conn), pn_bytes(strlen(QPID_DISPATCH_VERSION), QPID_DISPATCH_VERSION));
pn_data_exit(pn_connection_properties(conn));
}
static void thread_process_listeners_LH(qd_server_t *qd_server)
{
qdpn_driver_t *driver = qd_server->driver;
qdpn_listener_t *listener;
qdpn_connector_t *cxtr;
qd_connection_t *ctx;
for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) {
cxtr = qdpn_listener_accept(listener);
if (!cxtr)
continue;
char logbuf[qd_log_max_len()];
qd_log(qd_server->log_source, QD_LOG_DEBUG, "Accepting %s",
log_incoming(logbuf, sizeof(logbuf), cxtr));
ctx = new_qd_connection_t();
DEQ_ITEM_INIT(ctx);
ctx->state = CONN_STATE_OPENING;
ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
ctx->collector = 0;
ctx->ssl = 0;
ctx->listener = qdpn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
ctx->user_context = 0;
ctx->link_context = 0;
ctx->ufd = 0;
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
pn_connection_t *conn = pn_connection();
ctx->collector = pn_collector();
pn_connection_collect(conn, ctx->collector);
pn_connection_set_container(conn, qd_server->container_name);
pn_data_put_symbol(pn_connection_offered_capabilities(conn), pn_bytes(clen, (char*) QD_CAPABILITY_ANONYMOUS_RELAY));
add_connection_properties(conn);
qdpn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
ctx->owner_thread = CONTEXT_NO_OWNER;
qdpn_connector_set_context(cxtr, ctx);
// qd_server->lock is already locked
DEQ_INSERT_TAIL(qd_server->connections, ctx);
qd_entity_cache_add(QD_CONNECTION_TYPE, ctx);
//
// Get a pointer to the transport so we can insert security components into it
//
pn_transport_t *tport = qdpn_connector_transport(cxtr);
const qd_server_config_t *config = ctx->listener->config;
//
// Configure the transport.
//
pn_transport_set_server(tport);
pn_transport_set_max_frame(tport, config->max_frame_size);
// Set up SSL if configured
if (config->ssl_enabled) {
qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring SSL on %s",
log_incoming(logbuf, sizeof(logbuf), cxtr));
if (listener_setup_ssl(ctx, config, tport) != QD_ERROR_NONE) {
qd_log(qd_server->log_source, QD_LOG_ERROR, "%s on %s",
qd_error_message(), log_incoming(logbuf, sizeof(logbuf), cxtr));
qdpn_connector_close(cxtr);
continue;
}
}
//
// Set up SASL
//
pn_sasl_t *sasl = pn_sasl(tport);
if (qd_server->sasl_config_path)
pn_sasl_config_path(sasl, qd_server->sasl_config_path);
pn_sasl_config_name(sasl, qd_server->sasl_config_name);
if (config->sasl_mechanisms)
pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms);
pn_transport_require_auth(tport, config->requireAuthentication);
pn_transport_require_encryption(tport, config->requireEncryption);
pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication);
}
}
static void handle_signals_LH(qd_server_t *qd_server)
{
int signum = qd_server->pending_signal;
if (signum) {
qd_server->pending_signal = 0;
if (qd_server->signal_handler) {
sys_mutex_unlock(qd_server->lock);
qd_server->signal_handler(qd_server->signal_context, signum);
sys_mutex_lock(qd_server->lock);
}
}
}
static void block_if_paused_LH(qd_server_t *qd_server)
{
if (qd_server->pause_requests > 0) {
qd_server->threads_paused++;
sys_cond_signal_all(qd_server->cond);
while (qd_server->pause_requests > 0)
sys_cond_wait(qd_server->cond, qd_server->lock);
qd_server->threads_paused--;
}
}
static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
{
qd_deferred_call_list_t calls;
qd_deferred_call_t *dc;
//
// Copy the deferred calls out of the connection under lock.
//
DEQ_INIT(calls);
sys_mutex_lock(conn->deferred_call_lock);
dc = DEQ_HEAD(conn->deferred_calls);
while (dc) {
DEQ_REMOVE_HEAD(conn->deferred_calls);
DEQ_INSERT_TAIL(calls, dc);
dc = DEQ_HEAD(conn->deferred_calls);
}
sys_mutex_unlock(conn->deferred_call_lock);
//
// Invoke the calls outside of the critical section.
//
dc = DEQ_HEAD(calls);
while (dc) {
DEQ_REMOVE_HEAD(calls);
dc->call(dc->context, discard);
free_qd_deferred_call_t(dc);
dc = DEQ_HEAD(calls);
}
}
static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
{
qd_connection_t *ctx = qdpn_connector_context(cxtr);
int events = 0;
int passes = 0;
if (ctx->state == CONN_STATE_USER) {
qd_server->ufd_handler(ctx->ufd->context, ctx->ufd);
return 1;
}
do {
passes++;
//
// Step the engine for pre-handler processing
//
qdpn_connector_process(cxtr);
//
// Call the handler that is appropriate for the connector's state.
//
switch (ctx->state) {
case CONN_STATE_CONNECTING: {
if (qdpn_connector_closed(cxtr)) {
ctx->state = CONN_STATE_FAILED;
events = 0;
break;
}
size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
pn_connection_t *conn = pn_connection();
ctx->collector = pn_collector();
pn_connection_collect(conn, ctx->collector);
pn_connection_set_container(conn, qd_server->container_name);
pn_data_put_symbol(pn_connection_offered_capabilities(conn), pn_bytes(clen, (char*) QD_CAPABILITY_ANONYMOUS_RELAY));
add_connection_properties(conn);
qdpn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
pn_connection_open(conn);
ctx->pn_conn = conn;
ctx->state = CONN_STATE_OPENING;
assert(ctx->connector);
ctx->connector->state = CXTR_STATE_OPEN;
events = 1;
break;
}
case CONN_STATE_OPENING: {
qd_connection_t *qd_conn = (qd_connection_t*) qdpn_connector_context(cxtr);
pn_collector_t *collector = qd_connection_collector(qd_conn);
pn_event_t *event;
events = 0;
event = pn_collector_peek(collector);
while (event) {
if (pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
ctx->state = CONN_STATE_OPERATIONAL;
qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
if (ctx->connector) {
ce = QD_CONN_EVENT_CONNECTOR_OPEN;
ctx->connector->delay = 0;
} else
assert(ctx->listener);
qd_server->conn_handler(qd_server->conn_handler_context,
ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
events = 1;
break; // Break without popping this event. It will be re-processed in OPERATIONAL state.
} else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
ctx->state = CONN_STATE_FAILED;
if (ctx->connector) {
const qd_server_config_t *config = ctx->connector->config;
qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
}
}
pn_collector_pop(collector);
event = pn_collector_peek(collector);
}
break;
}
case CONN_STATE_OPERATIONAL:
if (qdpn_connector_closed(cxtr)) {
qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
QD_CONN_EVENT_CLOSE,
(qd_connection_t*) qdpn_connector_context(cxtr));
events = 0;
}
else {
invoke_deferred_calls(ctx, false);
qd_connection_t *qd_conn = (qd_connection_t*) qdpn_connector_context(cxtr);
pn_collector_t *collector = qd_connection_collector(qd_conn);
pn_event_t *event;
events = 0;
event = pn_collector_peek(collector);
while (event) {
events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
pn_collector_pop(collector);
event = pn_collector_peek(collector);
}
events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
}
break;
default:
break;
}
} while (events > 0);
return passes > 1;
}
//
// TEMPORARY FUNCTION PROTOTYPES
//
void qdpn_driver_wait_1(qdpn_driver_t *d);
int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout);
void qdpn_driver_wait_3(qdpn_driver_t *d);
//
// END TEMPORARY
//
static void *thread_run(void *arg)
{
qd_thread_t *thread = (qd_thread_t*) arg;
qd_work_item_t *work;
qdpn_connector_t *cxtr;
pn_connection_t *conn;
qd_connection_t *ctx;
int error;
int poll_result;
if (!thread)
return 0;
qd_server_t *qd_server = thread->qd_server;
thread_server = qd_server;
thread->running = 1;
if (thread->canceled)
return 0;
//
// Invoke the start handler if the application supplied one.
// This handler can be used to set NUMA or processor affinnity for the thread.
//
if (qd_server->start_handler)
qd_server->start_handler(qd_server->start_context, thread->thread_id);
//
// Main Loop
//
while (thread->running) {
sys_mutex_lock(qd_server->lock);
//
// Check for pending signals to process
//
handle_signals_LH(qd_server);
if (!thread->running) {
sys_mutex_unlock(qd_server->lock);
break;
}
//
// Check to see if the server is pausing. If so, block here.
//
block_if_paused_LH(qd_server);
if (!thread->running) {
sys_mutex_unlock(qd_server->lock);
break;
}
//
// Service pending timers.
//
qd_timer_t *timer = DEQ_HEAD(qd_server->pending_timers);
if (timer) {
DEQ_REMOVE_HEAD(qd_server->pending_timers);
//
// Mark the timer as idle in case it reschedules itself.
//
qd_timer_idle_LH(timer);
//
// Release the lock and invoke the connection handler.
//
sys_mutex_unlock(qd_server->lock);
timer->handler(timer->context);
qdpn_driver_wakeup(qd_server->driver);
continue;
}
//
// Check the work queue for connectors scheduled for processing.
//
work = DEQ_HEAD(qd_server->work_queue);
if (!work) {
//
// There is no pending work to do
//
if (qd_server->a_thread_is_waiting) {
//
// Another thread is waiting on the proton driver, this thread must
// wait on the condition variable until signaled.
//
sys_cond_wait(qd_server->cond, qd_server->lock);
} else {
//
// This thread elects itself to wait on the proton driver. Set the
// thread-is-waiting flag so other idle threads will not interfere.
//
qd_server->a_thread_is_waiting = true;
//
// Ask the timer module when its next timer is scheduled to fire. We'll
// use this value in driver_wait as the timeout. If there are no scheduled
// timers, the returned value will be -1.
//
qd_timestamp_t duration = qd_timer_next_duration_LH();
//
// Invoke the proton driver's wait sequence. This is a bit of a hack for now
// and will be improved in the future. The wait process is divided into three parts,
// the first and third of which need to be non-reentrant, and the second of which
// must be reentrant (and blocks).
//
qdpn_driver_wait_1(qd_server->driver);
sys_mutex_unlock(qd_server->lock);
do {
error = 0;
poll_result = qdpn_driver_wait_2(qd_server->driver, duration);
if (poll_result == -1)
error = errno;
} while (error == EINTR);
if (error) {
exit(-1);
}
sys_mutex_lock(qd_server->lock);
qdpn_driver_wait_3(qd_server->driver);
if (!thread->running) {
sys_mutex_unlock(qd_server->lock);
break;
}
//
// Visit the timer module.
//
struct timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);
qd_timestamp_t milliseconds = ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000;
qd_timer_visit_LH(milliseconds);
//
// Process listeners (incoming connections).
//
thread_process_listeners_LH(qd_server);
//
// Traverse the list of connectors-needing-service from the proton driver.
// If the connector is not already in the work queue and it is not currently
// being processed by another thread, put it in the work queue and signal the
// condition variable.
//
cxtr = qdpn_driver_connector(qd_server->driver);
while (cxtr) {
ctx = qdpn_connector_context(cxtr);
if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->enqueued = 1;
qd_work_item_t *workitem = new_qd_work_item_t();
DEQ_ITEM_INIT(workitem);
workitem->cxtr = cxtr;
DEQ_INSERT_TAIL(qd_server->work_queue, workitem);
sys_cond_signal(qd_server->cond);
}
cxtr = qdpn_driver_connector(qd_server->driver);
}
//
// Release our exclusive claim on qdpn_driver_wait.
//
qd_server->a_thread_is_waiting = false;
}
}
//
// If we were given a connector to work on from the work queue, mark it as
// owned by this thread and as no longer enqueued.
//
cxtr = 0;
if (work) {
DEQ_REMOVE_HEAD(qd_server->work_queue);
ctx = qdpn_connector_context(work->cxtr);
if (ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->owner_thread = thread->thread_id;
ctx->enqueued = 0;
qd_server->threads_active++;
cxtr = work->cxtr;
free_qd_work_item_t(work);
} else {
//
// This connector is being processed by another thread, re-queue it.
//
DEQ_INSERT_TAIL(qd_server->work_queue, work);
}
}
sys_mutex_unlock(qd_server->lock);
//
// Process the connector that we now have exclusive access to.
//
if (cxtr) {
int work_done = 1;
if (qdpn_connector_failed(cxtr))
qdpn_connector_close(cxtr);
//
// Even if the connector has failed there are still events that
// must be processed so that associated links will be cleaned up.
//
work_done = process_connector(qd_server, cxtr);
//
// Check to see if the connector was closed during processing
//
if (qdpn_connector_closed(cxtr)) {
qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx);
//
// Connector is closed. Free the context and the connector.
//
conn = qdpn_connector_connection(cxtr);
//
// If this is a dispatch connector, schedule the re-connect timer
//
if (ctx->connector) {
ctx->connector->ctx = 0;
ctx->connector->state = CXTR_STATE_CONNECTING;
qd_timer_schedule(ctx->connector->timer, ctx->connector->delay);
}
sys_mutex_lock(qd_server->lock);
DEQ_REMOVE(qd_server->connections, ctx);
qdpn_connector_free(cxtr);
if (conn)
pn_connection_free(conn);
if (ctx->collector)
pn_collector_free(ctx->collector);
invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
sys_mutex_free(ctx->deferred_call_lock);
free_qd_connection_t(ctx);
qd_server->threads_active--;
sys_mutex_unlock(qd_server->lock);
} else {
//
// The connector lives on. Mark it as no longer owned by this thread.
//
sys_mutex_lock(qd_server->lock);
ctx->owner_thread = CONTEXT_NO_OWNER;
qd_server->threads_active--;
sys_mutex_unlock(qd_server->lock);
}
//
// Wake up the proton driver to force it to reconsider its set of FDs
// in light of the processing that just occurred.
//
if (work_done)
qdpn_driver_wakeup(qd_server->driver);
}
}
return 0;
}
static void thread_start(qd_thread_t *thread)
{
if (!thread)
return;
thread->using_thread = 1;
thread->thread = sys_thread(thread_run, (void*) thread);
}
static void thread_cancel(qd_thread_t *thread)
{
if (!thread)
return;
thread->running = 0;
thread->canceled = 1;
}
static void thread_join(qd_thread_t *thread)
{
if (!thread)
return;
if (thread->using_thread) {
sys_thread_join(thread->thread);
sys_thread_free(thread->thread);
}
}
static void thread_free(qd_thread_t *thread)
{
if (!thread)
return;
free(thread);
}
static void cxtr_try_open(void *context)
{
qd_connector_t *ct = (qd_connector_t*) context;
if (ct->state != CXTR_STATE_CONNECTING)
return;
qd_connection_t *ctx = new_qd_connection_t();
DEQ_ITEM_INIT(ctx);
ctx->server = ct->server;
ctx->state = CONN_STATE_CONNECTING;
ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
ctx->enqueued = 0;
ctx->pn_conn = 0;
ctx->collector = 0;
ctx->ssl = 0;
ctx->listener = 0;
ctx->connector = ct;
ctx->context = ct->context;
ctx->user_context = 0;
ctx->link_context = 0;
ctx->ufd = 0;
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
qd_log(ct->server->log_source, QD_LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
//
// qdpn_connector is not thread safe
//
sys_mutex_lock(ct->server->lock);
ctx->pn_cxtr = qdpn_connector(ct->server->driver, ct->config->host, ct->config->port, (void*) ctx);
if (ctx->pn_cxtr) {
DEQ_INSERT_TAIL(ct->server->connections, ctx);
qd_entity_cache_add(QD_CONNECTION_TYPE, ctx);
}
sys_mutex_unlock(ct->server->lock);
if (ctx->pn_cxtr == 0) {
sys_mutex_free(ctx->deferred_call_lock);
free_qd_connection_t(ctx);
ct->delay = 10000;
qd_timer_schedule(ct->timer, ct->delay);
return;
}
ct->ctx = ctx;
ct->delay = 5000;
//
// Set up the transport, SASL, and SSL for the connection.
//
pn_transport_t *tport = qdpn_connector_transport(ctx->pn_cxtr);
const qd_server_config_t *config = ct->config;
//
// Configure the transport
//
pn_transport_set_max_frame(tport, config->max_frame_size);
//
// Set up SSL if appropriate
//
if (config->ssl_enabled) {
pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
if (!domain) {
qd_error(QD_ERROR_RUNTIME, "SSL domain failed for connection to %s:%s",
ct->config->host, ct->config->port);
/* TODO aconway 2014-07-15: Close the connection, clean up. */
return;
}
/* TODO aconway 2014-07-15: error handling on all SSL calls. */
// set our trusted database for checking the peer's cert:
if (config->ssl_trusted_certificate_db) {
if (pn_ssl_domain_set_trusted_ca_db(domain, config->ssl_trusted_certificate_db)) {
qd_log(ct->server->log_source, QD_LOG_ERROR,
"SSL CA configuration failed for %s:%s",
ct->config->host, ct->config->port);
}
}
// should we force the peer to provide a cert?
if (config->ssl_require_peer_authentication) {
const char *trusted = (config->ssl_trusted_certificates)
? config->ssl_trusted_certificates
: config->ssl_trusted_certificate_db;
if (pn_ssl_domain_set_peer_authentication(domain,
PN_SSL_VERIFY_PEER,
trusted)) {
qd_log(ct->server->log_source, QD_LOG_ERROR,
"SSL peer auth configuration failed for %s:%s",
ct->config->host, ct->config->port);
}
}
// configure our certificate if the peer requests one:
if (config->ssl_certificate_file) {
if (pn_ssl_domain_set_credentials(domain,
config->ssl_certificate_file,
config->ssl_private_key_file,
config->ssl_password)) {
qd_log(ct->server->log_source, QD_LOG_ERROR,
"SSL local configuration failed for %s:%s",
ct->config->host, ct->config->port);
}
}
ctx->ssl = pn_ssl(tport);
pn_ssl_init(ctx->ssl, domain, 0);
pn_ssl_domain_free(domain);
}
//
// Set up SASL
//
sys_mutex_lock(ct->server->lock);
pn_sasl_t *sasl = pn_sasl(tport);
if (config->sasl_mechanisms)
pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms);
pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication);
sys_mutex_unlock(ct->server->lock);
ctx->owner_thread = CONTEXT_NO_OWNER;
}
qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *container_name,
const char *sasl_config_path, const char *sasl_config_name)
{
int i;
qd_server_t *qd_server = NEW(qd_server_t);
if (qd_server == 0)
return 0;
DEQ_INIT(qd_server->connections);
qd_server->qd = qd;
qd_server->log_source = qd_log_source("SERVER");
qd_server->thread_count = thread_count;
qd_server->container_name = container_name;
qd_server->sasl_config_path = sasl_config_path;
qd_server->sasl_config_name = sasl_config_name;
qd_server->driver = qdpn_driver();
qd_server->start_handler = 0;
qd_server->conn_handler = 0;
qd_server->pn_event_handler = 0;
qd_server->signal_handler = 0;
qd_server->ufd_handler = 0;
qd_server->start_context = 0;
qd_server->signal_context = 0;
qd_server->lock = sys_mutex();
qd_server->cond = sys_cond();
qd_timer_initialize(qd_server->lock);
qd_server->threads = NEW_PTR_ARRAY(qd_thread_t, thread_count);
for (i = 0; i < thread_count; i++)
qd_server->threads[i] = thread(qd_server, i);
DEQ_INIT(qd_server->work_queue);
DEQ_INIT(qd_server->pending_timers);
qd_server->a_thread_is_waiting = false;
qd_server->threads_active = 0;
qd_server->pause_requests = 0;
qd_server->threads_paused = 0;
qd_server->pause_next_sequence = 0;
qd_server->pause_now_serving = 0;
qd_server->pending_signal = 0;
qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name);
return qd_server;
}
void qd_server_free(qd_server_t *qd_server)
{
if (!qd_server) return;
for (int i = 0; i < qd_server->thread_count; i++)
thread_free(qd_server->threads[i]);
qd_timer_finalize();
qdpn_driver_free(qd_server->driver);
sys_mutex_free(qd_server->lock);
sys_cond_free(qd_server->cond);
free(qd_server->threads);
free(qd_server);
}
void qd_server_set_conn_handler(qd_dispatch_t *qd,
qd_conn_handler_cb_t handler,
qd_pn_event_handler_cb_t pn_event_handler,
void *handler_context)
{
qd->server->conn_handler = handler;
qd->server->pn_event_handler = pn_event_handler;
qd->server->conn_handler_context = handler_context;
}
void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t handler, void *context)
{
qd->server->signal_handler = handler;
qd->server->signal_context = context;
}
void qd_server_set_start_handler(qd_dispatch_t *qd, qd_thread_start_cb_t handler, void *context)
{
qd->server->start_handler = handler;
qd->server->start_context = context;
}
void qd_server_set_user_fd_handler(qd_dispatch_t *qd, qd_user_fd_handler_cb_t ufd_handler)
{
qd->server->ufd_handler = ufd_handler;
}
static void qd_server_announce(qd_server_t* qd_server)
{
qd_log(qd_server->log_source, QD_LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count);
#ifndef NDEBUG
qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode");
#endif
}
void qd_server_run(qd_dispatch_t *qd)
{
qd_server_t *qd_server = qd->server;
int i;
if (!qd_server)
return;
assert(qd_server->conn_handler); // Server can't run without a connection handler.
for (i = 1; i < qd_server->thread_count; i++)
thread_start(qd_server->threads[i]);
qd_server_announce(qd_server);
thread_run((void*) qd_server->threads[0]);
for (i = 1; i < qd_server->thread_count; i++)
thread_join(qd_server->threads[i]);
for (i = 0; i < qd_server->thread_count; i++)
qd_server->threads[i]->canceled = 0;
qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down");
}
void qd_server_start(qd_dispatch_t *qd)
{
qd_server_t *qd_server = qd->server;
int i;
if (!qd_server)
return;
assert(qd_server->conn_handler); // Server can't run without a connection handler.
for (i = 0; i < qd_server->thread_count; i++)
thread_start(qd_server->threads[i]);
qd_server_announce(qd_server);
}
void qd_server_stop(qd_dispatch_t *qd)
{
qd_server_t *qd_server = qd->server;
int idx;
sys_mutex_lock(qd_server->lock);
for (idx = 0; idx < qd_server->thread_count; idx++)
thread_cancel(qd_server->threads[idx]);
sys_cond_signal_all(qd_server->cond);
qdpn_driver_wakeup(qd_server->driver);
sys_mutex_unlock(qd_server->lock);
if (thread_server != qd_server) {
for (idx = 0; idx < qd_server->thread_count; idx++)
thread_join(qd_server->threads[idx]);
qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down");
}
}
void qd_server_signal(qd_dispatch_t *qd, int signum)
{
qd_server_t *qd_server = qd->server;
qd_server->pending_signal = signum;
sys_cond_signal_all(qd_server->cond);
qdpn_driver_wakeup(qd_server->driver);
}
void qd_server_pause(qd_dispatch_t *qd)
{
qd_server_t *qd_server = qd->server;
sys_mutex_lock(qd_server->lock);
//
// Bump the request count to stop all the threads.
//
qd_server->pause_requests++;
int my_sequence = qd_server->pause_next_sequence++;
//
// Awaken all threads that are currently blocking.
//
sys_cond_signal_all(qd_server->cond);
qdpn_driver_wakeup(qd_server->driver);
//
// Wait for the paused thread count plus the number of threads requesting a pause to equal
// the total thread count. Also, don't exit the blocking loop until now_serving equals our
// sequence number. This ensures that concurrent pausers don't run at the same time.
//
while ((qd_server->threads_paused + qd_server->pause_requests < qd_server->thread_count) ||
(my_sequence != qd_server->pause_now_serving))
sys_cond_wait(qd_server->cond, qd_server->lock);
sys_mutex_unlock(qd_server->lock);
}
void qd_server_resume(qd_dispatch_t *qd)
{
qd_server_t *qd_server = qd->server;
sys_mutex_lock(qd_server->lock);
qd_server->pause_requests--;
qd_server->pause_now_serving++;
sys_cond_signal_all(qd_server->cond);
sys_mutex_unlock(qd_server->lock);
}
void qd_server_activate(qd_connection_t *ctx)
{
if (!ctx)
return;
qdpn_connector_t *ctor = ctx->pn_cxtr;
if (!ctor)
return;
if (!qdpn_connector_closed(ctor))
qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
}
void qd_connection_set_context(qd_connection_t *conn, void *context)
{
conn->user_context = context;
}
void *qd_connection_get_context(qd_connection_t *conn)
{
return conn->user_context;
}
void *qd_connection_get_config_context(qd_connection_t *conn)
{
return conn->context;
}
void qd_connection_set_link_context(qd_connection_t *conn, void *context)
{
conn->link_context = context;
}
void *qd_connection_get_link_context(qd_connection_t *conn)
{
return conn->link_context;
}
pn_connection_t *qd_connection_pn(qd_connection_t *conn)
{
return conn->pn_conn;
}
pn_collector_t *qd_connection_collector(qd_connection_t *conn)
{
return conn->collector;
}
const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
{
if (conn->listener)
return conn->listener->config;
return conn->connector->config;
}
void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context)
{
qd_deferred_call_t *dc = new_qd_deferred_call_t();
DEQ_ITEM_INIT(dc);
dc->call = call;
dc->context = context;
sys_mutex_lock(conn->deferred_call_lock);
DEQ_INSERT_TAIL(conn->deferred_calls, dc);
sys_mutex_unlock(conn->deferred_call_lock);
qd_server_activate(conn);
}
qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
{
qd_server_t *qd_server = qd->server;
qd_listener_t *li = new_qd_listener_t();
if (!li)
return 0;
li->server = qd_server;
li->config = config;
li->context = context;
li->pn_listener = qdpn_listener(qd_server->driver, config->host, config->port, (void*) li);
if (!li->pn_listener) {
free_qd_listener_t(li);
return 0;
}
qd_log(qd_server->log_source, QD_LOG_TRACE, "Listening on %s:%s", config->host, config->port);
return li;
}
void qd_server_listener_free(qd_listener_t* li)
{
if (!li)
return;
qdpn_listener_free(li->pn_listener);
free_qd_listener_t(li);
}
void qd_server_listener_close(qd_listener_t* li)
{
qdpn_listener_close(li->pn_listener);
}
qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
{
qd_server_t *qd_server = qd->server;
qd_connector_t *ct = new_qd_connector_t();
if (!ct)
return 0;
ct->server = qd_server;
ct->state = CXTR_STATE_CONNECTING;
ct->config = config;
ct->context = context;
ct->ctx = 0;
ct->timer = qd_timer(qd, cxtr_try_open, (void*) ct);
ct->delay = 0;
qd_timer_schedule(ct->timer, ct->delay);
return ct;
}
void qd_server_connector_free(qd_connector_t* ct)
{
// Don't free the proton connector. This will be done by the connector
// processing/cleanup.
if (!ct)
return;
if (ct->ctx) {
qdpn_connector_close(ct->ctx->pn_cxtr);
ct->ctx->connector = 0;
}
qd_timer_free(ct->timer);
free_qd_connector_t(ct);
}
qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
{
qd_server_t *qd_server = qd->server;
qd_user_fd_t *ufd = new_qd_user_fd_t();
if (!ufd)
return 0;
qd_connection_t *ctx = new_qd_connection_t();
DEQ_ITEM_INIT(ctx);
ctx->server = qd_server;
ctx->state = CONN_STATE_USER;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_conn = 0;
ctx->collector = 0;
ctx->ssl = 0;
ctx->listener = 0;
ctx->connector = 0;
ctx->context = 0;
ctx->user_context = 0;
ctx->link_context = 0;
ctx->ufd = ufd;
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
ufd->context = context;
ufd->server = qd_server;
ufd->fd = fd;
ufd->pn_conn = qdpn_connector_fd(qd_server->driver, fd, (void*) ctx);
qdpn_driver_wakeup(qd_server->driver);
return ufd;
}
void qd_user_fd_free(qd_user_fd_t *ufd)
{
if (!ufd) return;
qdpn_connector_close(ufd->pn_conn);
free_qd_user_fd_t(ufd);
}
void qd_user_fd_activate_read(qd_user_fd_t *ufd)
{
qdpn_connector_activate(ufd->pn_conn, QDPN_CONNECTOR_READABLE);
qdpn_driver_wakeup(ufd->server->driver);
}
void qd_user_fd_activate_write(qd_user_fd_t *ufd)
{
qdpn_connector_activate(ufd->pn_conn, QDPN_CONNECTOR_WRITABLE);
qdpn_driver_wakeup(ufd->server->driver);
}
bool qd_user_fd_is_readable(qd_user_fd_t *ufd)
{
return qdpn_connector_activated(ufd->pn_conn, QDPN_CONNECTOR_READABLE);
}
bool qd_user_fd_is_writeable(qd_user_fd_t *ufd)
{
return qdpn_connector_activated(ufd->pn_conn, QDPN_CONNECTOR_WRITABLE);
}
void qd_server_timer_pending_LH(qd_timer_t *timer)
{
DEQ_INSERT_TAIL(timer->server->pending_timers, timer);
}
void qd_server_timer_cancel_LH(qd_timer_t *timer)
{
DEQ_REMOVE(timer->server->pending_timers, timer);
}