blob: 07fcc62987574bd9db122fdc8c4e93a9d7e6e966 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/python_embedded.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
#include <stdlib.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
#include "entity_cache.h"
#include "router_private.h"
#include "waypoint_private.h"
const char *QD_ROUTER_NODE_TYPE = "router.node";
const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
const char *QD_ROUTER_LINK_TYPE = "router.link";
static char *router_role = "inter-router";
static char *on_demand_role = "on-demand";
static char *local_prefix = "_local/";
static char *topo_prefix = "_topo/";
static char *direct_prefix;
static char *node_id;
/*
* Address Types and Processing:
*
* Address Hash Key onReceive
* ===================================================================
* _local/<local> L<local> handler
* _topo/<area>/<router>/<local> A<area> forward
* _topo/<my-area>/<router>/<local> R<router> forward
* _topo/<my-area>/<my-router>/<local> L<local> handler
* _topo/<area>/all/<local> A<area> forward
* _topo/<my-area>/all/<local> L<local> forward handler
* _topo/all/all/<local> L<local> forward handler
* <mobile> M<mobile> forward handler
*/
ALLOC_DEFINE(qd_routed_event_t);
ALLOC_DEFINE(qd_router_link_t);
ALLOC_DEFINE(qd_router_node_t);
ALLOC_DEFINE(qd_router_ref_t);
ALLOC_DEFINE(qd_router_link_ref_t);
ALLOC_DEFINE(qd_router_lrp_ref_t);
ALLOC_DEFINE(qd_address_t);
ALLOC_DEFINE(qd_router_conn_t);
qd_address_t* qd_address(qd_address_semantics_t semantics)
{
qd_address_t* addr = new_qd_address_t();
memset(addr, 0, sizeof(qd_address_t));
DEQ_ITEM_INIT(addr);
DEQ_INIT(addr->lrps);
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
addr->semantics = semantics;
addr->forwarder = qd_router_get_forwarder(semantics);
return addr;
}
const char* qd_address_logstr(qd_address_t* address) {
return (char*)qd_hash_key_by_handle(address->hash_handle);
}
void qd_router_add_link_ref_LH(qd_router_link_ref_list_t *ref_list, qd_router_link_t *link)
{
qd_router_link_ref_t *ref = new_qd_router_link_ref_t();
DEQ_ITEM_INIT(ref);
ref->link = link;
link->ref = ref;
DEQ_INSERT_TAIL(*ref_list, ref);
}
void qd_router_del_link_ref_LH(qd_router_link_ref_list_t *ref_list, qd_router_link_t *link)
{
if (link->ref) {
DEQ_REMOVE(*ref_list, link->ref);
free_qd_router_link_ref_t(link->ref);
link->ref = 0;
}
}
void qd_router_add_node_ref_LH(qd_router_ref_list_t *ref_list, qd_router_node_t *rnode)
{
qd_router_ref_t *ref = new_qd_router_ref_t();
DEQ_ITEM_INIT(ref);
ref->router = rnode;
rnode->ref_count++;
DEQ_INSERT_TAIL(*ref_list, ref);
}
void qd_router_del_node_ref_LH(qd_router_ref_list_t *ref_list, qd_router_node_t *rnode)
{
qd_router_ref_t *ref = DEQ_HEAD(*ref_list);
while (ref) {
if (ref->router == rnode) {
DEQ_REMOVE(*ref_list, ref);
free_qd_router_ref_t(ref);
rnode->ref_count--;
break;
}
ref = DEQ_NEXT(ref);
}
}
void qd_router_add_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp)
{
qd_router_lrp_ref_t *ref = new_qd_router_lrp_ref_t();
DEQ_ITEM_INIT(ref);
ref->lrp = lrp;
DEQ_INSERT_TAIL(*ref_list, ref);
}
void qd_router_del_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp)
{
qd_router_lrp_ref_t *ref = DEQ_HEAD(*ref_list);
while (ref) {
if (ref->lrp == lrp) {
DEQ_REMOVE(*ref_list, ref);
free_qd_router_lrp_ref_t(ref);
break;
}
ref = DEQ_NEXT(ref);
}
}
/**
* Check an address to see if it no longer has any associated destinations.
* Depending on its policy, the address may be eligible for being closed out
* (i.e. Logging its terminal statistics and freeing its resources).
*/
void qd_router_check_addr(qd_router_t *router, qd_address_t *addr, int was_local)
{
if (addr == 0)
return;
unsigned char *key = 0;
int to_delete = 0;
int no_more_locals = 0;
sys_mutex_lock(router->lock);
//
// If the address has no in-process consumer or destinations, it should be
// deleted.
//
if (addr->on_message == 0 &&
DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
!addr->waypoint && !addr->block_deletion)
to_delete = 1;
//
// If we have just removed a local linkage and it was the last local linkage,
// we need to notify the router module that there is no longer a local
// presence of this address.
//
if (was_local && DEQ_SIZE(addr->rlinks) == 0)
no_more_locals = 1;
if (to_delete) {
//
// Delete the address but grab the hash key so we can use it outside the
// critical section.
//
qd_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key);
DEQ_REMOVE(router->addrs, addr);
qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, addr);
qd_hash_handle_free(addr->hash_handle);
free_qd_address_t(addr);
}
//
// If we're not deleting but there are no more locals, get a copy of the hash key.
//
if (!to_delete && no_more_locals) {
const unsigned char *key_const = qd_hash_key_by_handle(addr->hash_handle);
key = (unsigned char*) malloc(strlen((const char*) key_const) + 1);
strcpy((char*) key, (const char*) key_const);
}
sys_mutex_unlock(router->lock);
//
// If the address is mobile-class and it was just removed from a local link,
// tell the router module that it is no longer attached locally.
//
if (no_more_locals && key && key[0] == 'M')
qd_router_mobile_removed(router, (const char*) key);
//
// Free the key that was not freed by the hash table.
//
if (key)
free(key);
}
/**
* Determine whether a connection is configured in the inter-router role.
*/
static int qd_router_connection_is_inter_router(const qd_connection_t *conn)
{
if (!conn)
return 0;
const qd_server_config_t *cf = qd_connection_config(conn);
if (cf && strcmp(cf->role, router_role) == 0)
return 1;
return 0;
}
/**
* Determine whether a connection is configured in the on-demand role.
*/
static int qd_router_connection_is_on_demand(const qd_connection_t *conn)
{
if (!conn)
return 0;
const qd_server_config_t *cf = qd_connection_config(conn);
if (cf && strcmp(cf->role, on_demand_role) == 0)
return 1;
return 0;
}
/**
* Determine whether a terminus has router capability
*/
static int qd_router_terminus_is_router(pn_terminus_t *term)
{
pn_data_t *cap = pn_terminus_capabilities(term);
pn_data_rewind(cap);
pn_data_next(cap);
if (cap && pn_data_type(cap) == PN_SYMBOL) {
pn_bytes_t sym = pn_data_get_symbol(cap);
if (sym.size == strlen(QD_CAPABILITY_ROUTER) &&
strcmp(sym.start, QD_CAPABILITY_ROUTER) == 0)
return 1;
}
return 0;
}
/**
* If the terminus has a dynamic-node-property for a node address,
* return an iterator for the content of that property.
*/
static const char *qd_router_terminus_dnp_address(pn_terminus_t *term)
{
pn_data_t *props = pn_terminus_properties(term);
if (!props)
return 0;
pn_data_rewind(props);
if (pn_data_next(props) && pn_data_enter(props) && pn_data_next(props)) {
pn_bytes_t sym = pn_data_get_symbol(props);
if (sym.start && strcmp(QD_DYNAMIC_NODE_PROPERTY_ADDRESS, sym.start) == 0) {
if (pn_data_next(props)) {
pn_bytes_t val = pn_data_get_string(props);
if (val.start && *val.start != '\0')
return val.start;
}
}
}
return 0;
}
/**
* Generate a temporary routable address for a destination connected to this
* router node.
*/
static void qd_router_generate_temp_addr(qd_router_t *router, char *buffer, size_t length)
{
static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
char discriminator[11];
long int rnd1 = random();
long int rnd2 = random();
int idx;
int cursor = 0;
for (idx = 0; idx < 5; idx++) {
discriminator[cursor++] = table[(rnd1 >> (idx * 6)) & 63];
discriminator[cursor++] = table[(rnd2 >> (idx * 6)) & 63];
}
discriminator[cursor] = '\0';
snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator);
}
/**
* Assign a link-mask-bit to a new link. Do this in such a way that all links on the same
* connection share the same mask-bit value.
*/
static int qd_router_find_mask_bit_LH(qd_router_t *router, qd_link_t *link)
{
qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
if (shared) {
shared->ref_count++;
return shared->mask_bit;
}
int mask_bit;
if (qd_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
qd_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
} else {
qd_log(router->log_source, QD_LOG_CRITICAL, "Exceeded maximum inter-router link count");
return -1;
}
shared = new_qd_router_conn_t();
shared->ref_count = 1;
shared->mask_bit = mask_bit;
qd_link_set_conn_context(link, shared);
return mask_bit;
}
/**
*
*/
static qd_address_t *router_lookup_terminus_LH(qd_router_t *router, const char *taddr)
{
//
// For now: Find the first instance of a '.' in the address and search for the text
// up to and including this instance.
//
if (taddr == 0 || *taddr == '\0')
return 0;
const char *cursor = taddr;
while (*cursor && *cursor != '.')
cursor++;
if (*cursor == '.')
cursor++;
int len = (int) (cursor - taddr);
qd_field_iterator_t *iter = qd_address_iterator_binary(taddr, len, ITER_VIEW_ADDRESS_HASH);
qd_address_iterator_override_prefix(iter, 'C');
qd_address_t *addr;
qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
qd_field_iterator_free(iter);
return addr;
}
void qd_router_link_free_LH(qd_router_link_t *rlink)
{
qd_link_t *link = rlink->link;
if (link) {
qd_link_set_context(link, 0);
qd_link_free_LH(link);
rlink->link = 0;
}
if (rlink->target)
free(rlink->target);
assert(rlink->ref == 0);
qd_routed_event_t *re;
re = DEQ_HEAD(rlink->event_fifo);
while (re) {
DEQ_REMOVE_HEAD(rlink->event_fifo);
if (re->delivery && qd_router_delivery_fifo_exit_LH(re->delivery)) {
qd_router_delivery_unlink_LH(re->delivery);
}
free_qd_routed_event_t(re);
re = DEQ_HEAD(rlink->event_fifo);
}
re = DEQ_HEAD(rlink->msg_fifo);
while (re) {
DEQ_REMOVE_HEAD(rlink->msg_fifo);
if (re->delivery)
qd_router_delivery_fifo_exit_LH(re->delivery);
if (re->message)
qd_message_free(re->message);
free_qd_routed_event_t(re);
re = DEQ_HEAD(rlink->msg_fifo);
}
qd_router_delivery_t *delivery = DEQ_HEAD(rlink->deliveries);
while (delivery) {
// this unlinks the delivery from the rlink:
qd_router_delivery_free_LH(delivery, PN_RELEASED);
delivery = DEQ_HEAD(rlink->deliveries);
}
free_qd_router_link_t(rlink);
}
/**
* Outgoing Link Writable Handler
*/
static int router_writable_link_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
qd_router_delivery_t *delivery;
qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
pn_link_t *pn_link = qd_link_pn(link);
uint64_t tag;
int link_credit = pn_link_credit(pn_link);
qd_routed_event_list_t to_send;
qd_routed_event_list_t events;
qd_routed_event_t *re;
size_t offer;
int event_count = 0;
bool drain_mode;
bool drain_changed = qd_link_drain_changed(link, &drain_mode);
if (!rlink)
return 0;
DEQ_INIT(to_send);
DEQ_INIT(events);
sys_mutex_lock(router->lock);
//
// Pull the non-delivery events into a local list so they can be processed without
// the lock being held.
//
re = DEQ_HEAD(rlink->event_fifo);
while (re) {
DEQ_REMOVE_HEAD(rlink->event_fifo);
DEQ_INSERT_TAIL(events, re);
re = DEQ_HEAD(rlink->event_fifo);
}
//
// Under lock, move available deliveries from the msg_fifo to the local to_send
// list. Don't move more than we have credit to send.
//
if (link_credit > 0) {
tag = router->dtag;
re = DEQ_HEAD(rlink->msg_fifo);
while (re) {
DEQ_REMOVE_HEAD(rlink->msg_fifo);
DEQ_INSERT_TAIL(to_send, re);
if (DEQ_SIZE(to_send) == link_credit)
break;
re = DEQ_HEAD(rlink->msg_fifo);
}
router->dtag += DEQ_SIZE(to_send);
}
offer = DEQ_SIZE(rlink->msg_fifo);
sys_mutex_unlock(router->lock);
//
// Deliver all the to_send messages downrange
//
re = DEQ_HEAD(to_send);
while (re) {
DEQ_REMOVE_HEAD(to_send);
//
// Get a delivery for the send. This will be the current delivery on the link.
//
tag++;
delivery = qd_router_link_new_delivery(rlink, pn_dtag((char*) &tag, 8));
//
// Send the message
//
qd_message_send(re->message, link);
//
// Check the delivery associated with the queued message. If it is not
// settled, link it to the outgoing delivery for disposition/settlement
// tracking. If it is (pre-)settled, put it on the incoming link's event
// queue to be locally settled. This is done to hold session credit during
// the time the message is in the outgoing message fifo.
//
sys_mutex_lock(router->lock);
if (re->delivery) {
if (qd_router_delivery_fifo_exit_LH(re->delivery)) {
if (qd_router_delivery_settled(re->delivery)) {
qd_router_link_t *peer_rlink = qd_router_delivery_link(re->delivery);
qd_routed_event_t *return_re = new_qd_routed_event_t();
DEQ_ITEM_INIT(return_re);
return_re->delivery = re->delivery;
return_re->message = 0;
return_re->settle = true;
return_re->disposition = 0;
qd_router_delivery_fifo_enter_LH(re->delivery);
DEQ_INSERT_TAIL(peer_rlink->event_fifo, return_re);
qd_link_activate(peer_rlink->link);
} else
qd_router_delivery_link_peers_LH(re->delivery, delivery);
}
} else
qd_router_delivery_free_LH(delivery, 0); // settle and free
sys_mutex_unlock(router->lock);
pn_link_advance(pn_link);
event_count++;
qd_message_free(re->message);
free_qd_routed_event_t(re);
re = DEQ_HEAD(to_send);
}
//
// Process the non-delivery events.
//
re = DEQ_HEAD(events);
while (re) {
DEQ_REMOVE_HEAD(events);
if (re->delivery) {
if (re->disposition) {
pn_delivery_update(qd_router_delivery_pn(re->delivery), re->disposition);
event_count++;
}
sys_mutex_lock(router->lock);
bool ok = qd_router_delivery_fifo_exit_LH(re->delivery);
if (ok && re->settle) {
qd_router_delivery_unlink_LH(re->delivery);
qd_router_delivery_free_LH(re->delivery, re->disposition);
event_count++;
}
sys_mutex_unlock(router->lock);
}
free_qd_routed_event_t(re);
re = DEQ_HEAD(events);
}
//
// Set the offer to the number of messages remaining to be sent.
//
if (offer > 0)
pn_link_offered(pn_link, offer);
else {
pn_link_drained(pn_link);
//
// If this link is in drain mode and it wasn't last time we came through here, we need to
// count this operation as a work event. This will allow the container to process the
// connector and send out the flow(drain=true) response to the receiver.
//
if (drain_changed && drain_mode)
event_count++;
}
return event_count;
}
static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
qd_parsed_field_t *in_ma,
qd_message_t *msg,
int *drop,
const char *to_override)
{
qd_field_iterator_t *ingress_iter = 0;
qd_parsed_field_t *trace = 0;
qd_parsed_field_t *ingress = 0;
qd_parsed_field_t *to = 0;
if (in_ma) {
uint32_t count = qd_parse_sub_count(in_ma);
bool done = false;
for (uint32_t idx = 0; idx < count && !done; idx++) {
qd_parsed_field_t *sub = qd_parse_sub_key(in_ma, idx);
if (!sub) continue;
qd_field_iterator_t *iter = qd_parse_raw(sub);
if (!iter) continue;
if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_TRACE)) {
trace = qd_parse_sub_value(in_ma, idx);
} else if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_INGRESS)) {
ingress = qd_parse_sub_value(in_ma, idx);
} else if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_TO)) {
to = qd_parse_sub_value(in_ma, idx);
}
done = trace && ingress && to;
}
}
//
// QD_MA_TRACE:
// If there is a trace field, append this router's ID to the trace.
// If the router ID is already in the trace the msg has looped.
//
qd_composed_field_t *trace_field = qd_compose_subfield(0);
qd_compose_start_list(trace_field);
if (trace) {
if (qd_parse_is_list(trace)) {
uint32_t idx = 0;
qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
while (trace_item) {
qd_field_iterator_t *iter = qd_parse_raw(trace_item);
if (qd_field_iterator_equal(iter, (unsigned char*) node_id)) {
*drop = 1;
return 0; // no further processing necessary
}
qd_field_iterator_reset(iter);
qd_compose_insert_string_iterator(trace_field, iter);
idx++;
trace_item = qd_parse_sub_value(trace, idx);
}
}
}
qd_compose_insert_string(trace_field, node_id);
qd_compose_end_list(trace_field);
qd_message_set_trace_annotation(msg, trace_field);
//
// QD_MA_TO:
// The supplied to override takes precedense over any existing
// value.
//
if (to_override) { // takes precedence over existing value
qd_composed_field_t *to_field = qd_compose_subfield(0);
qd_compose_insert_string(to_field, to_override);
qd_message_set_to_override_annotation(msg, to_field);
} else if (to) {
qd_composed_field_t *to_field = qd_compose_subfield(0);
qd_compose_insert_string_iterator(to_field, qd_parse_raw(to));
qd_message_set_to_override_annotation(msg, to_field);
}
//
// QD_MA_INGRESS:
// If there is no ingress field, annotate the ingress as
// this router else keep the original field.
//
qd_composed_field_t *ingress_field = qd_compose_subfield(0);
if (ingress && qd_parse_is_scalar(ingress)) {
ingress_iter = qd_parse_raw(ingress);
qd_compose_insert_string_iterator(ingress_field, ingress_iter);
} else
qd_compose_insert_string(ingress_field, node_id);
qd_message_set_ingress_annotation(msg, ingress_field);
//
// Return the iterator to the ingress field _if_ it was present.
// If we added the ingress, return NULL.
//
return ingress_iter;
}
/**
* Handle the link-routing case, where links are pre-paired and there is no per-message
* routing needed.
*
* Note that this function does not issue a replacement credit for the received message.
* In link-routes, the flow commands must be propagated end-to-end. In other words, the
* ultimate receiving endpoint will issue the replacement credits as it sees fit.
*
* Note also that this function does not perform any message validation. For link-routing,
* there is no need to look into the transferred message.
*/
static void router_link_route_delivery_LH(qd_router_link_t *peer_link, qd_router_delivery_t *delivery, qd_message_t *msg)
{
qd_routed_event_t *re = new_qd_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = msg;
re->settle = false;
re->disposition = 0;
DEQ_INSERT_TAIL(peer_link->msg_fifo, re);
//
// Link the incoming delivery into the event for deferred processing
//
re->delivery = delivery;
qd_router_delivery_fifo_enter_LH(delivery);
qd_link_activate(peer_link->link);
}
/**
* Inbound Delivery Handler
*/
static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
qd_message_t *msg;
int valid_message = 0;
//
// Receive the message into a local representation. If the returned message
// pointer is NULL, we have not yet received a complete message.
//
// Note: In the link-routing case, consider cutting the message through. There's
// no reason to wait for the whole message to be received before starting to
// send it.
//
msg = qd_message_receive(pnd);
if (!msg)
return;
//
// Consume the delivery.
//
pn_link_advance(pn_link);
if (!rlink) {
qd_message_free(msg);
return;
}
//
// Handle the Link-Routing case.
//
sys_mutex_lock(router->lock);
qd_router_link_t *clink = rlink->connected_link;
if (clink) {
router_link_route_delivery_LH(clink, qd_router_delivery(rlink, pnd), msg);
sys_mutex_unlock(router->lock);
return;
}
//
// Handle the Message-Routing case. Start by issuing a replacement credit.
//
pn_link_flow(pn_link, 1);
//
// Validate the message through the Properties section so we can access the TO field.
//
qd_message_t *in_process_copy = 0;
qd_router_message_cb_t on_message = 0;
void *on_message_context = 0;
valid_message = qd_message_check(msg, QD_DEPTH_PROPERTIES);
if (valid_message) {
qd_parsed_field_t *in_ma = 0;
qd_field_iterator_t *iter = 0;
bool free_iter = true;
char *to_override = 0;
bool forwarded = false;
qd_router_delivery_t *delivery = qd_router_delivery(rlink, pnd);
//
// Only respect the delivery annotations if the message came from another router.
//
if (rlink->link_type != QD_LINK_WAYPOINT)
in_ma = qd_message_message_annotations(msg);
//
// If the message has delivery annotations, get the to-override field from the annotations.
//
if (in_ma) {
qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO);
if (ma_to) {
iter = qd_parse_raw(ma_to);
free_iter = false;
}
}
//
// If this is a waypoint link, set the address (and to_override) to the phased
// address for the link.
//
if (!iter && rlink->waypoint) {
iter = qd_address_iterator_string(rlink->waypoint->address, ITER_VIEW_ADDRESS_HASH);
qd_address_iterator_set_phase(iter, rlink->waypoint->out_phase);
}
//
// Still no destination address? Use the TO field from the message properties.
//
if (!iter)
iter = qd_message_field_iterator(msg, QD_FIELD_TO);
//
// Handle the case where the TO field is absent and the incoming link has a target
// address. Use the target address in the lookup in lieu of a TO address.
// Note also that the message must then be annotated with a TO-OVERRIDE field in
// the delivery annotations.
//
// ref: https://issues.apache.org/jira/browse/DISPATCH-1
//
if (!iter && rlink->target) {
iter = qd_address_iterator_string(rlink->target, ITER_VIEW_ALL);
to_override = rlink->target;
}
if (iter) {
//
// Note: This function is going to need to be refactored so we can put an
// asynchronous address lookup here. In the event there is a translation
// of the address (via namespace), it will have to be done here after
// obtaining the iterator and before doing the hash lookup.
//
// Note that this lookup is only done for global/mobile class addresses.
//
bool is_local;
bool is_direct;
qd_address_t *addr = qd_router_address_lookup_LH(router, iter, &is_local, &is_direct);
if (free_iter)
qd_field_iterator_free(iter);
if (addr) {
//
// If the incoming link is an endpoint link, count this as an ingress delivery.
//
if (rlink->link_type == QD_LINK_ENDPOINT)
addr->deliveries_ingress++;
//
// TO field is valid and contains a known destination. Handle the various
// cases for forwarding.
//
// Interpret and update the delivery annotations of the message. As a convenience,
// this function returns the iterator to the ingress field (if it exists). It also
// returns a 'drop' indication if it detects that the message will loop.
//
int drop = 0;
qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &drop, to_override);
if (!drop) {
//
// Forward a copy of the message to the in-process endpoint for
// this address if there is one. The actual invocation of the
// handler will occur later after we've released the lock.
//
if (addr->on_message) {
in_process_copy = qd_message_copy(msg);
on_message = addr->on_message;
on_message_context = addr->on_message_context;
addr->deliveries_to_container++;
}
//
// If the address form is local (i.e. is prefixed by _local), don't forward
// outside of the router process.
//
if (!is_local && router->router_mode != QD_ROUTER_MODE_ENDPOINT) {
qd_router_forwarder_t *f = addr->forwarder;
forwarded = f->forward(f, router, msg, delivery, addr, ingress_iter, is_direct);
}
}
}
}
if (!forwarded) {
if (on_message)
// our local in-process handler will accept it:
qd_router_delivery_free_LH(delivery, PN_ACCEPTED);
else {
// no one has accepted it, so inform sender
qd_router_delivery_set_undeliverable_LH(delivery);
qd_router_delivery_free_LH(delivery, PN_MODIFIED);
}
}
} else {
//
// Message is invalid. Reject the message.
//
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
}
sys_mutex_unlock(router->lock);
qd_message_free(msg);
//
// Invoke the in-process handler now that the lock is released.
//
if (on_message) {
on_message(on_message_context, in_process_copy, rlink->mask_bit);
qd_message_free(in_process_copy);
}
}
/**
* Delivery Disposition Handler
*/
static void router_disposition_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
{
qd_router_t *router = (qd_router_t*) context;
qd_router_delivery_t *delivery = (qd_router_delivery_t *)pn_delivery_get_context(pnd);
if (!delivery) return;
bool changed = qd_router_delivery_disp_changed(delivery);
uint64_t disp = qd_router_delivery_disp(delivery);
bool settled = qd_router_delivery_settled(delivery);
sys_mutex_lock(router->lock);
qd_router_delivery_t *peer = qd_router_delivery_peer(delivery);
if (peer) {
//
// The case where this delivery has a peer.
//
if (changed || settled) {
qd_router_link_t *peer_link = qd_router_delivery_link(peer);
qd_routed_event_t *re = new_qd_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = peer;
re->message = 0;
re->settle = settled;
re->disposition = changed ? disp : 0;
qd_router_delivery_fifo_enter_LH(peer);
DEQ_INSERT_TAIL(peer_link->event_fifo, re);
if (settled) {
qd_router_delivery_unlink_LH(delivery);
qd_router_delivery_free_LH(delivery, 0);
}
qd_link_activate(peer_link->link);
}
} else if (settled)
qd_router_delivery_free_LH(delivery, 0);
sys_mutex_unlock(router->lock);
}
typedef struct link_attach_t {
qd_router_t *router;
qd_router_link_t *peer_link;
qd_link_t *peer_qd_link;
char *link_name;
qd_direction_t dir;
qd_connection_t *conn;
int credit;
} link_attach_t;
ALLOC_DECLARE(link_attach_t);
ALLOC_DEFINE(link_attach_t);
#define COND_NAME_LEN 127
#define COND_DESCRIPTION_LEN 511
typedef struct link_detach_t {
qd_router_t *router;
qd_router_link_t *rlink;
char condition_name[COND_NAME_LEN + 1];
char condition_description[COND_DESCRIPTION_LEN + 1];
pn_data_t *condition_info;
} link_detach_t;
ALLOC_DECLARE(link_detach_t);
ALLOC_DEFINE(link_detach_t);
typedef struct link_event_t {
qd_router_t *router;
qd_router_link_t *rlink;
int credit;
bool drain;
} link_event_t;
ALLOC_DECLARE(link_event_t);
ALLOC_DEFINE(link_event_t);
typedef enum {
LINK_ATTACH_FORWARDED = 1, ///< The attach was forwarded
LINK_ATTACH_NO_MATCH = 2, ///< No link-route address was found
LINK_ATTACH_NO_PATH = 3 ///< Link-route exists but there's no reachable destination
} link_attach_result_t;
static void qd_router_attach_routed_link(void *context, bool discard)
{
link_attach_t *la = (link_attach_t*) context;
if (!discard) {
qd_link_t *link = qd_link(la->router->node, la->conn, la->dir, la->link_name);
qd_router_link_t *rlink = new_qd_router_link_t();
DEQ_ITEM_INIT(rlink);
rlink->link_type = QD_LINK_ENDPOINT;
rlink->link_direction = la->dir;
rlink->owning_addr = 0;
rlink->waypoint = 0;
rlink->link = link;
rlink->ref = 0;
rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
DEQ_INIT(rlink->deliveries);
qd_link_set_context(link, rlink);
sys_mutex_lock(la->router->lock);
rlink->connected_link = la->peer_link;
la->peer_link->connected_link = rlink;
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(la->router->links, rlink);
sys_mutex_unlock(la->router->lock);
pn_terminus_copy(qd_link_source(link), qd_link_remote_source(la->peer_qd_link));
pn_terminus_copy(qd_link_target(link), qd_link_remote_target(la->peer_qd_link));
pn_link_open(qd_link_pn(link));
if (la->credit > 0)
pn_link_flow(qd_link_pn(link), la->credit);
}
if (la->link_name)
free(la->link_name);
free_link_attach_t(la);
}
static void qd_router_detach_routed_link(void *context, bool discard)
{
link_detach_t *ld = (link_detach_t*) context;
if (!discard) {
qd_link_t *link = ld->rlink->link;
if (ld->condition_name[0]) {
pn_condition_t *cond = pn_link_condition(qd_link_pn(link));
pn_condition_set_name(cond, ld->condition_name);
pn_condition_set_description(cond, ld->condition_description);
if (ld->condition_info)
pn_data_copy(pn_condition_info(cond), ld->condition_info);
}
qd_link_close(link);
sys_mutex_lock(ld->router->lock);
qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, ld->rlink);
DEQ_REMOVE(ld->router->links, ld->rlink);
qd_router_link_free_LH(ld->rlink);
sys_mutex_unlock(ld->router->lock);
}
if (ld->condition_info)
pn_data_free(ld->condition_info);
free_link_detach_t(ld);
}
static void qd_router_open_routed_link(void *context, bool discard)
{
link_event_t *le = (link_event_t*) context;
if (!discard) {
qd_link_t *link = le->rlink->link;
if (le->rlink->connected_link) {
qd_link_t *peer = le->rlink->connected_link->link;
pn_terminus_copy(qd_link_source(link), qd_link_remote_source(peer));
pn_terminus_copy(qd_link_target(link), qd_link_remote_target(peer));
pn_link_open(qd_link_pn(link));
}
}
free_link_event_t(le);
}
static void qd_router_flow(void *context, bool discard)
{
link_event_t *le = (link_event_t*) context;
if (!discard) {
qd_link_t *link = le->rlink->link;
pn_link_t *pn_link = qd_link_pn(link);
int delta = le->credit - pn_link_credit(pn_link);
if (delta > 0) {
pn_link_flow(pn_link, delta);
qd_link_activate(link);
}
}
free_link_event_t(le);
}
link_attach_result_t qd_router_link_route_LH(qd_router_t *router,
qd_router_link_t *rlink,
const char *term_addr,
qd_direction_t dir)
{
//
// Lookup the target address to see if we can link-route this attach.
//
qd_address_t *addr = router_lookup_terminus_LH(router, term_addr);
if (addr) {
//
// This is a link-attach routable target. Propagate the attach downrange.
// Check first for a locally connected container.
//
qd_link_t *link = rlink->link;
pn_link_t *pn_link = qd_link_pn(link);
qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
if (lrpref) {
qd_connection_t *conn = lrpref->lrp->container->conn;
if (conn) {
link_attach_t *la = new_link_attach_t();
la->router = router;
la->peer_link = rlink;
la->peer_qd_link = link;
la->link_name = strdup(pn_link_name(pn_link));
la->dir = dir;
la->conn = conn;
la->credit = pn_link_credit(pn_link);
qd_connection_invoke_deferred(conn, qd_router_attach_routed_link, la);
}
} else if (DEQ_SIZE(addr->rnodes) > 0) {
//
// There are no locally connected containers for this link but there is at
// least one on a remote router. Forward the attach toward the remote destination.
//
qd_router_node_t *remote_router = DEQ_HEAD(addr->rnodes)->router;
qd_router_link_t *out_link = 0;
if (remote_router)
out_link = remote_router->peer_link;
if (!out_link && remote_router && remote_router->next_hop)
out_link = remote_router->next_hop->peer_link;
if (out_link) {
qd_connection_t *out_conn = qd_link_connection(out_link->link);
if (out_conn) {
link_attach_t *la = new_link_attach_t();
la->router = router;
la->peer_link = rlink;
la->peer_qd_link = link;
la->link_name = strdup(pn_link_name(pn_link));
la->dir = dir;
la->conn = out_conn;
la->credit = pn_link_credit(pn_link);
qd_connection_invoke_deferred(out_conn, qd_router_attach_routed_link, la);
} else
return LINK_ATTACH_NO_PATH;
} else
return LINK_ATTACH_NO_PATH;
} else
return LINK_ATTACH_NO_PATH;
} else
return LINK_ATTACH_NO_MATCH;
return LINK_ATTACH_FORWARDED;
}
/**
* New Incoming Link Handler
*/
static int router_incoming_link_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
int is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
qd_log(router->log_source, QD_LOG_WARNING,
"Incoming link claims router capability but is not on an inter-router connection");
pn_link_close(pn_link);
return 0;
}
qd_router_link_t *rlink = new_qd_router_link_t();
DEQ_ITEM_INIT(rlink);
rlink->link_type = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
rlink->link_direction = QD_INCOMING;
rlink->owning_addr = 0;
rlink->waypoint = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->ref = 0;
rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
DEQ_INIT(rlink->deliveries);
if (!is_router && r_tgt) {
rlink->target = (char*) malloc(strlen(r_tgt) + 1);
strcpy(rlink->target, r_tgt);
}
qd_link_set_context(link, rlink);
sys_mutex_lock(router->lock);
rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
//
// Attempt to link-route this attach
//
link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
if (!is_router)
la_result = qd_router_link_route_LH(router, rlink, r_tgt, QD_OUTGOING);
sys_mutex_unlock(router->lock);
pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
switch (la_result) {
case LINK_ATTACH_NO_MATCH:
//
// We didn't link-route this attach. It terminates here.
// Open it in the reverse direction.
//
pn_link_flow(pn_link, 1000);
pn_link_open(pn_link);
break;
case LINK_ATTACH_NO_PATH: {
//
// The link should be routable but there is no path to the
// destination. Close the link.
//
pn_condition_t *cond = pn_link_condition(pn_link);
pn_condition_set_name(cond, "qd:no-route-to-dest");
pn_condition_set_description(cond, "No route to the destination node");
pn_link_close(pn_link);
break;
}
case LINK_ATTACH_FORWARDED:
//
// We routed the attach outbound. Don't open the link back until
// the downstream link is opened.
//
break;
}
return 0;
}
/**
* New Outgoing Link Handler
*/
static int router_outgoing_link_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
const char *r_src = pn_terminus_get_address(qd_link_remote_source(link));
int is_dynamic = pn_terminus_is_dynamic(qd_link_remote_source(link));
int is_router = qd_router_terminus_is_router(qd_link_remote_target(link));
int propagate = 0;
qd_field_iterator_t *iter = 0;
char phase = '0';
qd_address_semantics_t semantics;
qd_address_t *addr = 0;
link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
qd_log(router->log_source, QD_LOG_WARNING,
"Outgoing link claims router capability but is not on an inter-router connection");
pn_link_close(pn_link);
return 0;
}
//
// If this link is not a router link and it has no source address, we can't
// accept it.
//
if (r_src == 0 && !is_router && !is_dynamic) {
pn_link_close(pn_link);
return 0;
}
//
// If this is an endpoint link with a source address, make sure the address is
// appropriate for endpoint links. If it is not mobile address, it cannot be
// bound to an endpoint link.
//
if (r_src && !is_router && !is_dynamic) {
iter = qd_address_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
unsigned char prefix = qd_field_iterator_octet(iter);
qd_field_iterator_reset(iter);
if (prefix != 'M') {
qd_field_iterator_free(iter);
pn_link_close(pn_link);
qd_log(router->log_source, QD_LOG_WARNING,
"Rejected an outgoing endpoint link with a router address: %s", r_src);
return 0;
}
}
//
// Create a router_link record for this link. Some of the fields will be
// modified in the different cases below.
//
qd_router_link_t *rlink = new_qd_router_link_t();
DEQ_ITEM_INIT(rlink);
rlink->link_type = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
rlink->link_direction = QD_OUTGOING;
rlink->owning_addr = 0;
rlink->waypoint = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->ref = 0;
rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
DEQ_INIT(rlink->deliveries);
qd_link_set_context(link, rlink);
pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
//
// Determine the semantics for the address prior to taking out the lock.
//
if (is_dynamic || !iter)
semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE;
else {
semantics = router_semantics_for_addr(router, iter, '\0', &phase);
qd_address_iterator_set_phase(iter, phase);
qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
}
sys_mutex_lock(router->lock);
rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
if (is_router) {
//
// If this is a router link, put it in the hello_address link-list.
//
qd_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
rlink->owning_addr = router->hello_addr;
router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
} else {
//
// If this is an endpoint link, check the source. If it is dynamic, we will
// assign it an ephemeral and routable address. If it has a non-dynamic
// address, that address needs to be set up in the address list.
//
char temp_addr[1000]; // TODO: Use pn_string or aprintf.
const char *link_route_address = qd_router_terminus_dnp_address(qd_link_remote_source(link));
if (link_route_address == 0)
link_route_address = r_src;
la_result = qd_router_link_route_LH(router, rlink, link_route_address, QD_INCOMING);
if (la_result == LINK_ATTACH_NO_MATCH) {
if (is_dynamic) {
qd_router_generate_temp_addr(router, temp_addr, 1000);
iter = qd_address_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
pn_terminus_set_address(qd_link_source(link), temp_addr);
qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
} else
qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = qd_address(semantics);
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
rlink->owning_addr = addr;
qd_router_add_link_ref_LH(&addr->rlinks, rlink);
//
// If this is not a dynamic address and it is the first local subscription
// to the address, supply the address to the router module for propagation
// to other nodes.
//
propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
}
}
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
//
// If an interesting change has occurred with this address and it has an associated waypoint,
// notify the waypoint module so it can react appropriately.
//
if (propagate && addr->waypoint)
qd_waypoint_address_updated_LH(router->qd, addr);
sys_mutex_unlock(router->lock);
if (propagate)
qd_router_mobile_added(router, iter);
if (iter)
qd_field_iterator_free(iter);
switch (la_result) {
case LINK_ATTACH_NO_MATCH:
//
// We didn't link-route this attach. It terminates here.
// Open it in the reverse direction.
//
pn_link_open(pn_link);
break;
case LINK_ATTACH_NO_PATH: {
//
// The link should be routable but there is no path to the
// destination. Close the link.
//
pn_condition_t *cond = pn_link_condition(qd_link_pn(link));
pn_condition_set_name(cond, "qd:no-route-to-dest");
pn_condition_set_description(cond, "No route to the destination node");
pn_link_close(pn_link);
break;
}
case LINK_ATTACH_FORWARDED:
//
// We routed the attach outbound. Don't open the link back until
// the downstream link is opened.
//
break;
}
return 0;
}
/**
* Handler for remote opening of links that we initiated.
*/
static int router_link_attach_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
if (!rlink)
return 0;
sys_mutex_lock(router->lock);
qd_router_link_t *peer_rlink = rlink->connected_link;
if (peer_rlink) {
qd_connection_t *out_conn = qd_link_connection(peer_rlink->link);
if (out_conn) {
link_event_t *le = new_link_event_t();
memset(le, 0, sizeof(link_event_t));
le->router = router;
le->rlink = peer_rlink;
qd_connection_invoke_deferred(out_conn, qd_router_open_routed_link, le);
}
}
sys_mutex_unlock(router->lock);
return 0;
}
/**
* Handler for flow events on links
*/
static int router_link_flow_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
pn_link_t *pn_link = qd_link_pn(link);
if (!rlink)
return 0;
sys_mutex_lock(router->lock);
qd_router_link_t *peer_rlink = rlink->connected_link;
if (peer_rlink) {
qd_connection_t *out_conn = qd_link_connection(peer_rlink->link);
if (out_conn) {
if (rlink->link_direction == QD_OUTGOING) {
//
// Outgoing link handling
//
int credit = pn_link_remote_credit(pn_link) - DEQ_SIZE(rlink->msg_fifo);
if (credit > 0) {
link_event_t *le = new_link_event_t();
memset(le, 0, sizeof(link_event_t));
le->router = router;
le->rlink = peer_rlink;
le->credit = credit;
le->drain = false;
qd_connection_invoke_deferred(out_conn, qd_router_flow, le);
}
} else {
//
// Incoming link handling
//
}
}
}
sys_mutex_unlock(router->lock);
return 0;
}
/**
* Link Detached Handler
*/
static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
{
qd_router_t *router = (qd_router_t*) context;
qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
qd_address_t *oaddr = 0;
int lost_link_mask_bit = -1;
if (!rlink)
return 0;
sys_mutex_lock(router->lock);
if (rlink->connected_link) {
qd_connection_t *out_conn = qd_link_connection(rlink->connected_link->link);
if (out_conn) {
link_detach_t *ld = new_link_detach_t();
memset(ld, 0, sizeof(link_detach_t));
ld->router = router;
ld->rlink = rlink->connected_link;
pn_condition_t *cond = pn_link_remote_condition(qd_link_pn(link));
if (pn_condition_is_set(cond)) {
if (pn_condition_get_name(cond)) {
strncpy(ld->condition_name, pn_condition_get_name(cond), COND_NAME_LEN);
ld->condition_name[COND_NAME_LEN] = '\0';
}
if (pn_condition_get_description(cond)) {
strncpy(ld->condition_description, pn_condition_get_description(cond), COND_DESCRIPTION_LEN);
ld->condition_description[COND_DESCRIPTION_LEN] = '\0';
}
if (pn_condition_info(cond)) {
ld->condition_info = pn_data(0);
pn_data_copy(ld->condition_info, pn_condition_info(cond));
}
} else if (dt == QD_LOST) {
strcpy(ld->condition_name, "qd:routed-link-lost");
strcpy(ld->condition_description, "Connectivity to the peer container was lost");
}
rlink->connected_link->connected_link = 0;
qd_connection_invoke_deferred(out_conn, qd_router_detach_routed_link, ld);
}
}
//
// If this link is part of an inter-router connection, drop the
// reference count. If this is the last link on the connection,
// free the mask-bit and the shared connection record.
//
if (shared) {
shared->ref_count--;
if (shared->ref_count == 0) {
lost_link_mask_bit = rlink->mask_bit;
qd_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
qd_link_set_conn_context(link, 0);
free_qd_router_conn_t(shared);
}
}
//
// If the link is outgoing, we must disassociate it from its address.
//
if (rlink->link_direction == QD_OUTGOING && rlink->owning_addr) {
qd_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
oaddr = rlink->owning_addr;
}
//
// If this is an outgoing inter-router link, we must remove the by-mask-bit
// index reference to this link.
//
if (rlink->link_type == QD_LINK_ROUTER && rlink->link_direction == QD_OUTGOING) {
if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink)
router->out_links_by_mask_bit[rlink->mask_bit] = 0;
else
qd_log(router->log_source, QD_LOG_CRITICAL,
"Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
}
//
// Remove the link from the master list-of-links and deallocate
//
DEQ_REMOVE(router->links, rlink);
qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, rlink);
qd_router_link_free_LH(rlink);
sys_mutex_unlock(router->lock);
//
// Check to see if the owning address should be deleted
//
qd_router_check_addr(router, oaddr, 1);
//
// If we lost the link to a neighbor router, notify the route engine so it doesn't
// have to wait for the HELLO timeout to expire.
//
if (lost_link_mask_bit >= 0)
qd_router_link_lost(router, lost_link_mask_bit);
return 0;
}
static void router_inbound_open_handler(void *type_context, qd_connection_t *conn, void *context)
{
}
static void router_outbound_open_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
//
// If the connection is on-demand, visit all waypoints that are waiting for their
// connection to arrive.
//
if (qd_router_connection_is_on_demand(conn)) {
qd_waypoint_connection_opened(router->qd, (qd_config_connector_t*) context, conn);
return;
}
//
// If the connection isn't inter-router, ignore it.
//
if (!qd_router_connection_is_inter_router(conn))
return;
qd_link_t *sender;
qd_link_t *receiver;
qd_router_link_t *rlink;
int mask_bit = 0;
size_t clen = strlen(QD_CAPABILITY_ROUTER);
//
// Allocate a mask bit to designate the pair of links connected to the neighbor router
//
sys_mutex_lock(router->lock);
if (qd_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
qd_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
} else {
sys_mutex_unlock(router->lock);
qd_log(router->log_source, QD_LOG_CRITICAL, "Exceeded maximum inter-router link count");
return;
}
//
// Create an incoming link with router source capability
//
receiver = qd_link(router->node, conn, QD_INCOMING, QD_INTERNODE_LINK_NAME_1);
// TODO - We don't want to have to cast away the constness of the literal string here!
// See PROTON-429
pn_data_put_symbol(pn_terminus_capabilities(qd_link_target(receiver)),
pn_bytes(clen, (char*) QD_CAPABILITY_ROUTER));
rlink = new_qd_router_link_t();
DEQ_ITEM_INIT(rlink);
rlink->mask_bit = mask_bit;
rlink->link_type = QD_LINK_ROUTER;
rlink->link_direction = QD_INCOMING;
rlink->owning_addr = 0;
rlink->waypoint = 0;
rlink->link = receiver;
rlink->connected_link = 0;
rlink->ref = 0;
rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
DEQ_INIT(rlink->deliveries);
qd_link_set_context(receiver, rlink);
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
//
// Create an outgoing link with router target capability
//
sender = qd_link(router->node, conn, QD_OUTGOING, QD_INTERNODE_LINK_NAME_2);
// TODO - We don't want to have to cast away the constness of the literal string here!
// See PROTON-429
pn_data_put_symbol(pn_terminus_capabilities(qd_link_source(sender)),
pn_bytes(clen, (char *) QD_CAPABILITY_ROUTER));
rlink = new_qd_router_link_t();
DEQ_ITEM_INIT(rlink);
rlink->mask_bit = mask_bit;
rlink->link_type = QD_LINK_ROUTER;
rlink->link_direction = QD_OUTGOING;
rlink->owning_addr = router->hello_addr;
rlink->waypoint = 0;
rlink->link = sender;
rlink->connected_link = 0;
rlink->ref = 0;
rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
DEQ_INIT(rlink->deliveries);
//
// Add the new outgoing link to the hello_address's list of links.
//
qd_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
//
// Index this link from the by-maskbit index so we can later find it quickly
// when provided with the mask bit.
//
router->out_links_by_mask_bit[mask_bit] = rlink;
qd_link_set_context(sender, rlink);
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
pn_link_open(qd_link_pn(receiver));
pn_link_open(qd_link_pn(sender));
pn_link_flow(qd_link_pn(receiver), 1000);
}
static void qd_router_timer_handler(void *context)
{
qd_router_t *router = (qd_router_t*) context;
//
// Periodic processing.
//
qd_pyrouter_tick(router);
qd_timer_schedule(router->timer, 1000);
}
static qd_node_type_t router_node = {"router", 0, 0,
router_rx_handler,
router_disposition_handler,
router_incoming_link_handler,
router_outgoing_link_handler,
router_writable_link_handler,
router_link_detach_handler,
router_link_attach_handler,
router_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
router_inbound_open_handler,
router_outbound_open_handler };
static int type_registered = 0;
qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id)
{
if (!type_registered) {
type_registered = 1;
qd_container_register_node_type(qd, &router_node);
}
size_t dplen = 9 + strlen(area) + strlen(id);
direct_prefix = (char*) malloc(dplen);
strcpy(direct_prefix, "_topo/");
strcat(direct_prefix, area);
strcat(direct_prefix, "/");
strcat(direct_prefix, id);
strcat(direct_prefix, "/");
node_id = (char*) malloc(dplen);
strcpy(node_id, area);
strcat(node_id, "/");
strcat(node_id, id);
qd_router_t *router = NEW(qd_router_t);
ZERO(router);
router_node.type_context = router;
qd->router = router;
router->qd = qd;
router->log_source = qd_log_source("ROUTER");
router->router_mode = mode;
router->router_area = area;
router->router_id = id;
router->node = qd_container_set_default_node_type(qd, &router_node, (void*) router, QD_DIST_BOTH);
DEQ_INIT(router->addrs);
router->addr_hash = qd_hash(10, 32, 0);
DEQ_INIT(router->links);
DEQ_INIT(router->routers);
DEQ_INIT(router->lrp_containers);
router->out_links_by_mask_bit = NEW_PTR_ARRAY(qd_router_link_t, qd_bitmask_width());
router->routers_by_mask_bit = NEW_PTR_ARRAY(qd_router_node_t, qd_bitmask_width());
for (int idx = 0; idx < qd_bitmask_width(); idx++) {
router->out_links_by_mask_bit[idx] = 0;
router->routers_by_mask_bit[idx] = 0;
}
router->neighbor_free_mask = qd_bitmask(1);
router->lock = sys_mutex();
router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router);
router->dtag = 1;
DEQ_INIT(router->config_addrs);
DEQ_INIT(router->waypoints);
//
// Create addresses for all of the routers in the topology. It will be registered
// locally later in the initialization sequence.
//
if (router->router_mode == QD_ROUTER_MODE_INTERIOR) {
router->router_addr = qd_router_register_address(qd, "qdrouter", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 0, 0, QD_SEMANTICS_DEFAULT, false, 0);
router->hello_addr = qd_router_register_address(qd, "qdhello", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
}
//
// Inform the field iterator module of this router's id and area. The field iterator
// uses this to offload some of the address-processing load from the router.
//
qd_field_iterator_set_address(area, id);
//
// Seed the random number generator
//
unsigned int seed = (unsigned int) time(0);
srandom(seed);
switch (router->router_mode) {
case QD_ROUTER_MODE_STANDALONE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Standalone mode"); break;
case QD_ROUTER_MODE_INTERIOR: qd_log(router->log_source, QD_LOG_INFO, "Router started in Interior mode, area=%s id=%s", area, id); break;
case QD_ROUTER_MODE_EDGE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Edge mode"); break;
case QD_ROUTER_MODE_ENDPOINT: qd_log(router->log_source, QD_LOG_INFO, "Router started in Endpoint mode"); break;
}
return router;
}
void qd_router_setup_late(qd_dispatch_t *qd)
{
qd_router_python_setup(qd->router);
qd_timer_schedule(qd->router->timer, 1000);
}
void qd_router_free(qd_router_t *router)
{
if (!router) return;
qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
for (qd_address_t *addr = DEQ_HEAD(router->addrs); addr; addr = DEQ_HEAD(router->addrs)) {
for (qd_router_link_ref_t *rlink = DEQ_HEAD(addr->rlinks); rlink; rlink = DEQ_HEAD(addr->rlinks)) {
DEQ_REMOVE_HEAD(addr->rlinks);
free_qd_router_link_ref_t(rlink);
}
for (qd_router_ref_t *rnode = DEQ_HEAD(addr->rnodes); rnode; rnode = DEQ_HEAD(addr->rnodes)) {
DEQ_REMOVE_HEAD(addr->rnodes);
free_qd_router_ref_t(rnode);
}
qd_hash_handle_free(addr->hash_handle);
DEQ_REMOVE_HEAD(router->addrs);
qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, addr);
free_qd_address_t(addr);
}
qd_timer_free(router->timer);
sys_mutex_free(router->lock);
qd_bitmask_free(router->neighbor_free_mask);
free(router->out_links_by_mask_bit);
free(router->routers_by_mask_bit);
qd_hash_free(router->addr_hash);
qd_router_configure_free(router);
qd_router_python_free(router);
free(router);
free(node_id);
free(direct_prefix);
}
const char *qd_router_id(const qd_dispatch_t *qd)
{
return node_id;
}
qd_address_t *qd_router_register_address(qd_dispatch_t *qd,
const char *address,
qd_router_message_cb_t on_message,
void *context,
qd_address_semantics_t semantics,
bool global,
qd_router_forwarder_t *forwarder)
{
char addr_string[1000];
qd_router_t *router = qd->router;
qd_address_t *addr = 0;
qd_field_iterator_t *iter = 0;
snprintf(addr_string, sizeof(addr_string), "%s%s", global ? "M0" : "L", address);
iter = qd_address_iterator_string(addr_string, ITER_VIEW_NO_HOST);
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = qd_address(semantics);
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
qd_field_iterator_free(iter);
addr->on_message = on_message;
addr->on_message_context = context;
if (forwarder) {
if (addr->forwarder) addr->forwarder->release(addr->forwarder);
addr->forwarder = forwarder;
}
sys_mutex_unlock(router->lock);
if (on_message)
qd_log(router->log_source, QD_LOG_INFO, "In-Process Address Registered: %s", address);
assert(addr);
return addr;
}
void qd_router_unregister_address(qd_address_t *ad)
{
// if (ad->forwarder) ad->forwarder->release(ad->forwarder);
//free_qd_address_t(ad);
}
void qd_address_set_redirect(qd_address_t *address, qd_address_t *redirect)
{
address->redirect = redirect;
}
void qd_address_set_static_cc(qd_address_t *address, qd_address_t *cc)
{
address->static_cc = cc;
}
void qd_address_set_dynamic_cc(qd_address_t *address, qd_address_t *cc)
{
address->dynamic_cc = cc;
}
qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
qd_field_iterator_t *addr_iter,
bool *is_local, bool *is_direct)
{
qd_address_t *addr = 0;
qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
qd_hash_retrieve(router->addr_hash, addr_iter, (void*) &addr);
qd_address_iterator_reset_view(addr_iter, ITER_VIEW_NO_HOST);
*is_local = (bool) qd_field_iterator_prefix(addr_iter, local_prefix);
*is_direct = (bool) qd_field_iterator_prefix(addr_iter, direct_prefix);
return addr;
}
void qd_router_send(qd_dispatch_t *qd,
qd_field_iterator_t *address,
qd_message_t *msg)
{
qd_router_t *router = qd->router;
qd_address_t *addr;
qd_address_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, address, (void*) &addr);
if (addr) {
//
// Forward to all of the local links receiving this address.
//
addr->deliveries_from_container++;
qd_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
while (dest_link_ref) {
qd_routed_event_t *re = new_qd_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = qd_message_copy(msg);
re->settle = 0;
re->disposition = 0;
DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
qd_link_activate(dest_link_ref->link->link);
addr->deliveries_egress++;
dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
//
qd_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
qd_router_link_t *dest_link;
qd_bitmask_t *link_set = qd_bitmask(0);
while (dest_node_ref) {
if (dest_node_ref->router->next_hop)
dest_link = dest_node_ref->router->next_hop->peer_link;
else
dest_link = dest_node_ref->router->peer_link;
if (dest_link)
qd_bitmask_set_bit(link_set, dest_link->mask_bit);
dest_node_ref = DEQ_NEXT(dest_node_ref);
}
int link_bit;
while (qd_bitmask_first_set(link_set, &link_bit)) {
qd_bitmask_clear_bit(link_set, link_bit);
dest_link = router->out_links_by_mask_bit[link_bit];
if (dest_link) {
qd_routed_event_t *re = new_qd_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = qd_message_copy(msg);
re->settle = 0;
re->disposition = 0;
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
qd_link_activate(dest_link->link);
addr->deliveries_transit++;
}
}
qd_bitmask_free(link_set);
}
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
}
void qd_router_send2(qd_dispatch_t *qd,
const char *address,
qd_message_t *msg)
{
qd_field_iterator_t *iter = qd_address_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
qd_router_send(qd, iter, msg);
qd_field_iterator_free(iter);
}