blob: af6e5c237247fae8a15d7988e29b70426fec3099 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "addr_proxy.h"
#include "core_events.h"
#include "core_link_endpoint.h"
#include "router_core_private.h"
#include "qpid/dispatch/amqp.h"
#include "qpid/dispatch/message.h"
#include "qpid/dispatch/iterator.h"
#include "qpid/dispatch/parse.h"
#include <stdio.h>
#include <inttypes.h>
//
// This is the Address Proxy component of the Edge Router module.
//
// Address Proxy has the following responsibilities:
//
// Related to dynamic (topological) addresses:
//
// 1) When an edge connection becomes active, the "_edge" address is properly linked to an
// outgoing anonymous link on the active edge connection.
//
// 2) When an edge connection becomes active, an incoming link is established over the edge
// connection that is used to transfer deliveries to topological (dynamic) addresses
// on the edge router.
//
// Related to mobile addresses:
//
// 3) Ensure that if there is an active edge connection, that connection should have one incoming
// link for every mobile address for which there is at least one local consumer.
//
// 4) Ensure that if there is an active edge connection, that connection should have one outgoing
// link for every mobile address for which there is at least one local producer.
//
// 5) Maintain an incoming link for edge-address-tracking attached to the edge-address-tracker
// in the connected interior router.
//
// 6) Handle address tracking updates indicating which producer-addresses have destinations
// reachable via the edge connection.
//
#define INITIAL_CREDIT 32
struct qcm_edge_addr_proxy_t {
qdr_core_t *core;
qdrc_event_subscription_t *event_sub;
bool edge_conn_established;
qdr_address_t *edge_conn_addr;
qdr_connection_t *edge_conn;
qdrc_endpoint_t *tracking_endpoint;
qdrc_endpoint_desc_t endpoint_descriptor;
};
static qdr_terminus_t *qdr_terminus_edge_downlink(const char *addr)
{
qdr_terminus_t *term = qdr_terminus(0);
qdr_terminus_add_capability(term, QD_CAPABILITY_EDGE_DOWNLINK);
if (addr)
qdr_terminus_set_address(term, addr);
return term;
}
static qdr_terminus_t *qdr_terminus_normal(const char *addr)
{
qdr_terminus_t *term = qdr_terminus(0);
if (addr)
qdr_terminus_set_address(term, addr);
return term;
}
static void set_fallback_capability(qdr_terminus_t *term)
{
qdr_terminus_add_capability(term, QD_CAPABILITY_FALLBACK);
}
static void set_waypoint_capability(qdr_terminus_t *term, char phase_char, qd_direction_t dir, int in_phase, int out_phase)
{
int phase = (int) (phase_char - '0');
bool fallback = phase_char == QD_ITER_HASH_PHASE_FALLBACK;
char cap[16];
char suffix[3];
if (fallback) {
strncpy(cap, QD_CAPABILITY_FALLBACK, 15);
qdr_terminus_add_capability(term, cap);
return;
}
//
// For links that are outgoing on the in_phase or incoming on the out_phase, don't set the
// waypoint capability. These links will behave like normal client links.
//
if ((dir == QD_OUTGOING && phase == in_phase) ||
(dir == QD_INCOMING && phase == out_phase))
return;
//
// If the phase is outside the range of in_phase..out_phase, don't do anything. This is a
// misconfiguration.
//
if (phase < in_phase || phase > out_phase)
return;
//
// In all remaining cases, the new links are acting as waypoints.
//
int ordinal = phase + (dir == QD_OUTGOING ? 0 : 1);
strncpy(cap, QD_CAPABILITY_WAYPOINT_DEFAULT, 15);
suffix[0] = '.';
suffix[1] = '0' + ordinal;
suffix[2] = '\0';
strcat(cap, suffix);
qdr_terminus_add_capability(term, cap);
}
static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr)
{
if (addr->edge_inlink == 0) {
qdr_terminus_t *term = qdr_terminus_normal(key + 2);
const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle);
if (key[1] == QD_ITER_HASH_PHASE_FALLBACK) {
set_fallback_capability(term);
} else if (addr->config && addr->config->out_phase > 0) {
//
// If this address is configured as multi-phase, we may need to
// add waypoint capabilities to the terminus.
//
if (key[0] == QD_ITER_HASH_PREFIX_MOBILE)
set_waypoint_capability(term, key[1], QD_INCOMING, addr->config->in_phase, addr->config->out_phase);
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_INCOMING,
term, qdr_terminus_normal(0));
qdr_core_bind_address_link_CT(ap->core, addr, link);
addr->edge_inlink = link;
}
}
static void del_inlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr)
{
qdr_link_t *link = addr->edge_inlink;
if (link) {
addr->edge_inlink = 0;
qdr_core_unbind_address_link_CT(ap->core, addr, link);
qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true);
}
}
static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr)
{
if (addr->edge_outlink == 0 && DEQ_SIZE(addr->subscriptions) == 0) {
//
// Note that this link must not be bound to the address at this time. That will
// happen later when the interior tells us that there are upstream destinations
// for the address (see on_transfer below).
//
qdr_terminus_t *term = qdr_terminus_normal(key + 2);
const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle);
if (key[1] == QD_ITER_HASH_PHASE_FALLBACK) {
set_fallback_capability(term);
} else if (addr->config && addr->config->out_phase > 0) {
//
// If this address is configured as multi-phase, we may need to
// add waypoint capabilities to the terminus.
//
const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle);
if (key[0] == QD_ITER_HASH_PREFIX_MOBILE)
set_waypoint_capability(term, key[1], QD_OUTGOING, addr->config->in_phase, addr->config->out_phase);
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_OUTGOING,
qdr_terminus_normal(0), term);
addr->edge_outlink = link;
}
}
static void del_outlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr)
{
qdr_link_t *link = addr->edge_outlink;
if (link) {
addr->edge_outlink = 0;
qdr_core_unbind_address_link_CT(ap->core, addr, link);
qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true);
}
}
static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context;
switch (event) {
case QDRC_EVENT_CONN_EDGE_ESTABLISHED : {
//
// Flag the edge connection as being established.
//
ap->edge_conn_established = true;
ap->edge_conn = conn;
//
// Attach an anonymous sending link to the interior router.
//
qdr_link_t *out_link = qdr_create_link_CT(ap->core, conn,
QD_LINK_ENDPOINT, QD_OUTGOING,
qdr_terminus(0), qdr_terminus(0));
//
// Associate the anonymous sender with the edge connection address. This will cause
// all deliveries destined off-edge to be sent to the interior via the edge connection.
//
qdr_core_bind_address_link_CT(ap->core, ap->edge_conn_addr, out_link);
//
// Attach a receiving link for edge summary. This will cause all deliveries
// destined for this router to be delivered via the edge connection.
//
(void) qdr_create_link_CT(ap->core, conn,
QD_LINK_ENDPOINT, QD_INCOMING,
qdr_terminus_edge_downlink(ap->core->router_id),
qdr_terminus_edge_downlink(0));
//
// Attach a receiving link for edge address tracking updates.
//
ap->tracking_endpoint =
qdrc_endpoint_create_link_CT(ap->core, conn, QD_INCOMING,
qdr_terminus_normal(QD_TERMINUS_EDGE_ADDRESS_TRACKING),
qdr_terminus(0), &ap->endpoint_descriptor, ap);
//
// Process eligible local destinations
//
qdr_address_t *addr = DEQ_HEAD(ap->core->addrs);
while (addr) {
const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
if (*key == QD_ITER_HASH_PREFIX_MOBILE) {
//
// Nullify the edge link references in case there are any left over from an earlier
// instance of an edge connection.
//
addr->edge_inlink = 0;
addr->edge_outlink = 0;
//
// If the address has more than zero attached destinations, create an
// incoming link from the interior to signal the presence of local consumers.
//
if (DEQ_SIZE(addr->rlinks) > 0) {
if (DEQ_SIZE(addr->rlinks) == 1) {
//
// If there's only one link and it's on the edge connection, ignore the address.
//
qdr_link_ref_t *ref = DEQ_HEAD(addr->rlinks);
if (ref->link->conn != ap->edge_conn)
add_inlink(ap, key, addr);
} else
add_inlink(ap, key, addr);
}
//
// If the address has more than zero attached sources, create an outgoing link
// to the interior to signal the presence of local producers.
//
bool add = false;
if (DEQ_SIZE(addr->inlinks) > 0) {
if (DEQ_SIZE(addr->inlinks) == 1) {
//
// If there's only one link and it's on the edge connection, ignore the address.
//
qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks);
if (ref->link->conn != ap->edge_conn)
add = true;
} else
add = true;
if (add) {
add_outlink(ap, key, addr);
//
// If the address has a fallback address, add an outlink for that as well
//
if (!!addr->fallback)
add_outlink(ap, key, addr->fallback);
}
}
}
addr = DEQ_NEXT(addr);
}
break;
}
case QDRC_EVENT_CONN_EDGE_LOST :
ap->edge_conn_established = false;
ap->edge_conn = 0;
break;
default:
assert(false);
break;
}
}
static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context;
qdr_link_ref_t *link_ref;
//
// If we don't have an established edge connection, there is no further work to be done.
//
if (!ap->edge_conn_established)
return;
//
// If the address is not in the Mobile class, no further processing is needed.
//
const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
if (*key != QD_ITER_HASH_PREFIX_MOBILE)
return;
switch (event) {
case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST :
//
// Add an edge connection for this address only if the local destination is
// not the link to the interior.
//
link_ref = DEQ_HEAD(addr->rlinks);
if (link_ref->link->conn != ap->edge_conn)
add_inlink(ap, key, addr);
break;
case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST :
del_inlink(ap, addr);
break;
case QDRC_EVENT_ADDR_ONE_LOCAL_DEST :
//
// If the remaining local destination is the link to the interior,
// remove the inlink for this address.
//
link_ref = DEQ_HEAD(addr->rlinks);
if (link_ref->link->conn == ap->edge_conn)
del_inlink(ap, addr);
break;
case QDRC_EVENT_ADDR_TWO_DEST :
add_inlink(ap, key, addr);
break;
case QDRC_EVENT_ADDR_BECAME_SOURCE :
link_ref = DEQ_HEAD(addr->inlinks);
if (!link_ref || link_ref->link->conn != ap->edge_conn)
add_outlink(ap, key, addr);
break;
case QDRC_EVENT_ADDR_NO_LONGER_SOURCE :
del_outlink(ap, addr);
break;
case QDRC_EVENT_ADDR_TWO_SOURCE :
add_outlink(ap, key, addr);
break;
case QDRC_EVENT_ADDR_ONE_SOURCE :
link_ref = DEQ_HEAD(addr->inlinks);
if (!link_ref || link_ref->link->conn == ap->edge_conn)
del_outlink(ap, addr);
break;
default:
assert(false);
break;
}
}
static void on_second_attach(void *link_context,
qdr_terminus_t *remote_source,
qdr_terminus_t *remote_target)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) link_context;
qdrc_endpoint_flow_CT(ap->core, ap->tracking_endpoint, INITIAL_CREDIT, false);
qdr_terminus_free(remote_source);
qdr_terminus_free(remote_target);
}
static void on_transfer(void *link_context,
qdr_delivery_t *dlv,
qd_message_t *msg)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) link_context;
uint64_t dispo = PN_ACCEPTED;
//
// Validate the message
//
if (qd_message_check_depth(msg, QD_DEPTH_BODY) == QD_MESSAGE_DEPTH_OK) {
//
// Get the message body. It must be a list with two elements. The first is an address
// and the second is a boolean indicating whether that address has upstream destinations.
//
qd_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
qd_parsed_field_t *body = qd_parse(iter);
if (!!body && qd_parse_is_list(body) && qd_parse_sub_count(body) == 2) {
qd_parsed_field_t *addr_field = qd_parse_sub_value(body, 0);
qd_parsed_field_t *dest_field = qd_parse_sub_value(body, 1);
if (qd_parse_is_scalar(addr_field) && qd_parse_is_scalar(dest_field)) {
qd_iterator_t *addr_iter = qd_parse_raw(addr_field);
bool dest = qd_parse_as_bool(dest_field);
qdr_address_t *addr;
qd_iterator_reset_view(addr_iter, ITER_VIEW_ALL);
qd_hash_retrieve(ap->core->addr_hash, addr_iter, (void**) &addr);
if (addr) {
qdr_link_t *link = addr->edge_outlink;
if (link) {
if (dest)
qdr_core_bind_address_link_CT(ap->core, addr, link);
else
qdr_core_unbind_address_link_CT(ap->core, addr, link);
}
}
}
}
qd_parse_free(body);
qd_iterator_free(iter);
} else {
qd_log(ap->core->log, QD_LOG_ERROR,
"Edge Address Proxy: received an invalid message body, rejecting");
dispo = PN_REJECTED;
}
qdrc_endpoint_settle_CT(ap->core, dlv, dispo);
//
// Replenish the credit for this delivery
//
qdrc_endpoint_flow_CT(ap->core, ap->tracking_endpoint, 1, false);
}
qdr_address_t *qcm_edge_conn_addr(void *link_context)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) link_context;
if (!ap)
return 0;
return ap->edge_conn_addr;
}
static void on_cleanup(void *link_context)
{
qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) link_context;
ap->tracking_endpoint = 0;
}
qcm_edge_addr_proxy_t *qcm_edge_addr_proxy(qdr_core_t *core)
{
qcm_edge_addr_proxy_t *ap = NEW(qcm_edge_addr_proxy_t);
ZERO(ap);
ap->core = core;
ap->endpoint_descriptor.label = "Edge Address Proxy";
ap->endpoint_descriptor.on_second_attach = on_second_attach;
ap->endpoint_descriptor.on_transfer = on_transfer;
ap->endpoint_descriptor.on_cleanup = on_cleanup;
//
// Establish the edge connection address to represent destinations reachable via the edge connection
//
ap->edge_conn_addr = qdr_add_local_address_CT(core, 'L', "_edge", QD_TREATMENT_ANYCAST_CLOSEST);
//
// Subscribe to the core events we'll need to drive this component
//
ap->event_sub = qdrc_event_subscribe_CT(core,
QDRC_EVENT_CONN_EDGE_ESTABLISHED
| QDRC_EVENT_CONN_EDGE_LOST
| QDRC_EVENT_ADDR_BECAME_LOCAL_DEST
| QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST
| QDRC_EVENT_ADDR_ONE_LOCAL_DEST
| QDRC_EVENT_ADDR_TWO_DEST
| QDRC_EVENT_ADDR_BECAME_SOURCE
| QDRC_EVENT_ADDR_NO_LONGER_SOURCE
| QDRC_EVENT_ADDR_TWO_SOURCE
| QDRC_EVENT_ADDR_ONE_SOURCE,
on_conn_event,
0,
on_addr_event,
0,
ap);
core->edge_conn_addr = qcm_edge_conn_addr;
core->edge_context = ap;
return ap;
}
void qcm_edge_addr_proxy_final(qcm_edge_addr_proxy_t *ap)
{
qdrc_event_unsubscribe_CT(ap->core, ap->event_sub);
free(ap);
}