blob: 27e213c80d91b14ee5573e6e979b2c1c31a26aba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include "agent_config_address.h"
#include <inttypes.h>
#include <stdio.h>
#define QDR_CONFIG_ADDRESS_NAME 0
#define QDR_CONFIG_ADDRESS_IDENTITY 1
#define QDR_CONFIG_ADDRESS_TYPE 2
#define QDR_CONFIG_ADDRESS_PREFIX 3
#define QDR_CONFIG_ADDRESS_DISTRIBUTION 4
#define QDR_CONFIG_ADDRESS_WAYPOINT 5
#define QDR_CONFIG_ADDRESS_IN_PHASE 6
#define QDR_CONFIG_ADDRESS_OUT_PHASE 7
#define QDR_CONFIG_ADDRESS_PATTERN 8
#define QDR_CONFIG_ADDRESS_PRIORITY 9
#define QDR_CONFIG_ADDRESS_FALLBACK 10
const char *qdr_config_address_columns[] =
{"name",
"identity",
"type",
"prefix",
"distribution",
"waypoint",
"ingressPhase",
"egressPhase",
"pattern",
"priority",
"fallback",
0};
const char *CONFIG_ADDRESS_TYPE = "org.apache.qpid.dispatch.router.config.address";
const char CONFIG_ADDRESS_PREFIX = 'C';
static void qdr_config_address_insert_column_CT(qdr_address_config_t *addr, int col, qd_composed_field_t *body, bool as_map)
{
const char *text = 0;
if (as_map)
qd_compose_insert_string(body, qdr_config_address_columns[col]);
switch(col) {
case QDR_CONFIG_ADDRESS_NAME:
if (addr->name)
qd_compose_insert_string(body, addr->name);
else
qd_compose_insert_null(body);
break;
case QDR_CONFIG_ADDRESS_IDENTITY: {
char id_str[100];
snprintf(id_str, 100, "%"PRId64, addr->identity);
qd_compose_insert_string(body, id_str);
break;
}
case QDR_CONFIG_ADDRESS_TYPE:
qd_compose_insert_string(body, CONFIG_ADDRESS_TYPE);
break;
case QDR_CONFIG_ADDRESS_PREFIX:
if (addr->is_prefix && addr->pattern) {
// Note (kgiusti): internally we prepend a '/#' to the configured
// prefix and treat it like a pattern. Remove trailing '/#' to put
// it back into its original form
const size_t len = strlen(addr->pattern);
assert(len > 1);
qd_compose_insert_string_n(body, addr->pattern, len - 2);
} else
qd_compose_insert_null(body);
break;
case QDR_CONFIG_ADDRESS_PATTERN:
if (!addr->is_prefix && addr->pattern)
qd_compose_insert_string(body, addr->pattern);
else
qd_compose_insert_null(body);
break;
case QDR_CONFIG_ADDRESS_DISTRIBUTION:
switch (addr->treatment) {
case QD_TREATMENT_MULTICAST_FLOOD:
case QD_TREATMENT_MULTICAST_ONCE: text = "multicast"; break;
case QD_TREATMENT_ANYCAST_CLOSEST: text = "closest"; break;
case QD_TREATMENT_ANYCAST_BALANCED: text = "balanced"; break;
default:
text = 0;
}
if (text)
qd_compose_insert_string(body, text);
else
qd_compose_insert_null(body);
break;
case QDR_CONFIG_ADDRESS_WAYPOINT:
qd_compose_insert_bool(body, addr->in_phase == 0 && addr->out_phase == 1);
break;
case QDR_CONFIG_ADDRESS_IN_PHASE:
qd_compose_insert_int(body, addr->in_phase);
break;
case QDR_CONFIG_ADDRESS_OUT_PHASE:
qd_compose_insert_int(body, addr->out_phase);
break;
case QDR_CONFIG_ADDRESS_PRIORITY:
qd_compose_insert_int(body, addr->priority);
break;
case QDR_CONFIG_ADDRESS_FALLBACK:
qd_compose_insert_bool(body, addr->fallback);
break;
}
}
static void qdr_agent_write_config_address_CT(qdr_query_t *query, qdr_address_config_t *addr)
{
qd_composed_field_t *body = query->body;
qd_compose_start_list(body);
int i = 0;
while (query->columns[i] >= 0) {
qdr_config_address_insert_column_CT(addr, query->columns[i], body, false);
i++;
}
qd_compose_end_list(body);
}
static void qdr_manage_advance_config_address_CT(qdr_query_t *query, qdr_address_config_t *addr)
{
if (addr) {
addr = DEQ_NEXT(addr);
query->more = !!addr;
query->next_offset++;
}
else {
query->more = false;
}
}
void qdra_config_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
{
//
// Queries that get this far will always succeed.
//
query->status = QD_AMQP_OK;
//
// If the offset goes beyond the set of objects, end the query now.
//
if (offset >= DEQ_SIZE(core->addr_config)) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
}
//
// Run to the object at the offset.
//
qdr_address_config_t *addr = DEQ_HEAD(core->addr_config);
for (int i = 0; i < offset && addr; i++)
addr = DEQ_NEXT(addr);
assert(addr);
if (addr) {
//
// Write the columns of the object into the response body.
//
qdr_agent_write_config_address_CT(query, addr);
//
// Advance to the next address
//
query->next_offset = offset;
qdr_manage_advance_config_address_CT(query, addr);
}
else {
query->more = false;
}
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
void qdra_config_address_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
qdr_address_config_t *addr = 0;
if (query->next_offset < DEQ_SIZE(core->addr_config)) {
addr = DEQ_HEAD(core->addr_config);
if (!addr) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
}
for (int i = 0; i < query->next_offset && addr; i++)
addr = DEQ_NEXT(addr);
}
if (addr) {
//
// Write the columns of the addr entity into the response body.
//
qdr_agent_write_config_address_CT(query, addr);
//
// Advance to the next object
//
qdr_manage_advance_config_address_CT(query, addr);
} else
query->more = false;
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
static qd_address_treatment_t qdra_address_treatment_CT(qd_parsed_field_t *field)
{
if (field) {
qd_iterator_t *iter = qd_parse_raw(field);
if (qd_iterator_equal(iter, (unsigned char*) "multicast")) return QD_TREATMENT_MULTICAST_ONCE;
if (qd_iterator_equal(iter, (unsigned char*) "closest")) return QD_TREATMENT_ANYCAST_CLOSEST;
if (qd_iterator_equal(iter, (unsigned char*) "balanced")) return QD_TREATMENT_ANYCAST_BALANCED;
if (qd_iterator_equal(iter, (unsigned char*) "unavailable")) return QD_TREATMENT_UNAVAILABLE;
}
return QD_TREATMENT_ANYCAST_BALANCED;
}
static qdr_address_config_t *qdr_address_config_find_by_identity_CT(qdr_core_t *core, qd_iterator_t *identity)
{
if (!identity)
return 0;
qdr_address_config_t *rc = DEQ_HEAD(core->addr_config);
while (rc) {
// Convert the passed in identity to a char*
char id[100];
snprintf(id, 100, "%"PRId64, rc->identity);
if (qd_iterator_equal(identity, (const unsigned char*) id))
break;
rc = DEQ_NEXT(rc);
}
return rc;
}
static qdr_address_config_t *qdr_address_config_find_by_name_CT(qdr_core_t *core, qd_iterator_t *name)
{
if (!name)
return 0;
qdr_address_config_t *rc = DEQ_HEAD(core->addr_config);
while (rc) { // Sometimes the name can be null
if (rc->name && qd_iterator_equal(name, (const unsigned char*) rc->name))
break;
rc = DEQ_NEXT(rc);
}
return rc;
}
void qdra_config_address_delete_CT(qdr_core_t *core,
qdr_query_t *query,
qd_iterator_t *name,
qd_iterator_t *identity)
{
qdr_address_config_t *addr = 0;
if (!name && !identity) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "No name or identity provided";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
}
else {
if (identity)
addr = qdr_address_config_find_by_identity_CT(core, identity);
else if (name)
addr = qdr_address_config_find_by_name_CT(core, name);
if (addr) {
qdr_core_remove_address_config(core, addr);
query->status = QD_AMQP_NO_CONTENT;
} else
query->status = QD_AMQP_NOT_FOUND;
}
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
void qdra_config_address_create_CT(qdr_core_t *core,
qd_iterator_t *name,
qdr_query_t *query,
qd_parsed_field_t *in_body)
{
char *pattern = NULL;
while (true) {
//
// Ensure there isn't a duplicate name
//
qdr_address_config_t *addr = 0;
if (name) {
qd_iterator_view_t iter_view = qd_iterator_get_view(name);
qd_iterator_annotate_prefix(name, CONFIG_ADDRESS_PREFIX);
qd_iterator_reset_view(name, ITER_VIEW_ADDRESS_HASH);
qd_hash_retrieve(core->addr_lr_al_hash, name, (void**) &addr);
qd_iterator_reset_view(name, iter_view);
}
if (!!addr) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Name conflicts with an existing entity";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
// Ensure that the body is a map
if (!qd_parse_is_map(in_body)) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Body of request must be a map";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// Extract the fields from the request
//
qd_parsed_field_t *prefix_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PREFIX]);
qd_parsed_field_t *pattern_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PATTERN]);
qd_parsed_field_t *distrib_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_DISTRIBUTION]);
qd_parsed_field_t *waypoint_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_WAYPOINT]);
qd_parsed_field_t *in_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]);
qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]);
qd_parsed_field_t *priority_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PRIORITY]);
qd_parsed_field_t *fallback_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_FALLBACK]);
bool waypoint = waypoint_field ? qd_parse_as_bool(waypoint_field) : false;
long in_phase = in_phase_field ? qd_parse_as_long(in_phase_field) : -1;
long out_phase = out_phase_field ? qd_parse_as_long(out_phase_field) : -1;
long priority = priority_field ? qd_parse_as_long(priority_field) : -1;
bool fallback = fallback_field ? qd_parse_as_bool(fallback_field) : false;
//
// Either a prefix or a pattern field is mandatory. Prefix and pattern
// are mutually exclusive. Fail if either both or none are given.
//
const char *msg = NULL;
if (!prefix_field && !pattern_field) {
msg = "Either a 'prefix' or 'pattern' attribute must be provided";
} else if (prefix_field && pattern_field) {
msg = "Cannot specify both a 'prefix' and a 'pattern' attribute";
}
if (fallback && (waypoint || in_phase > 0 || out_phase > 0)) {
msg = "Fallback cannot be specified with waypoint or non-zero ingress and egress phases";
}
if (msg) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = msg;
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
// validate the pattern/prefix, add "/#" if prefix
pattern = qdra_config_address_validate_pattern_CT((prefix_field) ? prefix_field : pattern_field,
!!prefix_field,
&msg);
if (!pattern) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = msg;
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// Handle the address-phasing logic. If the phases are provided, use them. Otherwise
// use the waypoint flag to set the most common defaults.
//
if (in_phase == -1 && out_phase == -1) {
in_phase = 0;
out_phase = waypoint ? 1 : 0;
}
//
// Validate the phase values
//
if (in_phase < 0 || in_phase > 9 || out_phase < 0 || out_phase > 9) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Phase values must be between 0 and 9";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// Validate the priority values.
//
if (priority > QDR_MAX_PRIORITY ) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Priority value, if present, must be between 0 and QDR_MAX_PRIORITY";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
//
// The request is valid. Attempt to insert the address pattern into
// the parse tree, fail if there is already an entry for that pattern
//
addr = new_qdr_address_config_t();
if (!addr) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Out of memory";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
ZERO(addr);
//
// Insert the uninitialized address to check if it already exists in
// the parse tree. On success initialize it. This is thread safe
// since the current thread (core) is the only thread allowed to use
// the parse tree
//
qd_error_t rc = qd_parse_tree_add_pattern_str(core->addr_parse_tree, pattern, addr);
if (rc) {
free_qdr_address_config_t(addr);
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = qd_error_name(rc);
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
break;
}
addr->ref_count = 1; // Represents the reference from the addr_config list
addr->name = name ? (char*) qd_iterator_copy(name) : 0;
addr->identity = qdr_identifier(core);
addr->treatment = qdra_address_treatment_CT(distrib_field);
addr->in_phase = in_phase;
addr->out_phase = out_phase;
addr->is_prefix = !!prefix_field;
addr->pattern = pattern;
addr->priority = priority;
addr->fallback = fallback;
pattern = 0;
DEQ_INSERT_TAIL(core->addr_config, addr);
if (name) {
qd_iterator_view_t iter_view = qd_iterator_get_view(name);
qd_iterator_reset_view(name, ITER_VIEW_ADDRESS_HASH);
qd_hash_insert(core->addr_lr_al_hash, name, addr, &addr->hash_handle);
qd_iterator_reset_view(name, iter_view);
}
//
// Compose the result map for the response.
//
if (query->body) {
qd_compose_start_map(query->body);
for (int col = 0; col < QDR_CONFIG_ADDRESS_COLUMN_COUNT; col++)
qdr_config_address_insert_column_CT(addr, col, query->body, true);
qd_compose_end_map(query->body);
}
query->status = QD_AMQP_CREATED;
break;
}
//
// Enqueue the response if there is a body. If there is no body, this is a management
// operation created internally by the configuration file parser.
//
if (query->body) {
//
// If there was an error in processing the create, insert a NULL value into the body.
//
if (query->status.status / 100 > 2)
qd_compose_insert_null(query->body);
qdr_agent_enqueue_response_CT(core, query);
} else
qdr_query_free(query);
free(pattern);
}
static void qdr_manage_write_config_address_map_CT(qdr_core_t *core,
qdr_address_config_t *addr,
qd_composed_field_t *body,
const char *qdr_config_address_columns[])
{
qd_compose_start_map(body);
for(int i = 0; i < QDR_CONFIG_ADDRESS_COLUMN_COUNT; i++) {
qd_compose_insert_string(body, qdr_config_address_columns[i]);
qdr_config_address_insert_column_CT(addr, i, body, false);
}
qd_compose_end_map(body);
}
void qdra_config_address_get_CT(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *identity,
qdr_query_t *query,
const char *qdr_config_address_columns[])
{
qdr_address_config_t *addr = 0;
if (!name && !identity) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "No name or identity provided";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
}
else {
if (identity) //If there is identity, ignore the name
addr = qdr_address_config_find_by_identity_CT(core, identity);
else if (name)
addr = qdr_address_config_find_by_name_CT(core, name);
if (addr == 0) {
// Send back a 404
query->status = QD_AMQP_NOT_FOUND;
}
else {
//
// Write the columns of the address entity into the response body.
//
qdr_manage_write_config_address_map_CT(core, addr, query->body, qdr_config_address_columns);
query->status = QD_AMQP_OK;
}
}
//
// Enqueue the response.
//
qdr_agent_enqueue_response_CT(core, query);
}
// given an address pattern parsed field, validate it and convert it to a string
char *qdra_config_address_validate_pattern_CT(qd_parsed_field_t *pattern_field,
bool is_prefix,
const char **error)
{
char *buf = NULL;
char *pattern = NULL;
uint8_t tag = qd_parse_tag(pattern_field);
qd_iterator_t *p_iter = qd_parse_raw(pattern_field);
int len = qd_iterator_length(p_iter);
*error = NULL;
if ((tag != QD_AMQP_STR8_UTF8 && tag != QD_AMQP_STR32_UTF8)
|| len == 0)
{
*error = ((is_prefix)
? "Prefix must be a non-empty string type"
: "Pattern must be a non-empty string type");
goto exit;
}
buf = (char *)qd_iterator_copy(p_iter);
char *begin = buf;
// strip leading token separators
// note: see parse_tree.c for acceptable separator characters
while (*begin && strchr("./", *begin))
begin++;
// strip trailing separators
while (*begin) {
char *end = &begin[strlen(begin) - 1];
if (!strchr("./", *end))
break;
*end = 0;
}
if (*begin == 0) {
*error = ((is_prefix)
? "Prefix invalid - no tokens"
: "Pattern invalid - no tokens");
goto exit;
}
if (is_prefix) {
// convert a prefix match into a valid pattern by appending "/#"
pattern = malloc(strlen(begin) + 3);
strcpy(pattern, begin);
strcat(pattern, "/#");
} else {
pattern = strdup(begin);
}
exit:
free(buf);
return pattern;
}