blob: 9f8b17f4b74c437c663fdf62a5d75e41320c94f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "router_core_private.h"
#include "module.h"
#include <qpid/dispatch/protocol_adaptor.h>
#include "dispatch_private.h"
/**
* Creates a thread that is dedicated to managing and using the routing table.
* The purpose of moving this function into one thread is to remove the widespread
* lock contention that happens with synchrounous multi-threaded routing.
*
* This module owns, manages, and uses the router-link list and the address hash table
*/
ALLOC_DEFINE(qdr_action_t);
typedef struct qdrc_module_t {
DEQ_LINKS(struct qdrc_module_t);
const char *name;
qdrc_module_enable_t enable;
qdrc_module_init_t on_init;
qdrc_module_final_t on_final;
void *context;
bool enabled;
} qdrc_module_t;
DEQ_DECLARE(qdrc_module_t, qdrc_module_list_t);
static qdrc_module_list_t registered_modules = {0,0};
void qdr_register_core_module(const char *name, qdrc_module_enable_t enable, qdrc_module_init_t on_init, qdrc_module_final_t on_final)
{
qdrc_module_t *module = NEW(qdrc_module_t);
ZERO(module);
module->name = name;
module->enable = enable;
module->on_init = on_init;
module->on_final = on_final;
DEQ_INSERT_TAIL(registered_modules, module);
}
typedef struct qdrc_adaptor_t {
DEQ_LINKS(struct qdrc_adaptor_t);
const char *name;
qdr_adaptor_init_t on_init;
qdr_adaptor_final_t on_final;
void *context;
} qdrc_adaptor_t;
DEQ_DECLARE(qdrc_adaptor_t, qdrc_adaptor_list_t);
static qdrc_adaptor_list_t registered_adaptors = {0,0};
void qdr_register_adaptor(const char *name, qdr_adaptor_init_t on_init, qdr_adaptor_final_t on_final)
{
qdrc_adaptor_t *adaptor = NEW(qdrc_adaptor_t);
ZERO(adaptor);
adaptor->name = name;
adaptor->on_init = on_init;
adaptor->on_final = on_final;
DEQ_INSERT_TAIL(registered_adaptors, adaptor);
}
static void qdr_activate_connections_CT(qdr_core_t *core)
{
qdr_connection_t *conn = DEQ_HEAD(core->connections_to_activate);
while (conn) {
DEQ_REMOVE_HEAD_N(ACTIVATE, core->connections_to_activate);
conn->in_activate_list = false;
conn->protocol_adaptor->activate_handler(conn->protocol_adaptor->user_context, conn);
conn = DEQ_HEAD(core->connections_to_activate);
}
}
static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work)
{
qdr_delivery_cleanup_t *cleanup = DEQ_HEAD(work->delivery_cleanup_list);
while (cleanup) {
DEQ_REMOVE_HEAD(work->delivery_cleanup_list);
if (cleanup->msg)
qd_message_free(cleanup->msg);
if (cleanup->iter)
qd_iterator_free(cleanup->iter);
free_qdr_delivery_cleanup_t(cleanup);
cleanup = DEQ_HEAD(work->delivery_cleanup_list);
}
}
void qdr_modules_init(qdr_core_t *core)
{
//
// Initialize registered modules
//
qdrc_module_t *module = DEQ_HEAD(registered_modules);
while (module) {
module->enabled = module->enable(core);
if (module->enabled) {
module->on_init(core, &module->context);
qd_log(core->log, QD_LOG_INFO, "Core module enabled: %s", module->name);
} else
qd_log(core->log, QD_LOG_INFO, "Core module present but disabled: %s", module->name);
module = DEQ_NEXT(module);
}
}
void qdr_modules_finalize(qdr_core_t *core)
{
//
// Finalize registered modules
//
qdrc_module_t *module = DEQ_TAIL(registered_modules);
while (module) {
if (module->enabled) {
qd_log(core->log, QD_LOG_INFO, "Finalizing core module: %s", module->name);
module->on_final(module->context);
}
module = DEQ_PREV(module);
}
}
void qdr_adaptors_init(qdr_core_t *core)
{
//
// Initialize registered adaptors
//
qdrc_adaptor_t *adaptor = DEQ_HEAD(registered_adaptors);
while (adaptor) {
adaptor->on_init(core, &adaptor->context);
adaptor = DEQ_NEXT(adaptor);
}
}
void qdr_adaptors_finalize(qdr_core_t *core)
{
//
// Finalize registered adaptors
//
qdrc_adaptor_t *adaptor = DEQ_TAIL(registered_adaptors);
while (adaptor) {
adaptor->on_final(adaptor->context);
adaptor = DEQ_PREV(adaptor);
}
// release the default AMQP adaptor (it is not a module)
assert(DEQ_SIZE(core->protocol_adaptors) == 1);
qdr_protocol_adaptor_free(core, DEQ_HEAD(core->protocol_adaptors));
}
/*
* router_core_process_background_action_LH
*
* Process up to one available background action.
* Return true iff an action was processed.
*/
static bool router_core_process_background_action_LH(qdr_core_t *core)
{
qdr_action_t *action = DEQ_HEAD(core->action_list_background);
if (!!action) {
DEQ_REMOVE_HEAD(core->action_list_background);
sys_mutex_unlock(core->action_lock);
if (action->label)
qd_log(core->log, QD_LOG_TRACE, "Core background action '%s'%s", action->label, core->running ? "" : " (discard)");
action->action_handler(core, action, !core->running);
sys_mutex_lock(core->action_lock);
free_qdr_action_t(action);
return true;
}
return false;
}
void *router_core_thread(void *arg)
{
qdr_core_t *core = (qdr_core_t*) arg;
qdr_action_list_t action_list;
qdr_action_t *action;
qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id);
while (core->running) {
//
// Use the lock only to protect the condition variable and the action lists
//
sys_mutex_lock(core->action_lock);
//
// Block on the condition variable when there is no action to do
//
while (core->running && DEQ_IS_EMPTY(core->action_list)) {
if (!router_core_process_background_action_LH(core))
sys_cond_wait(core->action_cond, core->action_lock);
}
//
// Move the entire action list to a private list so we can process it without
// holding the lock
//
DEQ_MOVE(core->action_list, action_list);
sys_mutex_unlock(core->action_lock);
//
// Process and free all of the action items in the list
//
action = DEQ_HEAD(action_list);
while (action) {
DEQ_REMOVE_HEAD(action_list);
if (action->label)
qd_log(core->log, QD_LOG_TRACE, "Core action '%s'%s", action->label, core->running ? "" : " (discard)");
action->action_handler(core, action, !core->running);
free_qdr_action_t(action);
action = DEQ_HEAD(action_list);
}
//
// Activate all connections that were flagged for activation during the above processing
//
qdr_activate_connections_CT(core);
//
// Schedule the cleanup of deliveries freed during this core-thread pass
//
if (DEQ_SIZE(core->delivery_cleanup_list) > 0) {
qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free);
DEQ_MOVE(core->delivery_cleanup_list, work->delivery_cleanup_list);
qdr_post_general_work_CT(core, work);
}
}
qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
return 0;
}