blob: 10794ace7a081c38d2b3744cf0a1edc1ca474445 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/python_embedded.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
#include <stdlib.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
#include "entity_cache.h"
#include "router_private.h"
const char *QD_ROUTER_NODE_TYPE = "router.node";
const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
const char *QD_ROUTER_LINK_TYPE = "router.link";
const char *CORE_AGENT_ADDRESS = "$management";
static char *router_role = "inter-router";
static char *on_demand_role = "on-demand";
static char *container_role = "route-container";
static char *direct_prefix;
static char *node_id;
/**
* Determine the role of a connection
*/
static void qd_router_connection_get_config(const qd_connection_t *conn,
qdr_connection_role_t *role,
int *cost,
const char **name,
bool *strip_annotations_in,
bool *strip_annotations_out,
int *link_capacity)
{
if (conn) {
const qd_server_config_t *cf = qd_connection_config(conn);
*strip_annotations_in = cf ? cf->strip_inbound_annotations : false;
*strip_annotations_out = cf ? cf->strip_outbound_annotations : false;
*link_capacity = cf ? cf->link_capacity : 1;
if (cf && strcmp(cf->role, router_role) == 0) {
*strip_annotations_in = false;
*strip_annotations_out = false;
*role = QDR_ROLE_INTER_ROUTER;
*cost = cf->inter_router_cost;
} else if (cf && (strcmp(cf->role, container_role) == 0 ||
strcmp(cf->role, on_demand_role) == 0)) // backward compat
*role = QDR_ROLE_ROUTE_CONTAINER;
else
*role = QDR_ROLE_NORMAL;
*name = cf ? cf->name : 0;
if (*name) {
if (strncmp("listener/", *name, 9) == 0 ||
strncmp("connector/", *name, 10) == 0)
*name = 0;
}
}
}
static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn, void *context)
{
qdr_connection_t *qconn = (qdr_connection_t*) qd_connection_get_context(conn);
if (qconn)
return qdr_connection_process(qconn);
return 0;
}
static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
qd_parsed_field_t *in_ma,
qd_message_t *msg,
qd_bitmask_t **link_exclusions,
bool strip_inbound_annotations)
{
qd_field_iterator_t *ingress_iter = 0;
qd_parsed_field_t *trace = 0;
qd_parsed_field_t *ingress = 0;
qd_parsed_field_t *to = 0;
qd_parsed_field_t *phase = 0;
*link_exclusions = 0;
if (in_ma && !strip_inbound_annotations) {
uint32_t count = qd_parse_sub_count(in_ma);
bool done = false;
for (uint32_t idx = 0; idx < count && !done; idx++) {
qd_parsed_field_t *sub = qd_parse_sub_key(in_ma, idx);
if (!sub)
continue;
qd_field_iterator_t *iter = qd_parse_raw(sub);
if (!iter)
continue;
if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_TRACE)) {
trace = qd_parse_sub_value(in_ma, idx);
} else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_INGRESS)) {
ingress = qd_parse_sub_value(in_ma, idx);
} else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_TO)) {
to = qd_parse_sub_value(in_ma, idx);
} else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) {
phase = qd_parse_sub_value(in_ma, idx);
}
done = trace && ingress && to && phase;
}
}
//
// QD_MA_TRACE:
// If there is a trace field, append this router's ID to the trace.
// If the router ID is already in the trace the msg has looped.
//
qd_composed_field_t *trace_field = qd_compose_subfield(0);
qd_compose_start_list(trace_field);
if (trace) {
if (qd_parse_is_list(trace)) {
//
// Create a link-exclusion map for the items in the trace. This map will
// contain a one-bit for each link that leads to a neighbor router that
// the message has already passed through.
//
*link_exclusions = qd_tracemask_create(router->tracemask, trace);
//
// Append this router's ID to the trace.
//
uint32_t idx = 0;
qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
while (trace_item) {
qd_field_iterator_t *iter = qd_parse_raw(trace_item);
qd_address_iterator_reset_view(iter, ITER_VIEW_ALL);
qd_compose_insert_string_iterator(trace_field, iter);
idx++;
trace_item = qd_parse_sub_value(trace, idx);
}
}
}
qd_compose_insert_string(trace_field, node_id);
qd_compose_end_list(trace_field);
qd_message_set_trace_annotation(msg, trace_field);
//
// QD_MA_TO:
// Preserve the existing value.
//
if (to) {
qd_composed_field_t *to_field = qd_compose_subfield(0);
qd_compose_insert_string_iterator(to_field, qd_parse_raw(to));
qd_message_set_to_override_annotation(msg, to_field);
}
//
// QD_MA_PHASE:
// Preserve the existing value.
//
if (phase) {
int phase_val = qd_parse_as_int(phase);
qd_message_set_phase_annotation(msg, phase_val);
}
//
// QD_MA_INGRESS:
// If there is no ingress field, annotate the ingress as
// this router else keep the original field.
//
qd_composed_field_t *ingress_field = qd_compose_subfield(0);
if (ingress && qd_parse_is_scalar(ingress)) {
ingress_iter = qd_parse_raw(ingress);
qd_compose_insert_string_iterator(ingress_field, ingress_iter);
} else
qd_compose_insert_string(ingress_field, node_id);
qd_message_set_ingress_annotation(msg, ingress_field);
//
// Return the iterator to the ingress field _if_ it was present.
// If we added the ingress, return NULL.
//
return ingress_iter;
}
/**
* Inbound Delivery Handler
*/
static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
qdr_delivery_t *delivery = 0;
qd_message_t *msg;
//
// Receive the message into a local representation. If the returned message
// pointer is NULL, we have not yet received a complete message.
//
// Note: In the link-routing case, consider cutting the message through. There's
// no reason to wait for the whole message to be received before starting to
// send it.
//
msg = qd_message_receive(pnd);
if (!msg)
return;
//
// Consume the delivery.
//
pn_link_advance(pn_link);
//
// If there's no router link, free the message and finish. It's likely that the link
// is closing.
//
if (!rlink) {
qd_message_free(msg);
return;
}
//
// Handle the link-routed case
//
if (qdr_link_is_routed(rlink)) {
pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
delivery = qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd), (uint8_t*) dtag.start, dtag.size);
if (delivery) {
if (pn_delivery_settled(pnd))
pn_delivery_settle(pnd);
else {
pn_delivery_set_context(pnd, delivery);
qdr_delivery_set_context(delivery, pnd);
qdr_delivery_incref(delivery);
}
}
return;
}
//
// Determine if the incoming link is anonymous. If the link is addressed,
// there are some optimizations we can take advantage of.
//
bool anonymous_link = qdr_link_is_anonymous(rlink);
//
// Validate the content of the delivery as an AMQP message. This is done partially, only
// to validate that we can find the fields we need to route the message.
//
// If the link is anonymous, we must validate through the message properties to find the
// 'to' field. If the link is not anonymous, we don't need the 'to' field as we will be
// using the address from the link target.
//
qd_message_depth_t validation_depth = anonymous_link ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
bool valid_message = qd_message_check(msg, validation_depth);
if (valid_message) {
qd_parsed_field_t *in_ma = qd_message_message_annotations(msg);
qd_bitmask_t *link_exclusions;
bool strip = qdr_link_strip_annotations_in(rlink);
qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &link_exclusions, strip);
if (anonymous_link) {
qd_field_iterator_t *addr_iter = 0;
int phase = 0;
//
// If the message has delivery annotations, get the to-override field from the annotations.
//
if (in_ma) {
qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO);
if (ma_to) {
addr_iter = qd_field_iterator_dup(qd_parse_raw(ma_to));
phase = qd_message_get_phase_annotation(msg);
}
}
//
// Still no destination address? Use the TO field from the message properties.
//
if (!addr_iter)
addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
if (addr_iter) {
qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
if (phase > 0)
qd_address_iterator_set_phase(addr_iter, '0' + (char) phase);
delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
link_exclusions);
}
} else {
const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link));
if (!term_addr)
term_addr = pn_terminus_get_address(qd_link_source(link));
if (term_addr) {
qd_composed_field_t *to_override = qd_compose_subfield(0);
qd_compose_insert_string(to_override, term_addr);
qd_message_set_to_override_annotation(msg, to_override);
int phase = qdr_link_phase(rlink);
if (phase != 0)
qd_message_set_phase_annotation(msg, phase);
}
delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions);
}
if (delivery) {
if (pn_delivery_settled(pnd))
pn_delivery_settle(pnd);
else {
pn_delivery_set_context(pnd, delivery);
qdr_delivery_set_context(delivery, pnd);
qdr_delivery_incref(delivery);
}
} else {
//
// The message is now and will always be unroutable because there is no address.
//
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
}
//
// Rules for delivering messages:
//
// For addressed (non-anonymous) links:
// to-override must be set (done in the core?)
// uses qdr_link_deliver to hand over to the core
//
// For anonymous links:
// If there's a to-override in the annotations, use that address
// Or, use the 'to' field in the message properties
//
} else {
//
// Message is invalid. Reject the message and don't involve the router core.
//
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
}
}
/**
* Delivery Disposition Handler
*/
static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
{
qd_router_t *router = (qd_router_t*) context;
qdr_delivery_t *delivery = (qdr_delivery_t*) pn_delivery_get_context(pnd);
bool give_reference = false;
//
// It's important to not do any processing without a qdr_delivery. When pre-settled
// multi-frame deliveries arrive, it's possible for the settlement to register before
// the whole message arrives. Such premature settlement indications must be ignored.
//
if (!delivery)
return;
//
// If the delivery is settled, remove the linkage between the PN and QDR deliveries.
//
if (pn_delivery_settled(pnd)) {
pn_delivery_set_context(pnd, 0);
qdr_delivery_set_context(delivery, 0);
//
// Don't decref the delivery here. Rather, we will _give_ the reference to the core.
//
give_reference = true;
}
//
// Update the disposition of the delivery
//
qdr_delivery_update_disposition(router->router_core, delivery,
pn_delivery_remote_state(pnd), pn_delivery_settled(pnd),
give_reference);
//
// If settled, close out the delivery
//
if (pn_delivery_settled(pnd))
pn_delivery_settle(pnd);
}
/**
* New Incoming Link Handler
*/
static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
pn_link_name(qd_link_pn(link)));
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
return 0;
}
/**
* New Outgoing Link Handler
*/
static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
pn_link_name(qd_link_pn(link)));
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
return 0;
}
/**
* Handler for remote opening of links that we initiated.
*/
static int AMQP_link_attach_handler(void* context, qd_link_t *link)
{
qdr_link_t *qlink = (qdr_link_t*) qd_link_get_context(link);
qdr_link_second_attach(qlink,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)));
return 0;
}
/**
* Handler for flow events on links
*/
static int AMQP_link_flow_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_link_t *pnlink = qd_link_pn(link);
if (!rlink)
return 0;
qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
return 0;
}
/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
{
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
if (rlink) {
qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach(rlink, dt, error);
//
// This is the last event for this link that we will send into the core. Remove the
// core linkage. Note that the core->qd linkage is still in place.
//
qd_link_set_context(link, 0);
//
// If the link was lost (due to connection drop), or the linkage from the core
// object is already gone, finish disconnecting the linkage and free the qd_link
// because the core will silently free its own resources.
//
if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
qdr_link_set_context(rlink, 0);
qd_link_free(link);
}
}
return 0;
}
static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool inbound)
{
qdr_connection_role_t role = 0;
int cost = 1;
int remote_cost = 1;
bool strip_annotations_in = false;
bool strip_annotations_out = false;
int link_capacity = 1;
const char *name = 0;
uint64_t connection_id = qd_connection_connection_id(conn);
pn_connection_t *pn_conn = qd_connection_pn(conn);
qd_router_connection_get_config(conn, &role, &cost, &name,
&strip_annotations_in, &strip_annotations_out, &link_capacity);
if (role == QDR_ROLE_INTER_ROUTER) {
//
// Check the remote properties for an inter-router cost value.
//
pn_data_t *props = pn_conn ? pn_connection_remote_properties(pn_conn) : 0;
if (props) {
pn_data_rewind(props);
pn_data_next(props);
if (props && pn_data_type(props) == PN_MAP) {
pn_data_enter(props);
while (pn_data_next(props)) {
if (pn_data_type(props) == PN_SYMBOL) {
pn_bytes_t sym = pn_data_get_symbol(props);
if (sym.size == strlen(QD_CONNECTION_PROPERTY_COST_KEY) &&
strcmp(sym.start, QD_CONNECTION_PROPERTY_COST_KEY) == 0) {
pn_data_next(props);
if (pn_data_type(props) == PN_INT)
remote_cost = pn_data_get_int(props);
break;
}
}
}
}
}
//
// Use the larger of the local and remote costs for this connection
//
if (remote_cost > cost)
cost = remote_cost;
}
qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, inbound, role, cost, connection_id, name,
pn_connection_remote_container(pn_conn),
strip_annotations_in, strip_annotations_out, link_capacity);
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
}
static int AMQP_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
AMQP_opened_handler(router, conn, true);
return 0;
}
static int AMQP_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
AMQP_opened_handler(router, conn, false);
return 0;
}
static int AMQP_closed_handler(void *type_context, qd_connection_t *conn, void *context)
{
qdr_connection_t *qdrc = (qdr_connection_t*) qd_connection_get_context(conn);
if (qdrc) {
qdr_connection_closed(qdrc);
qd_connection_set_context(conn, 0);
}
return 0;
}
static void qd_router_timer_handler(void *context)
{
qd_router_t *router = (qd_router_t*) context;
//
// Periodic processing.
//
qd_pyrouter_tick(router);
qd_timer_schedule(router->timer, 1000);
}
static qd_node_type_t router_node = {"router", 0, 0,
AMQP_rx_handler,
AMQP_disposition_handler,
AMQP_incoming_link_handler,
AMQP_outgoing_link_handler,
AMQP_writable_conn_handler,
AMQP_link_detach_handler,
AMQP_link_attach_handler,
AMQP_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
AMQP_inbound_opened_handler,
AMQP_outbound_opened_handler,
AMQP_closed_handler};
static int type_registered = 0;
qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id)
{
if (!type_registered) {
type_registered = 1;
qd_container_register_node_type(qd, &router_node);
}
size_t dplen = 9 + strlen(area) + strlen(id);
node_id = (char*) malloc(dplen);
strcpy(node_id, area);
strcat(node_id, "/");
strcat(node_id, id);
qd_router_t *router = NEW(qd_router_t);
ZERO(router);
router_node.type_context = router;
qd->router = router;
router->qd = qd;
router->router_core = 0;
router->log_source = qd_log_source("ROUTER");
router->router_mode = mode;
router->router_area = area;
router->router_id = id;
router->node = qd_container_set_default_node_type(qd, &router_node, (void*) router, QD_DIST_BOTH);
router->lock = sys_mutex();
router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router);
//
// Inform the field iterator module of this router's id and area. The field iterator
// uses this to offload some of the address-processing load from the router.
//
qd_field_iterator_set_address(area, id);
//
// Seed the random number generator
//
unsigned int seed = (unsigned int) time(0);
srandom(seed);
switch (router->router_mode) {
case QD_ROUTER_MODE_STANDALONE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Standalone mode"); break;
case QD_ROUTER_MODE_INTERIOR: qd_log(router->log_source, QD_LOG_INFO, "Router started in Interior mode, area=%s id=%s", area, id); break;
case QD_ROUTER_MODE_EDGE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Edge mode"); break;
case QD_ROUTER_MODE_ENDPOINT: qd_log(router->log_source, QD_LOG_INFO, "Router started in Endpoint mode"); break;
}
return router;
}
static void CORE_connection_activate(void *context, qdr_connection_t *conn)
{
//
// IMPORTANT: This is the only core callback that is invoked on the core
// thread itself. It is imperative that this function do nothing
// apart from setting the activation in the server for the connection.
//
qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn));
}
static void CORE_link_first_attach(void *context,
qdr_connection_t *conn,
qdr_link_t *link,
qdr_terminus_t *source,
qdr_terminus_t *target)
{
qd_router_t *router = (qd_router_t*) context;
qd_connection_t *qconn = (qd_connection_t*) qdr_connection_get_context(conn);
//
// Create a new link to be attached
//
qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link));
//
// Copy the source and target termini to the link
//
qdr_terminus_copy(source, qd_link_source(qlink));
qdr_terminus_copy(target, qd_link_target(qlink));
//
// Associate the qd_link and the qdr_link to each other
//
qdr_link_set_context(link, qlink);
qd_link_set_context(qlink, link);
//
// Open (attach) the link
//
pn_link_open(qd_link_pn(qlink));
}
static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
qdr_terminus_copy(source, qd_link_source(qlink));
qdr_terminus_copy(target, qd_link_target(qlink));
//
// Open (attach) the link
//
pn_link_open(qd_link_pn(qlink));
}
static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
return;
if (error) {
pn_condition_t *cond = pn_link_condition(pn_link);
qdr_error_copy(error, cond);
}
qd_link_close(qlink);
//
// If this is the second detach, free the qd_link
//
if (!first)
qd_link_free(qlink);
}
static void CORE_link_flow(void *context, qdr_link_t *link, int credit)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
pn_link_flow(plink, credit);
}
static void CORE_link_offer(void *context, qdr_link_t *link, int delivery_count)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
if (plink)
pn_link_offered(plink, delivery_count);
}
static void CORE_link_drained(void *context, qdr_link_t *link)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
if (plink)
pn_link_drained(plink);
}
static void CORE_link_push(void *context, qdr_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
int link_credit = pn_link_credit(plink);
qdr_link_process_deliveries(router->router_core, link, link_credit);
}
static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
const char *tag;
int tag_length;
qdr_delivery_tag(dlv, &tag, &tag_length);
pn_delivery(plink, pn_dtag(tag, tag_length));
pn_delivery_t *pdlv = pn_link_current(plink);
//
// If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
//
bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED;
if (!settled && !remote_snd_settled) {
pn_delivery_set_context(pdlv, dlv);
qdr_delivery_set_context(dlv, pdlv);
qdr_delivery_incref(dlv);
}
qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link));
if (!settled && remote_snd_settled)
// Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, false);
if (settled || remote_snd_settled)
pn_delivery_settle(pdlv);
pn_link_advance(plink);
}
static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
{
pn_delivery_t *pnd = (pn_delivery_t*) qdr_delivery_get_context(dlv);
if (!pnd)
return;
//
// If the disposition has changed, update the proton delivery.
//
if (disp != pn_delivery_remote_state(pnd))
pn_delivery_update(pnd, disp);
//
// If the delivery is settled, remove the linkage and settle the proton delivery.
//
if (settled) {
qdr_delivery_set_context(dlv, 0);
pn_delivery_set_context(pnd, 0);
pn_delivery_settle(pnd);
qdr_delivery_decref(dlv);
}
}
void qd_router_setup_late(qd_dispatch_t *qd)
{
qd->router->tracemask = qd_tracemask();
qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id);
qdr_connection_handlers(qd->router->router_core, (void*) qd->router,
CORE_connection_activate,
CORE_link_first_attach,
CORE_link_second_attach,
CORE_link_detach,
CORE_link_flow,
CORE_link_offer,
CORE_link_drained,
CORE_link_push,
CORE_link_deliver,
CORE_delivery_update);
qd_router_python_setup(qd->router);
qd_timer_schedule(qd->router->timer, 1000);
}
void qd_router_free(qd_router_t *router)
{
if (!router) return;
qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
qdr_core_free(router->router_core);
qd_tracemask_free(router->tracemask);
qd_timer_free(router->timer);
sys_mutex_free(router->lock);
qd_router_configure_free(router);
qd_router_python_free(router);
free(router);
free(node_id);
free(direct_prefix);
}
const char *qd_router_id(const qd_dispatch_t *qd)
{
return node_id;
}
qdr_core_t *qd_router_core(qd_dispatch_t *qd)
{
return qd->router->router_core;
}