blob: 5bf1bbb6a86fdf7d4eb8f607c3f9916b603b90b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include <inttypes.h>
#include <stdio.h>
#include "router_core_private.h"
#include "forwarder.h"
#include "exchange_bindings.h"
#include "delivery.h"
// next_hop_t
// Describes the destination of a forwarded message
// May be shared by different bindings
//
typedef struct next_hop_t next_hop_t;
struct next_hop_t
{
// per-exchange list of all next hops
DEQ_LINKS_N(exchange_list, next_hop_t);
// when hooked to the transmit list
DEQ_LINKS_N(transmit_list, next_hop_t);
int ref_count; // binding references
int phase;
bool on_xmit_list;
qdr_exchange_t *exchange;
unsigned char *next_hop;
qdr_address_t *qdr_addr;
};
ALLOC_DECLARE(next_hop_t);
ALLOC_DEFINE(next_hop_t);
DEQ_DECLARE(next_hop_t, next_hop_list_t);
// qdr_binding_t
// Represents a subject key --> next hop mapping
// A binding is uniquely identified by the tuple (pattern, nextHop, phase). No
// two bindings with the same tuple value can exist on an exchange.
// The binding is implemented using two classes: qdr_binding_t and
// next_hop_t. The qdr_binding_t holds the pattern and points to the
// next_hop_t. This allows different patterns to share the same nextHop.
// Since there is only one next_hop_t instance for each (nextHop, phase) value,
// we guarantee only 1 copy of a message is forwarded to a given nextHop+phase
// even if multiple distinct patterns are matched. Ex: a message with a
// value of "a.b" will match two distict binding keys "+.b" and "a.+". If
// both these patterns share the same next_hop_t only 1 copy of the message
// will be forwarded.
typedef struct qdr_binding qdr_binding_t;
struct qdr_binding
{
// per-exchange list of all bindings
DEQ_LINKS_N(exchange_list, qdr_binding_t);
// parse tree node's list of bindings sharing the same pattern
DEQ_LINKS_N(tree_list, qdr_binding_t);
unsigned char *name;
uint64_t identity;
qdr_exchange_t *exchange;
unsigned char *key;
next_hop_t *next_hop;
uint64_t msgs_matched;
};
ALLOC_DECLARE(qdr_binding_t);
ALLOC_DEFINE(qdr_binding_t);
DEQ_DECLARE(qdr_binding_t, qdr_binding_list_t);
struct qdr_exchange {
DEQ_LINKS(qdr_exchange_t); // for core->exchanges
qdr_core_t *core;
uint64_t identity;
unsigned char *name;
unsigned char *address;
int phase;
qd_parse_tree_t *parse_tree;
qdr_address_t *qdr_addr;
next_hop_t *alternate;
qdr_binding_list_t bindings;
next_hop_list_t next_hops;
qdr_forwarder_t *old_forwarder;
uint64_t msgs_received;
uint64_t msgs_dropped;
uint64_t msgs_routed;
uint64_t msgs_alternate;
};
ALLOC_DECLARE(qdr_exchange_t);
ALLOC_DEFINE(qdr_exchange_t);
static void qdr_exchange_free(qdr_exchange_t *ex);
static qdr_exchange_t *qdr_exchange(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *address,
int phase,
qd_iterator_t *alternate,
int alt_phase,
qd_parse_tree_type_t method);
static void write_config_exchange_map(qdr_exchange_t *ex,
qd_composed_field_t *body);
static qdr_exchange_t *find_exchange(qdr_core_t *core,
qd_iterator_t *identity,
qd_iterator_t *name);
static qdr_binding_t *find_binding(qdr_core_t *core,
qd_iterator_t *identity,
qd_iterator_t *name);
static void write_config_exchange_list(qdr_exchange_t *ex,
qdr_query_t *query);
static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
qd_iterator_t *name,
qd_iterator_t *key,
qd_iterator_t *next_hop,
int phase);
static void write_config_binding_map(qdr_binding_t *binding,
qd_composed_field_t *body);
static qdr_binding_t *find_binding(qdr_core_t *core,
qd_iterator_t *identity,
qd_iterator_t *name);
static void qdr_binding_free(qdr_binding_t *b);
static void write_config_binding_list(qdr_binding_t *binding,
qdr_query_t *query);
static qdr_binding_t *get_binding_at_index(qdr_core_t *core,
int index);
static next_hop_t *next_hop(qdr_exchange_t *ex,
qd_iterator_t *address,
int phase);
static void next_hop_release(next_hop_t *next_hop);
static next_hop_t *find_next_hop(qdr_exchange_t *ex,
qd_iterator_t *address,
int phase);
static bool gather_next_hops(void *handle,
const char *pattern,
void *payload);
static int send_message(qdr_core_t *core,
next_hop_t *next_hop,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
bool control);
//
// The Exchange Forwarder
//
int qdr_forward_exchange_CT(qdr_core_t *core,
qdr_address_t *addr,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
bool control)
{
int forwarded = 0;
const bool presettled = !!in_delivery ? in_delivery->settled : true;
qdr_exchange_t *ex = addr->exchange;
assert(ex);
ex->msgs_received += 1;
// honor the disposition for the exchange address (this may not be right??)
if (ex->old_forwarder)
forwarded = ex->old_forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
// @TODO(kgiusti): de-duplicate this code (cut & paste from multicast
// forwarder)
//
// If the delivery is not presettled, set the settled flag for forwarding so all
// outgoing deliveries will be presettled.
//
// NOTE: This is the only multicast mode currently supported. Others will likely be
// implemented in the future.
//
if (!presettled)
in_delivery->settled = true;
qd_iterator_t *subject = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) == QD_MESSAGE_DEPTH_OK
? qd_message_field_iterator(msg, QD_FIELD_SUBJECT)
: NULL;
next_hop_list_t transmit_list;
DEQ_INIT(transmit_list);
if (subject) {
// find all matching bindings and build a list of their next hops
qd_parse_tree_search(ex->parse_tree, subject, gather_next_hops, &transmit_list);
qd_iterator_free(subject);
}
// if there are valid next hops then we're routing this message based on an
// entirely new destination address. We need to reset the origin and the
// excluded link flags in the delivery. We also need to reset the trace
// annotations and ingress field in the message. This is done because it is
// possible that the next hop is reached via the same link/router this
// message arrived from.
// @TODO(kgiusti) - loop detection
if (DEQ_SIZE(transmit_list) > 0 || ex->alternate) {
if (in_delivery) {
in_delivery->origin = 0;
qd_bitmask_free(in_delivery->link_exclusion);
in_delivery->link_exclusion = 0;
}
const char *node_id = qd_router_id(core->qd);
qd_composed_field_t *trace_field = qd_compose_subfield(0);
qd_compose_start_list(trace_field);
qd_compose_insert_string(trace_field, node_id);
qd_compose_end_list(trace_field);
qd_message_set_trace_annotation(msg, trace_field);
qd_composed_field_t *ingress_field = qd_compose_subfield(0);
qd_compose_insert_string(ingress_field, node_id);
qd_message_set_ingress_annotation(msg, ingress_field);
}
next_hop_t *next_hop = DEQ_HEAD(transmit_list);
while (next_hop) {
DEQ_REMOVE_N(transmit_list, transmit_list, next_hop);
next_hop->on_xmit_list = false;
assert(next_hop->qdr_addr);
// @TODO(kgiusti) - non-recursive handling of next hop if it is an exchange
forwarded += send_message(ex->core, next_hop, msg, in_delivery, exclude_inprocess, control);
next_hop = DEQ_HEAD(transmit_list);
}
if (forwarded == 0 && ex->alternate) {
forwarded = send_message(ex->core, ex->alternate, msg, in_delivery, exclude_inprocess, control);
if (forwarded) {
ex->msgs_alternate += 1;
}
}
// @TODO(kgiusti): de-duplicate the settlement code (cut & paste from
// multicast forwarder)
if (forwarded == 0) {
ex->msgs_dropped += 1;
if (!presettled) {
//
// The delivery was not originally presettled and it was not
// forwarded to any destinations, return it to its original
// unsettled state.
//
in_delivery->settled = false;
}
} else {
ex->msgs_routed += 1;
if (in_delivery && !presettled) {
//
// The delivery was not presettled and it was forwarded to at least
// one destination. Accept and settle the delivery only if the
// entire delivery has been received.
//
const bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
if (receive_complete) {
in_delivery->disposition = PN_ACCEPTED;
qdr_delivery_push_CT(core, in_delivery);
}
}
}
return forwarded;
}
// callback from parse tree search:
// handle = transmit_list containing all matching next_hops
// pattern = pattern that matches the search (ignored)
// payload = list of bindings configured for the pattern
static bool gather_next_hops(void *handle, const char *pattern, void *payload)
{
next_hop_list_t *transmit_list = (next_hop_list_t *)handle;
qdr_binding_list_t *bindings = (qdr_binding_list_t *)payload;
qdr_binding_t *binding = DEQ_HEAD(*bindings);
while (binding) {
binding->msgs_matched += 1;
// note - since multiple bindings may reference the next hop, it is
// possible a next hop has already been added to the transmit list.
// do not re-add. This is not thread safe but that is fine since all
// forwarding is done on the core thread.
if (!binding->next_hop->on_xmit_list) {
DEQ_INSERT_TAIL_N(transmit_list, *transmit_list, binding->next_hop);
binding->next_hop->on_xmit_list = true;
}
binding = DEQ_NEXT_N(tree_list, binding);
}
return true; // keep searching
}
// Forward a copy of the message to the to_addr address
static int send_message(qdr_core_t *core,
next_hop_t *next_hop,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
bool control)
{
int count = 0;
qd_message_t *copy = qd_message_copy(msg);
qd_log(core->log, QD_LOG_TRACE, "Exchange '%s' forwarding message to '%s'",
next_hop->exchange->name, next_hop->next_hop);
// set "to override" and "phase" message annotations based on the next hop
qd_composed_field_t *to_field = qd_compose_subfield(0);
qd_compose_insert_string(to_field, (char *)next_hop->next_hop);
qd_message_set_to_override_annotation(copy, to_field); // frees to_field
qd_message_set_phase_annotation(copy, next_hop->phase);
count = qdr_forward_message_CT(core, next_hop->qdr_addr, copy, in_delivery, exclude_inprocess, control);
qd_message_free(copy);
return count;
}
long qdr_exchange_binding_count(const qdr_exchange_t *ex)
{
return (long) DEQ_SIZE(ex->bindings);
}
qdr_address_t *qdr_exchange_alternate_addr(const qdr_exchange_t *ex)
{
return (ex->alternate) ? ex->alternate->qdr_addr : NULL;
}
/////////////////////////////
// Exchange Management API //
/////////////////////////////
#define QDR_CONFIG_EXCHANGE_NAME 0
#define QDR_CONFIG_EXCHANGE_IDENTITY 1
#define QDR_CONFIG_EXCHANGE_ADDRESS 2
#define QDR_CONFIG_EXCHANGE_PHASE 3
#define QDR_CONFIG_EXCHANGE_ALTERNATE 4
#define QDR_CONFIG_EXCHANGE_ALT_PHASE 5
#define QDR_CONFIG_EXCHANGE_MATCH_METHOD 6
#define QDR_CONFIG_EXCHANGE_BINDING_COUNT 7
#define QDR_CONFIG_EXCHANGE_RECEIVED 8
#define QDR_CONFIG_EXCHANGE_DROPPED 9
#define QDR_CONFIG_EXCHANGE_FORWARDED 10
#define QDR_CONFIG_EXCHANGE_DIVERTED 11
const char *qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_COLUMN_COUNT + 1] =
{"name",
"identity",
"address",
"phase",
"alternateAddress",
"alternatePhase",
"matchMethod",
"bindingCount",
"receivedCount",
"droppedCount",
"forwardedCount",
"divertedCount",
0};
// from management_agent.c
extern const unsigned char *config_exchange_entity_type;
#define QDR_CONFIG_BINDING_NAME 0
#define QDR_CONFIG_BINDING_IDENTITY 1
#define QDR_CONFIG_BINDING_EXCHANGE 2
#define QDR_CONFIG_BINDING_KEY 3
#define QDR_CONFIG_BINDING_NEXTHOP 4
#define QDR_CONFIG_BINDING_NHOP_PHASE 5
#define QDR_CONFIG_BINDING_MATCHED 6
const char *qdr_config_binding_columns[QDR_CONFIG_BINDING_COLUMN_COUNT + 1] =
{"name",
"identity",
"exchangeName",
"bindingKey",
"nextHopAddress",
"nextHopPhase",
"matchedCount",
0};
// from management_agent.c
extern const unsigned char *config_binding_entity_type;
// called on core shutdown to release all exchanges
//
void qdr_exchange_free_all(qdr_core_t *core)
{
qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
while (ex) {
qdr_exchange_t *next = DEQ_NEXT(ex);
qdr_exchange_free(ex);
ex = next;
}
}
// Exchange CREATE
//
//
void qdra_config_exchange_create_CT(qdr_core_t *core,
qd_iterator_t *name,
qdr_query_t *query,
qd_parsed_field_t *in_body)
{
qdr_exchange_t *ex = NULL;
query->status = QD_AMQP_BAD_REQUEST;
if (!qd_parse_is_map(in_body)) {
query->status.description = "Body of request must be a map";
goto exit;
}
if (!name) {
query->status.description = "exchange requires a unique name";
goto exit;
}
qd_parsed_field_t *address_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ADDRESS]);
if (!address_field) {
query->status.description = "exchange address is mandatory";
goto exit;
}
qd_iterator_t *address = qd_parse_raw(address_field);
// check for duplicates
{
qdr_exchange_t *eptr = 0;
for (eptr = DEQ_HEAD(core->exchanges); eptr; eptr = DEQ_NEXT(eptr)) {
if (qd_iterator_equal(address, eptr->address)) {
query->status.description = "duplicate exchange address";
goto exit;
} else if (qd_iterator_equal(name, eptr->name)) {
query->status.description = "duplicate exchange name";
goto exit;
}
}
}
qd_parsed_field_t *method_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_MATCH_METHOD]);
qd_parse_tree_type_t method = QD_PARSE_TREE_AMQP_0_10;
if (method_field) {
if (qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"mqtt")) {
method = QD_PARSE_TREE_MQTT;
} else if (!qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"amqp")) {
query->status.description = "Exchange matchMethod must be either 'amqp' or 'mqtt'";
goto exit;
}
}
long phase = 0;
qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_PHASE]);
if (phase_field) {
phase = qd_parse_as_long(phase_field);
if (phase < 0 || phase > 9) {
query->status.description = "phase must be in the range 0-9";
goto exit;
}
}
qd_iterator_t *alternate = NULL;
long alt_phase = 0;
qd_parsed_field_t *alternate_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALTERNATE]);
if (alternate_field) {
alternate = qd_parse_raw(alternate_field);
qd_parsed_field_t *alt_phase_field = qd_parse_value_by_key(in_body,
qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALT_PHASE]);
if (alt_phase_field) {
alt_phase = qd_parse_as_long(alt_phase_field);
if (alt_phase < 0 || alt_phase > 9) {
query->status.description = "phase must be in the range 0-9";
goto exit;
}
}
}
ex = qdr_exchange(core, name, address, phase, alternate, alt_phase, method);
if (ex) {
// @TODO(kgiusti) - for now, until the behavior is nailed down:
static int warn_user;
if (!warn_user) {
warn_user = 1;
qd_log(core->agent_log, QD_LOG_WARNING,
"The Exchange/Binding feature is currently EXPERIMENTAL."
" Its functionality may change in future releases"
" of the Qpid Dispatch Router. Backward compatibility is"
" not guaranteed.");
}
query->status = QD_AMQP_CREATED;
if (query->body) {
write_config_exchange_map(ex, query->body);
}
} else {
query->status.description = "failed to allocate exchange";
}
exit:
if (query->status.status == QD_AMQP_CREATED.status) {
qd_log(core->agent_log, QD_LOG_DEBUG,
"Exchange %s CREATED (id=%"PRIu64")", ex->name, ex->identity);
} else {
qd_log(core->agent_log, QD_LOG_ERROR,
"Error performing CREATE of %s: %s", config_exchange_entity_type, query->status.description);
// return a NULL body:
if (query->body) qd_compose_insert_null(query->body);
}
if (query->body) {
qdr_agent_enqueue_response_CT(core, query);
} else {
// no body == create from internal config parser
qdr_query_free(query);
}
}
// Exchange DELETE:
//
void qdra_config_exchange_delete_CT(qdr_core_t *core,
qdr_query_t *query,
qd_iterator_t *name,
qd_iterator_t *identity)
{
qdr_exchange_t *ex = 0;
if (!name && !identity) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "No name or identity provided";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of %s: %s",
config_exchange_entity_type, query->status.description);
} else {
ex = find_exchange(core, identity, name);
if (ex) {
qd_log(core->agent_log, QD_LOG_DEBUG,
"Exchange %s DELETED (id=%"PRIu64")", ex->name, ex->identity);
qdr_exchange_free(ex);
query->status = QD_AMQP_NO_CONTENT;
} else
query->status = QD_AMQP_NOT_FOUND;
}
qdr_agent_enqueue_response_CT(core, query);
}
// Exchange GET
//
void qdra_config_exchange_get_CT(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *identity,
qdr_query_t *query,
const char *columns[])
{
qdr_exchange_t *ex = 0;
if (!name && !identity) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "No name or identity provided";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s",
config_exchange_entity_type, query->status.description);
}
else {
ex = find_exchange(core, identity, name);
if (!ex) {
query->status = QD_AMQP_NOT_FOUND;
}
else {
if (query->body) write_config_exchange_map(ex, query->body);
query->status = QD_AMQP_OK;
}
}
qdr_agent_enqueue_response_CT(core, query);
}
// Exchange GET first:
//
void qdra_config_exchange_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
{
//
// Queries that get this far will always succeed.
//
query->status = QD_AMQP_OK;
//
// If the offset goes beyond the set of objects, end the query now.
//
if (offset >= DEQ_SIZE(core->exchanges)) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
}
//
// Run to the object at the offset.
//
qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
for (int i = 0; i < offset; i++)
ex = DEQ_NEXT(ex);
assert(ex);
//
// Write the columns of the object into the response body.
//
if (query->body) write_config_exchange_list(ex, query);
//
// Advance to the next address
//
query->next_offset = offset + 1;
query->more = !!DEQ_NEXT(ex);
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
// Exchange GET-NEXT
//
void qdra_config_exchange_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
qdr_exchange_t *ex = 0;
if (query->next_offset < DEQ_SIZE(core->exchanges)) {
ex = DEQ_HEAD(core->exchanges);
for (int i = 0; i < query->next_offset && ex; i++)
ex = DEQ_NEXT(ex);
}
if (ex) {
//
// Write the columns of the addr entity into the response body.
//
if (query->body) write_config_exchange_list(ex, query);
//
// Advance to the next object
//
query->next_offset++;
query->more = !!DEQ_NEXT(ex);
} else
query->more = false;
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
// Binding CREATE
void qdra_config_binding_create_CT(qdr_core_t *core,
qd_iterator_t *name,
qdr_query_t *query,
qd_parsed_field_t *in_body)
{
qdr_binding_t *binding = NULL;
qdr_exchange_t *ex = NULL;
qd_iterator_t *key = NULL;
query->status = QD_AMQP_BAD_REQUEST;
if (!qd_parse_is_map(in_body)) {
query->status.description = "Body of request must be a map";
goto exit;
}
qd_parsed_field_t *exchange_field = qd_parse_value_by_key(in_body,
qdr_config_binding_columns[QDR_CONFIG_BINDING_EXCHANGE]);
if (!exchange_field) {
query->status.description = "Binding configuration requires an exchange";
goto exit;
}
// lookup the exchange by its name:
ex = find_exchange(core, NULL, qd_parse_raw(exchange_field));
if (!ex) {
query->status.description = "Named exchange does not exist";
goto exit;
}
qd_parsed_field_t *next_hop_field = qd_parse_value_by_key(in_body,
qdr_config_binding_columns[QDR_CONFIG_BINDING_NEXTHOP]);
if (!next_hop_field) {
query->status.description = "No next hop specified";
goto exit;
}
qd_iterator_t *nhop = qd_parse_raw(next_hop_field);
qd_parsed_field_t *key_field = qd_parse_value_by_key(in_body,
qdr_config_binding_columns[QDR_CONFIG_BINDING_KEY]);
// if no pattern given, assume match all "#":
key = key_field ? qd_iterator_dup(qd_parse_raw(key_field)) : qd_iterator_string("#", ITER_VIEW_ALL);
if (!qd_parse_tree_validate_pattern(ex->parse_tree, key)) {
query->status.description = "The binding key pattern is invalid";
goto exit;
}
qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
qdr_config_binding_columns[QDR_CONFIG_BINDING_NHOP_PHASE]);
long phase = (phase_field ? qd_parse_as_long(phase_field) : 0);
if (phase < 0 || phase > 9) {
query->status.description = "phase must be in the range 0-9";
goto exit;
}
// check for duplicates: the name and the tuple (key, next hop, phase) must
// be unique per exchange
for (qdr_binding_t *b = DEQ_HEAD(ex->bindings); b; b = DEQ_NEXT_N(exchange_list, b)) {
if (name && b->name && qd_iterator_equal(name, b->name)) {
query->status.description = "Duplicate next hop name";
goto exit;
} else if (qd_iterator_equal(key, b->key) &&
qd_iterator_equal(nhop, b->next_hop->next_hop) &&
phase == b->next_hop->phase) {
query->status.description = "Next hop for key already exists";
goto exit;
}
}
binding = qdr_binding(ex, name, key, nhop, phase);
if (binding) {
query->status = QD_AMQP_CREATED;
if (query->body) {
write_config_binding_map(binding, query->body);
}
} else {
query->status.description = "Failed to allocate next hop";
}
exit:
if (query->status.status == QD_AMQP_CREATED.status) {
qd_log(core->agent_log, QD_LOG_DEBUG,
"Exchange %s Binding %s -> %s CREATED (id=%"PRIu64")", ex->name,
binding->key, binding->next_hop->next_hop, binding->identity);
} else {
qd_log(core->agent_log, QD_LOG_ERROR,
"Error performing CREATE of %s: %s",
config_binding_entity_type,
query->status.description);
// return a NULL body:
if (query->body) qd_compose_insert_null(query->body);
}
if (query->body) {
qdr_agent_enqueue_response_CT(core, query);
} else {
// no body == create from internal config parser
qdr_query_free(query);
}
if (key) qd_iterator_free(key);
}
// Binding DELETE
//
void qdra_config_binding_delete_CT(qdr_core_t *core,
qdr_query_t *query,
qd_iterator_t *name,
qd_iterator_t *identity)
{
if (!identity && !name) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "No binding name or identity provided";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of %s: %s",
config_binding_entity_type, query->status.description);
} else {
qdr_binding_t *binding = find_binding(core, identity, name);
if (!binding) {
query->status = QD_AMQP_NOT_FOUND;
} else {
qd_log(core->agent_log, QD_LOG_DEBUG,
"Binding %s -> %s on exchange %s DELETED (id=%"PRIu64")",
binding->key,
binding->next_hop->next_hop,
binding->exchange->name,
binding->identity);
qdr_binding_free(binding);
query->status = QD_AMQP_NO_CONTENT;
}
}
qdr_agent_enqueue_response_CT(core, query);
}
// Binding GET
//
void qdra_config_binding_get_CT(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *identity,
qdr_query_t *query,
const char *columns[])
{
if (!identity && !name) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "No binding name or identity provided";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s",
config_binding_entity_type, query->status.description);
} else {
qdr_binding_t *binding = find_binding(core, identity, name);
if (binding == 0) {
query->status = QD_AMQP_NOT_FOUND;
}
else {
if (query->body) write_config_binding_map(binding, query->body);
query->status = QD_AMQP_OK;
}
}
qdr_agent_enqueue_response_CT(core, query);
}
// Binding GET first
//
void qdra_config_binding_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
{
query->status = QD_AMQP_OK;
qdr_binding_t *binding = get_binding_at_index(core, offset);
if (!binding) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
}
if (query->body) write_config_binding_list(binding, query);
query->next_offset = offset + 1;
query->more = !!(DEQ_NEXT_N(exchange_list, binding) || DEQ_NEXT(binding->exchange));
qdr_agent_enqueue_response_CT(core, query);
}
// Binding GET-NEXT
//
void qdra_config_binding_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
qdr_binding_t *binding = get_binding_at_index(core, query->next_offset);
if (binding) {
if (query->body) write_config_binding_list(binding, query);
query->next_offset++;
query->more = !!(DEQ_NEXT_N(exchange_list, binding) || DEQ_NEXT(binding->exchange));
} else
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
}
// Exchange constructor/destructor
static qdr_exchange_t *qdr_exchange(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *address,
int phase,
qd_iterator_t *alternate,
int alt_phase,
qd_parse_tree_type_t method)
{
assert(address);
qdr_exchange_t *ex = new_qdr_exchange_t();
if (ex) {
ZERO(ex);
DEQ_ITEM_INIT(ex);
ex->core = core;
ex->identity = qdr_identifier(core);
ex->name = qd_iterator_copy(name);
ex->address = qd_iterator_copy(address);
ex->phase = phase;
ex->parse_tree = qd_parse_tree_new(method);
DEQ_INIT(ex->bindings);
DEQ_INIT(ex->next_hops);
qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
qd_iterator_annotate_phase(address, (char) phase + '0');
qd_hash_retrieve(core->addr_hash, address, (void **)&ex->qdr_addr);
if (!ex->qdr_addr) {
qdr_address_config_t *addr_config;
qd_address_treatment_t treatment = qdr_treatment_for_address_hash_CT(core, address, &addr_config);
ex->qdr_addr = qdr_address_CT(core, treatment, addr_config);
qd_hash_insert(core->addr_hash, address, ex->qdr_addr, &ex->qdr_addr->hash_handle);
DEQ_INSERT_TAIL(core->addrs, ex->qdr_addr);
}
// we're going to override the forwarder
qdr_forwarder_t *old = ex->qdr_addr->forwarder;
qdr_forwarder_t *new = qdr_new_forwarder(qdr_forward_exchange_CT,
old ? old->forward_attach : 0,
old ? old->bypass_valid_origins: false);
ex->old_forwarder = old;
ex->qdr_addr->forwarder = new;
ex->qdr_addr->ref_count += 1;
ex->qdr_addr->exchange = ex;
DEQ_INSERT_TAIL(core->exchanges, ex);
if (alternate) {
ex->alternate = next_hop(ex, alternate, alt_phase);
}
//
// TODO - handle case where there was already a local dest.
//
qdr_addr_start_inlinks_CT(ex->core, ex->qdr_addr);
qdrc_event_addr_raise(ex->core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, ex->qdr_addr);
}
return ex;
}
static void qdr_exchange_free(qdr_exchange_t *ex)
{
if (ex->core->running && DEQ_SIZE(ex->qdr_addr->rlinks) == 0) {
qdrc_event_addr_raise(ex->core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, ex->qdr_addr);
}
DEQ_REMOVE(ex->core->exchanges, ex);
while (DEQ_SIZE(ex->bindings) > 0) {
// freeing the binding removes it from the binding list
// and the parse tree
qdr_binding_free(DEQ_HEAD(ex->bindings));
}
if (ex->alternate) {
next_hop_release(ex->alternate);
}
assert(DEQ_IS_EMPTY(ex->next_hops));
free(ex->qdr_addr->forwarder);
ex->qdr_addr->forwarder = ex->old_forwarder;
assert(ex->qdr_addr->ref_count > 0);
ex->qdr_addr->ref_count -= 1;
qdr_check_addr_CT(ex->core, ex->qdr_addr);
free(ex->name);
free(ex->address);
qd_parse_tree_free(ex->parse_tree);
free_qdr_exchange_t(ex);
}
// Binding constructor/destructor
static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
qd_iterator_t *name,
qd_iterator_t *key,
qd_iterator_t *nhop,
int phase)
{
qdr_binding_t *b = new_qdr_binding_t();
if (b) {
ZERO(b);
DEQ_ITEM_INIT_N(exchange_list, b);
DEQ_ITEM_INIT_N(tree_list, b);
b->name = qd_iterator_copy(name);
b->identity = qdr_identifier(ex->core);
b->exchange = ex;
b->key = qd_iterator_copy(key);
b->next_hop = next_hop(ex, nhop, phase);
qdr_binding_list_t *bindings = NULL;
if (!qd_parse_tree_get_pattern(ex->parse_tree, key, (void **)&bindings)) {
// new pattern
bindings = malloc(sizeof(*bindings));
DEQ_INIT(*bindings);
qd_parse_tree_add_pattern(ex->parse_tree, key, bindings);
}
assert(bindings);
DEQ_INSERT_TAIL_N(tree_list, *bindings, b);
DEQ_INSERT_TAIL_N(exchange_list, ex->bindings, b);
}
return b;
}
static void qdr_binding_free(qdr_binding_t *b)
{
qdr_binding_list_t *bindings = NULL;
qdr_exchange_t *ex = b->exchange;
qd_iterator_t *k_iter = qd_iterator_string((char *)b->key,
ITER_VIEW_ALL);
if (qd_parse_tree_get_pattern(ex->parse_tree, k_iter, (void **)&bindings)) {
assert(bindings && !DEQ_IS_EMPTY(*bindings));
DEQ_REMOVE_N(tree_list, *bindings, b);
if (DEQ_IS_EMPTY(*bindings)) {
qd_parse_tree_remove_pattern(ex->parse_tree, k_iter);
free(bindings);
}
}
qd_iterator_free(k_iter);
DEQ_REMOVE_N(exchange_list, b->exchange->bindings, b);
free(b->name);
free(b->key);
next_hop_release(b->next_hop);
free_qdr_binding_t(b);
}
// Next Hop constructor/destructor
static next_hop_t *next_hop(qdr_exchange_t *ex,
qd_iterator_t *address,
int phase)
{
next_hop_t *nh = find_next_hop(ex, address, phase);
if (!nh) {
nh = new_next_hop_t();
if (!nh) return NULL;
ZERO(nh);
DEQ_ITEM_INIT_N(exchange_list, nh);
DEQ_ITEM_INIT_N(transmit_list, nh);
nh->exchange = ex;
nh->next_hop = qd_iterator_copy(address);
nh->phase = phase;
qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
qd_iterator_annotate_phase(address, (char) phase + '0');
qd_hash_retrieve(ex->core->addr_hash, address, (void **)&nh->qdr_addr);
if (!nh->qdr_addr) {
qdr_core_t *core = ex->core;
qdr_address_config_t *addr_config;
qd_address_treatment_t treatment = qdr_treatment_for_address_hash_CT(core, address, &addr_config);
nh->qdr_addr = qdr_address_CT(core, treatment, addr_config);
qd_hash_insert(core->addr_hash, address, nh->qdr_addr, &nh->qdr_addr->hash_handle);
DEQ_INSERT_TAIL(core->addrs, nh->qdr_addr);
}
nh->qdr_addr->ref_count += 1;
DEQ_INSERT_TAIL_N(exchange_list, ex->next_hops, nh);
}
nh->ref_count += 1; // for caller's reference
return nh;
}
static void next_hop_release(next_hop_t *nh)
{
assert(nh->ref_count > 0);
if (--nh->ref_count == 0) {
assert(nh->qdr_addr->ref_count > 0);
if (--nh->qdr_addr->ref_count == 0) {
qdr_check_addr_CT(nh->exchange->core, nh->qdr_addr);
}
DEQ_REMOVE_N(exchange_list, nh->exchange->next_hops, nh);
assert(!nh->on_xmit_list);
free(nh->next_hop);
free_next_hop_t(nh);
}
}
// lookup
static qdr_exchange_t *find_exchange(qdr_core_t *core, qd_iterator_t *identity, qd_iterator_t *name)
{
qdr_exchange_t *ex = 0;
for (ex = DEQ_HEAD(core->exchanges); ex; ex = DEQ_NEXT(ex)) {
if (identity) { // ignore name - identity takes precedence
// Convert the identity for comparison
char id[100];
snprintf(id, 100, "%"PRId64, ex->identity);
if (qd_iterator_equal(identity, (const unsigned char*) id))
break;
} else if (name && qd_iterator_equal(name, ex->name))
break;
}
return ex;
}
static qdr_binding_t *find_binding(qdr_core_t *core, qd_iterator_t *identity, qd_iterator_t *name)
{
for (qdr_exchange_t *ex = DEQ_HEAD(core->exchanges); ex; ex = DEQ_NEXT(ex)) {
for (qdr_binding_t *binding = DEQ_HEAD(ex->bindings); binding; binding = DEQ_NEXT_N(exchange_list, binding)) {
if (identity) { // ignore name - identity takes precedence
// Convert the identity for comparison
char id[100];
snprintf(id, 100, "%"PRId64, binding->identity);
if (qd_iterator_equal(identity, (const unsigned char*) id))
return binding;
} else if (name && qd_iterator_equal(name, binding->name))
return binding;
}
}
return NULL;
}
static next_hop_t *find_next_hop(qdr_exchange_t *ex,
qd_iterator_t *address,
int phase)
{
next_hop_t *nh = DEQ_HEAD(ex->next_hops);
DEQ_FIND_N(exchange_list, nh, (phase == nh->phase) && qd_iterator_equal(address, nh->next_hop));
return nh;
}
// Management helper routines
static void exchange_insert_column(qdr_exchange_t *ex, int col, qd_composed_field_t *body)
{
switch(col) {
case QDR_CONFIG_EXCHANGE_NAME:
qd_compose_insert_string(body, (const char *)ex->name);
break;
case QDR_CONFIG_EXCHANGE_IDENTITY: {
char id_str[100];
snprintf(id_str, 100, "%"PRId64, ex->identity);
qd_compose_insert_string(body, id_str);
break;
}
case QDR_CONFIG_EXCHANGE_ADDRESS:
qd_compose_insert_string(body, (const char *)ex->address);
break;
case QDR_CONFIG_EXCHANGE_PHASE:
qd_compose_insert_int(body, ex->phase);
break;
case QDR_CONFIG_EXCHANGE_ALTERNATE:
if (ex->alternate && ex->alternate->next_hop)
qd_compose_insert_string(body, (const char *)ex->alternate->next_hop);
else
qd_compose_insert_null(body);
break;
case QDR_CONFIG_EXCHANGE_ALT_PHASE:
if (ex->alternate)
qd_compose_insert_int(body, ex->alternate->phase);
else
qd_compose_insert_null(body);
break;
case QDR_CONFIG_EXCHANGE_MATCH_METHOD:
switch (qd_parse_tree_type(ex->parse_tree)) {
case QD_PARSE_TREE_AMQP_0_10:
qd_compose_insert_string(body, "amqp");
break;
case QD_PARSE_TREE_MQTT:
qd_compose_insert_string(body, "mqtt");
break;
default:
break;
}
break;
case QDR_CONFIG_EXCHANGE_BINDING_COUNT:
qd_compose_insert_uint(body, DEQ_SIZE(ex->bindings));
break;
case QDR_CONFIG_EXCHANGE_RECEIVED:
qd_compose_insert_ulong(body, ex->msgs_received);
break;
case QDR_CONFIG_EXCHANGE_DROPPED:
qd_compose_insert_ulong(body, ex->msgs_dropped);
break;
case QDR_CONFIG_EXCHANGE_FORWARDED:
qd_compose_insert_ulong(body, ex->msgs_routed);
break;
case QDR_CONFIG_EXCHANGE_DIVERTED:
qd_compose_insert_ulong(body, ex->msgs_alternate);
break;
}
}
static void binding_insert_column(qdr_binding_t *b, int col, qd_composed_field_t *body)
{
switch(col) {
case QDR_CONFIG_BINDING_NAME:
if (b->name)
qd_compose_insert_string(body, (char *)b->name);
else
qd_compose_insert_null(body);
break;
case QDR_CONFIG_BINDING_IDENTITY: {
char id_str[100];
snprintf(id_str, 100, "%"PRIu64, b->identity);
qd_compose_insert_string(body, id_str);
break;
}
case QDR_CONFIG_BINDING_EXCHANGE:
qd_compose_insert_string(body, (char *)b->exchange->name);
break;
case QDR_CONFIG_BINDING_KEY:
qd_compose_insert_string(body, (char *)b->key);
break;
case QDR_CONFIG_BINDING_NEXTHOP:
assert(b->next_hop && b->next_hop->next_hop);
qd_compose_insert_string(body, (char *)b->next_hop->next_hop);
break;
case QDR_CONFIG_BINDING_NHOP_PHASE:
assert(b->next_hop);
qd_compose_insert_int(body, b->next_hop->phase);
break;
case QDR_CONFIG_BINDING_MATCHED:
qd_compose_insert_ulong(body, b->msgs_matched);
break;
}
}
static void write_config_exchange_map(qdr_exchange_t *ex,
qd_composed_field_t *body)
{
qd_compose_start_map(body);
for(int i = 0; i < QDR_CONFIG_EXCHANGE_COLUMN_COUNT; i++) {
qd_compose_insert_string(body, qdr_config_exchange_columns[i]);
exchange_insert_column(ex, i, body);
}
qd_compose_end_map(body);
}
static void write_config_exchange_list(qdr_exchange_t *ex,
qdr_query_t *query)
{
qd_compose_start_list(query->body);
int i = 0;
while (query->columns[i] >= 0) {
exchange_insert_column(ex, query->columns[i], query->body);
i++;
}
qd_compose_end_list(query->body);
}
static void write_config_binding_map(qdr_binding_t *binding,
qd_composed_field_t *body)
{
qd_compose_start_map(body);
for(int i = 0; i < QDR_CONFIG_BINDING_COLUMN_COUNT; i++) {
qd_compose_insert_string(body, qdr_config_binding_columns[i]);
binding_insert_column(binding, i, body);
}
qd_compose_end_map(body);
}
static void write_config_binding_list(qdr_binding_t *binding,
qdr_query_t *query)
{
qd_compose_start_list(query->body);
int i = 0;
while (query->columns[i] >= 0) {
binding_insert_column(binding, query->columns[i], query->body);
i++;
}
qd_compose_end_list(query->body);
}
static qdr_binding_t *get_binding_at_index(qdr_core_t *core, int index)
{
qdr_binding_t *binding = 0;
// skip to the proper exchange:
qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
while (ex && index >= DEQ_SIZE(ex->bindings)) {
index -= DEQ_SIZE(ex->bindings);
ex = DEQ_NEXT(ex);
}
if (ex) {
// then to the target binding
assert(index < DEQ_SIZE(ex->bindings));
binding = DEQ_HEAD(ex->bindings);
while (index--) {
binding = DEQ_NEXT_N(exchange_list, binding);
}
}
return binding;
}