blob: 44f13cf48523402f3148aafdb071f0ac2f8e2f13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "delivery.h"
#include "dispatch_private.h"
#include "policy.h"
#include "router_private.h"
#include "qpid/dispatch.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "qpid/dispatch/proton_utils.h"
#include <proton/sasl.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
const char *QD_ROUTER_NODE_TYPE = "router.node";
const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
const char *QD_ROUTER_LINK_TYPE = "router.link";
static qdr_protocol_adaptor_t *amqp_direct_adaptor = 0;
static char *router_role = "inter-router";
static char *container_role = "route-container";
static char *edge_role = "edge";
static char *direct_prefix;
static char *node_id;
static void deferred_AMQP_rx_handler(void *context, bool discard);
static bool parse_failover_property_list(qd_router_t *router, qd_connection_t *conn, pn_data_t *props);
const char *QD_AMQP_COND_OVERSIZE_DESCRIPTION = "Message size exceeded";
//==============================================================================
// Functions to handle the linkage between proton deliveries and qdr deliveries
//==============================================================================
//
// qd_link.list_of_references(pn_delivery_t)
// pn_delivery.context => reference-entry
// qdr_delivery.context => pn_delivery
//
static void qdr_node_connect_deliveries(qd_link_t *link, qdr_delivery_t *qdlv, pn_delivery_t *pdlv)
{
qd_link_ref_t *ref = new_qd_link_ref_t();
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
ZERO(ref);
ref->ref = qdlv;
DEQ_INSERT_TAIL(*list, ref);
pn_delivery_set_context(pdlv, ref);
qdr_delivery_set_context(qdlv, pdlv);
qdr_delivery_incref(qdlv, "referenced by a pn_delivery");
}
static void qdr_node_disconnect_deliveries(qdr_core_t *core, qd_link_t *link, qdr_delivery_t *qdlv, pn_delivery_t *pdlv)
{
if (!link)
return;
qd_link_ref_t *ref = (qd_link_ref_t*) pn_delivery_get_context(pdlv);
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
if (ref) {
DEQ_REMOVE(*list, ref);
free_qd_link_ref_t(ref);
pn_delivery_set_context(pdlv, 0);
qdr_delivery_set_context(qdlv, 0);
qdr_delivery_decref(core, qdlv, "removed reference from pn_delivery");
}
}
static pn_delivery_t *qdr_node_delivery_pn_from_qdr(qdr_delivery_t *dlv)
{
return dlv ? (pn_delivery_t*) qdr_delivery_get_context(dlv) : 0;
}
static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv)
{
qd_link_ref_t *ref = (qd_link_ref_t*) pn_delivery_get_context(dlv);
return ref ? (qdr_delivery_t*) ref->ref : 0;
}
void qd_link_abandoned_deliveries_handler(void *context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
qd_link_ref_t *ref = DEQ_HEAD(*list);
while (ref) {
DEQ_REMOVE_HEAD(*list);
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
ref->ref = 0;
qdr_delivery_set_context(dlv, 0);
qdr_delivery_decref(router->router_core, dlv, "qd_link_abandoned_deliveries_handler");
free_qd_link_ref_t(ref);
ref = DEQ_HEAD(*list);
}
}
// read the delivery-state set by the remote endpoint
//
static qd_delivery_state_t *qd_delivery_read_remote_state(pn_delivery_t *pnd)
{
qd_delivery_state_t *dstate = 0;
uint64_t outcome = pn_delivery_remote_state(pnd);
switch (outcome) {
case 0:
// not set - no delivery-state
break;
case PN_RECEIVED: {
pn_disposition_t *disp = pn_delivery_remote(pnd);
dstate = qd_delivery_state();
dstate->section_number = pn_disposition_get_section_number(disp);
dstate->section_offset = pn_disposition_get_section_offset(disp);
break;
}
case PN_ACCEPTED:
case PN_RELEASED:
// no associated state (that we care about)
break;
case PN_REJECTED: {
// See AMQP 1.0 section 3.4.4 Rejected
pn_condition_t *cond = pn_disposition_condition(pn_delivery_remote(pnd));
dstate = qd_delivery_state();
dstate->error = qdr_error_from_pn(cond);
break;
}
case PN_MODIFIED: {
// See AMQP 1.0 section 3.4.5 Modified
pn_disposition_t *disp = pn_delivery_remote(pnd);
bool failed = pn_disposition_is_failed(disp);
bool undeliverable = pn_disposition_is_undeliverable(disp);
pn_data_t *anno = pn_disposition_annotations(disp);
// avoid expensive alloc if only default values found
const bool need_anno = (anno && pn_data_size(anno) > 0);
if (failed || undeliverable || need_anno) {
dstate = qd_delivery_state();
dstate->delivery_failed = failed;
dstate->undeliverable_here = undeliverable;
if (need_anno) {
dstate->annotations = pn_data(0);
pn_data_copy(dstate->annotations, anno);
}
}
break;
}
default: {
// Check for custom state data. Custom outcomes and AMQP 1.0
// Transaction defined outcomes will all be numerically >
// PN_MODIFIED. See Part 4: Transactions and section 1.5 Descriptor
// Values in the AMQP 1.0 spec.
if (outcome > PN_MODIFIED) {
pn_data_t *data = pn_disposition_data(pn_delivery_remote(pnd));
if (data && pn_data_size(data) > 0) {
dstate = qd_delivery_state();
dstate->extension = pn_data(0);
pn_data_copy(dstate->extension, data);
}
}
break;
}
} // end switch
return dstate;
}
// Set the delivery-state to be sent to the remote endpoint.
//
static void qd_delivery_write_local_state(pn_delivery_t *pnd, uint64_t outcome, const qd_delivery_state_t *dstate)
{
if (pnd && dstate) {
switch (outcome) {
case PN_RECEIVED: {
pn_disposition_t *ldisp = pn_delivery_local(pnd);
pn_disposition_set_section_number(ldisp, dstate->section_number);
pn_disposition_set_section_offset(ldisp, dstate->section_offset);
break;
}
case PN_ACCEPTED:
case PN_RELEASED:
// no associated state (that we care about)
break;
case PN_REJECTED:
if (dstate->error) {
pn_condition_t *condition = pn_disposition_condition(pn_delivery_local(pnd));
char *name = qdr_error_name(dstate->error);
char *description = qdr_error_description(dstate->error);
pn_condition_set_name(condition, (const char*) name);
pn_condition_set_description(condition, (const char*) description);
if (qdr_error_info(dstate->error)) {
pn_data_copy(pn_condition_info(condition), qdr_error_info(dstate->error));
}
//proton makes copies of name and description, so it is ok to free them here.
free(name);
free(description);
}
break;
case PN_MODIFIED: {
pn_disposition_t *ldisp = pn_delivery_local(pnd);
pn_disposition_set_failed(ldisp, dstate->delivery_failed);
pn_disposition_set_undeliverable(ldisp, dstate->undeliverable_here);
if (dstate->annotations)
pn_data_copy(pn_disposition_annotations(ldisp), dstate->annotations);
break;
}
default:
if (dstate->extension)
pn_data_copy(pn_disposition_data(pn_delivery_local(pnd)), dstate->extension);
break;
}
}
}
/**
* Determine the role of a connection
*/
static void qd_router_connection_get_config(const qd_connection_t *conn,
qdr_connection_role_t *role,
int *cost,
const char **name,
bool *multi_tenant,
bool *strip_annotations_in,
bool *strip_annotations_out,
int *link_capacity)
{
if (conn) {
const qd_server_config_t *cf = qd_connection_config(conn);
*strip_annotations_in = cf ? cf->strip_inbound_annotations : false;
*strip_annotations_out = cf ? cf->strip_outbound_annotations : false;
*link_capacity = cf ? cf->link_capacity : 1;
if (cf && (strcmp(cf->role, router_role) == 0)) {
*strip_annotations_in = false;
*strip_annotations_out = false;
*role = QDR_ROLE_INTER_ROUTER;
*cost = cf->inter_router_cost;
} else if (cf && (strcmp(cf->role, edge_role) == 0)) {
*strip_annotations_in = false;
*strip_annotations_out = false;
*role = QDR_ROLE_EDGE_CONNECTION;
*cost = cf->inter_router_cost;
} else if (cf && (strcmp(cf->role, container_role) == 0)) // backward compat
*role = QDR_ROLE_ROUTE_CONTAINER;
else
*role = QDR_ROLE_NORMAL;
*name = cf ? cf->name : 0;
if (*name) {
if (strncmp("listener/", *name, 9) == 0 ||
strncmp("connector/", *name, 10) == 0)
*name = 0;
}
*multi_tenant = cf ? cf->multi_tenant : false;
}
}
static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn, void *context)
{
qdr_connection_t *qconn = (qdr_connection_t*) qd_connection_get_context(conn);
if (qconn)
return qdr_connection_process(qconn);
return 0;
}
static qd_iterator_t *router_annotate_message(qd_router_t *router,
qd_message_t *msg,
qd_bitmask_t **link_exclusions,
uint32_t *distance,
int *ingress_index)
{
qd_iterator_t *ingress_iter = 0;
bool edge_mode = router->router_mode == QD_ROUTER_MODE_EDGE;
*link_exclusions = 0;
*distance = 0;
qd_parsed_field_t *trace = qd_message_get_trace(msg);
qd_parsed_field_t *ingress = qd_message_get_ingress(msg);
qd_parsed_field_t *to = qd_message_get_to_override(msg);
qd_parsed_field_t *phase = qd_message_get_phase(msg);
//
// QD_MA_TRACE:
// If there is a trace field, append this router's ID to the trace.
// If the router ID is already in the trace the msg has looped.
// This code does not check for the loop condition.
//
// Edge routers do not add their IDs to the trace list.
//
if (!edge_mode) {
qd_composed_field_t *trace_field = qd_compose_subfield(0);
qd_compose_start_list(trace_field);
if (trace) {
if (qd_parse_is_list(trace)) {
//
// Return the distance in hops that this delivery has traveled.
//
*distance = qd_parse_sub_count(trace);
//
// Create a link-exclusion map for the items in the trace. This map will
// contain a one-bit for each link that leads to a neighbor router that
// the message has already passed through.
//
*link_exclusions = qd_tracemask_create(router->tracemask, trace, ingress_index);
//
// Append this router's ID to the trace.
//
uint32_t idx = 0;
qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
while (trace_item) {
qd_iterator_t *iter = qd_parse_raw(trace_item);
qd_iterator_reset_view(iter, ITER_VIEW_ALL);
qd_compose_insert_string_iterator(trace_field, iter);
idx++;
trace_item = qd_parse_sub_value(trace, idx);
}
}
}
qd_compose_insert_string(trace_field, node_id);
qd_compose_end_list(trace_field);
qd_message_set_trace_annotation(msg, trace_field);
}
//
// QD_MA_TO:
// Preserve the existing value.
//
if (to) {
qd_composed_field_t *to_field = qd_compose_subfield(0);
qd_compose_insert_string_iterator(to_field, qd_parse_raw(to));
qd_message_set_to_override_annotation(msg, to_field);
}
//
// QD_MA_PHASE:
// Preserve the existing value.
//
if (phase) {
qd_message_set_phase_annotation(msg, qd_message_get_phase_val(msg));
}
//
// QD_MA_INGRESS:
// If there is no ingress field, annotate the ingress as
// this router else keep the original field.
//
// Edge routers do not annotate the ingress field.
//
if (!edge_mode) {
qd_composed_field_t *ingress_field = qd_compose_subfield(0);
if (ingress && qd_parse_is_scalar(ingress)) {
ingress_iter = qd_parse_raw(ingress);
qd_compose_insert_string_iterator(ingress_field, ingress_iter);
} else
qd_compose_insert_string(ingress_field, node_id);
qd_message_set_ingress_annotation(msg, ingress_field);
}
//
// Return the iterator to the ingress field _if_ it was present.
// If we added the ingress, return NULL.
//
return ingress_iter;
}
static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_message_t *msg)
{
if (!conn || !pn_link || !msg) return;
const qd_server_config_t *cf = qd_connection_config(conn);
if (!cf) return;
char buf[qd_message_repr_len()];
const char *msg_str = qd_message_oversize(msg) ? "oversize message" :
qd_message_aborted(msg) ? "aborted message" :
qd_message_repr(msg, buf, sizeof(buf), cf->log_bits);
if (msg_str) {
const char *src = pn_terminus_get_address(pn_link_source(pn_link));
const char *tgt = pn_terminus_get_address(pn_link_target(pn_link));
qd_log(qd_message_log_source(), QD_LOG_TRACE,
"[C%"PRIu64"]: %s %s on link '%s' (%s -> %s)",
qd_connection_connection_id(conn),
pn_link_is_sender(pn_link) ? "Sent" : "Received",
msg_str,
pn_link_name(pn_link),
src ? src : "",
tgt ? tgt : "");
}
}
/**
* Inbound Delivery Handler
*
* @return true if we've advanced to the next delivery on this link and it is
* ready for rx processing. This will cause the container to immediately
* re-call this function with the next delivery.
*/
static bool AMQP_rx_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
assert(pn_link);
if (!pn_link)
return false;
// ensure the current delivery is readable
pn_delivery_t *pnd = pn_link_current(pn_link);
if (!pnd)
return false;
qd_connection_t *conn = qd_link_connection(link);
// DISPATCH-1628 DISPATCH-975 exit if router already closed this connection
if (conn->closed_locally) {
return false;
}
qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
bool next_delivery = false;
//
// Receive the message into a local representation.
//
qd_message_t *msg = qd_message_receive(pnd);
bool receive_complete = qd_message_receive_complete(msg);
//
// The very first time AMQP_rx_handler is called on a PN_DELIVERY event, it calls qd_message_receive(). When qd_message_receive() returns, we check here if
// there are any data in the content buffers. If there is no content in the buffers, there is no reason to route the delivery. We will wait for some data
// in the buffers before we start to route the delivery.
// Notice that the if statement checks for the existence of a delivery (qdr_delivery_t). Existence of a delivery means that the delivery has been routed when
// there was data in the buffers (When a delivery has been routed successfully, the delivery (qdr_delivery_t) will be non null)
//
// The following if statement will deal with the following cases:-
// 1. We receive one empty transfer frame with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false
// In this case, there is no data at all in the message content buffers, we will reject the message when receive_complete=true. We will never route this
// delivery, so core thread will not be involved
// 2. We receive 2 or more empty transfer frames with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false
// This case is similar to #1. We have no content in any of the buffers, we will reject this message after receive_complete=true. We will never route this
// delivery, so core thread will not be involved
// 3. Exactly one empty transfer frame with more=false and abort=false
// In this case, again there is still no content in any of the buffers, we will reject this message. Again, we will not route this message, so the core thread is not involved.
//
if (!delivery && !qd_message_has_data_in_content_or_pending_buffers(msg)) {
if (receive_complete) {
// There is no qdr_delivery_t (delivery) yet which means this message has not been routed yet (the first run of this function is not complete yet) and
// the message is fully received (receive_complete=true) but there is no content in the message buffers.
// This is only possible if there were one or more empty transfer frames.
// Since there is nothing in the message, we will reject it (AMQP message must have a non empty message body)
pn_link_flow(pn_link, 1);
if (pn_delivery_aborted(pnd))
qd_message_set_discard(msg, true);
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
// qd_message_free will free all the associated content buffers and also the content->pending buffer
qd_message_free(msg);
qd_log(router->log_source, QD_LOG_TRACE, "Message rejected due to empty message");
}
return false;
}
if (!qd_message_oversize(msg)) {
// message not rejected as oversize
if (receive_complete) {
log_link_message(conn, pn_link, msg);
//
// The entire message has been received and we are ready to consume the delivery by calling pn_link_advance().
//
pn_link_advance(pn_link);
next_delivery = pn_link_current(pn_link) != 0;
}
if (qd_message_is_discard(msg)) {
//
// Message has been marked for discard, no further processing necessary
//
if (receive_complete) {
// If this discarded delivery has already been settled by proton,
// set the presettled flag on the delivery to true if it is not already true.
// Since the entire message has already been received, we directly call the
// function to set the pre-settled flag since we cannot go thru the core-thread
// to do this since the delivery has been discarded.
// Discarded streaming deliveries are not put thru the core thread via the continue action.
if (pn_delivery_settled(pnd))
qdr_delivery_set_presettled(delivery);
uint64_t local_disp = qdr_delivery_disposition(delivery);
//
// Call pn_delivery_update only if the local disposition is different than the pn_delivery's local disposition.
// This will make sure we call pn_delivery_update only when necessary.
//
if (local_disp != 0 && local_disp != pn_delivery_local_state(pnd)) {
//
// DISPATCH-1626 - This enables pn_delivery_update() and pn_delivery_settle() to be called back to back in the same function call.
// CORE_delivery_update() will handle most of the other cases where we need to call pn_delivery_update() followed by pn_delivery_settle().
//
pn_delivery_update(pnd, local_disp);
}
// note: expected that the code that set discard has handled
// setting disposition and updating flow!
pn_delivery_settle(pnd);
if (delivery) {
// if delivery already exists then the core thread discarded this
// delivery, it will eventually free the qdr_delivery_t and its
// associated message - do not free it here.
qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
} else {
qd_message_free(msg);
}
}
return next_delivery;
}
} else {
// message is oversize
if (receive_complete) {
// set condition, reject, and settle the incoming delivery
pn_condition_t *lcond = pn_disposition_condition(pn_delivery_local(pnd));
(void) pn_condition_set_name( lcond, QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED);
(void) pn_condition_set_description(lcond, QD_AMQP_COND_OVERSIZE_DESCRIPTION);
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
// close the link
pn_link_close(pn_link);
// set condition and close the connection
pn_connection_t * pn_conn = qd_connection_pn(conn);
pn_condition_t * cond = pn_connection_condition(pn_conn);
(void) pn_condition_set_name( cond, QD_AMQP_COND_CONNECTION_FORCED);
(void) pn_condition_set_description(cond, QD_AMQP_COND_OVERSIZE_DESCRIPTION);
pn_connection_close(pn_conn);
if (!delivery) {
// this message has not been forwarded yet, so it will not be
// cleaned up when the link is freed.
qd_message_free(msg);
}
// stop all message reception on this connection
conn->closed_locally = true;
}
return false;
// oversize messages are not processed any further
}
//
// If the delivery already exists we've already passed it to the core (2nd
// frame for a multi-frame transfer). Simply continue.
//
if (delivery) {
qdr_delivery_continue(router->router_core, delivery, pn_delivery_settled(pnd));
return next_delivery;
}
//
// No pre-existing delivery means we're starting a new delivery or
// continuing a delivery that has not accumulated enough of the message
// for forwarding.
//
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
if (!rlink) {
// receive link was closed or deleted - can't be forwarded
// so no use setting disposition or adding flow
qd_message_set_discard(msg, true);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}
//
// Handle the link-routed case
//
if (qdr_link_is_routed(rlink)) {
pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
if (dtag.size > QDR_DELIVERY_TAG_MAX) {
qd_log(router->log_source, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)",
dtag.size, QDR_DELIVERY_TAG_MAX);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
pn_delivery_update(pnd, PN_REJECTED);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}
delivery = qdr_link_deliver_to_routed_link(rlink,
msg,
pn_delivery_settled(pnd),
(uint8_t*) dtag.start,
dtag.size,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link
qdr_node_connect_deliveries(link, delivery, pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
return next_delivery;
}
//
// Determine if the incoming link is anonymous. If the link is addressed,
// there are some optimizations we can take advantage of.
//
bool anonymous_link = qdr_link_is_anonymous(rlink);
//
// Determine if the user of this connection is allowed to proxy the
// user_id of messages. A message user_id is proxied when the
// property value differs from the authenticated user name of the connection.
// If the user is not allowed to proxy the user_id then the message user_id
// must be blank or it must be equal to the connection user name.
//
bool check_user = false;
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
int tenant_space_len;
const char *tenant_space = qdr_connection_get_tenant_space(qdr_conn, &tenant_space_len);
if (conn->policy_settings)
check_user = !conn->policy_settings->spec.allowUserIdProxy;
//
// Validate the content of the delivery as an AMQP message. This is done partially, only
// to validate that we can find the fields we need to route the message.
//
// If the link is anonymous, we must validate through the message properties to find the
// 'to' field. If the link is not anonymous, we don't need the 'to' field as we will be
// using the address from the link target.
//
// Validate the content of the delivery as an AMQP message. This is done partially, only
// to validate that we can find the fields we need to route the message.
//
// If the link is anonymous, we must validate through the message properties to find the
// 'to' field. If the link is not anonymous, we don't need the 'to' field as we will be
// using the address from the link target.
//
qd_message_depth_t validation_depth = (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
qd_message_depth_status_t depth_valid = qd_message_check_depth(msg, validation_depth);
if (depth_valid != QD_MESSAGE_DEPTH_OK) {
if (depth_valid == QD_MESSAGE_DEPTH_INVALID) {
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
qd_message_free(msg);
} else {
// otherwise wait until more data arrives and re-try the validation
assert(depth_valid == QD_MESSAGE_DEPTH_INCOMPLETE);
}
return next_delivery;
}
if (check_user) {
// This connection must not allow proxied user_id
qd_iterator_t *userid_iter = qd_message_field_iterator(msg, QD_FIELD_USER_ID);
if (userid_iter) {
// The user_id property has been specified
if (qd_iterator_remaining(userid_iter) > 0) {
// user_id property in message is not blank
if (!qd_iterator_equal(userid_iter, (const unsigned char *)conn->user_id)) {
// This message is rejected: attempted user proxy is disallowed
qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to user_id proxy violation. User:%s", conn->user_id);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
pn_delivery_update(pnd, PN_REJECTED);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
qd_iterator_free(userid_iter);
return next_delivery;
}
}
qd_iterator_free(userid_iter);
}
}
qd_message_message_annotations(msg);
//
// Head of line blocking avoidance (DISPATCH-1545)
//
// Before we can forward a message we need to determine whether or not this
// message is "streaming" - a large message that has the potential to block
// other messages sharing the trunk link. At this point we cannot for sure
// know the actual length of the incoming message, so we employ the
// following heuristic to determine if the message is "streaming":
//
// - If the message is receive-complete it is NOT a streaming message.
// - If it is NOT receive-complete:
// Continue buffering incoming data until:
// - receive has completed => NOT a streaming message
// - not rx-complete AND Q2 threshold hit => a streaming message
//
// Once Q2 is hit we MUST forward the message regardless of rx-complete
// since Q2 will block forever unless the incoming data is drained via
// forwarding.
//
if (!receive_complete) {
if (qd_message_is_streaming(msg) || qd_message_is_Q2_blocked(msg)) {
qd_log(router->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Incoming message classified as streaming. User:%s",
conn->connection_id,
qd_link_link_id(link),
conn->user_id);
} else {
// Continue buffering this message
return false;
}
}
uint32_t distance = 0;
int ingress_index = 0; // Default to _this_ router
qd_bitmask_t *link_exclusions = 0;
qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index);
//
// If this delivery has traveled further than the known radius of the network topology (plus 1),
// release and settle the delivery. This can happen in the case of "flood" multicast where the
// deliveries follow all available paths. This will only discard messages that will reach their
// destinations via shorter paths.
//
if (distance > (router->topology_radius + 1)) {
qd_bitmask_free(link_exclusions);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
pn_delivery_update(pnd, PN_RELEASED);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}
if (anonymous_link) {
qd_iterator_t *addr_iter = 0;
int phase = 0;
//
// If the message has delivery annotations, get the to-override field from the annotations.
//
qd_parsed_field_t *ma_to = qd_message_get_to_override(msg);
if (ma_to) {
addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
phase = qd_message_get_phase_annotation(msg);
}
//
// Still no destination address? Use the TO field from the message properties.
//
if (!addr_iter) {
addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
//
// If the address came from the TO field and we need to apply a tenant-space,
// set the to-override with the annotated address.
//
if (addr_iter && tenant_space) {
qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE);
qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_len);
qd_composed_field_t *to_override = qd_compose_subfield(0);
qd_compose_insert_string_iterator(to_override, addr_iter);
qd_message_set_to_override_annotation(msg, to_override);
}
}
if (addr_iter) {
if (!conn->policy_settings || qd_policy_approve_message_target(addr_iter, conn)) {
qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
if (phase > 0)
qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
link_exclusions, ingress_index,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
} else {
//reject
qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to policy violation on target. User:%s", conn->user_id);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
pn_delivery_update(pnd, PN_REJECTED);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
qd_iterator_free(addr_iter);
qd_bitmask_free(link_exclusions);
return next_delivery;
}
}
} else {
//
// This is a targeted link, not anonymous.
//
//
// Look in a series of locations for the terminus address, starting
// with the qdr_link (in case this is an auto-link with separate
// internal and external addresses).
//
const char *term_addr = qdr_link_internal_address(rlink);
if (!term_addr) {
term_addr = pn_terminus_get_address(qd_link_remote_target(link));
if (!term_addr)
term_addr = pn_terminus_get_address(qd_link_source(link));
}
if (term_addr) {
qd_composed_field_t *to_override = qd_compose_subfield(0);
if (tenant_space) {
qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE);
qd_iterator_annotate_space(aiter, tenant_space, tenant_space_len);
qd_compose_insert_string_iterator(to_override, aiter);
qd_iterator_free(aiter);
} else
qd_compose_insert_string(to_override, term_addr);
qd_message_set_to_override_annotation(msg, to_override);
int phase = qdr_link_phase(rlink);
if (phase != 0)
qd_message_set_phase_annotation(msg, phase);
}
delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
}
//
// End of new delivery processing
//
if (delivery) {
qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link
qdr_node_connect_deliveries(link, delivery, pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver");
} else {
//
// If there is no delivery, the message is now and will always be unroutable because there is no address.
//
qd_bitmask_free(link_exclusions);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
pn_delivery_update(pnd, PN_REJECTED);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
}
return next_delivery;
}
/**
* Deferred callback for inbound delivery handler
*/
static void deferred_AMQP_rx_handler(void *context, bool discard)
{
qd_link_t_sp *safe_qdl = (qd_link_t_sp*) context;
if (!discard) {
qd_link_t *qdl = safe_deref_qd_link_t(*safe_qdl);
if (!!qdl) {
assert(qd_link_direction(qdl) == QD_INCOMING);
qd_router_t *qdr = (qd_router_t*) qd_link_get_node_context(qdl);
assert(qdr != 0);
while (true) {
if (!AMQP_rx_handler(qdr, qdl))
break;
}
}
}
free(safe_qdl);
}
/**
* Delivery Disposition Handler
*/
static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
{
qd_router_t *router = (qd_router_t*) context;
qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
//
// It's important to not do any processing without a qdr_delivery.
if (!delivery)
return;
uint64_t dstate = pn_delivery_remote_state(pnd);
bool settled = pn_delivery_settled(pnd);
//
// When pre-settled multi-frame deliveries arrive, it's possible for the
// settlement to register before the whole message arrives. Such premature
// settlement indications must be ignored.
//
if (settled && !qdr_delivery_receive_complete(delivery))
settled = false;
if (dstate || settled) {
//
// Update the disposition of the delivery
//
qdr_delivery_remote_state_updated(router->router_core, delivery,
dstate,
settled,
qd_delivery_read_remote_state(pnd),
false);
//
// If settled, close out the delivery
//
if (settled) {
qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
pn_delivery_settle(pnd);
}
}
}
/**
* New Incoming Link Handler
*/
static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
uint64_t link_id;
// The connection that this link belongs to is gone. Perhaps an AMQP close came in.
// This link handler should not continue since there is no connection.
if (conn == 0)
return 0;
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_target((pn_link_t *)qd_link_pn(link)));
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
pn_link_name(qd_link_pn(link)),
terminus_addr,
false,
0,
&link_id);
qd_link_set_link_id(link, link_id);
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
return 0;
}
/**
* New Outgoing Link Handler
*/
static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
uint64_t link_id;
// The connection that this link belongs to is gone. Perhaps an AMQP close came in.
// This link handler should not continue since there is no connection.
if (conn == 0)
return 0;
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_source((pn_link_t *)qd_link_pn(link)));
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
pn_link_name(qd_link_pn(link)),
terminus_addr,
false,
0,
&link_id);
qd_link_set_link_id(link, link_id);
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
return 0;
}
/**
* Handler for remote opening of links that we initiated.
*/
static int AMQP_link_attach_handler(void* context, qd_link_t *link)
{
qdr_link_t *qlink = (qdr_link_t*) qd_link_get_context(link);
qdr_link_second_attach(qlink,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)));
return 0;
}
/**
* Handler for flow events on links. Flow updates include session window
* state, which needs to be checked for unblocking Q3.
*/
static int AMQP_link_flow_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pnlink = qd_link_pn(link);
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
if (rlink) {
qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
}
// check if Q3 can be unblocked
pn_session_t *pn_ssn = pn_link_session(pnlink);
if (pn_ssn) {
qd_session_t *qd_ssn = qd_session_from_pn(pn_ssn);
if (qd_ssn && qd_session_is_q3_blocked(qd_ssn)) {
// Q3 blocked - have we drained enough outgoing bytes?
const size_t q3_lower = BUFFER_SIZE * QD_QLIMIT_Q3_LOWER;
if (pn_session_outgoing_bytes(pn_ssn) < q3_lower) {
// yes. We must now unblock all links that have been blocked by Q3
qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
qd_link_t *blink = DEQ_HEAD(*blinks);
while (blink) {
qd_link_q3_unblock(blink); // removes from blinks list!
if (blink != link) { // already flowed this link
rlink = (qdr_link_t *) qd_link_get_context(blink);
if (rlink) {
pnlink = qd_link_pn(blink);
// signalling flow to the core causes the link to be re-activated
qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
}
}
blink = DEQ_HEAD(*blinks);
}
}
}
}
return 0;
}
/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
{
if (!link)
return 0;
pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
return 0;
// DISPATCH-1085: If link is in the middle of receiving a message it is
// possible that the message is actually complete but the remaining message
// data is still in proton's buffers. (e.g. a large message is sent then
// the sender immediately detaches) Force a call to the rx_handler for the
// link in order to pull the buffered data into the message.
if (pn_link_is_receiver(pn_link)) {
pn_delivery_t *pnd = pn_link_current(pn_link);
if (pnd) {
qd_message_t *msg = qd_get_message_context(pnd);
if (msg) {
if (!qd_message_receive_complete(msg)) {
qd_link_set_q2_limit_unbounded(link, true);
qd_message_Q2_holdoff_disable(msg);
qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
set_safe_ptr_qd_link_t(link, safe_ptr);
deferred_AMQP_rx_handler(safe_ptr, false);
}
}
}
}
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
if (rlink) {
//
// This is the last event for this link that we will send into the core. Remove the
// core linkage. Note that the core->qd linkage is still in place.
//
qd_link_set_context(link, 0);
//
// If the link was lost (due to connection drop), or the linkage from the core
// object is already gone, finish disconnecting the linkage and free the qd_link
// because the core will silently free its own resources.
//
if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
qdr_link_set_context(rlink, 0);
qd_link_free(link);
}
qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach(rlink, dt, error);
}
return 0;
}
static void bind_connection_context(qdr_connection_t *qdrc, void* token)
{
qd_connection_t *conn = (qd_connection_t*) token;
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
}
static void save_original_and_current_conn_info(qd_connection_t *conn)
{
// The failover list is present but it is empty. We will wipe the old backup information from the failover list.
// The only items we want to keep in this list is the original connection information (from the config file)
// and the current connection information.
if (conn->connector && DEQ_SIZE(conn->connector->conn_info_list) > 1) {
// Here we are simply removing all other failover information except the original connection information and the one we used to make a successful connection.
int i = 1;
qd_failover_item_t *item = DEQ_HEAD(conn->connector->conn_info_list);
qd_failover_item_t *next_item = 0;
bool match_found = false;
int dec_conn_index=0;
while(item) {
//The first item on this list is always the original connector, so we want to keep that item in place
// We have to delete items in the list that were left over from the previous failover list from the previous connection
// because the new connection might have its own failover list.
if (i != conn->connector->conn_index) {
if (item != DEQ_HEAD(conn->connector->conn_info_list)) {
next_item = DEQ_NEXT(item);
free(item->scheme);
free(item->host);
free(item->port);
free(item->hostname);
free(item->host_port);
DEQ_REMOVE(conn->connector->conn_info_list, item);
free(item);
item = next_item;
// We are removing an item from the list before the conn_index match was found. We need to
// decrement the conn_index
if (!match_found)
dec_conn_index += 1;
}
else {
item = DEQ_NEXT(item);
}
}
else {
match_found = true;
item = DEQ_NEXT(item);
}
i += 1;
}
conn->connector->conn_index -= dec_conn_index;
}
}
static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool inbound)
{
qdr_connection_role_t role = 0;
int cost = 1;
int link_capacity = 1;
const char *name = 0;
bool multi_tenant = false;
bool streaming_links = false;
const char *vhost = 0;
char rversion[128];
uint64_t connection_id = qd_connection_connection_id(conn);
pn_connection_t *pn_conn = qd_connection_pn(conn);
pn_transport_t *tport = 0;
pn_sasl_t *sasl = 0;
pn_ssl_t *ssl = 0;
const char *mech = 0;
const char *user = 0;
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
rversion[0] = 0;
conn->strip_annotations_in = false;
conn->strip_annotations_out = false;
if (conn->pn_conn) {
tport = pn_connection_transport(conn->pn_conn);
ssl = conn->ssl;
}
if (tport) {
sasl = pn_sasl(tport);
if(conn->user_id)
user = conn->user_id;
else
user = pn_transport_get_user(tport);
}
if (sasl)
mech = pn_sasl_get_mech(sasl);
const char *host = 0;
char host_local[255];
const qd_server_config_t *config;
if (qd_connection_connector(conn)) {
config = qd_connector_config(qd_connection_connector(conn));
snprintf(host_local, 254, "%s", config->host_port);
host = &host_local[0];
}
else
host = qd_connection_name(conn);
qd_router_connection_get_config(conn, &role, &cost, &name, &multi_tenant,
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);
// check offered capabilities for streaming link support
//
pn_data_t *ocaps = pn_connection_remote_offered_capabilities(pn_conn);
if (ocaps) {
size_t sl_len = strlen(QD_CAPABILITY_STREAMING_LINKS);
pn_data_rewind(ocaps);
if (pn_data_next(ocaps)) {
if (pn_data_type(ocaps) == PN_ARRAY) {
pn_data_enter(ocaps);
pn_data_next(ocaps);
}
do {
if (pn_data_type(ocaps) == PN_SYMBOL) {
pn_bytes_t s = pn_data_get_symbol(ocaps);
streaming_links = (s.size == sl_len
&& strncmp(s.start, QD_CAPABILITY_STREAMING_LINKS, sl_len) == 0);
}
} while (pn_data_next(ocaps) && !streaming_links);
}
}
// if connection properties are present parse out any important data
//
pn_data_t *props = pn_conn ? pn_connection_remote_properties(pn_conn) : 0;
if (props) {
const bool is_router = (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION);
pn_data_rewind(props);
if (pn_data_next(props) && pn_data_type(props) == PN_MAP) {
const size_t num_items = pn_data_get_map(props);
int props_found = 0; // once all props found exit loop
pn_data_enter(props);
for (int i = 0; i < num_items / 2 && props_found < 3; ++i) {
if (!pn_data_next(props)) break;
if (pn_data_type(props) != PN_SYMBOL) break; // invalid properties map
pn_bytes_t key = pn_data_get_symbol(props);
if (key.size == strlen(QD_CONNECTION_PROPERTY_COST_KEY) &&
strncmp(key.start, QD_CONNECTION_PROPERTY_COST_KEY, key.size) == 0) {
props_found += 1;
if (!pn_data_next(props)) break;
if (is_router) {
if (pn_data_type(props) == PN_INT) {
const int remote_cost = (int) pn_data_get_int(props);
if (remote_cost > cost)
cost = remote_cost;
}
}
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY) &&
strncmp(key.start, QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY, key.size) == 0) {
props_found += 1;
if (!pn_data_next(props)) break;
parse_failover_property_list(router, conn, props);
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_VERSION_KEY)
&& strncmp(key.start, QD_CONNECTION_PROPERTY_VERSION_KEY, key.size) == 0) {
props_found += 1;
if (!pn_data_next(props)) break;
if (is_router) {
pn_bytes_t vdata = pn_data_get_string(props);
size_t vlen = MIN(sizeof(rversion) - 1, vdata.size);
strncpy(rversion, vdata.start, vlen);
rversion[vlen] = 0;
}
} else {
// skip this key
if (!pn_data_next(props)) break;
}
}
}
}
if (multi_tenant)
vhost = (conn->policy_settings && conn->policy_settings->vhost_name) ?
conn->policy_settings->vhost_name :
pn_connection_remote_hostname(pn_conn);
char proto[50];
memset(proto, 0, 50);
char cipher[50];
memset(cipher, 0, 50);
int ssl_ssf = 0;
bool is_ssl = false;
if (ssl) {
pn_ssl_get_protocol_name(ssl, proto, 50);
pn_ssl_get_cipher_name(ssl, cipher, 50);
ssl_ssf = pn_ssl_get_ssf(ssl);
is_ssl = true;
}
bool encrypted = tport && pn_transport_is_encrypted(tport);
bool authenticated = tport && pn_transport_is_authenticated(tport);
qdr_connection_info_t *connection_info = qdr_connection_info(encrypted,
authenticated,
conn->opened,
(char*) mech,
conn->connector ? QD_OUTGOING : QD_INCOMING,
host,
proto,
cipher,
(char*) user,
container,
props,
ssl_ssf,
is_ssl,
rversion,
streaming_links);
qdr_connection_opened(router->router_core,
amqp_direct_adaptor,
inbound,
role,
cost,
connection_id,
name,
pn_connection_remote_container(pn_conn),
conn->strip_annotations_in,
conn->strip_annotations_out,
link_capacity,
vhost,
!!conn->policy_settings ? &conn->policy_settings->spec : 0,
connection_info,
bind_connection_context,
conn);
if (conn->connector) {
char conn_msg[300];
qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s"
" auth=%s user=%s container_id=%s",
connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no",
authenticated ? mech : "no", (char*) user, container);
sys_mutex_lock(conn->connector->lock);
strcpy(conn->connector->conn_msg, conn_msg);
sys_mutex_unlock(conn->connector->lock);
}
}
// We are attempting to find a connection property called failover-server-list which is a list of failover host names and ports..
// failover-server-list looks something like this
// :"failover-server-list"=[{:"network-host"="some-host", :port="35000"}, {:"network-host"="localhost", :port="25000"}]
// There are three cases here -
// 1. The failover-server-list is present but the content of the list is empty in which case we scrub the failover list except we keep the original connector information and current connection information.
// 2. If the failover list contains one or more maps that contain failover connection information, that information will be appended to the list which already contains the original connection information
// and the current connection information. Any other failover information left over from the previous connection is deleted
// 3. If the failover-server-list is not present at all in the connection properties, the failover list we maintain is untouched.
//
// props should be pointing at the value that corresponds to the QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY
// returns true if failover list properly parsed.
//
static bool parse_failover_property_list(qd_router_t *router, qd_connection_t *conn, pn_data_t *props)
{
bool found_failover = false;
if (pn_data_type(props) != PN_LIST)
return false;
size_t list_num_items = pn_data_get_list(props);
if (list_num_items > 0) {
save_original_and_current_conn_info(conn);
pn_data_enter(props); // enter list
for (int i=0; i < list_num_items; i++) {
pn_data_next(props);// this is the first element of the list, a map.
if (props && pn_data_type(props) == PN_MAP) {
size_t map_num_items = pn_data_get_map(props);
pn_data_enter(props);
qd_failover_item_t *item = NEW(qd_failover_item_t);
ZERO(item);
// We have found a map with the connection information. Step thru the map contents and create qd_failover_item_t
for (int j=0; j < map_num_items/2; j++) {
pn_data_next(props);
if (pn_data_type(props) == PN_SYMBOL) {
pn_bytes_t sym = pn_data_get_symbol(props);
if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY) &&
strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY) == 0) {
pn_data_next(props);
if (pn_data_type(props) == PN_STRING) {
item->host = strdup(pn_data_get_string(props).start);
}
}
else if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY) &&
strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY) == 0) {
pn_data_next(props);
item->port = qdpn_data_as_string(props);
}
else if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY) &&
strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY) == 0) {
pn_data_next(props);
if (pn_data_type(props) == PN_STRING) {
item->scheme = strdup(pn_data_get_string(props).start);
}
}
else if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY) &&
strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY) == 0) {
pn_data_next(props);
if (pn_data_type(props) == PN_STRING) {
item->hostname = strdup(pn_data_get_string(props).start);
}
}
}
}
int host_length = strlen(item->host);
//
// We will not even bother inserting the item if there is no host available.
//
if (host_length != 0) {
if (item->scheme == 0)
item->scheme = strdup("amqp");
if (item->port == 0)
item->port = strdup("5672");
int hplen = strlen(item->host) + strlen(item->port) + 2;
item->host_port = malloc(hplen);
snprintf(item->host_port, hplen, "%s:%s", item->host, item->port);
//
// Iterate through failover list items and sets insert_tail to true
// when list has just original connector's host and port or when new
// reported host and port is not yet part of the current list.
//
bool insert_tail = false;
if ( DEQ_SIZE(conn->connector->conn_info_list) == 1 ) {
insert_tail = true;
} else {
qd_failover_item_t *conn_item = DEQ_HEAD(conn->connector->conn_info_list);
insert_tail = true;
while ( conn_item ) {
if ( !strcmp(conn_item->host_port, item->host_port) ) {
insert_tail = false;
break;
}
conn_item = DEQ_NEXT(conn_item);
}
}
// Only inserts if not yet part of failover list
if ( insert_tail ) {
DEQ_INSERT_TAIL(conn->connector->conn_info_list, item);
qd_log(router->log_source, QD_LOG_DEBUG, "Added %s as backup host", item->host_port);
found_failover = true;
}
else {
free(item->scheme);
free(item->host);
free(item->port);
free(item->hostname);
free(item->host_port);
free(item);
}
}
else {
free(item->scheme);
free(item->host);
free(item->port);
free(item->hostname);
free(item->host_port);
free(item);
}
}
pn_data_exit(props);
}
} // list_num_items > 0
else {
save_original_and_current_conn_info(conn);
}
return found_failover;
}
static int AMQP_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
AMQP_opened_handler(router, conn, true);
return 0;
}
static int AMQP_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
AMQP_opened_handler(router, conn, false);
return 0;
}
static int AMQP_closed_handler(void *type_context, qd_connection_t *conn, void *context)
{
qdr_connection_t *qdrc = (qdr_connection_t*) qd_connection_get_context(conn);
qd_router_t *router = (qd_router_t*) type_context;
if (qdrc) {
sys_mutex_lock(qd_server_get_activation_lock(router->qd->server));
qdr_connection_set_context(qdrc, 0);
sys_mutex_unlock(qd_server_get_activation_lock(router->qd->server));
qdr_connection_closed(qdrc);
qd_connection_set_context(conn, 0);
}
return 0;
}
static void qd_router_timer_handler(void *context)
{
qd_router_t *router = (qd_router_t*) context;
//
// Periodic processing.
//
qd_pyrouter_tick(router);
// This sends a tick into the core and this happens every second.
qdr_process_tick(router->router_core);
qd_timer_schedule(router->timer, 1000);
}
static qd_node_type_t router_node = {"router", 0, 0,
AMQP_rx_handler,
AMQP_disposition_handler,
AMQP_incoming_link_handler,
AMQP_outgoing_link_handler,
AMQP_writable_conn_handler,
AMQP_link_detach_handler,
AMQP_link_attach_handler,
qd_link_abandoned_deliveries_handler,
AMQP_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
AMQP_inbound_opened_handler,
AMQP_outbound_opened_handler,
AMQP_closed_handler};
static int type_registered = 0;
qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id)
{
if (!type_registered) {
type_registered = 1;
qd_container_register_node_type(qd, &router_node);
}
size_t dplen = 9 + strlen(area) + strlen(id);
node_id = (char*) qd_malloc(dplen);
strcpy(node_id, area);
strcat(node_id, "/");
strcat(node_id, id);
qd_router_t *router = NEW(qd_router_t);
ZERO(router);
router_node.type_context = router;
qd->router = router;
router->qd = qd;
router->router_core = 0;
router->log_source = qd_log_source("ROUTER");
router->router_mode = mode;
router->router_area = area;
router->router_id = id;
router->node = qd_container_set_default_node_type(qd, &router_node, (void*) router, QD_DIST_BOTH);
router->lock = sys_mutex();
router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router);
sys_atomic_init(&global_delivery_id, 1);
//
// Inform the field iterator module of this router's mode, id, and area. The field iterator
// uses this to offload some of the address-processing load from the router.
//
qd_iterator_set_address(mode == QD_ROUTER_MODE_EDGE, area, id);
switch (router->router_mode) {
case QD_ROUTER_MODE_STANDALONE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Standalone mode"); break;
case QD_ROUTER_MODE_INTERIOR: qd_log(router->log_source, QD_LOG_INFO, "Router started in Interior mode, area=%s id=%s", area, id); break;
case QD_ROUTER_MODE_EDGE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Edge mode"); break;
case QD_ROUTER_MODE_ENDPOINT: qd_log(router->log_source, QD_LOG_INFO, "Router started in Endpoint mode"); break;
}
qd_log(router->log_source, QD_LOG_INFO, "Version: %s", QPID_DISPATCH_VERSION);
return router;
}
static void CORE_connection_activate(void *context, qdr_connection_t *conn)
{
qd_router_t *router = (qd_router_t*) context;
//
// IMPORTANT: This is the only core callback that is invoked on the core
// thread itself. It must not take locks that could deadlock the core.
//
sys_mutex_lock(qd_server_get_activation_lock(router->qd->server));
qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn));
sys_mutex_unlock(qd_server_get_activation_lock(router->qd->server));
}
static void CORE_link_first_attach(void *context,
qdr_connection_t *conn,
qdr_link_t *link,
qdr_terminus_t *source,
qdr_terminus_t *target,
qd_session_class_t ssn_class)
{
qd_router_t *router = (qd_router_t*) context;
qd_connection_t *qconn = (qd_connection_t*) qdr_connection_get_context(conn);
if (!qconn) return; /* Connection is already closed */
//
// Create a new link to be attached
//
qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link), ssn_class);
//
// Copy the source and target termini to the link
//
qdr_terminus_copy(source, qd_link_source(qlink));
qdr_terminus_copy(target, qd_link_target(qlink));
//
// Associate the qd_link and the qdr_link to each other
//
qdr_link_set_context(link, qlink);
qd_link_set_context(qlink, link);
//
// Open (attach) the link
//
pn_link_open(qd_link_pn(qlink));
//
// Mark the link as stalled and waiting for initial credit.
//
if (qdr_link_direction(link) == QD_OUTGOING)
qdr_link_stalled_outbound(link);
}
static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
return;
qdr_terminus_copy(source, qd_link_source(qlink));
qdr_terminus_copy(target, qd_link_target(qlink));
//
// Open (attach) the link
//
pn_link_open(pn_link);
//
// Mark the link as stalled and waiting for initial credit.
//
if (qdr_link_direction(link) == QD_OUTGOING)
qdr_link_stalled_outbound(link);
}
static void CORE_conn_trace(void *context, qdr_connection_t *qdr_conn, bool trace)
{
qd_connection_t *qconn = (qd_connection_t*) qdr_connection_get_context(qdr_conn);
if (!qconn)
return;
pn_transport_t *tport = pn_connection_transport(qconn->pn_conn);
if (!tport)
return;
if (trace) {
pn_transport_trace(tport, PN_TRACE_FRM);
pn_transport_set_tracer(tport, connection_transport_tracer);
}
else {
pn_transport_trace(tport, PN_TRACE_OFF);
}
}
static void CORE_close_connection(void *context, qdr_connection_t *qdr_conn, qdr_error_t *error)
{
if (qdr_conn) {
qd_connection_t *qd_conn = qdr_connection_get_context(qdr_conn);
if (qd_conn) {
pn_connection_t *pn_conn = qd_connection_pn(qd_conn);
if (pn_conn) {
//
// Go down to the transport and close the head and tail. This will
// drop the socket to the peer without providing any error indication.
// Due to issues in Proton that cause different behaviors in different
// bindings depending on whether there is a connection:forced error,
// this has been deemed the best way to force the peer to reconnect.
//
pn_transport_t *tport = pn_connection_transport(pn_conn);
pn_transport_close_head(tport);
pn_transport_close_tail(tport);
}
}
}
}
static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
return;
if (error) {
pn_condition_t *cond = pn_link_condition(pn_link);
qdr_error_copy(error, cond);
}
//
// If the link is only half open, then this DETACH constitutes the rejection of
// an incoming ATTACH. We must nullify the source and target in order to be
// compliant with the AMQP specification. This is because Proton will generate
// the missing ATTACH before the DETACH and will include spurious terminus data
// if we don't nullify it here.
//
if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
if (pn_link_is_receiver(pn_link)) {
pn_terminus_set_type(pn_link_target(pn_link), PN_UNSPECIFIED);
pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
} else {
pn_terminus_set_type(pn_link_source(pn_link), PN_UNSPECIFIED);
pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
}
}
if (close)
qd_link_close(qlink);
else
qd_link_detach(qlink);
//
// This is the last event for this link that we are going to send into Proton.
// Remove the core->proton linkage. Note that the proton->core linkage may still
// be intact and needed.
//
qdr_link_set_context(link, 0);
//
// If this is the second detach, free the qd_link
//
if (!first) {
qd_link_free(qlink);
}
}
static void CORE_link_flow(void *context, qdr_link_t *link, int credit)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
pn_link_t *plink = qd_link_pn(qlink);
if (plink)
pn_link_flow(plink, credit);
}
static void CORE_link_offer(void *context, qdr_link_t *link, int delivery_count)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
pn_link_t *plink = qd_link_pn(qlink);
if (plink)
pn_link_offered(plink, delivery_count);
}
static void CORE_link_drained(void *context, qdr_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
pn_link_t *plink = qd_link_pn(qlink);
if (plink) {
pn_link_drained(plink);
qdr_link_set_drained(router->router_core, link);
}
}
static void CORE_link_drain(void *context, qdr_link_t *link, bool mode)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return;
pn_link_t *plink = qd_link_pn(qlink);
if (plink) {
if (pn_link_is_receiver(plink))
pn_link_set_drain(plink, mode);
}
}
static int CORE_link_push(void *context, qdr_link_t *link, int limit)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
return 0;
pn_link_t *plink = qd_link_pn(qlink);
if (plink) {
int link_credit = pn_link_credit(plink);
if (link_credit > limit)
link_credit = limit;
if (link_credit > 0)
// We will not bother calling qdr_link_process_deliveries if we have no credit.
return qdr_link_process_deliveries(router->router_core, link, link_credit);
}
return 0;
}
static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
qd_connection_t *qconn = qd_link_connection(qlink);
uint64_t update = 0;
if (!qlink)
return 0;
pn_link_t *plink = qd_link_pn(qlink);
if (!plink)
return 0;
//
// If the remote send settle mode is set to 'settled' then settle the delivery on behalf of the receiver.
//
bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED;
pn_delivery_t *pdlv = 0;
if (!qdr_delivery_tag_sent(dlv)) {
const char *tag;
int tag_length;
uint64_t disposition = 0;
qdr_delivery_tag(dlv, &tag, &tag_length);
// Create a new proton delivery on link 'plink'
pn_delivery(plink, pn_dtag(tag, tag_length));
pdlv = pn_link_current(plink);
// handle any delivery-state on the transfer e.g. transactional-state
qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &disposition);
if (disposition) {
if (dstate)
qd_delivery_write_local_state(pdlv, disposition, dstate);
pn_delivery_update(pdlv, disposition);
}
qd_delivery_state_free(dstate);
//
// If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
//
if (qdr_delivery_get_context(dlv) == 0)
qdr_node_connect_deliveries(qlink, dlv, pdlv);
qdr_delivery_set_tag_sent(dlv, true);
} else {
pdlv = qdr_node_delivery_pn_from_qdr(dlv);
}
if (!pdlv)
return 0;
bool q3_stalled = false;
qd_message_t *msg_out = qdr_delivery_message(dlv);
qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &q3_stalled);
if (q3_stalled) {
qd_link_q3_block(qlink);
qdr_link_stalled_outbound(link);
}
bool send_complete = qdr_delivery_send_complete(dlv);
if (send_complete) {
if (qd_message_aborted(msg_out)) {
// Aborted messages must be settled locally
// Settling does not produce any disposition to message sender.
if (pdlv) {
pn_link_advance(plink);
qdr_node_disconnect_deliveries(router->router_core, qlink, dlv, pdlv);
pn_delivery_settle(pdlv);
}
} else {
if (!settled && remote_snd_settled) {
// The caller must tell the core that the delivery has been
// accepted and settled, since we are settling on behalf of the
// receiver
update = PN_ACCEPTED; // schedule the settle
}
pn_link_advance(plink);
if (settled || remote_snd_settled) {
if (pdlv) {
qdr_node_disconnect_deliveries(router->router_core, qlink, dlv, pdlv);
pn_delivery_settle(pdlv);
}
}
}
log_link_message(qconn, plink, msg_out);
}
return update;
}
static int CORE_link_get_credit(void *context, qdr_link_t *link)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = !!qlink ? qd_link_pn(qlink) : 0;
if (!plink)
return 0;
return pn_link_remote_credit(plink);
}
static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
{
qd_router_t *router = (qd_router_t*) context;
pn_delivery_t *pnd = qdr_node_delivery_pn_from_qdr(dlv);
if (!pnd)
return;
// If the delivery's link is somehow gone (maybe because of a connection drop, we don't proceed.
if (!pn_delivery_link(pnd))
return;
qdr_link_t *qlink = qdr_delivery_link(dlv);
qd_link_t *link = 0;
qd_connection_t *qd_conn = 0;
if (qlink) {
link = (qd_link_t*) qdr_link_get_context(qlink);
if (link) {
qd_conn = qd_link_connection(link);
if (qd_conn == 0)
return;
}
else
return;
}
else
return;
if (disp && !pn_delivery_settled(pnd)) {
uint64_t ignore = 0;
qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore);
assert(ignore == disp); // expected: since both are from the same dlv
// update if the disposition has changed or there is new state associated with it
if (disp != pn_delivery_local_state(pnd) || dstate) {
// handle propagation of delivery state from qdr_delivery_t to proton:
qd_delivery_write_local_state(pnd, disp, dstate);
pn_delivery_update(pnd, disp);
qd_delivery_state_free(dstate);
if (disp == PN_MODIFIED) // @TODO(kgiusti) why do we need this???
pn_disposition_set_failed(pn_delivery_local(pnd), true);
}
}
if (settled) {
qd_message_t *msg = qdr_delivery_message(dlv);
if (qd_message_receive_complete(msg)) {
//
// If the delivery is settled and the message has fully arrived, disconnect
// the linkages and settle it in Proton now.
//
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pnd);
pn_delivery_settle(pnd);
} else {
if (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED || qdr_delivery_presettled(dlv)) {
//
// If the delivery is settled and it is still arriving, defer the settlement
// until the content has fully arrived. For now set the disposition on the qdr_delivery
// We will use this disposition later on to set the disposition on the proton delivery.
//
qdr_delivery_set_disposition(dlv, disp);
//
// We have set the message to be discarded. We will use this information
// in AMQP_rx_handler to only update the disposition on the proton delivery if the message is discarded.
//
qd_message_set_discard(msg, true);
//
// If the disposition is RELEASED or MODIFIED, set the message to discard
// and if it is blocked by Q2 holdoff, get the link rolling again.
//
qd_message_Q2_holdoff_disable(msg);
qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
set_safe_ptr_qd_link_t(link, safe_ptr);
qd_connection_invoke_deferred(qd_conn, deferred_AMQP_rx_handler, safe_ptr);
}
}
}
}
void qd_router_setup_late(qd_dispatch_t *qd)
{
qd->router->tracemask = qd_tracemask();
qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id);
amqp_direct_adaptor = qdr_protocol_adaptor(qd->router->router_core,
"amqp",
(void*) qd->router,
CORE_connection_activate,
CORE_link_first_attach,
CORE_link_second_attach,
CORE_link_detach,
CORE_link_flow,
CORE_link_offer,
CORE_link_drained,
CORE_link_drain,
CORE_link_push,
CORE_link_deliver,
CORE_link_get_credit,
CORE_delivery_update,
CORE_close_connection,
CORE_conn_trace);
qd_router_python_setup(qd->router);
qd_timer_schedule(qd->router->timer, 1000);
}
void qd_router_free(qd_router_t *router)
{
if (!router) return;
//
// The char* router->router_id and router->router_area are owned by qd->router_id and qd->router_area respectively
// We will set them to zero here just in case anybody tries to use these fields.
//
router->router_id = 0;
router->router_area = 0;
qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
qdr_core_free(router->router_core);
qd_tracemask_free(router->tracemask);
qd_timer_free(router->timer);
sys_mutex_free(router->lock);
qd_router_configure_free(router);
qd_router_python_free(router);
free(router);
free(node_id);
free(direct_prefix);
}
const char *qd_router_id(const qd_dispatch_t *qd)
{
return node_id;
}
qdr_core_t *qd_router_core(qd_dispatch_t *qd)
{
return qd->router->router_core;
}
// invoked by an I/O thread when enough buffers have been released deactivate
// the Q2 block. Note that this method will likely be running on a worker
// thread that is not the same thread that "owns" the qd_link_t passed in.
//
void qd_link_q2_restart_receive(qd_alloc_safe_ptr_t context)
{
qd_link_t *in_link = (qd_link_t*) qd_alloc_deref_safe_ptr(&context);
if (!in_link)
return;
assert(qd_link_direction(in_link) == QD_INCOMING);
qd_connection_t *in_conn = qd_link_connection(in_link);
if (in_conn) {
qd_link_t_sp *safe_ptr = NEW(qd_alloc_safe_ptr_t);
*safe_ptr = context; // use original to keep old sequence counter
qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr);
}
}
// Issue a warning POLICY log message with connection and link identities
// prepended to the policy denial text string.
void qd_connection_log_policy_denial(qd_link_t *link, const char *text)
{
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
uint64_t l_id = 0;
uint64_t c_id = 0;
if (rlink) {
l_id = rlink->identity;
if (rlink->conn) {
c_id = rlink->conn->identity;
}
}
qd_log(qd_policy_log_source(), QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] %s",
c_id, l_id, text);
}