blob: 0099393f60b979e01e88bafbf71c8eceeb25e2f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/log.h>
#include "server_private.h"
#include "timer_private.h"
#include "alloc_private.h"
#include "auth.h"
#include "work_queue.h"
#include <stdio.h>
#include <time.h>
#include <signal.h>
static char *module="SERVER";
typedef struct dx_thread_t {
int thread_id;
volatile int running;
volatile int canceled;
int using_thread;
sys_thread_t *thread;
} dx_thread_t;
typedef struct dx_server_t {
int thread_count;
pn_driver_t *driver;
dx_thread_start_cb_t start_handler;
dx_conn_handler_cb_t conn_handler;
dx_signal_handler_cb_t signal_handler;
dx_user_fd_handler_cb_t ufd_handler;
void *start_context;
void *conn_context;
void *signal_context;
sys_cond_t *cond;
sys_mutex_t *lock;
dx_thread_t **threads;
work_queue_t *work_queue;
dx_timer_list_t pending_timers;
bool a_thread_is_waiting;
int threads_active;
int pause_requests;
int threads_paused;
int pause_next_sequence;
int pause_now_serving;
int pending_signal;
} dx_server_t;
ALLOC_DEFINE(dx_listener_t);
ALLOC_DEFINE(dx_connector_t);
ALLOC_DEFINE(dx_connection_t);
ALLOC_DEFINE(dx_user_fd_t);
/**
* Singleton Concurrent Proton Driver object
*/
static dx_server_t *dx_server = 0;
static void signal_handler(int signum)
{
dx_server->pending_signal = signum;
sys_cond_signal_all(dx_server->cond);
}
static dx_thread_t *thread(int id)
{
dx_thread_t *thread = NEW(dx_thread_t);
if (!thread)
return 0;
thread->thread_id = id;
thread->running = 0;
thread->canceled = 0;
thread->using_thread = 0;
return thread;
}
static void thread_process_listeners(pn_driver_t *driver)
{
pn_listener_t *listener = pn_driver_listener(driver);
pn_connector_t *cxtr;
dx_connection_t *ctx;
while (listener) {
dx_log(module, LOG_TRACE, "Accepting Connection");
cxtr = pn_listener_accept(listener);
ctx = new_dx_connection_t();
ctx->state = CONN_STATE_SASL_SERVER;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
ctx->pn_conn = 0;
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
ctx->ufd = 0;
pn_connector_set_context(cxtr, ctx);
listener = pn_driver_listener(driver);
}
}
static void handle_signals_LH(void)
{
int signum = dx_server->pending_signal;
if (signum) {
dx_server->pending_signal = 0;
if (dx_server->signal_handler) {
sys_mutex_unlock(dx_server->lock);
dx_server->signal_handler(dx_server->signal_context, signum);
sys_mutex_lock(dx_server->lock);
}
}
}
static void block_if_paused_LH(void)
{
if (dx_server->pause_requests > 0) {
dx_server->threads_paused++;
sys_cond_signal_all(dx_server->cond);
while (dx_server->pause_requests > 0)
sys_cond_wait(dx_server->cond, dx_server->lock);
dx_server->threads_paused--;
}
}
static void process_connector(pn_connector_t *cxtr)
{
dx_connection_t *ctx = pn_connector_context(cxtr);
int events = 0;
int auth_passes = 0;
if (ctx->state == CONN_STATE_USER) {
dx_server->ufd_handler(ctx->ufd->context, ctx->ufd);
return;
}
do {
//
// Step the engine for pre-handler processing
//
pn_connector_process(cxtr);
//
// Call the handler that is appropriate for the connector's state.
//
switch (ctx->state) {
case CONN_STATE_CONNECTING:
if (!pn_connector_closed(cxtr)) {
ctx->state = CONN_STATE_SASL_CLIENT;
assert(ctx->connector);
ctx->connector->state = CXTR_STATE_OPEN;
events = 1;
} else {
ctx->state = CONN_STATE_FAILED;
events = 0;
}
break;
case CONN_STATE_SASL_CLIENT:
if (auth_passes == 0) {
auth_client_handler(cxtr);
events = 1;
} else {
auth_passes++;
events = 0;
}
break;
case CONN_STATE_SASL_SERVER:
if (auth_passes == 0) {
auth_server_handler(cxtr);
events = 1;
} else {
auth_passes++;
events = 0;
}
break;
case CONN_STATE_OPENING:
ctx->state = CONN_STATE_OPERATIONAL;
pn_connection_t *conn = pn_connection();
pn_connection_set_container(conn, "dispatch"); // TODO - make unique
pn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
if (ctx->listener) {
ce = DX_CONN_EVENT_LISTENER_OPEN;
} else if (ctx->connector) {
ce = DX_CONN_EVENT_CONNECTOR_OPEN;
ctx->connector->delay = 0;
} else
assert(0);
dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
events = 1;
break;
case CONN_STATE_OPERATIONAL:
if (pn_connector_closed(cxtr)) {
dx_server->conn_handler(ctx->context,
DX_CONN_EVENT_CLOSE,
(dx_connection_t*) pn_connector_context(cxtr));
events = 0;
}
else
events = dx_server->conn_handler(ctx->context,
DX_CONN_EVENT_PROCESS,
(dx_connection_t*) pn_connector_context(cxtr));
break;
default:
break;
}
} while (events > 0);
}
//
// TEMPORARY FUNCTION PROTOTYPES
//
void pn_driver_wait_1(pn_driver_t *d);
int pn_driver_wait_2(pn_driver_t *d, int timeout);
void pn_driver_wait_3(pn_driver_t *d);
//
// END TEMPORARY
//
static void *thread_run(void *arg)
{
dx_thread_t *thread = (dx_thread_t*) arg;
pn_connector_t *work;
pn_connection_t *conn;
dx_connection_t *ctx;
int error;
int poll_result;
int timer_holdoff = 0;
if (!thread)
return 0;
thread->running = 1;
if (thread->canceled)
return 0;
//
// Invoke the start handler if the application supplied one.
// This handler can be used to set NUMA or processor affinnity for the thread.
//
if (dx_server->start_handler)
dx_server->start_handler(dx_server->start_context, thread->thread_id);
//
// Main Loop
//
while (thread->running) {
sys_mutex_lock(dx_server->lock);
//
// Check for pending signals to process
//
handle_signals_LH();
if (!thread->running) {
sys_mutex_unlock(dx_server->lock);
break;
}
//
// Check to see if the server is pausing. If so, block here.
//
block_if_paused_LH();
if (!thread->running) {
sys_mutex_unlock(dx_server->lock);
break;
}
//
// Service pending timers.
//
dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
if (timer) {
DEQ_REMOVE_HEAD(dx_server->pending_timers);
//
// Mark the timer as idle in case it reschedules itself.
//
dx_timer_idle_LH(timer);
//
// Release the lock and invoke the connection handler.
//
sys_mutex_unlock(dx_server->lock);
timer->handler(timer->context);
pn_driver_wakeup(dx_server->driver);
continue;
}
//
// Check the work queue for connectors scheduled for processing.
//
work = work_queue_get(dx_server->work_queue);
if (!work) {
//
// There is no pending work to do
//
if (dx_server->a_thread_is_waiting) {
//
// Another thread is waiting on the proton driver, this thread must
// wait on the condition variable until signaled.
//
sys_cond_wait(dx_server->cond, dx_server->lock);
} else {
//
// This thread elects itself to wait on the proton driver. Set the
// thread-is-waiting flag so other idle threads will not interfere.
//
dx_server->a_thread_is_waiting = true;
//
// Ask the timer module when its next timer is scheduled to fire. We'll
// use this value in driver_wait as the timeout. If there are no scheduled
// timers, the returned value will be -1.
//
long duration = dx_timer_next_duration_LH();
//
// Invoke the proton driver's wait sequence. This is a bit of a hack for now
// and will be improved in the future. The wait process is divided into three parts,
// the first and third of which need to be non-reentrant, and the second of which
// must be reentrant (and blocks).
//
pn_driver_wait_1(dx_server->driver);
sys_mutex_unlock(dx_server->lock);
do {
error = 0;
poll_result = pn_driver_wait_2(dx_server->driver, duration);
if (poll_result == -1)
error = pn_driver_errno(dx_server->driver);
} while (error == PN_INTR);
if (error) {
dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver)));
exit(-1);
}
sys_mutex_lock(dx_server->lock);
pn_driver_wait_3(dx_server->driver);
if (!thread->running) {
sys_mutex_unlock(dx_server->lock);
break;
}
//
// Visit the timer module.
//
if (poll_result == 0 || ++timer_holdoff == 100) {
struct timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);
long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
dx_timer_visit_LH(milliseconds);
timer_holdoff = 0;
}
//
// Process listeners (incoming connections).
//
thread_process_listeners(dx_server->driver);
//
// Traverse the list of connectors-needing-service from the proton driver.
// If the connector is not already in the work queue and it is not currently
// being processed by another thread, put it in the work queue and signal the
// condition variable.
//
work = pn_driver_connector(dx_server->driver);
while (work) {
ctx = pn_connector_context(work);
if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->enqueued = 1;
work_queue_put(dx_server->work_queue, work);
sys_cond_signal(dx_server->cond);
}
work = pn_driver_connector(dx_server->driver);
}
//
// Release our exclusive claim on pn_driver_wait.
//
dx_server->a_thread_is_waiting = false;
}
}
//
// If we were given a connector to work on from the work queue, mark it as
// owned by this thread and as no longer enqueued.
//
if (work) {
ctx = pn_connector_context(work);
if (ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->owner_thread = thread->thread_id;
ctx->enqueued = 0;
dx_server->threads_active++;
} else {
//
// This connector is being processed by another thread, re-queue it.
//
work_queue_put(dx_server->work_queue, work);
work = 0;
}
}
sys_mutex_unlock(dx_server->lock);
//
// Process the connector that we now have exclusive access to.
//
if (work) {
process_connector(work);
//
// Check to see if the connector was closed during processing
//
if (pn_connector_closed(work)) {
//
// Connector is closed. Free the context and the connector.
//
conn = pn_connector_connection(work);
if (ctx->connector) {
ctx->connector->ctx = 0;
ctx->connector->state = CXTR_STATE_CONNECTING;
dx_timer_schedule(ctx->connector->timer, ctx->connector->delay);
}
sys_mutex_lock(dx_server->lock);
free_dx_connection_t(ctx);
pn_connector_free(work);
if (conn)
pn_connection_free(conn);
dx_server->threads_active--;
sys_mutex_unlock(dx_server->lock);
} else {
//
// The connector lives on. Mark it as no longer owned by this thread.
//
sys_mutex_lock(dx_server->lock);
ctx->owner_thread = CONTEXT_NO_OWNER;
dx_server->threads_active--;
sys_mutex_unlock(dx_server->lock);
}
//
// Wake up the proton driver to force it to reconsider its set of FDs
// in light of the processing that just occurred.
//
pn_driver_wakeup(dx_server->driver);
}
}
return 0;
}
static void thread_start(dx_thread_t *thread)
{
if (!thread)
return;
thread->using_thread = 1;
thread->thread = sys_thread(thread_run, (void*) thread);
}
static void thread_cancel(dx_thread_t *thread)
{
if (!thread)
return;
thread->running = 0;
thread->canceled = 1;
}
static void thread_join(dx_thread_t *thread)
{
if (!thread)
return;
if (thread->using_thread)
sys_thread_join(thread->thread);
}
static void thread_free(dx_thread_t *thread)
{
if (!thread)
return;
free(thread);
}
static void cxtr_try_open(void *context)
{
dx_connector_t *ct = (dx_connector_t*) context;
if (ct->state != CXTR_STATE_CONNECTING)
return;
dx_connection_t *ctx = new_dx_connection_t();
ctx->state = CONN_STATE_CONNECTING;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_conn = 0;
ctx->listener = 0;
ctx->connector = ct;
ctx->context = ct->context;
ctx->user_context = 0;
ctx->ufd = 0;
//
// pn_connector is not thread safe
//
sys_mutex_lock(dx_server->lock);
ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx);
sys_mutex_unlock(dx_server->lock);
ct->ctx = ctx;
ct->delay = 5000;
dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
}
void dx_server_initialize(int thread_count)
{
int i;
if (dx_server)
return; // TODO - Fail in a more dramatic way
dx_alloc_initialize();
dx_server = NEW(dx_server_t);
if (!dx_server)
return; // TODO - Fail in a more dramatic way
dx_server->thread_count = thread_count;
dx_server->driver = pn_driver();
dx_server->start_handler = 0;
dx_server->conn_handler = 0;
dx_server->signal_handler = 0;
dx_server->ufd_handler = 0;
dx_server->start_context = 0;
dx_server->signal_context = 0;
dx_server->lock = sys_mutex();
dx_server->cond = sys_cond();
dx_timer_initialize(dx_server->lock);
dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count);
for (i = 0; i < thread_count; i++)
dx_server->threads[i] = thread(i);
dx_server->work_queue = work_queue();
DEQ_INIT(dx_server->pending_timers);
dx_server->a_thread_is_waiting = false;
dx_server->threads_active = 0;
dx_server->pause_requests = 0;
dx_server->threads_paused = 0;
dx_server->pause_next_sequence = 0;
dx_server->pause_now_serving = 0;
dx_server->pending_signal = 0;
}
void dx_server_finalize(void)
{
int i;
if (!dx_server)
return;
for (i = 0; i < dx_server->thread_count; i++)
thread_free(dx_server->threads[i]);
work_queue_free(dx_server->work_queue);
pn_driver_free(dx_server->driver);
sys_mutex_free(dx_server->lock);
sys_cond_free(dx_server->cond);
free(dx_server);
dx_server = 0;
}
void dx_server_set_conn_handler(dx_conn_handler_cb_t handler)
{
dx_server->conn_handler = handler;
}
void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context)
{
dx_server->signal_handler = handler;
dx_server->signal_context = context;
}
void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context)
{
dx_server->start_handler = handler;
dx_server->start_context = context;
}
void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler)
{
dx_server->ufd_handler = ufd_handler;
}
void dx_server_run(void)
{
int i;
if (!dx_server)
return;
assert(dx_server->conn_handler); // Server can't run without a connection handler.
for (i = 1; i < dx_server->thread_count; i++)
thread_start(dx_server->threads[i]);
dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);
thread_run((void*) dx_server->threads[0]);
for (i = 1; i < dx_server->thread_count; i++)
thread_join(dx_server->threads[i]);
dx_log(module, LOG_INFO, "Shut Down");
}
void dx_server_stop(void)
{
int idx;
sys_mutex_lock(dx_server->lock);
for (idx = 0; idx < dx_server->thread_count; idx++)
thread_cancel(dx_server->threads[idx]);
sys_cond_signal_all(dx_server->cond);
pn_driver_wakeup(dx_server->driver);
sys_mutex_unlock(dx_server->lock);
}
void dx_server_signal(int signum)
{
signal(signum, signal_handler);
}
void dx_server_pause(void)
{
sys_mutex_lock(dx_server->lock);
//
// Bump the request count to stop all the threads.
//
dx_server->pause_requests++;
int my_sequence = dx_server->pause_next_sequence++;
//
// Awaken all threads that are currently blocking.
//
sys_cond_signal_all(dx_server->cond);
pn_driver_wakeup(dx_server->driver);
//
// Wait for the paused thread count plus the number of threads requesting a pause to equal
// the total thread count. Also, don't exit the blocking loop until now_serving equals our
// sequence number. This ensures that concurrent pausers don't run at the same time.
//
while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) ||
(my_sequence != dx_server->pause_now_serving))
sys_cond_wait(dx_server->cond, dx_server->lock);
sys_mutex_unlock(dx_server->lock);
}
void dx_server_resume(void)
{
sys_mutex_lock(dx_server->lock);
dx_server->pause_requests--;
dx_server->pause_now_serving++;
sys_cond_signal_all(dx_server->cond);
sys_mutex_unlock(dx_server->lock);
}
void dx_server_activate(dx_connection_t *ctx)
{
if (!ctx)
return;
pn_connector_t *ctor = ctx->pn_cxtr;
if (!ctor)
return;
if (!pn_connector_closed(ctor))
pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE);
}
void dx_connection_set_context(dx_connection_t *conn, void *context)
{
conn->user_context = context;
}
void *dx_connection_get_context(dx_connection_t *conn)
{
return conn->user_context;
}
pn_connection_t *dx_connection_pn(dx_connection_t *conn)
{
return conn->pn_conn;
}
dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context)
{
dx_listener_t *li = new_dx_listener_t();
if (!li)
return 0;
li->config = config;
li->context = context;
li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li);
if (!li->pn_listener) {
dx_log(module, LOG_ERROR, "Driver Error %d (%s)",
pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver));
free_dx_listener_t(li);
return 0;
}
dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);
return li;
}
void dx_server_listener_free(dx_listener_t* li)
{
pn_listener_free(li->pn_listener);
free_dx_listener_t(li);
}
void dx_server_listener_close(dx_listener_t* li)
{
pn_listener_close(li->pn_listener);
}
dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context)
{
dx_connector_t *ct = new_dx_connector_t();
if (!ct)
return 0;
ct->state = CXTR_STATE_CONNECTING;
ct->config = config;
ct->context = context;
ct->ctx = 0;
ct->timer = dx_timer(cxtr_try_open, (void*) ct);
ct->delay = 0;
dx_timer_schedule(ct->timer, ct->delay);
return ct;
}
void dx_server_connector_free(dx_connector_t* ct)
{
// Don't free the proton connector. This will be done by the connector
// processing/cleanup.
if (ct->ctx) {
pn_connector_close(ct->ctx->pn_cxtr);
ct->ctx->connector = 0;
}
dx_timer_free(ct->timer);
free_dx_connector_t(ct);
}
dx_user_fd_t *dx_user_fd(int fd, void *context)
{
dx_user_fd_t *ufd = new_dx_user_fd_t();
if (!ufd)
return 0;
dx_connection_t *ctx = new_dx_connection_t();
ctx->state = CONN_STATE_USER;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_conn = 0;
ctx->listener = 0;
ctx->connector = 0;
ctx->context = 0;
ctx->user_context = 0;
ctx->ufd = ufd;
ufd->context = context;
ufd->fd = fd;
ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx);
pn_driver_wakeup(dx_server->driver);
return ufd;
}
void dx_user_fd_free(dx_user_fd_t *ufd)
{
pn_connector_close(ufd->pn_conn);
free_dx_user_fd_t(ufd);
}
void dx_user_fd_activate_read(dx_user_fd_t *ufd)
{
pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
pn_driver_wakeup(dx_server->driver);
}
void dx_user_fd_activate_write(dx_user_fd_t *ufd)
{
pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
pn_driver_wakeup(dx_server->driver);
}
bool dx_user_fd_is_readable(dx_user_fd_t *ufd)
{
return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE);
}
bool dx_user_fd_is_writeable(dx_user_fd_t *ufd)
{
return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
}
void dx_server_timer_pending_LH(dx_timer_t *timer)
{
DEQ_INSERT_TAIL(dx_server->pending_timers, timer);
}
void dx_server_timer_cancel_LH(dx_timer_t *timer)
{
DEQ_REMOVE(dx_server->pending_timers, timer);
}