| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "router_core_private.h" |
| #include "route_control.h" |
| #include "exchange_bindings.h" |
| #include "core_events.h" |
| #include "delivery.h" |
| #include <stdio.h> |
| #include <strings.h> |
| |
| ALLOC_DEFINE(qdr_address_t); |
| ALLOC_DEFINE(qdr_address_config_t); |
| ALLOC_DEFINE(qdr_node_t); |
| ALLOC_DEFINE(qdr_delivery_ref_t); |
| ALLOC_DEFINE(qdr_link_t); |
| ALLOC_DEFINE(qdr_router_ref_t); |
| ALLOC_DEFINE(qdr_link_ref_t); |
| ALLOC_DEFINE(qdr_delivery_cleanup_t); |
| ALLOC_DEFINE(qdr_general_work_t); |
| ALLOC_DEFINE(qdr_link_work_t); |
| ALLOC_DEFINE(qdr_connection_ref_t); |
| ALLOC_DEFINE(qdr_connection_info_t); |
| ALLOC_DEFINE(qdr_subscription_ref_t); |
| |
| static void qdr_general_handler(void *context); |
| |
| qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id) |
| { |
| qdr_core_t *core = NEW(qdr_core_t); |
| ZERO(core); |
| |
| core->qd = qd; |
| core->router_mode = mode; |
| core->router_area = area; |
| core->router_id = id; |
| |
| DEQ_INIT(core->exchanges); |
| |
| // |
| // Set up the logging sources for the router core |
| // |
| core->log = qd->router->log_source; |
| core->agent_log = qd_log_source("AGENT"); |
| |
| // |
| // Set up the threading support |
| // |
| core->action_cond = sys_cond(); |
| core->action_lock = sys_mutex(); |
| core->running = true; |
| DEQ_INIT(core->action_list); |
| DEQ_INIT(core->action_list_background); |
| |
| core->work_lock = sys_mutex(); |
| DEQ_INIT(core->work_list); |
| core->work_timer = qd_timer(core->qd, qdr_general_handler, core); |
| |
| // |
| // Set up the unique identifier generator |
| // |
| core->next_identifier = 1; |
| core->id_lock = sys_mutex(); |
| |
| // |
| // Launch the core thread |
| // |
| core->thread = sys_thread(router_core_thread, core); |
| |
| // |
| // Perform outside-of-thread setup for the management agent |
| // |
| core->agent_subscription_mobile = qdr_core_subscribe(core, "$management", 'M', '0', |
| QD_TREATMENT_ANYCAST_CLOSEST, false, |
| qdr_management_agent_on_message, core); |
| core->agent_subscription_local = qdr_core_subscribe(core, "$management", 'L', '0', |
| QD_TREATMENT_ANYCAST_CLOSEST, false, |
| qdr_management_agent_on_message, core); |
| |
| return core; |
| } |
| |
| |
| void qdr_core_free(qdr_core_t *core) |
| { |
| // |
| // Stop and join the thread |
| // |
| core->running = false; |
| sys_cond_signal(core->action_cond); |
| sys_thread_join(core->thread); |
| |
| // Drain the general work lists |
| qdr_general_handler(core); |
| |
| // |
| // Free the core resources |
| // |
| sys_thread_free(core->thread); |
| sys_cond_free(core->action_cond); |
| sys_mutex_free(core->action_lock); |
| sys_mutex_free(core->work_lock); |
| sys_mutex_free(core->id_lock); |
| qd_timer_free(core->work_timer); |
| |
| //we can't call qdr_core_unsubscribe on the subscriptions because the action processing thread has |
| //already been shut down. But, all the action would have done at this point is free the subscriptions |
| //so we just do that directly. |
| free(core->agent_subscription_mobile); |
| free(core->agent_subscription_local); |
| |
| for (int i = 0; i <= QD_TREATMENT_LINK_BALANCED; ++i) { |
| if (core->forwarders[i]) { |
| free(core->forwarders[i]); |
| } |
| } |
| |
| qdr_link_route_t *link_route = 0; |
| while ( (link_route = DEQ_HEAD(core->link_routes))) { |
| DEQ_REMOVE_HEAD(core->link_routes); |
| qdr_core_delete_link_route(core, link_route); |
| } |
| |
| qdr_auto_link_t *auto_link = 0; |
| while ( (auto_link = DEQ_HEAD(core->auto_links))) { |
| DEQ_REMOVE_HEAD(core->auto_links); |
| qdr_core_delete_auto_link(core, auto_link); |
| } |
| |
| qdr_exchange_free_all(core); |
| |
| qdr_address_t *addr = 0; |
| while ( (addr = DEQ_HEAD(core->addrs)) ) { |
| qdr_core_remove_address(core, addr); |
| } |
| qdr_address_config_t *addr_config = 0; |
| while ( (addr_config = DEQ_HEAD(core->addr_config))) { |
| qdr_core_remove_address_config(core, addr_config); |
| } |
| qd_hash_free(core->addr_hash); |
| qd_parse_tree_free(core->addr_parse_tree); |
| qd_parse_tree_free(core->link_route_tree[QD_INCOMING]); |
| qd_parse_tree_free(core->link_route_tree[QD_OUTGOING]); |
| |
| qdr_node_t *rnode = 0; |
| while ( (rnode = DEQ_HEAD(core->routers)) ) { |
| qdr_router_node_free(core, rnode); |
| } |
| |
| qdr_link_t *link = DEQ_HEAD(core->open_links); |
| while (link) { |
| DEQ_REMOVE_HEAD(core->open_links); |
| if (link->core_endpoint) |
| qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint); |
| qdr_del_link_ref(&link->conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION); |
| qdr_del_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK); |
| free(link->name); |
| free(link->disambiguated_name); |
| free(link->terminus_addr); |
| free(link->ingress_histogram); |
| free(link->insert_prefix); |
| free(link->strip_prefix); |
| link->name = 0; |
| free_qdr_link_t(link); |
| link = DEQ_HEAD(core->open_links); |
| } |
| |
| qdr_connection_t *conn = DEQ_HEAD(core->open_connections); |
| while (conn) { |
| DEQ_REMOVE_HEAD(core->open_connections); |
| |
| if (conn->conn_id) { |
| qdr_del_connection_ref(&conn->conn_id->connection_refs, conn); |
| qdr_route_check_id_for_deletion_CT(core, conn->conn_id); |
| } |
| |
| qdr_connection_work_t *work = DEQ_HEAD(conn->work_list); |
| while (work) { |
| DEQ_REMOVE_HEAD(conn->work_list); |
| qdr_connection_work_free_CT(work); |
| work = DEQ_HEAD(conn->work_list); |
| } |
| |
| qdr_connection_free(conn); |
| conn = DEQ_HEAD(core->open_connections); |
| } |
| |
| // at this point all the conn identifiers have been freed |
| qd_hash_free(core->conn_id_hash); |
| |
| qdr_modules_finalize(core); |
| |
| if (core->query_lock) sys_mutex_free(core->query_lock); |
| if (core->routers_by_mask_bit) free(core->routers_by_mask_bit); |
| if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit); |
| if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit); |
| if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask); |
| |
| free(core); |
| } |
| |
| void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode) |
| { |
| qd_bitmask_free(rnode->valid_origins); |
| DEQ_REMOVE(core->routers, rnode); |
| core->routers_by_mask_bit[rnode->mask_bit] = 0; |
| core->cost_epoch++; |
| free(rnode->wire_address_ma); |
| free_qdr_node_t(rnode); |
| } |
| |
| ALLOC_DECLARE(qdr_field_t); |
| ALLOC_DEFINE(qdr_field_t); |
| |
| qdr_field_t *qdr_field(const char *text) |
| { |
| size_t length = text ? strlen(text) : 0; |
| size_t ilength = length; |
| |
| if (length == 0) |
| return 0; |
| |
| qdr_field_t *field = new_qdr_field_t(); |
| qd_buffer_t *buf; |
| |
| ZERO(field); |
| while (length > 0) { |
| buf = qd_buffer(); |
| size_t cap = qd_buffer_capacity(buf); |
| size_t copy = length > cap ? cap : length; |
| memcpy(qd_buffer_cursor(buf), text, copy); |
| qd_buffer_insert(buf, copy); |
| length -= copy; |
| text += copy; |
| DEQ_INSERT_TAIL(field->buffers, buf); |
| } |
| |
| field->iterator = qd_iterator_buffer(DEQ_HEAD(field->buffers), 0, ilength, ITER_VIEW_ALL); |
| |
| return field; |
| } |
| |
| |
| qdr_field_t *qdr_field_from_iter(qd_iterator_t *iter) |
| { |
| if (!iter) |
| return 0; |
| |
| qdr_field_t *field = new_qdr_field_t(); |
| qd_buffer_t *buf; |
| int remaining; |
| int length; |
| |
| ZERO(field); |
| qd_iterator_reset(iter); |
| remaining = qd_iterator_remaining(iter); |
| length = remaining; |
| while (remaining) { |
| buf = qd_buffer(); |
| size_t cap = qd_buffer_capacity(buf); |
| int copied = qd_iterator_ncopy(iter, qd_buffer_cursor(buf), cap); |
| qd_buffer_insert(buf, copied); |
| DEQ_INSERT_TAIL(field->buffers, buf); |
| remaining = qd_iterator_remaining(iter); |
| } |
| |
| field->iterator = qd_iterator_buffer(DEQ_HEAD(field->buffers), 0, length, ITER_VIEW_ALL); |
| |
| return field; |
| } |
| |
| qd_iterator_t *qdr_field_iterator(qdr_field_t *field) |
| { |
| if (!field) |
| return 0; |
| |
| return field->iterator; |
| } |
| |
| |
| void qdr_field_free(qdr_field_t *field) |
| { |
| if (field) { |
| qd_iterator_free(field->iterator); |
| qd_buffer_list_free_buffers(&field->buffers); |
| free_qdr_field_t(field); |
| } |
| } |
| |
| |
| char *qdr_field_copy(qdr_field_t *field) |
| { |
| if (!field || !field->iterator) |
| return 0; |
| |
| return (char*) qd_iterator_copy(field->iterator); |
| } |
| |
| |
| qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label) |
| { |
| qdr_action_t *action = new_qdr_action_t(); |
| ZERO(action); |
| action->action_handler = action_handler; |
| action->label = label; |
| return action; |
| } |
| |
| |
| void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action) |
| { |
| sys_mutex_lock(core->action_lock); |
| DEQ_INSERT_TAIL(core->action_list, action); |
| sys_cond_signal(core->action_cond); |
| sys_mutex_unlock(core->action_lock); |
| } |
| |
| |
| void qdr_action_background_enqueue(qdr_core_t *core, qdr_action_t *action) |
| { |
| sys_mutex_lock(core->action_lock); |
| DEQ_INSERT_TAIL(core->action_list_background, action); |
| sys_cond_signal(core->action_cond); |
| sys_mutex_unlock(core->action_lock); |
| } |
| |
| |
| qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config) |
| { |
| if (treatment == QD_TREATMENT_UNAVAILABLE) |
| return 0; |
| |
| qdr_address_t *addr = new_qdr_address_t(); |
| ZERO(addr); |
| addr->config = config; |
| addr->treatment = treatment; |
| addr->forwarder = qdr_forwarder_CT(core, treatment); |
| addr->rnodes = qd_bitmask(0); |
| addr->add_prefix = 0; |
| addr->del_prefix = 0; |
| addr->priority = -1; |
| |
| if (config) |
| config->ref_count++; |
| |
| return addr; |
| } |
| |
| |
| qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *address, qd_address_treatment_t treatment) |
| { |
| char addr_string[1000]; |
| qdr_address_t *addr = 0; |
| qd_iterator_t *iter = 0; |
| |
| snprintf(addr_string, sizeof(addr_string), "%c%s", aclass, address); |
| iter = qd_iterator_string(addr_string, ITER_VIEW_ALL); |
| |
| qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); |
| if (!addr) { |
| addr = qdr_address_CT(core, treatment, 0); |
| if (addr) { |
| qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); |
| DEQ_INSERT_TAIL(core->addrs, addr); |
| addr->ref_count++; |
| addr->local = (aclass == 'L'); |
| } |
| } |
| qd_iterator_free(iter); |
| return addr; |
| } |
| |
| |
| qdr_address_t *qdr_add_mobile_address_CT(qdr_core_t *core, const char *prefix, const char *address, qd_address_treatment_t treatment, bool edge) |
| { |
| char addr_string_stack[1000]; |
| char *addr_string = addr_string_stack; |
| bool allocated = false; |
| qdr_address_t *addr = 0; |
| qd_iterator_t *iter = 0; |
| |
| size_t len = strlen(prefix) + strlen(address) + 3; |
| if (len > sizeof(addr_string_stack)) { |
| allocated = true; |
| addr_string = (char*) malloc(len); |
| } |
| |
| snprintf(addr_string, len, "%s%s%s", edge ? "H" : "M0", prefix, address); |
| iter = qd_iterator_string(addr_string, ITER_VIEW_ALL); |
| |
| qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); |
| if (!addr) { |
| addr = qdr_address_CT(core, treatment, 0); |
| if (addr) { |
| qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); |
| DEQ_INSERT_TAIL(core->addrs, addr); |
| } |
| } |
| |
| qd_iterator_free(iter); |
| if (allocated) |
| free(addr_string); |
| return addr; |
| } |
| |
| |
| bool qdr_address_is_mobile_CT(qdr_address_t *addr) |
| { |
| if (!addr) |
| return false; |
| |
| const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle); |
| |
| if (addr_str && addr_str[0] == QD_ITER_HASH_PREFIX_MOBILE) |
| return true; |
| |
| return false; |
| } |
| |
| bool qdr_is_addr_treatment_multicast(qdr_address_t *addr) |
| { |
| if (addr) { |
| if (addr->treatment == QD_TREATMENT_MULTICAST_FLOOD || addr->treatment == QD_TREATMENT_MULTICAST_ONCE) |
| return true; |
| } |
| return false; |
| } |
| |
| void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr) |
| { |
| if (lr->conn_id) { |
| DEQ_REMOVE_N(REF, lr->conn_id->link_route_refs, lr); |
| qdr_route_check_id_for_deletion_CT(core, lr->conn_id); |
| } |
| |
| if (lr->addr) { |
| if (--lr->addr->ref_count == 0) { |
| qdr_check_addr_CT(core, lr->addr); |
| } |
| } |
| |
| free(lr->add_prefix); |
| free(lr->del_prefix); |
| free(lr->name); |
| free(lr->pattern); |
| free_qdr_link_route_t(lr); |
| } |
| |
| void qdr_core_delete_auto_link(qdr_core_t *core, qdr_auto_link_t *al) |
| { |
| if (al->conn_id) { |
| DEQ_REMOVE_N(REF, al->conn_id->auto_link_refs, al); |
| qdr_route_check_id_for_deletion_CT(core, al->conn_id); |
| } |
| |
| qdr_address_t *addr = al->addr; |
| if (addr && --addr->ref_count == 0) |
| qdr_check_addr_CT(core, addr); |
| |
| free(al->name); |
| free(al->external_addr); |
| qdr_core_timer_free_CT(core, al->retry_timer); |
| free_qdr_auto_link_t(al); |
| } |
| |
| static void free_address_config(qdr_address_config_t *addr) |
| { |
| free(addr->name); |
| free(addr->pattern); |
| free_qdr_address_config_t(addr); |
| } |
| |
| void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr) |
| { |
| qdr_address_config_t *config = addr->config; |
| if (config && --config->ref_count == 0) |
| free_address_config(config); |
| |
| // Remove the address from the list, hash index, and parse tree |
| DEQ_REMOVE(core->addrs, addr); |
| if (addr->hash_handle) { |
| const char *a_str = (const char *)qd_hash_key_by_handle(addr->hash_handle); |
| if (QDR_IS_LINK_ROUTE(a_str[0])) { |
| qd_iterator_t *iter = qd_iterator_string(a_str, ITER_VIEW_ALL); |
| qdr_link_route_unmap_pattern_CT(core, iter); |
| qd_iterator_free(iter); |
| } |
| qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle); |
| qd_hash_handle_free(addr->hash_handle); |
| } |
| |
| // Free resources associated with this address |
| |
| DEQ_APPEND(addr->rlinks, addr->inlinks); |
| qdr_link_ref_t *lref = DEQ_HEAD(addr->rlinks); |
| while (lref) { |
| qdr_link_t *link = lref->link; |
| assert(link->owning_addr == addr); |
| link->owning_addr = 0; |
| qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); |
| lref = DEQ_HEAD(addr->rlinks); |
| } |
| |
| qd_bitmask_free(addr->rnodes); |
| if (addr->treatment == QD_TREATMENT_ANYCAST_CLOSEST) { |
| qd_bitmask_free(addr->closest_remotes); |
| } |
| else if (addr->treatment == QD_TREATMENT_ANYCAST_BALANCED) { |
| free(addr->outstanding_deliveries); |
| } |
| |
| qdr_connection_ref_t *cr = DEQ_HEAD(addr->conns); |
| while (cr) { |
| qdr_del_connection_ref(&addr->conns, cr->conn); |
| cr = DEQ_HEAD(addr->conns); |
| } |
| |
| // |
| // If there are any fallback-related linkages, disconnect them. |
| // |
| if (!!addr->fallback) |
| addr->fallback->fallback_for = 0; |
| if (!!addr->fallback_for) |
| addr->fallback_for->fallback = 0; |
| |
| free(addr->add_prefix); |
| free(addr->del_prefix); |
| free_qdr_address_t(addr); |
| } |
| |
| |
| void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) |
| { |
| const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); |
| link->owning_addr = addr; |
| if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE)) |
| link->phase = (int) (key[1] - '0'); |
| |
| if (link->link_direction == QD_OUTGOING) { |
| qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); |
| if (DEQ_SIZE(addr->rlinks) == 1) { |
| qdr_addr_start_inlinks_CT(core, addr); |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr); |
| } else if (DEQ_SIZE(addr->rlinks) == 2 && qd_bitmask_cardinality(addr->rnodes) == 0) |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_DEST, addr); |
| } else { // link->link_direction == QD_INCOMING |
| qdr_add_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); |
| if (DEQ_SIZE(addr->inlinks) == 1) { |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_SOURCE, addr); |
| if (!!addr->fallback && !link->fallback) |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_SOURCE, addr->fallback); |
| } else if (DEQ_SIZE(addr->inlinks) == 2) { |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_SOURCE, addr); |
| if (!!addr->fallback && !link->fallback) |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_SOURCE, addr->fallback); |
| } |
| } |
| } |
| |
| |
| void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) |
| { |
| link->owning_addr = 0; |
| |
| if (link->link_direction == QD_OUTGOING) { |
| qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); |
| if (DEQ_SIZE(addr->rlinks) == 0) { |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr); |
| } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0) |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); |
| } else { |
| bool removed = qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); |
| if (removed) { |
| if (DEQ_SIZE(addr->inlinks) == 0) { |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_SOURCE, addr); |
| if (!!addr->fallback && !link->fallback) |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_SOURCE, addr->fallback); |
| } else if (DEQ_SIZE(addr->inlinks) == 1) { |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_SOURCE, addr); |
| if (!!addr->fallback && !link->fallback) |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_SOURCE, addr->fallback); |
| } |
| } |
| } |
| } |
| |
| |
| void qdr_core_bind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_connection_t *conn) |
| { |
| qdr_add_connection_ref(&addr->conns, conn); |
| if (DEQ_SIZE(addr->conns) == 1) { |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr); |
| } |
| } |
| |
| |
| void qdr_core_unbind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_connection_t *conn) |
| { |
| qdr_del_connection_ref(&addr->conns, conn); |
| if (DEQ_IS_EMPTY(addr->conns)) { |
| qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr); |
| } |
| } |
| |
| |
| /** |
| * Search for, and possibly create, the fallback address based on the |
| * fallback flag in the address's configuration. This will be used in |
| * the forwarding paths to handle undeliverable messages with fallback destinations. |
| */ |
| void qdr_setup_fallback_address_CT(qdr_core_t *core, qdr_address_t *addr) |
| { |
| #define QDR_SETUP_FALLBACK_BUFFER_SIZE 256 |
| char buffer[QDR_SETUP_FALLBACK_BUFFER_SIZE]; |
| char *alt_text = buffer; |
| bool buffer_on_heap = false; |
| |
| char *address_text = (char*) qd_hash_key_by_handle(addr->hash_handle); |
| size_t alt_length = strlen(address_text) + 1; |
| |
| // |
| // If this is a fallback address for a primary address that hasn't been seen |
| // yet, simply exit without doing anything. |
| // |
| if (address_text[1] == QD_ITER_HASH_PHASE_FALLBACK) |
| return; |
| |
| if (alt_length > QDR_SETUP_FALLBACK_BUFFER_SIZE) { |
| alt_text = (char*) malloc(alt_length); |
| buffer_on_heap = true; |
| } |
| |
| strcpy(alt_text, address_text); |
| alt_text[1] = QD_ITER_HASH_PHASE_FALLBACK; |
| |
| qd_iterator_t *alt_iter = qd_iterator_string(alt_text, ITER_VIEW_ALL); |
| qdr_address_t *alt_addr = 0; |
| |
| qd_hash_retrieve(core->addr_hash, alt_iter, (void**) &alt_addr); |
| if (!alt_addr) { |
| alt_addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED, 0); |
| qd_hash_insert(core->addr_hash, alt_iter, alt_addr, &alt_addr->hash_handle); |
| DEQ_INSERT_TAIL(core->addrs, alt_addr); |
| } |
| |
| assert(alt_addr != addr); |
| assert(alt_addr->fallback_for == 0); |
| addr->fallback = alt_addr; |
| alt_addr->fallback_for = addr; |
| |
| qd_iterator_free(alt_iter); |
| if (buffer_on_heap) |
| free(alt_text); |
| } |
| |
| |
| void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr) |
| { |
| qd_iterator_t *pattern = qd_iterator_string(addr->pattern, ITER_VIEW_ALL); |
| |
| // Remove the address from the list and the parse tree |
| DEQ_REMOVE(core->addr_config, addr); |
| qd_parse_tree_remove_pattern(core->addr_parse_tree, pattern); |
| addr->ref_count--; |
| |
| if (addr->ref_count == 0) |
| free_address_config(addr); |
| qd_iterator_free(pattern); |
| } |
| |
| |
| void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls) |
| { |
| if (link->ref[cls] != 0) |
| return; |
| |
| qdr_link_ref_t *ref = new_qdr_link_ref_t(); |
| DEQ_ITEM_INIT(ref); |
| ref->link = link; |
| link->ref[cls] = ref; |
| DEQ_INSERT_TAIL(*ref_list, ref); |
| } |
| |
| |
| bool qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls) |
| { |
| if (link->ref[cls]) { |
| DEQ_REMOVE(*ref_list, link->ref[cls]); |
| free_qdr_link_ref_t(link->ref[cls]); |
| link->ref[cls] = 0; |
| return true; |
| } |
| return false; |
| } |
| |
| |
| void move_link_ref(qdr_link_t *link, int from_cls, int to_cls) |
| { |
| assert(link->ref[to_cls] == 0); |
| link->ref[to_cls] = link->ref[from_cls]; |
| link->ref[from_cls] = 0; |
| } |
| |
| |
| void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn) |
| { |
| qdr_connection_ref_t *ref = new_qdr_connection_ref_t(); |
| DEQ_ITEM_INIT(ref); |
| ref->conn = conn; |
| DEQ_INSERT_TAIL(*ref_list, ref); |
| } |
| |
| |
| void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn) |
| { |
| qdr_connection_ref_t *ref = DEQ_HEAD(*ref_list); |
| while (ref) { |
| if (ref->conn == conn) { |
| DEQ_REMOVE(*ref_list, ref); |
| free_qdr_connection_ref_t(ref); |
| break; |
| } |
| ref = DEQ_NEXT(ref); |
| } |
| } |
| |
| |
| void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv) |
| { |
| qdr_delivery_ref_t *ref = new_qdr_delivery_ref_t(); |
| DEQ_ITEM_INIT(ref); |
| ref->dlv = dlv; |
| DEQ_INSERT_TAIL(*list, ref); |
| } |
| |
| |
| void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref) |
| { |
| DEQ_REMOVE(*list, ref); |
| free_qdr_delivery_ref_t(ref); |
| } |
| |
| |
| void qdr_add_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_t *sub) |
| { |
| qdr_subscription_ref_t *ref = new_qdr_subscription_ref_t(); |
| DEQ_ITEM_INIT(ref); |
| ref->sub = sub; |
| DEQ_INSERT_TAIL(*list, ref); |
| } |
| |
| |
| void qdr_del_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_ref_t *ref) |
| { |
| DEQ_REMOVE(*list, ref); |
| free_qdr_subscription_ref_t(ref); |
| } |
| |
| |
| static void qdr_general_handler(void *context) |
| { |
| qdr_core_t *core = (qdr_core_t*) context; |
| qdr_general_work_list_t work_list; |
| qdr_general_work_t *work; |
| |
| sys_mutex_lock(core->work_lock); |
| DEQ_MOVE(core->work_list, work_list); |
| sys_mutex_unlock(core->work_lock); |
| |
| work = DEQ_HEAD(work_list); |
| while (work) { |
| DEQ_REMOVE_HEAD(work_list); |
| work->handler(core, work); |
| free_qdr_general_work_t(work); |
| work = DEQ_HEAD(work_list); |
| } |
| } |
| |
| |
| qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler) |
| { |
| qdr_general_work_t *work = new_qdr_general_work_t(); |
| ZERO(work); |
| work->handler = handler; |
| return work; |
| } |
| |
| |
| void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work) |
| { |
| bool notify; |
| |
| sys_mutex_lock(core->work_lock); |
| DEQ_ITEM_INIT(work); |
| DEQ_INSERT_TAIL(core->work_list, work); |
| notify = DEQ_SIZE(core->work_list) == 1; |
| sys_mutex_unlock(core->work_lock); |
| |
| if (notify) |
| qd_timer_schedule(core->work_timer, 0); |
| } |
| |
| |
| uint64_t qdr_identifier(qdr_core_t* core) |
| { |
| sys_mutex_lock(core->id_lock); |
| uint64_t id = core->next_identifier++; |
| sys_mutex_unlock(core->id_lock); |
| return id; |
| } |
| |
| void qdr_reset_sheaf(qdr_core_t *core, uint8_t n) |
| { |
| qdr_priority_sheaf_t *sheaf = core->data_links_by_mask_bit + n; |
| sheaf->count = 0; |
| memset(sheaf->links, 0, QDR_N_PRIORITIES * sizeof(void *)); |
| } |
| |
| |
| void qdr_connection_work_free_CT(qdr_connection_work_t *work) |
| { |
| qdr_terminus_free(work->source); |
| qdr_terminus_free(work->target); |
| free_qdr_connection_work_t(work); |
| } |
| |
| static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t *work) |
| { |
| work->stats_handler(work->context); |
| } |
| |
| static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard) |
| { |
| qdr_global_stats_t *stats = action->args.stats_request.stats; |
| if (stats) { |
| stats->addrs = DEQ_SIZE(core->addrs); |
| stats->links = DEQ_SIZE(core->open_links); |
| stats->routers = DEQ_SIZE(core->routers); |
| stats->connections = DEQ_SIZE(core->open_connections); |
| stats->link_routes = DEQ_SIZE(core->link_routes); |
| stats->auto_links = DEQ_SIZE(core->auto_links); |
| stats->presettled_deliveries = core->presettled_deliveries; |
| stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries; |
| stats->accepted_deliveries = core->accepted_deliveries; |
| stats->rejected_deliveries = core->rejected_deliveries; |
| stats->released_deliveries = core->released_deliveries; |
| stats->modified_deliveries = core->modified_deliveries; |
| stats->deliveries_ingress = core->deliveries_ingress; |
| stats->deliveries_egress = core->deliveries_egress; |
| stats->deliveries_transit = core->deliveries_transit; |
| stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container; |
| stats->deliveries_egress_route_container = core->deliveries_egress_route_container; |
| stats->deliveries_delayed_1sec = core->deliveries_delayed_1sec; |
| stats->deliveries_delayed_10sec = core->deliveries_delayed_10sec; |
| stats->deliveries_stuck = core->deliveries_stuck; |
| stats->links_blocked = core->links_blocked; |
| stats->deliveries_redirected_to_fallback = core->deliveries_redirected; |
| } |
| qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response); |
| work->stats_handler = action->args.stats_request.handler; |
| work->context = action->args.stats_request.context; |
| qdr_post_general_work_CT(core, work); |
| } |
| |
| void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t callback, void *context) |
| { |
| qdr_action_t *action = qdr_action(qdr_global_stats_request_CT, "global_stats_request"); |
| action->args.stats_request.stats = stats; |
| action->args.stats_request.handler = callback; |
| action->args.stats_request.context = context; |
| qdr_action_enqueue(core, action); |
| } |
| |