blob: d43fc2f9fee26385316747ba7da883efe8c4d05e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/python_embedded.h>
#include <qpid/dispatch.h>
#include <qpid/dispatch/log.h>
#include "dispatch_private.h"
#include "router_private.h"
#include "lrp_private.h"
#include "entity.h"
#include "entity_cache.h"
#include "schema_enum.h"
qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity) {
qd_error_clear();
int phase = qd_entity_opt_long(entity, "phase", 0); QD_ERROR_RET();
qd_schema_fixedAddress_fanout_t fanout = qd_entity_get_long(entity, "fanout"); QD_ERROR_RET();
qd_schema_fixedAddress_bias_t bias = qd_entity_get_long(entity, "bias"); QD_ERROR_RET();
char *prefix = qd_entity_get_string(entity, "prefix"); QD_ERROR_RET();
if (phase < 0 || phase > 9) {
qd_error_t err = qd_error(QD_ERROR_CONFIG,
"Invalid phase %d for prefix '%s' must be between 0 and 9. Ignoring", phase, prefix);
free(prefix);
return err;
}
//
// Search for a matching prefix in the list.
//
qd_config_address_t *addr = DEQ_HEAD(router->config_addrs);
while (addr) {
if (strcmp(addr->prefix, prefix) == 0)
break;
addr = DEQ_NEXT(addr);
}
if (addr == 0) {
//
// Create a new prefix
//
addr = NEW(qd_config_address_t);
DEQ_ITEM_INIT(addr);
addr->prefix = (char*) malloc(strlen(prefix) + 1);
addr->last_phase = (char) phase + '0';
DEQ_INIT(addr->phases);
DEQ_INSERT_TAIL(router->config_addrs, addr);
if (prefix[0] == '/')
strcpy(addr->prefix, &prefix[1]);
else
strcpy(addr->prefix, prefix);
}
//
// Add the phase to the prefix
//
qd_config_phase_t *addr_phase = NEW(qd_config_phase_t);
DEQ_ITEM_INIT(addr_phase);
addr_phase->phase = (char) phase + '0';
qd_address_semantics_t semantics = 0;
switch(fanout) {
case QD_SCHEMA_FIXEDADDRESS_FANOUT_MULTIPLE: semantics |= QD_FANOUT_MULTIPLE; break;
case QD_SCHEMA_FIXEDADDRESS_FANOUT_SINGLE: semantics |= QD_FANOUT_SINGLE; break;
default:
free(prefix);
free(addr_phase);
return qd_error(QD_ERROR_CONFIG, "Invalid fanout value %d", fanout);
}
if ((semantics & QD_FANOUTMASK) == QD_FANOUT_SINGLE) {
switch(bias) {
case QD_SCHEMA_FIXEDADDRESS_BIAS_CLOSEST: semantics |= QD_BIAS_CLOSEST; break;
case QD_SCHEMA_FIXEDADDRESS_BIAS_SPREAD: semantics |= QD_BIAS_SPREAD; break;
default:
free(prefix);
free(addr_phase);
return qd_error(QD_ERROR_CONFIG, "Invalid bias value %d", fanout);
}
qd_log(router->log_source, QD_LOG_INFO,
"Configured Address: prefix=%s phase=%d fanout=%s bias=%s",
prefix, phase,
qd_schema_fixedAddress_fanout_names[fanout],
qd_schema_fixedAddress_bias_names[bias]);
} else {
semantics |= QD_BIAS_NONE;
qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s phase=%d fanout=%s",
prefix, phase, qd_schema_fixedAddress_fanout_names[fanout]);
}
addr_phase->semantics = semantics;
addr->last_phase = addr_phase->phase;
DEQ_INSERT_TAIL(addr->phases, addr_phase);
free(prefix);
return qd_error_code();
}
qd_error_t qd_router_configure_waypoint(qd_router_t *router, qd_entity_t *entity)
{
char *address = qd_entity_get_string(entity, "address"); QD_ERROR_RET();
char *connector = qd_entity_get_string(entity, "connector"); QD_ERROR_RET();
int in_phase = qd_entity_opt_long(entity, "inPhase", 0); QD_ERROR_RET();
int out_phase = qd_entity_opt_long(entity, "outPhase", 0); QD_ERROR_RET();
if (in_phase > 9 || out_phase > 9) {
qd_error_t err = qd_error(QD_ERROR_CONFIG,
"Phases for waypoint '%s' must be between 0 and 9.", address);
free(address);
free(connector);
return err;
}
qd_waypoint_t *waypoint = NEW(qd_waypoint_t);
memset(waypoint, 0, sizeof(qd_waypoint_t));
DEQ_ITEM_INIT(waypoint);
waypoint->address = address;
waypoint->in_phase = in_phase >= 0 ? (char) in_phase + '0' : '\0';
waypoint->out_phase = out_phase >= 0 ? (char) out_phase + '0' : '\0';
waypoint->connector_name = connector;
DEQ_INSERT_TAIL(router->waypoints, waypoint);
qd_log(router->log_source, QD_LOG_INFO,
"Configured Waypoint: address=%s in_phase=%d out_phase=%d connector=%s",
address, in_phase, out_phase, connector);
return qd_error_code();
}
qd_error_t qd_router_configure_lrp(qd_router_t *router, qd_entity_t *entity)
{
char *prefix = qd_entity_get_string(entity, "prefix"); QD_ERROR_RET();
char *connector = qd_entity_get_string(entity, "connector"); QD_ERROR_RET();
sys_mutex_lock(router->lock);
if (connector && connector[0]) {
//
// Look for an existing lrp_container for the same connector name
//
qd_lrp_container_t *lrpc = DEQ_HEAD(router->lrp_containers);
while (lrpc) {
if (strcmp(qd_config_connector_name(lrpc->cc), connector) == 0)
break;
lrpc = DEQ_NEXT(lrpc);
}
//
// If no lrp_container was found, create one and add it to the list
//
if (lrpc == 0) {
qd_config_connector_t *cc = qd_connection_manager_find_on_demand(router->qd, connector);
if (cc) {
lrpc = NEW(qd_lrp_container_t);
DEQ_ITEM_INIT(lrpc);
lrpc->qd = router->qd;
lrpc->cc = cc;
lrpc->timer = qd_timer(router->qd, qd_lrpc_timer_handler, lrpc);
lrpc->conn = 0;
DEQ_INIT(lrpc->lrps);
DEQ_INSERT_TAIL(router->lrp_containers, lrpc);
qd_timer_schedule(lrpc->timer, 0);
}
}
if (lrpc == 0) {
sys_mutex_unlock(router->lock);
free(prefix);
free(connector);
return qd_error(QD_ERROR_CONFIG, "Link-route-pattern configured with unknown connector: %s", connector);
}
qd_lrp_t *lrp = qd_lrp_LH(prefix, lrpc);
if (!lrp) {
sys_mutex_unlock(router->lock);
qd_error_t err = qd_error(QD_ERROR_CONFIG,
"Failed to create link-route-pattern: prefix=%s connector=%s",
prefix, connector);
free(prefix);
free(connector);
return err;
}
qd_log(router->log_source, QD_LOG_INFO,
"Configured Link-route-pattern: prefix=%s connector=%s", prefix, connector);
} else
qd_log(router->log_source, QD_LOG_INFO,
"Configured Remote Link-route-pattern: prefix=%s", prefix);
//
// Create an address iterator for the prefix address with the namespace
// prefix for link-attach routed addresses.
//
char unused;
qd_address_t *addr;
qd_field_iterator_t *iter = qd_address_iterator_string(prefix, ITER_VIEW_ADDRESS_HASH);
qd_address_iterator_override_prefix(iter, 'C');
//
// Find the address in the router's hash table. If not found, create one
// and hash it into the table.
//
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = qd_address(router_semantics_for_addr(router, iter, '\0', &unused));
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
//
// Since this is a configured address, block its deletion.
//
addr->block_deletion = true;
sys_mutex_unlock(router->lock);
qd_field_iterator_free(iter);
free(prefix);
free(connector);
return qd_error_code();
}
void qd_router_configure_free(qd_router_t *router)
{
if (!router) return;
for (qd_config_address_t *ca = DEQ_HEAD(router->config_addrs); ca; ca = DEQ_HEAD(router->config_addrs)) {
for (qd_config_phase_t *ap = DEQ_HEAD(ca->phases); ap; ap = DEQ_HEAD(ca->phases)) {
DEQ_REMOVE_HEAD(ca->phases);
free(ap);
}
free(ca->prefix);
DEQ_REMOVE_HEAD(router->config_addrs);
free(ca);
}
for (qd_waypoint_t *wp = DEQ_HEAD(router->waypoints); wp; wp = DEQ_HEAD(router->waypoints)) {
DEQ_REMOVE_HEAD(router->waypoints);
free(wp->address);
free(wp->connector_name);
free(wp);
}
for (qd_lrp_container_t *lrpc = DEQ_HEAD(router->lrp_containers); lrpc; lrpc = DEQ_HEAD(router->lrp_containers)) {
for (qd_lrp_t *lrp = DEQ_HEAD(lrpc->lrps); lrp; lrp = DEQ_HEAD(lrpc->lrps))
qd_lrp_free(lrp);
qd_timer_free(lrpc->timer);
DEQ_REMOVE_HEAD(router->lrp_containers);
free(lrpc);
}
}
qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
char in_phase, char *out_phase)
{
const qd_iterator_view_t old_view = qd_address_iterator_get_view(iter);
qd_address_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
qd_config_address_t *addr = DEQ_HEAD(router->config_addrs);
qd_config_phase_t *phase = 0;
while (addr) {
if (qd_field_iterator_prefix(iter, addr->prefix))
break;
qd_field_iterator_reset(iter);
addr = DEQ_NEXT(addr);
}
if (addr) {
*out_phase = in_phase == '\0' ? addr->last_phase : in_phase;
phase = DEQ_HEAD(addr->phases);
while (phase) {
if (phase->phase == *out_phase)
break;
phase = DEQ_NEXT(phase);
}
}
qd_address_iterator_reset_view(iter, old_view);
return phase ? phase->semantics : QD_SEMANTICS_DEFAULT;
}