DISPATCH-1509: free core agent timer on shutdown
Re-factor the management agent code by making the qdr_agent_t
structure private and adding setup and teardown APIs.
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index f355573..5c77cff 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -39,6 +39,18 @@
ALLOC_DECLARE(qdr_query_t);
ALLOC_DEFINE(qdr_query_t);
+
+struct qdr_agent_t {
+ qdr_query_list_t outgoing_query_list;
+ sys_mutex_t *query_lock;
+ qd_timer_t *timer;
+ qdr_manage_response_t response_handler;
+ qdr_subscription_t *subscription_mobile;
+ qdr_subscription_t *subscription_local;
+ qd_log_source_t *log_source;
+};
+
+
//==================================================================================
// Internal Functions
//==================================================================================
@@ -46,20 +58,21 @@
static void qdr_agent_response_handler(void *context)
{
qdr_core_t *core = (qdr_core_t*) context;
+ qdr_agent_t *agent = core->mgmt_agent;
qdr_query_t *query;
bool done = false;
while (!done) {
- sys_mutex_lock(core->query_lock);
- query = DEQ_HEAD(core->outgoing_query_list);
+ sys_mutex_lock(agent->query_lock);
+ query = DEQ_HEAD(agent->outgoing_query_list);
if (query)
- DEQ_REMOVE_HEAD(core->outgoing_query_list);
- done = DEQ_SIZE(core->outgoing_query_list) == 0;
- sys_mutex_unlock(core->query_lock);
+ DEQ_REMOVE_HEAD(agent->outgoing_query_list);
+ done = DEQ_SIZE(agent->outgoing_query_list) == 0;
+ sys_mutex_unlock(agent->query_lock);
if (query) {
bool more = query->more;
- core->agent_response_handler(query->context, &query->status, more);
+ agent->response_handler(query->context, &query->status, more);
if (!more)
qdr_query_free(query);
}
@@ -69,13 +82,15 @@
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query)
{
- sys_mutex_lock(core->query_lock);
- DEQ_INSERT_TAIL(core->outgoing_query_list, query);
- bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
- sys_mutex_unlock(core->query_lock);
+ qdr_agent_t *agent = core->mgmt_agent;
+
+ sys_mutex_lock(agent->query_lock);
+ DEQ_INSERT_TAIL(agent->outgoing_query_list, query);
+ bool notify = DEQ_SIZE(agent->outgoing_query_list) == 1;
+ sys_mutex_unlock(agent->query_lock);
if (notify)
- qd_timer_schedule(core->agent_timer, 0);
+ qd_timer_schedule(agent->timer, 0);
}
@@ -107,6 +122,54 @@
// Interface Functions
//==================================================================================
+
+// called prior to core thread start
+qdr_agent_t *qdr_agent(qdr_core_t *core)
+{
+ qdr_agent_t *agent = NEW(qdr_agent_t);
+ ZERO(agent);
+
+ DEQ_INIT(agent->outgoing_query_list);
+ agent->query_lock = sys_mutex();
+ agent->timer = qd_timer(core->qd, qdr_agent_response_handler, core);
+ agent->log_source = qd_log_source("AGENT");
+ return agent;
+}
+
+
+// called after core thread has shutdown
+void qdr_agent_free(qdr_agent_t *agent)
+{
+ if (agent) {
+ qd_timer_free(agent->timer);
+ if (agent->query_lock)
+ sys_mutex_free(agent->query_lock);
+
+ //we can't call qdr_core_unsubscribe on the subscriptions because the action processing thread has
+ //already been shut down. But, all the action would have done at this point is free the subscriptions
+ //so we just do that directly.
+ free(agent->subscription_mobile);
+ free(agent->subscription_local);
+
+ free(agent);
+ }
+}
+
+
+// create management subscriptions
+// (called after core thread starts)
+void qdr_agent_setup_subscriptions(qdr_agent_t *agent, qdr_core_t *core)
+{
+
+ agent->subscription_mobile = qdr_core_subscribe(core, "$management", 'M', '0',
+ QD_TREATMENT_ANYCAST_CLOSEST, false,
+ qdr_management_agent_on_message, core);
+ agent->subscription_local = qdr_core_subscribe(core, "$management", 'L', '0',
+ QD_TREATMENT_ANYCAST_CLOSEST, false,
+ qdr_management_agent_on_message, core);
+}
+
+
void qdr_manage_create(qdr_core_t *core,
void *context,
qd_router_entity_type_t type,
@@ -323,10 +386,10 @@
}
-
void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler)
{
- core->agent_response_handler = response_handler;
+ assert(core->mgmt_agent); // expect: management agent must already be present
+ core->mgmt_agent->response_handler = response_handler;
}
@@ -334,14 +397,6 @@
// In-Thread Functions
//==================================================================================
-void qdr_agent_setup_CT(qdr_core_t *core)
-{
- DEQ_INIT(core->outgoing_query_list);
- core->query_lock = sys_mutex();
- core->agent_timer = qd_timer(core->qd, qdr_agent_response_handler, core);
-}
-
-
static void qdr_agent_forbidden(qdr_core_t *core, qdr_query_t *query, bool op_query)
{
query->status = QD_AMQP_FORBIDDEN;
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index bdbfb6e..49c70ff 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -56,8 +56,7 @@
//
// Set up the logging sources for the router core
//
- core->log = qd->router->log_source;
- core->agent_log = qd_log_source("AGENT");
+ core->log = qd->router->log_source;
//
// Set up the threading support
@@ -79,19 +78,19 @@
core->id_lock = sys_mutex();
//
+ // Initialize the management agent
+ //
+ core->mgmt_agent = qdr_agent(core);
+
+ //
// Launch the core thread
//
core->thread = sys_thread(router_core_thread, core);
//
- // Perform outside-of-thread setup for the management agent
+ // Setup the agents subscriptions to $management
//
- core->agent_subscription_mobile = qdr_core_subscribe(core, "$management", 'M', '0',
- QD_TREATMENT_ANYCAST_CLOSEST, false,
- qdr_management_agent_on_message, core);
- core->agent_subscription_local = qdr_core_subscribe(core, "$management", 'L', '0',
- QD_TREATMENT_ANYCAST_CLOSEST, false,
- qdr_management_agent_on_message, core);
+ qdr_agent_setup_subscriptions(core->mgmt_agent, core);
return core;
}
@@ -119,12 +118,6 @@
sys_mutex_free(core->id_lock);
qd_timer_free(core->work_timer);
- //we can't call qdr_core_unsubscribe on the subscriptions because the action processing thread has
- //already been shut down. But, all the action would have done at this point is free the subscriptions
- //so we just do that directly.
- free(core->agent_subscription_mobile);
- free(core->agent_subscription_local);
-
for (int i = 0; i <= QD_TREATMENT_LINK_BALANCED; ++i) {
if (core->forwarders[i]) {
free(core->forwarders[i]);
@@ -206,7 +199,8 @@
qdr_modules_finalize(core);
- if (core->query_lock) sys_mutex_free(core->query_lock);
+ qdr_agent_free(core->mgmt_agent);
+
if (core->routers_by_mask_bit) free(core->routers_by_mask_bit);
if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit);
if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 3c68b59..299722f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -39,6 +39,7 @@
typedef struct qdr_connection_ref_t qdr_connection_ref_t;
typedef struct qdr_exchange qdr_exchange_t;
typedef struct qdr_edge_t qdr_edge_t;
+typedef struct qdr_agent_t qdr_agent_t;
ALLOC_DECLARE(qdr_address_t);
ALLOC_DECLARE(qdr_address_config_t);
@@ -801,15 +802,7 @@
qdrc_attach_addr_lookup_t addr_lookup_handler;
void *addr_lookup_context;
- //
- // Agent section
- //
- qdr_query_list_t outgoing_query_list;
- sys_mutex_t *query_lock;
- qd_timer_t *agent_timer;
- qdr_manage_response_t agent_response_handler;
- qdr_subscription_t *agent_subscription_mobile;
- qdr_subscription_t *agent_subscription_local;
+ qdr_agent_t *mgmt_agent;
//
// Route table section
@@ -921,7 +914,9 @@
uint64_t qdr_identifier(qdr_core_t* core);
void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id);
void qdr_route_table_setup_CT(qdr_core_t *core);
-void qdr_agent_setup_CT(qdr_core_t *core);
+qdr_agent_t *qdr_agent(qdr_core_t *core);
+void qdr_agent_setup_subscriptions(qdr_agent_t *agent, qdr_core_t *core);
+void qdr_agent_free(qdr_agent_t *agent);
void qdr_forwarder_setup_CT(qdr_core_t *core);
qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label);
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index 2ce0f09..1e137ae 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -157,7 +157,6 @@
qdr_forwarder_setup_CT(core);
qdr_route_table_setup_CT(core);
- qdr_agent_setup_CT(core);
qdr_modules_init(core);