blob: 5c6fe3781b9b4d0694562048d091621e23824498 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "dispatch_private.h"
/** defines a default set of forwarding behaviors based on the semantics of an
* address.
*/
static void forward_to_direct_subscribers_LH(qd_address_t *addr,
qd_router_delivery_t *delivery,
qd_message_t *msg,
int *fanout)
{
qd_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
while (dest_link_ref) {
qd_routed_event_t *re = new_qd_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = qd_message_copy(msg);
re->settle = 0;
re->disposition = 0;
DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
(*fanout)++;
if (*fanout == 1) {
re->delivery = delivery;
qd_router_delivery_fifo_enter_LH(delivery);
}
addr->deliveries_egress++;
qd_link_activate(dest_link_ref->link->link);
//
// If the fanout is single, exit the loop here. We only want to send one message copy.
//
if (QD_FANOUT(addr->semantics) == QD_FANOUT_SINGLE)
break;
dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// If dest_link_ref is not null here, we exited after sending one message copy.
// If the number of local links is greater than one, rotate the head link to the
// tail so we balance the message deliveries.
//
if (dest_link_ref && DEQ_SIZE(addr->rlinks) > 1) {
assert(DEQ_HEAD(addr->rlinks) == dest_link_ref);
DEQ_REMOVE_HEAD(addr->rlinks);
DEQ_INSERT_TAIL(addr->rlinks, dest_link_ref);
}
}
static void forward_to_remote_subscribers_LH(qd_router_t *router,
qd_address_t *addr,
qd_router_delivery_t *delivery,
qd_message_t *msg,
int *fanout,
qd_field_iterator_t *ingress_iter)
{
//
// Get the mask bit associated with the ingress router for the message.
// This will be compared against the "valid_origin" masks for each
// candidate destination router.
//
int origin = -1;
if (ingress_iter && !(addr->semantics & QD_BYPASS_VALID_ORIGINS)) {
qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
qd_address_t *origin_addr;
qd_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
qd_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
origin = rref->router->mask_bit;
}
} else
origin = 0;
//
// Forward to the next-hops for remote destinations.
//
if (origin >= 0) {
qd_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
qd_router_link_t *dest_link;
qd_bitmask_t *link_set = qd_bitmask(0);
//
// Loop over the target nodes for this address. Build a set of outgoing links
// for which there are valid targets. We do this to avoid sending more than one
// message down a given link. It's possible that there are multiple destinations
// for this address that are all reachable over the same link. In this case, we
// will send only one copy of the message over the link and allow a downstream
// router to fan the message out.
//
while (dest_node_ref) {
if (dest_node_ref->router->next_hop)
dest_link = dest_node_ref->router->next_hop->peer_link;
else
dest_link = dest_node_ref->router->peer_link;
if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin))
qd_bitmask_set_bit(link_set, dest_link->mask_bit);
dest_node_ref = DEQ_NEXT(dest_node_ref);
}
//
// Send a copy of the message outbound on each identified link.
//
int link_bit;
while (qd_bitmask_first_set(link_set, &link_bit)) {
qd_bitmask_clear_bit(link_set, link_bit);
dest_link = router->out_links_by_mask_bit[link_bit];
if (dest_link) {
qd_routed_event_t *re = new_qd_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = qd_message_copy(msg);
re->settle = 0;
re->disposition = 0;
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
(*fanout)++;
if (*fanout == 1) {
re->delivery = delivery;
qd_router_delivery_fifo_enter_LH(delivery);
}
addr->deliveries_transit++;
qd_link_activate(dest_link->link);
}
}
qd_bitmask_free(link_set);
}
}
/** Multicast forwarder:
*/
static bool forwarder_multicast_LH(qd_router_forwarder_t *forwarder,
qd_router_t *router,
qd_message_t *msg,
qd_router_delivery_t *delivery,
qd_address_t *addr,
qd_field_iterator_t *ingress_iter,
bool is_direct)
{
int fanout = 0;
//
// Forward to all of the local links receiving this address.
//
forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
//
// If the address form is direct to this router node, don't relay it on to
// any other part of the network.
//
if (!is_direct)
forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
return fanout != 0;
}
/** Forward using the 'closest' bias:
*/
static bool forwarder_anycast_closest_LH(qd_router_forwarder_t *forwarder,
qd_router_t *router,
qd_message_t *msg,
qd_router_delivery_t *delivery,
qd_address_t *addr,
qd_field_iterator_t *ingress_iter,
bool is_direct)
{
int fanout = 0;
//
// First, try to find a directly connected consumer for the address. If
// there is none, then look for the closest remote consumer.
//
forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
if (fanout == 0 && !is_direct)
forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
return fanout != 0;
}
/** Forwarding using a 'balanced' bias:
*/
static bool forwarder_anycast_balanced_LH(qd_router_forwarder_t *forwarder,
qd_router_t *router,
qd_message_t *msg,
qd_router_delivery_t *delivery,
qd_address_t *addr,
qd_field_iterator_t *ingress_iter,
bool is_direct)
{
int fanout = 0;
//
// Alternate between looking first for a local consumer and looking first
// for a remote consumer.
//
addr->toggle = !addr->toggle;
if (addr->toggle) {
forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
if (fanout == 0 && !is_direct)
forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
} else {
if (!is_direct)
forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
if (fanout == 0)
forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
}
return fanout != 0;
}
/* release method for default forwarders:
*/
static void forwarder_release(qd_router_forwarder_t *forwarder)
{
// no-op - they're static singletons!
}
/* The default forwarders:
*/
static qd_router_forwarder_t multicast_forwarder = {
forwarder_multicast_LH, /* forward method */
forwarder_release,
};
static qd_router_forwarder_t anycast_closest_forwarder = {
forwarder_anycast_closest_LH, /* forward method */
forwarder_release,
};
static qd_router_forwarder_t anycast_balanced_forwarder = {
forwarder_anycast_balanced_LH, /* forward method */
forwarder_release,
};
/** Get the proper default forwarder for an address of the given semantics:
*/
qd_router_forwarder_t *qd_router_get_forwarder(qd_address_semantics_t semantics)
{
switch (QD_FANOUT(semantics)) {
case QD_FANOUT_MULTIPLE:
return &multicast_forwarder;
case QD_FANOUT_SINGLE:
switch (QD_BIAS(semantics)) {
case QD_BIAS_CLOSEST:
return &anycast_closest_forwarder;
case QD_BIAS_SPREAD:
return &anycast_balanced_forwarder;
}
}
assert(false); // invalid semantics? need new forwarder?
return 0;
}