| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <qpid/dispatch/python_embedded.h> |
| #include <stdio.h> |
| #include <string.h> |
| #include <stdbool.h> |
| #include <stdlib.h> |
| #include <qpid/dispatch.h> |
| #include "dispatch_private.h" |
| #include "entity_cache.h" |
| #include "router_private.h" |
| #include "delivery.h" |
| #include "policy.h" |
| #include <qpid/dispatch/router_core.h> |
| #include <qpid/dispatch/proton_utils.h> |
| #include <proton/sasl.h> |
| #include <inttypes.h> |
| |
| const char *QD_ROUTER_NODE_TYPE = "router.node"; |
| const char *QD_ROUTER_ADDRESS_TYPE = "router.address"; |
| const char *QD_ROUTER_LINK_TYPE = "router.link"; |
| |
| static char *router_role = "inter-router"; |
| static char *container_role = "route-container"; |
| static char *edge_role = "edge"; |
| static char *direct_prefix; |
| static char *node_id; |
| |
| static void deferred_AMQP_rx_handler(void *context, bool discard); |
| |
| //============================================================================== |
| // Functions to handle the linkage between proton deliveries and qdr deliveries |
| //============================================================================== |
| |
| // |
| // qd_link.list_of_references(pn_delivery_t) |
| // pn_delivery.context => reference-entry |
| // qdr_delivery.context => pn_delivery |
| // |
| |
| |
| static void qdr_node_connect_deliveries(qd_link_t *link, qdr_delivery_t *qdlv, pn_delivery_t *pdlv) |
| { |
| qd_link_ref_t *ref = new_qd_link_ref_t(); |
| qd_link_ref_list_t *list = qd_link_get_ref_list(link); |
| ZERO(ref); |
| ref->ref = qdlv; |
| DEQ_INSERT_TAIL(*list, ref); |
| |
| pn_delivery_set_context(pdlv, ref); |
| qdr_delivery_set_context(qdlv, pdlv); |
| qdr_delivery_incref(qdlv, "referenced by a pn_delivery"); |
| |
| } |
| |
| |
| static void qdr_node_disconnect_deliveries(qdr_core_t *core, qd_link_t *link, qdr_delivery_t *qdlv, pn_delivery_t *pdlv) |
| { |
| if (!link) |
| return; |
| |
| qd_link_ref_t *ref = (qd_link_ref_t*) pn_delivery_get_context(pdlv); |
| qd_link_ref_list_t *list = qd_link_get_ref_list(link); |
| |
| if (ref) { |
| DEQ_REMOVE(*list, ref); |
| free_qd_link_ref_t(ref); |
| |
| pn_delivery_set_context(pdlv, 0); |
| qdr_delivery_set_context(qdlv, 0); |
| qdr_delivery_decref(core, qdlv, "removed reference from pn_delivery"); |
| } |
| } |
| |
| |
| static pn_delivery_t *qdr_node_delivery_pn_from_qdr(qdr_delivery_t *dlv) |
| { |
| return dlv ? (pn_delivery_t*) qdr_delivery_get_context(dlv) : 0; |
| } |
| |
| |
| static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv) |
| { |
| qd_link_ref_t *ref = (qd_link_ref_t*) pn_delivery_get_context(dlv); |
| return ref ? (qdr_delivery_t*) ref->ref : 0; |
| } |
| |
| |
| void qd_link_abandoned_deliveries_handler(void *context, qd_link_t *link) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| qd_link_ref_list_t *list = qd_link_get_ref_list(link); |
| qd_link_ref_t *ref = DEQ_HEAD(*list); |
| |
| while (ref) { |
| DEQ_REMOVE_HEAD(*list); |
| qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; |
| ref->ref = 0; |
| qdr_delivery_set_context(dlv, 0); |
| qdr_delivery_decref(router->router_core, dlv, "qd_link_abandoned_deliveries_handler"); |
| free_qd_link_ref_t(ref); |
| ref = DEQ_HEAD(*list); |
| } |
| } |
| |
| |
| |
| /** |
| * Determine the role of a connection |
| */ |
| static void qd_router_connection_get_config(const qd_connection_t *conn, |
| qdr_connection_role_t *role, |
| int *cost, |
| const char **name, |
| bool *multi_tenant, |
| bool *strip_annotations_in, |
| bool *strip_annotations_out, |
| int *link_capacity) |
| { |
| if (conn) { |
| const qd_server_config_t *cf = qd_connection_config(conn); |
| |
| *strip_annotations_in = cf ? cf->strip_inbound_annotations : false; |
| *strip_annotations_out = cf ? cf->strip_outbound_annotations : false; |
| *link_capacity = cf ? cf->link_capacity : 1; |
| |
| if (cf && (strcmp(cf->role, router_role) == 0)) { |
| *strip_annotations_in = false; |
| *strip_annotations_out = false; |
| *role = QDR_ROLE_INTER_ROUTER; |
| *cost = cf->inter_router_cost; |
| } else if (cf && (strcmp(cf->role, edge_role) == 0)) { |
| *strip_annotations_in = false; |
| *strip_annotations_out = false; |
| *role = QDR_ROLE_EDGE_CONNECTION; |
| *cost = cf->inter_router_cost; |
| } else if (cf && (strcmp(cf->role, container_role) == 0)) // backward compat |
| *role = QDR_ROLE_ROUTE_CONTAINER; |
| else |
| *role = QDR_ROLE_NORMAL; |
| |
| *name = cf ? cf->name : 0; |
| if (*name) { |
| if (strncmp("listener/", *name, 9) == 0 || |
| strncmp("connector/", *name, 10) == 0) |
| *name = 0; |
| } |
| |
| *multi_tenant = cf ? cf->multi_tenant : false; |
| } |
| } |
| |
| |
| static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn, void *context) |
| { |
| qdr_connection_t *qconn = (qdr_connection_t*) qd_connection_get_context(conn); |
| |
| if (qconn) |
| return qdr_connection_process(qconn); |
| return 0; |
| } |
| |
| |
| static qd_iterator_t *router_annotate_message(qd_router_t *router, |
| qd_message_t *msg, |
| qd_bitmask_t **link_exclusions, |
| uint32_t *distance, |
| int *ingress_index) |
| { |
| qd_iterator_t *ingress_iter = 0; |
| bool edge_mode = router->router_mode == QD_ROUTER_MODE_EDGE; |
| |
| *link_exclusions = 0; |
| *distance = 0; |
| |
| qd_parsed_field_t *trace = qd_message_get_trace(msg); |
| qd_parsed_field_t *ingress = qd_message_get_ingress(msg); |
| qd_parsed_field_t *to = qd_message_get_to_override(msg); |
| qd_parsed_field_t *phase = qd_message_get_phase(msg); |
| |
| // |
| // QD_MA_TRACE: |
| // If there is a trace field, append this router's ID to the trace. |
| // If the router ID is already in the trace the msg has looped. |
| // This code does not check for the loop condition. |
| // |
| // Edge routers do not add their IDs to the trace list. |
| // |
| if (!edge_mode) { |
| qd_composed_field_t *trace_field = qd_compose_subfield(0); |
| qd_compose_start_list(trace_field); |
| if (trace) { |
| if (qd_parse_is_list(trace)) { |
| // |
| // Return the distance in hops that this delivery has traveled. |
| // |
| *distance = qd_parse_sub_count(trace); |
| |
| // |
| // Create a link-exclusion map for the items in the trace. This map will |
| // contain a one-bit for each link that leads to a neighbor router that |
| // the message has already passed through. |
| // |
| *link_exclusions = qd_tracemask_create(router->tracemask, trace, ingress_index); |
| |
| // |
| // Append this router's ID to the trace. |
| // |
| uint32_t idx = 0; |
| qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx); |
| while (trace_item) { |
| qd_iterator_t *iter = qd_parse_raw(trace_item); |
| qd_iterator_reset_view(iter, ITER_VIEW_ALL); |
| qd_compose_insert_string_iterator(trace_field, iter); |
| idx++; |
| trace_item = qd_parse_sub_value(trace, idx); |
| } |
| } |
| } |
| |
| qd_compose_insert_string(trace_field, node_id); |
| qd_compose_end_list(trace_field); |
| qd_message_set_trace_annotation(msg, trace_field); |
| } |
| |
| // |
| // QD_MA_TO: |
| // Preserve the existing value. |
| // |
| if (to) { |
| qd_composed_field_t *to_field = qd_compose_subfield(0); |
| qd_compose_insert_string_iterator(to_field, qd_parse_raw(to)); |
| qd_message_set_to_override_annotation(msg, to_field); |
| } |
| |
| // |
| // QD_MA_PHASE: |
| // Preserve the existing value. |
| // |
| if (phase) { |
| qd_message_set_phase_annotation(msg, qd_message_get_phase_val(msg)); |
| } |
| |
| // |
| // QD_MA_INGRESS: |
| // If there is no ingress field, annotate the ingress as |
| // this router else keep the original field. |
| // |
| // Edge routers do not annotate the ingress field. |
| // |
| if (!edge_mode) { |
| qd_composed_field_t *ingress_field = qd_compose_subfield(0); |
| if (ingress && qd_parse_is_scalar(ingress)) { |
| ingress_iter = qd_parse_raw(ingress); |
| qd_compose_insert_string_iterator(ingress_field, ingress_iter); |
| } else |
| qd_compose_insert_string(ingress_field, node_id); |
| qd_message_set_ingress_annotation(msg, ingress_field); |
| } |
| |
| // |
| // Return the iterator to the ingress field _if_ it was present. |
| // If we added the ingress, return NULL. |
| // |
| return ingress_iter; |
| } |
| |
| static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_message_t *msg) |
| { |
| if (!conn || !pn_link || !msg) return; |
| const qd_server_config_t *cf = qd_connection_config(conn); |
| if (!cf) return; |
| char buf[qd_message_repr_len()]; |
| const char *msg_str = qd_message_oversize(msg) ? "oversize message" : |
| qd_message_aborted(msg) ? "aborted message" : |
| qd_message_repr(msg, buf, sizeof(buf), cf->log_bits); |
| if (msg_str) { |
| const char *src = pn_terminus_get_address(pn_link_source(pn_link)); |
| const char *tgt = pn_terminus_get_address(pn_link_target(pn_link)); |
| qd_log(qd_message_log_source(), QD_LOG_TRACE, |
| "[C%"PRIu64"]: %s %s on link '%s' (%s -> %s)", |
| qd_connection_connection_id(conn), |
| pn_link_is_sender(pn_link) ? "Sent" : "Received", |
| msg_str, |
| pn_link_name(pn_link), |
| src ? src : "", |
| tgt ? tgt : ""); |
| } |
| } |
| |
| /** |
| * Inbound Delivery Handler |
| * |
| * @return true if we've advanced to the next delivery on this link and it is |
| * ready for rx processing. This will cause the container to immediately |
| * re-call this function with the next delivery. |
| */ |
| static bool AMQP_rx_handler(void* context, qd_link_t *link) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| pn_link_t *pn_link = qd_link_pn(link); |
| |
| assert(pn_link); |
| |
| if (!pn_link) |
| return false; |
| |
| // ensure the current delivery is readable |
| pn_delivery_t *pnd = pn_link_current(pn_link); |
| if (!pnd) |
| return false; |
| |
| qd_connection_t *conn = qd_link_connection(link); |
| |
| // DISPATCH-1628 DISPATCH-975 exit if router already closed this connection |
| if (conn->closed_locally) { |
| return false; |
| } |
| |
| qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd); |
| bool next_delivery = false; |
| |
| // |
| // Receive the message into a local representation. |
| // |
| qd_message_t *msg = qd_message_receive(pnd); |
| bool receive_complete = qd_message_receive_complete(msg); |
| |
| if (!qd_message_oversize(msg)) { |
| // message not rejected as oversize |
| if (receive_complete) { |
| log_link_message(conn, pn_link, msg); |
| |
| // |
| // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). |
| // |
| pn_link_advance(pn_link); |
| next_delivery = pn_link_current(pn_link) != 0; |
| } |
| |
| if (qd_message_is_discard(msg)) { |
| // |
| // Message has been marked for discard, no further processing necessary |
| // |
| if (receive_complete) { |
| // If this discarded delivery has already been settled by proton, |
| // set the presettled flag on the delivery to true if it is not already true. |
| // Since the entire message has already been received, we directly call the |
| // function to set the pre-settled flag since we cannot go thru the core-thread |
| // to do this since the delivery has been discarded. |
| // Discarded streaming deliveries are not put thru the core thread via the continue action. |
| if (pn_delivery_settled(pnd)) |
| qdr_delivery_set_presettled(delivery); |
| |
| uint64_t local_disp = qdr_delivery_disposition(delivery); |
| // |
| // Call pn_delivery_update only if the local disposition is different than the pn_delivery's local disposition. |
| // This will make sure we call pn_delivery_update only when necessary. |
| // |
| if (local_disp != 0 && local_disp != pn_delivery_local_state(pnd)) { |
| // |
| // DISPATCH-1626 - This enables pn_delivery_update() and pn_delivery_settle() to be called back to back in the same function call. |
| // CORE_delivery_update() will handle most of the other cases where we need to call pn_delivery_update() followed by pn_delivery_settle(). |
| // |
| pn_delivery_update(pnd, local_disp); |
| } |
| |
| // note: expected that the code that set discard has handled |
| // setting disposition and updating flow! |
| pn_delivery_settle(pnd); |
| if (delivery) { |
| // if delivery already exists then the core thread discarded this |
| // delivery, it will eventually free the qdr_delivery_t and its |
| // associated message - do not free it here. |
| qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); |
| } else { |
| qd_message_free(msg); |
| } |
| } |
| return next_delivery; |
| } |
| } else { |
| // message is oversize |
| if (receive_complete) { |
| // reject and settle the incoming delivery |
| pn_delivery_update(pnd, PN_REJECTED); |
| pn_delivery_settle(pnd); |
| // close the link |
| pn_link_close(pn_link); |
| // close the connection |
| pn_connection_t * pn_conn = qd_connection_pn(conn); |
| pn_condition_t * cond = pn_connection_condition(pn_conn); |
| (void) pn_condition_set_name( cond, QD_AMQP_COND_CONNECTION_FORCED); |
| (void) pn_condition_set_description(cond, "Message size exceeded"); |
| pn_connection_close(pn_conn); |
| if (!delivery) { |
| // this message has not been forwarded yet, so it will not be |
| // cleaned up when the link is freed. |
| qd_message_free(msg); |
| } |
| // stop activity on this connection |
| conn->closed_locally = true; |
| } |
| return false; |
| // oversize messages are not processed any further |
| } |
| |
| // |
| // If the delivery already exists we've already passed it to the core (2nd |
| // frame for a multi-frame transfer). Simply continue. |
| // |
| |
| if (delivery) { |
| qdr_deliver_continue(router->router_core, delivery, pn_delivery_settled(pnd)); |
| return next_delivery; |
| } |
| |
| // |
| // No pre-existing delivery means we're starting a new delivery or |
| // continuing a delivery that has not accumulated enough of the message |
| // for forwarding. |
| // |
| |
| qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); |
| if (!rlink) { |
| // receive link was closed or deleted - can't be forwarded |
| // so no use setting disposition or adding flow |
| qd_message_set_discard(msg, true); |
| if (receive_complete) { |
| pn_delivery_settle(pnd); |
| qd_message_free(msg); |
| } |
| return next_delivery; |
| } |
| |
| // |
| // Handle the link-routed case |
| // |
| if (qdr_link_is_routed(rlink)) { |
| pn_delivery_tag_t dtag = pn_delivery_tag(pnd); |
| |
| if (dtag.size > QDR_DELIVERY_TAG_MAX) { |
| qd_log(router->log_source, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)", |
| dtag.size, QDR_DELIVERY_TAG_MAX); |
| qd_message_set_discard(msg, true); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(pnd, PN_REJECTED); |
| if (receive_complete) { |
| pn_delivery_settle(pnd); |
| qd_message_free(msg); |
| } |
| return next_delivery; |
| } |
| |
| delivery = qdr_link_deliver_to_routed_link(rlink, |
| msg, |
| pn_delivery_settled(pnd), |
| (uint8_t*) dtag.start, |
| dtag.size, |
| pn_disposition_type(pn_delivery_remote(pnd)), |
| pn_disposition_data(pn_delivery_remote(pnd))); |
| qdr_node_connect_deliveries(link, delivery, pnd); |
| qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link"); |
| return next_delivery; |
| } |
| |
| // |
| // Determine if the incoming link is anonymous. If the link is addressed, |
| // there are some optimizations we can take advantage of. |
| // |
| bool anonymous_link = qdr_link_is_anonymous(rlink); |
| |
| // |
| // Determine if the user of this connection is allowed to proxy the |
| // user_id of messages. A message user_id is proxied when the |
| // property value differs from the authenticated user name of the connection. |
| // If the user is not allowed to proxy the user_id then the message user_id |
| // must be blank or it must be equal to the connection user name. |
| // |
| bool check_user = false; |
| qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); |
| int tenant_space_len; |
| const char *tenant_space = qdr_connection_get_tenant_space(qdr_conn, &tenant_space_len); |
| if (conn->policy_settings) |
| check_user = !conn->policy_settings->allowUserIdProxy; |
| |
| // |
| // Validate the content of the delivery as an AMQP message. This is done partially, only |
| // to validate that we can find the fields we need to route the message. |
| // |
| // If the link is anonymous, we must validate through the message properties to find the |
| // 'to' field. If the link is not anonymous, we don't need the 'to' field as we will be |
| // using the address from the link target. |
| // |
| // Validate the content of the delivery as an AMQP message. This is done partially, only |
| // to validate that we can find the fields we need to route the message. |
| // |
| // If the link is anonymous, we must validate through the message properties to find the |
| // 'to' field. If the link is not anonymous, we don't need the 'to' field as we will be |
| // using the address from the link target. |
| // |
| qd_message_depth_t validation_depth = (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS; |
| qd_message_depth_status_t depth_valid = qd_message_check_depth(msg, validation_depth); |
| |
| if (depth_valid != QD_MESSAGE_DEPTH_OK) { |
| if (depth_valid == QD_MESSAGE_DEPTH_INVALID) { |
| qd_message_set_discard(msg, true); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(pnd, PN_REJECTED); |
| pn_delivery_settle(pnd); |
| qd_message_free(msg); |
| } else { |
| // otherwise wait until more data arrives and re-try the validation |
| assert(depth_valid == QD_MESSAGE_DEPTH_INCOMPLETE); |
| } |
| return next_delivery; |
| } |
| |
| if (check_user) { |
| // This connection must not allow proxied user_id |
| qd_iterator_t *userid_iter = qd_message_field_iterator(msg, QD_FIELD_USER_ID); |
| if (userid_iter) { |
| // The user_id property has been specified |
| if (qd_iterator_remaining(userid_iter) > 0) { |
| // user_id property in message is not blank |
| if (!qd_iterator_equal(userid_iter, (const unsigned char *)conn->user_id)) { |
| // This message is rejected: attempted user proxy is disallowed |
| qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to user_id proxy violation. User:%s", conn->user_id); |
| qd_message_set_discard(msg, true); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(pnd, PN_REJECTED); |
| if (receive_complete) { |
| pn_delivery_settle(pnd); |
| qd_message_free(msg); |
| } |
| qd_iterator_free(userid_iter); |
| return next_delivery; |
| } |
| } |
| qd_iterator_free(userid_iter); |
| } |
| } |
| |
| qd_message_message_annotations(msg); |
| qd_bitmask_t *link_exclusions; |
| uint32_t distance; |
| int ingress_index = 0; // Default to _this_ router |
| |
| qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index); |
| |
| // |
| // If this delivery has traveled further than the known radius of the network topology (plus 1), |
| // release and settle the delivery. This can happen in the case of "flood" multicast where the |
| // deliveries follow all available paths. This will only discard messages that will reach their |
| // destinations via shorter paths. |
| // |
| if (distance > (router->topology_radius + 1)) { |
| qd_bitmask_free(link_exclusions); |
| qd_message_set_discard(msg, true); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(pnd, PN_RELEASED); |
| if (receive_complete) { |
| pn_delivery_settle(pnd); |
| qd_message_free(msg); |
| } |
| return next_delivery; |
| } |
| |
| if (anonymous_link) { |
| qd_iterator_t *addr_iter = 0; |
| int phase = 0; |
| |
| // |
| // If the message has delivery annotations, get the to-override field from the annotations. |
| // |
| qd_parsed_field_t *ma_to = qd_message_get_to_override(msg); |
| if (ma_to) { |
| addr_iter = qd_iterator_dup(qd_parse_raw(ma_to)); |
| phase = qd_message_get_phase_annotation(msg); |
| } |
| |
| // |
| // Still no destination address? Use the TO field from the message properties. |
| // |
| if (!addr_iter) { |
| addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO); |
| |
| // |
| // If the address came from the TO field and we need to apply a tenant-space, |
| // set the to-override with the annotated address. |
| // |
| if (addr_iter && tenant_space) { |
| qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE); |
| qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_len); |
| qd_composed_field_t *to_override = qd_compose_subfield(0); |
| qd_compose_insert_string_iterator(to_override, addr_iter); |
| qd_message_set_to_override_annotation(msg, to_override); |
| } |
| } |
| |
| if (addr_iter) { |
| if (!conn->policy_settings || qd_policy_approve_message_target(addr_iter, conn)) { |
| qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH); |
| if (phase > 0) |
| qd_iterator_annotate_phase(addr_iter, '0' + (char) phase); |
| delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd), |
| link_exclusions, ingress_index); |
| } else { |
| //reject |
| qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to policy violation on target. User:%s", conn->user_id); |
| qd_message_set_discard(msg, true); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(pnd, PN_REJECTED); |
| if (receive_complete) { |
| pn_delivery_settle(pnd); |
| qd_message_free(msg); |
| } |
| qd_iterator_free(addr_iter); |
| qd_bitmask_free(link_exclusions); |
| return next_delivery; |
| } |
| } |
| } else { |
| // |
| // This is a targeted link, not anonymous. |
| // |
| |
| // |
| // Look in a series of locations for the terminus address, starting |
| // with the qdr_link (in case this is an auto-link with separate |
| // internal and external addresses). |
| // |
| const char *term_addr = qdr_link_internal_address(rlink); |
| if (!term_addr) { |
| term_addr = pn_terminus_get_address(qd_link_remote_target(link)); |
| if (!term_addr) |
| term_addr = pn_terminus_get_address(qd_link_source(link)); |
| } |
| |
| if (term_addr) { |
| qd_composed_field_t *to_override = qd_compose_subfield(0); |
| if (tenant_space) { |
| qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE); |
| qd_iterator_annotate_space(aiter, tenant_space, tenant_space_len); |
| qd_compose_insert_string_iterator(to_override, aiter); |
| qd_iterator_free(aiter); |
| } else |
| qd_compose_insert_string(to_override, term_addr); |
| qd_message_set_to_override_annotation(msg, to_override); |
| int phase = qdr_link_phase(rlink); |
| if (phase != 0) |
| qd_message_set_phase_annotation(msg, phase); |
| } |
| delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index); |
| } |
| |
| // |
| // End of new delivery processing |
| // |
| |
| if (delivery) { |
| qdr_node_connect_deliveries(link, delivery, pnd); |
| qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver"); |
| } else { |
| // |
| // If there is no delivery, the message is now and will always be unroutable because there is no address. |
| // |
| qd_bitmask_free(link_exclusions); |
| qd_message_set_discard(msg, true); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(pnd, PN_REJECTED); |
| if (receive_complete) { |
| pn_delivery_settle(pnd); |
| qd_message_free(msg); |
| } |
| } |
| |
| return next_delivery; |
| } |
| |
| |
| /** |
| * Deferred callback for inbound delivery handler |
| */ |
| static void deferred_AMQP_rx_handler(void *context, bool discard) |
| { |
| qd_link_t_sp *safe_qdl = (qd_link_t_sp*) context; |
| |
| if (!discard) { |
| qd_link_t *qdl = safe_deref_qd_link_t(*safe_qdl); |
| if (!!qdl) { |
| qd_router_t *qdr = (qd_router_t*) qd_link_get_node_context(qdl); |
| assert(qdr != 0); |
| while (true) { |
| if (!AMQP_rx_handler(qdr, qdl)) |
| break; |
| } |
| } |
| } |
| |
| free(safe_qdl); |
| } |
| |
| |
| /** |
| * Delivery Disposition Handler |
| */ |
| static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd); |
| |
| // |
| // It's important to not do any processing without a qdr_delivery. When pre-settled |
| // multi-frame deliveries arrive, it's possible for the settlement to register before |
| // the whole message arrives. Such premature settlement indications must be ignored. |
| // |
| if (!delivery || !qdr_delivery_receive_complete(delivery)) |
| return; |
| |
| pn_disposition_t *disp = pn_delivery_remote(pnd); |
| pn_condition_t *cond = pn_disposition_condition(disp); |
| qdr_error_t *error = qdr_error_from_pn(cond); |
| |
| // |
| // Update the disposition of the delivery |
| // |
| qdr_delivery_remote_state_updated(router->router_core, delivery, |
| pn_delivery_remote_state(pnd), |
| pn_delivery_settled(pnd), |
| error, |
| pn_disposition_data(disp), |
| false); |
| |
| // |
| // If settled, close out the delivery |
| // |
| if (pn_delivery_settled(pnd)) { |
| qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); |
| pn_delivery_settle(pnd); |
| } |
| } |
| |
| |
| /** |
| * New Incoming Link Handler |
| */ |
| static int AMQP_incoming_link_handler(void* context, qd_link_t *link) |
| { |
| qd_connection_t *conn = qd_link_connection(link); |
| |
| // The connection that this link belongs to is gone. Perhaps an AMQP close came in. |
| // This link handler should not continue since there is no connection. |
| if (conn == 0) |
| return 0; |
| |
| qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); |
| |
| char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_target((pn_link_t *)qd_link_pn(link))); |
| |
| qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING, |
| qdr_terminus(qd_link_remote_source(link)), |
| qdr_terminus(qd_link_remote_target(link)), |
| pn_link_name(qd_link_pn(link)), |
| terminus_addr); |
| qdr_link_set_context(qdr_link, link); |
| qd_link_set_context(link, qdr_link); |
| |
| return 0; |
| } |
| |
| |
| /** |
| * New Outgoing Link Handler |
| */ |
| static int AMQP_outgoing_link_handler(void* context, qd_link_t *link) |
| { |
| qd_connection_t *conn = qd_link_connection(link); |
| |
| // The connection that this link belongs to is gone. Perhaps an AMQP close came in. |
| // This link handler should not continue since there is no connection. |
| if (conn == 0) |
| return 0; |
| |
| qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); |
| char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_source((pn_link_t *)qd_link_pn(link))); |
| qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING, |
| qdr_terminus(qd_link_remote_source(link)), |
| qdr_terminus(qd_link_remote_target(link)), |
| pn_link_name(qd_link_pn(link)), |
| terminus_addr); |
| qdr_link_set_context(qdr_link, link); |
| qd_link_set_context(link, qdr_link); |
| |
| return 0; |
| } |
| |
| |
| /** |
| * Handler for remote opening of links that we initiated. |
| */ |
| static int AMQP_link_attach_handler(void* context, qd_link_t *link) |
| { |
| qdr_link_t *qlink = (qdr_link_t*) qd_link_get_context(link); |
| qdr_link_second_attach(qlink, |
| qdr_terminus(qd_link_remote_source(link)), |
| qdr_terminus(qd_link_remote_target(link))); |
| |
| return 0; |
| } |
| |
| |
| /** |
| * Handler for flow events on links. Flow updates include session window |
| * state, which needs to be checked for unblocking Q3. |
| */ |
| static int AMQP_link_flow_handler(void* context, qd_link_t *link) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| pn_link_t *pnlink = qd_link_pn(link); |
| qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); |
| |
| if (rlink) { |
| qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink)); |
| } |
| |
| // check if Q3 can be unblocked |
| pn_session_t *pn_ssn = pn_link_session(pnlink); |
| if (pn_ssn) { |
| qd_session_t *qd_ssn = qd_session_from_pn(pn_ssn); |
| if (qd_ssn && qd_session_is_q3_blocked(qd_ssn)) { |
| // Q3 blocked - have we drained enough outgoing bytes? |
| const size_t q3_lower = BUFFER_SIZE * QD_QLIMIT_Q3_LOWER; |
| if (pn_session_outgoing_bytes(pn_ssn) < q3_lower) { |
| // yes. We must now unblock all links that have been blocked by Q3 |
| qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn); |
| qd_link_t *blink = DEQ_HEAD(*blinks); |
| while (blink) { |
| qd_link_q3_unblock(blink); // removes from blinks list! |
| if (blink != link) { // already flowed this link |
| rlink = (qdr_link_t *) qd_link_get_context(blink); |
| if (rlink) { |
| pnlink = qd_link_pn(blink); |
| // signalling flow to the core causes the link to be re-activated |
| qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink)); |
| } |
| } |
| blink = DEQ_HEAD(*blinks); |
| } |
| } |
| } |
| } |
| return 0; |
| } |
| |
| |
| /** |
| * Link Detached Handler |
| */ |
| static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt) |
| { |
| if (!link) |
| return 0; |
| |
| pn_link_t *pn_link = qd_link_pn(link); |
| if (!pn_link) |
| return 0; |
| |
| // DISPATCH-1085: If link is in the middle of receiving a message it is |
| // possible that the message is actually complete but the remaining message |
| // data is still in proton's buffers. (e.g. a large message is sent then |
| // the sender immediately detaches) Force a call to the rx_handler for the |
| // link in order to pull the buffered data into the message. |
| if (pn_link_is_receiver(pn_link)) { |
| pn_delivery_t *pnd = pn_link_current(pn_link); |
| if (pnd) { |
| qd_message_t *msg = qd_get_message_context(pnd); |
| if (msg) { |
| if (!qd_message_receive_complete(msg)) { |
| qd_link_set_q2_limit_unbounded(link, true); |
| qd_message_Q2_holdoff_disable(msg); |
| qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp); |
| set_safe_ptr_qd_link_t(link, safe_ptr); |
| deferred_AMQP_rx_handler(safe_ptr, false); |
| } |
| } |
| } |
| } |
| |
| qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); |
| pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0; |
| |
| if (rlink) { |
| // |
| // This is the last event for this link that we will send into the core. Remove the |
| // core linkage. Note that the core->qd linkage is still in place. |
| // |
| qd_link_set_context(link, 0); |
| |
| // |
| // If the link was lost (due to connection drop), or the linkage from the core |
| // object is already gone, finish disconnecting the linkage and free the qd_link |
| // because the core will silently free its own resources. |
| // |
| if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) { |
| qdr_link_set_context(rlink, 0); |
| qd_link_free(link); |
| } |
| |
| qdr_error_t *error = qdr_error_from_pn(cond); |
| qdr_link_detach(rlink, dt, error); |
| } |
| |
| return 0; |
| } |
| |
| static void bind_connection_context(qdr_connection_t *qdrc, void* token) |
| { |
| qd_connection_t *conn = (qd_connection_t*) token; |
| qd_connection_set_context(conn, qdrc); |
| qdr_connection_set_context(qdrc, conn); |
| } |
| |
| |
| static void save_original_and_current_conn_info(qd_connection_t *conn) |
| { |
| // The failover list is present but it is empty. We will wipe the old backup information from the failover list. |
| // The only items we want to keep in this list is the original connection information (from the config file) |
| // and the current connection information. |
| |
| if (conn->connector && DEQ_SIZE(conn->connector->conn_info_list) > 1) { |
| // Here we are simply removing all other failover information except the original connection information and the one we used to make a successful connection. |
| int i = 1; |
| qd_failover_item_t *item = DEQ_HEAD(conn->connector->conn_info_list); |
| qd_failover_item_t *next_item = 0; |
| |
| bool match_found = false; |
| int dec_conn_index=0; |
| |
| while(item) { |
| |
| //The first item on this list is always the original connector, so we want to keep that item in place |
| // We have to delete items in the list that were left over from the previous failover list from the previous connection |
| // because the new connection might have its own failover list. |
| if (i != conn->connector->conn_index) { |
| if (item != DEQ_HEAD(conn->connector->conn_info_list)) { |
| next_item = DEQ_NEXT(item); |
| free(item->scheme); |
| free(item->host); |
| free(item->port); |
| free(item->hostname); |
| free(item->host_port); |
| |
| DEQ_REMOVE(conn->connector->conn_info_list, item); |
| |
| free(item); |
| item = next_item; |
| |
| // We are removing an item from the list before the conn_index match was found. We need to |
| // decrement the conn_index |
| if (!match_found) |
| dec_conn_index += 1; |
| } |
| else { |
| item = DEQ_NEXT(item); |
| } |
| } |
| else { |
| match_found = true; |
| item = DEQ_NEXT(item); |
| } |
| i += 1; |
| } |
| |
| conn->connector->conn_index -= dec_conn_index; |
| } |
| } |
| |
| static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool inbound) |
| { |
| qdr_connection_role_t role = 0; |
| int cost = 1; |
| int remote_cost = 1; |
| int link_capacity = 1; |
| const char *name = 0; |
| bool multi_tenant = false; |
| const char *vhost = 0; |
| uint64_t connection_id = qd_connection_connection_id(conn); |
| pn_connection_t *pn_conn = qd_connection_pn(conn); |
| pn_transport_t *tport = 0; |
| pn_sasl_t *sasl = 0; |
| pn_ssl_t *ssl = 0; |
| const char *mech = 0; |
| const char *user = 0; |
| const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0; |
| conn->strip_annotations_in = false; |
| conn->strip_annotations_out = false; |
| if (conn->pn_conn) { |
| tport = pn_connection_transport(conn->pn_conn); |
| ssl = conn->ssl; |
| } |
| if (tport) { |
| sasl = pn_sasl(tport); |
| if(conn->user_id) |
| user = conn->user_id; |
| else |
| user = pn_transport_get_user(tport); |
| } |
| |
| if (sasl) |
| mech = pn_sasl_get_mech(sasl); |
| |
| const char *host = 0; |
| char host_local[255]; |
| const qd_server_config_t *config; |
| if (qd_connection_connector(conn)) { |
| config = qd_connector_config(qd_connection_connector(conn)); |
| snprintf(host_local, 254, "%s", config->host_port); |
| host = &host_local[0]; |
| } |
| else |
| host = qd_connection_name(conn); |
| |
| |
| qd_router_connection_get_config(conn, &role, &cost, &name, &multi_tenant, |
| &conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity); |
| |
| pn_data_t *props = pn_conn ? pn_connection_remote_properties(pn_conn) : 0; |
| |
| if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION) { |
| // |
| // Check the remote properties for an inter-router cost value. |
| // |
| if (props) { |
| pn_data_rewind(props); |
| pn_data_next(props); |
| if (props && pn_data_type(props) == PN_MAP) { |
| pn_data_enter(props); |
| while (pn_data_next(props)) { |
| if (pn_data_type(props) == PN_SYMBOL) { |
| pn_bytes_t sym = pn_data_get_symbol(props); |
| if (sym.size == strlen(QD_CONNECTION_PROPERTY_COST_KEY) && |
| strcmp(sym.start, QD_CONNECTION_PROPERTY_COST_KEY) == 0) { |
| pn_data_next(props); |
| if (pn_data_type(props) == PN_INT) |
| remote_cost = pn_data_get_int(props); |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| // |
| // Use the larger of the local and remote costs for this connection |
| // |
| if (remote_cost > cost) |
| cost = remote_cost; |
| } |
| |
| bool found_failover = false; |
| |
| if (props) { |
| pn_data_rewind(props); |
| pn_data_next(props); |
| if (props && pn_data_type(props) == PN_MAP) { |
| pn_data_enter(props); |
| |
| // |
| // We are attempting to find a connection property called failover-server-list which is a list of failover host names and ports.. |
| // failover-server-list looks something like this |
| // :"failover-server-list"=[{:"network-host"="some-host", :port="35000"}, {:"network-host"="localhost", :port="25000"}] |
| // There are three cases here - |
| // 1. The failover-server-list is present but the content of the list is empty in which case we scrub the failover list except we keep the original connector information and current connection information. |
| // 2. If the failover list contains one or more maps that contain failover connection information, that information will be appended to the list which already contains the original connection information |
| // and the current connection information. Any other failover information left over from the previous connection is deleted |
| // 3. If the failover-server-list is not present at all in the connection properties, the failover list we maintain in untoched. |
| // |
| while (pn_data_next(props)) { |
| if (pn_data_type(props) == PN_SYMBOL) { |
| pn_bytes_t sym = pn_data_get_symbol(props); |
| if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY) && |
| strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY) == 0) { |
| found_failover = true; |
| } |
| } |
| else if (pn_data_type(props) == PN_LIST && found_failover) { |
| size_t list_num_items = pn_data_get_list(props); |
| |
| if (list_num_items > 0) { |
| |
| save_original_and_current_conn_info(conn); |
| |
| pn_data_enter(props); // enter list |
| |
| for (int i=0; i < list_num_items; i++) { |
| pn_data_next(props);// this is the first element of the list, a map. |
| if (props && pn_data_type(props) == PN_MAP) { |
| |
| size_t map_num_items = pn_data_get_map(props); |
| pn_data_enter(props); |
| |
| qd_failover_item_t *item = NEW(qd_failover_item_t); |
| ZERO(item); |
| |
| // We have found a map with the connection information. Step thru the map contents and create qd_failover_item_t |
| |
| for (int j=0; j < map_num_items/2; j++) { |
| |
| pn_data_next(props); |
| if (pn_data_type(props) == PN_SYMBOL) { |
| pn_bytes_t sym = pn_data_get_symbol(props); |
| if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY) && |
| strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY) == 0) { |
| pn_data_next(props); |
| if (pn_data_type(props) == PN_STRING) { |
| item->host = strdup(pn_data_get_string(props).start); |
| } |
| } |
| else if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY) && |
| strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY) == 0) { |
| pn_data_next(props); |
| item->port = qdpn_data_as_string(props); |
| |
| } |
| else if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY) && |
| strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY) == 0) { |
| pn_data_next(props); |
| if (pn_data_type(props) == PN_STRING) { |
| item->scheme = strdup(pn_data_get_string(props).start); |
| } |
| |
| } |
| else if (sym.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY) && |
| strcmp(sym.start, QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY) == 0) { |
| pn_data_next(props); |
| if (pn_data_type(props) == PN_STRING) { |
| item->hostname = strdup(pn_data_get_string(props).start); |
| } |
| } |
| } |
| } |
| |
| int host_length = strlen(item->host); |
| // |
| // We will not even bother inserting the item if there is no host available. |
| // |
| if (host_length != 0) { |
| if (item->scheme == 0) |
| item->scheme = strdup("amqp"); |
| if (item->port == 0) |
| item->port = strdup("5672"); |
| |
| int hplen = strlen(item->host) + strlen(item->port) + 2; |
| item->host_port = malloc(hplen); |
| snprintf(item->host_port, hplen, "%s:%s", item->host, item->port); |
| |
| // |
| // Iterate through failover list items and sets insert_tail to true |
| // when list has just original connector's host and port or when new |
| // reported host and port is not yet part of the current list. |
| // |
| bool insert_tail = false; |
| if ( DEQ_SIZE(conn->connector->conn_info_list) == 1 ) { |
| insert_tail = true; |
| } else { |
| qd_failover_item_t *conn_item = DEQ_HEAD(conn->connector->conn_info_list); |
| insert_tail = true; |
| while ( conn_item ) { |
| if ( !strcmp(conn_item->host_port, item->host_port) ) { |
| insert_tail = false; |
| break; |
| } |
| conn_item = DEQ_NEXT(conn_item); |
| } |
| } |
| |
| // Only inserts if not yet part of failover list |
| if ( insert_tail ) { |
| DEQ_INSERT_TAIL(conn->connector->conn_info_list, item); |
| qd_log(router->log_source, QD_LOG_DEBUG, "Added %s as backup host", item->host_port); |
| } |
| else { |
| free(item->scheme); |
| free(item->host); |
| free(item->port); |
| free(item->hostname); |
| free(item->host_port); |
| free(item); |
| } |
| |
| } |
| else { |
| free(item->scheme); |
| free(item->host); |
| free(item->port); |
| free(item->hostname); |
| free(item->host_port); |
| free(item); |
| } |
| } |
| pn_data_exit(props); |
| } |
| } // list_num_items > 0 |
| else { |
| save_original_and_current_conn_info(conn); |
| |
| } |
| } |
| } |
| } |
| } |
| |
| if (multi_tenant) |
| vhost = pn_connection_remote_hostname(pn_conn); |
| |
| char proto[50]; |
| memset(proto, 0, 50); |
| char cipher[50]; |
| memset(cipher, 0, 50); |
| |
| int ssl_ssf = 0; |
| bool is_ssl = false; |
| |
| if (ssl) { |
| pn_ssl_get_protocol_name(ssl, proto, 50); |
| pn_ssl_get_cipher_name(ssl, cipher, 50); |
| ssl_ssf = pn_ssl_get_ssf(ssl); |
| is_ssl = true; |
| } |
| |
| |
| bool encrypted = tport && pn_transport_is_encrypted(tport); |
| bool authenticated = tport && pn_transport_is_authenticated(tport); |
| |
| qdr_connection_info_t *connection_info = qdr_connection_info(encrypted, |
| authenticated, |
| conn->opened, |
| (char*) mech, |
| conn->connector ? QD_OUTGOING : QD_INCOMING, |
| host, |
| proto, |
| cipher, |
| (char*) user, |
| container, |
| props, |
| ssl_ssf, |
| is_ssl); |
| |
| qdr_connection_opened(router->router_core, inbound, role, cost, connection_id, name, |
| pn_connection_remote_container(pn_conn), |
| conn->strip_annotations_in, |
| conn->strip_annotations_out, |
| conn->policy_settings ? conn->policy_settings->allowDynamicLinkRoutes : true, |
| conn->policy_settings ? conn->policy_settings->allowAdminStatusUpdate : true, |
| link_capacity, |
| vhost, |
| connection_info, |
| bind_connection_context, conn); |
| |
| char props_str[1000]; |
| size_t props_len = 1000; |
| |
| pn_data_format(props, props_str, &props_len); |
| |
| if (conn->connector) { |
| char conn_msg[300]; |
| qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s" |
| " auth=%s user=%s container_id=%s", |
| connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no", |
| authenticated ? mech : "no", (char*) user, container); |
| strcpy(conn->connector->conn_msg, conn_msg); |
| } |
| |
| |
| qd_log(router->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s" |
| " auth=%s user=%s container_id=%s props=%s", |
| connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no", |
| authenticated ? mech : "no", (char*) user, container, props_str); |
| } |
| |
| static int AMQP_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context) |
| { |
| qd_router_t *router = (qd_router_t*) type_context; |
| AMQP_opened_handler(router, conn, true); |
| return 0; |
| } |
| |
| |
| static int AMQP_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context) |
| { |
| qd_router_t *router = (qd_router_t*) type_context; |
| AMQP_opened_handler(router, conn, false); |
| return 0; |
| } |
| |
| |
| static int AMQP_closed_handler(void *type_context, qd_connection_t *conn, void *context) |
| { |
| qdr_connection_t *qdrc = (qdr_connection_t*) qd_connection_get_context(conn); |
| |
| if (qdrc) { |
| qdr_connection_set_context(qdrc, NULL); |
| qdr_connection_closed(qdrc); |
| qd_connection_set_context(conn, 0); |
| } |
| |
| return 0; |
| } |
| |
| |
| static void qd_router_timer_handler(void *context) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| |
| // |
| // Periodic processing. |
| // |
| qd_pyrouter_tick(router); |
| |
| // This sends a tick into the core and this happens every second. |
| qdr_process_tick(router->router_core); |
| qd_timer_schedule(router->timer, 1000); |
| } |
| |
| |
| static qd_node_type_t router_node = {"router", 0, 0, |
| AMQP_rx_handler, |
| AMQP_disposition_handler, |
| AMQP_incoming_link_handler, |
| AMQP_outgoing_link_handler, |
| AMQP_writable_conn_handler, |
| AMQP_link_detach_handler, |
| AMQP_link_attach_handler, |
| qd_link_abandoned_deliveries_handler, |
| AMQP_link_flow_handler, |
| 0, // node_created_handler |
| 0, // node_destroyed_handler |
| AMQP_inbound_opened_handler, |
| AMQP_outbound_opened_handler, |
| AMQP_closed_handler}; |
| static int type_registered = 0; |
| |
| qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id) |
| { |
| if (!type_registered) { |
| type_registered = 1; |
| qd_container_register_node_type(qd, &router_node); |
| } |
| |
| size_t dplen = 9 + strlen(area) + strlen(id); |
| node_id = (char*) malloc(dplen); |
| strcpy(node_id, area); |
| strcat(node_id, "/"); |
| strcat(node_id, id); |
| |
| qd_router_t *router = NEW(qd_router_t); |
| ZERO(router); |
| |
| router_node.type_context = router; |
| |
| qd->router = router; |
| router->qd = qd; |
| router->router_core = 0; |
| router->log_source = qd_log_source("ROUTER"); |
| router->router_mode = mode; |
| router->router_area = area; |
| router->router_id = id; |
| router->node = qd_container_set_default_node_type(qd, &router_node, (void*) router, QD_DIST_BOTH); |
| |
| router->lock = sys_mutex(); |
| router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router); |
| |
| // |
| // Inform the field iterator module of this router's mode, id, and area. The field iterator |
| // uses this to offload some of the address-processing load from the router. |
| // |
| qd_iterator_set_address(mode == QD_ROUTER_MODE_EDGE, area, id); |
| |
| switch (router->router_mode) { |
| case QD_ROUTER_MODE_STANDALONE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Standalone mode"); break; |
| case QD_ROUTER_MODE_INTERIOR: qd_log(router->log_source, QD_LOG_INFO, "Router started in Interior mode, area=%s id=%s", area, id); break; |
| case QD_ROUTER_MODE_EDGE: qd_log(router->log_source, QD_LOG_INFO, "Router started in Edge mode"); break; |
| case QD_ROUTER_MODE_ENDPOINT: qd_log(router->log_source, QD_LOG_INFO, "Router started in Endpoint mode"); break; |
| } |
| |
| qd_log(router->log_source, QD_LOG_INFO, "Version: %s", QPID_DISPATCH_VERSION); |
| |
| return router; |
| } |
| |
| |
| static void CORE_connection_activate(void *context, qdr_connection_t *conn) |
| { |
| // |
| // IMPORTANT: This is the only core callback that is invoked on the core |
| // thread itself. It must not take locks that could deadlock the core. |
| // |
| qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn)); |
| } |
| |
| |
| static void CORE_link_first_attach(void *context, |
| qdr_connection_t *conn, |
| qdr_link_t *link, |
| qdr_terminus_t *source, |
| qdr_terminus_t *target, |
| qd_session_class_t ssn_class) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| qd_connection_t *qconn = (qd_connection_t*) qdr_connection_get_context(conn); |
| if (!qconn) return; /* Connection is already closed */ |
| |
| // |
| // Create a new link to be attached |
| // |
| qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link), ssn_class); |
| |
| // |
| // Copy the source and target termini to the link |
| // |
| qdr_terminus_copy(source, qd_link_source(qlink)); |
| qdr_terminus_copy(target, qd_link_target(qlink)); |
| |
| // |
| // Associate the qd_link and the qdr_link to each other |
| // |
| qdr_link_set_context(link, qlink); |
| qd_link_set_context(qlink, link); |
| |
| // |
| // Open (attach) the link |
| // |
| pn_link_open(qd_link_pn(qlink)); |
| |
| // |
| // All links on the inter router or edge connection have unbounded q2 limit. |
| // Blocking control messages can lead to various failures |
| // |
| if (qdr_connection_role(conn) == QDR_ROLE_EDGE_CONNECTION || qdr_connection_role(conn) == QDR_ROLE_INTER_ROUTER) { |
| qd_link_set_q2_limit_unbounded(qlink, true); |
| } |
| |
| // |
| // Mark the link as stalled and waiting for initial credit. |
| // |
| if (qdr_link_direction(link) == QD_OUTGOING) |
| qdr_link_stalled_outbound(link); |
| } |
| |
| |
| static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) |
| { |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| if (!qlink) |
| return; |
| |
| pn_link_t *pn_link = qd_link_pn(qlink); |
| if (!pn_link) |
| return; |
| |
| qdr_terminus_copy(source, qd_link_source(qlink)); |
| qdr_terminus_copy(target, qd_link_target(qlink)); |
| |
| // |
| // Open (attach) the link |
| // |
| pn_link_open(pn_link); |
| |
| qd_connection_t *conn = qd_link_connection(qlink); |
| qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); |
| // |
| // All links on the inter router or edge connection have unbounded q2 limit |
| // |
| if (qdr_connection_role(qdr_conn) == QDR_ROLE_EDGE_CONNECTION || qdr_connection_role(qdr_conn) == QDR_ROLE_INTER_ROUTER) { |
| qd_link_set_q2_limit_unbounded(qlink, true); |
| } |
| |
| |
| // |
| // Mark the link as stalled and waiting for initial credit. |
| // |
| if (qdr_link_direction(link) == QD_OUTGOING) |
| qdr_link_stalled_outbound(link); |
| } |
| |
| |
| static void CORE_conn_trace(void *context, qdr_connection_t *qdr_conn, bool trace) |
| { |
| qd_connection_t *qconn = (qd_connection_t*) qdr_connection_get_context(qdr_conn); |
| |
| if (!qconn) |
| return; |
| |
| pn_transport_t *tport = pn_connection_transport(qconn->pn_conn); |
| |
| if (!tport) |
| return; |
| |
| if (trace) { |
| pn_transport_trace(tport, PN_TRACE_FRM); |
| pn_transport_set_tracer(tport, connection_transport_tracer); |
| |
| } |
| else { |
| pn_transport_trace(tport, PN_TRACE_OFF); |
| } |
| } |
| |
| static void CORE_close_connection(void *context, qdr_connection_t *qdr_conn, qdr_error_t *error) |
| { |
| if (qdr_conn) { |
| qd_connection_t *qd_conn = qdr_connection_get_context(qdr_conn); |
| if (qd_conn) { |
| pn_connection_t *pn_conn = qd_connection_pn(qd_conn); |
| if (pn_conn) { |
| // |
| // Go down to the transport and close the head and tail. This will |
| // drop the socket to the peer without providing any error indication. |
| // Due to issues in Proton that cause different behaviors in different |
| // bindings depending on whether there is a connection:forced error, |
| // this has been deemed the best way to force the peer to reconnect. |
| // |
| pn_transport_t *tport = pn_connection_transport(pn_conn); |
| pn_transport_close_head(tport); |
| pn_transport_close_tail(tport); |
| } |
| } |
| } |
| } |
| |
| static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) |
| { |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| if (!qlink) |
| return; |
| |
| pn_link_t *pn_link = qd_link_pn(qlink); |
| if (!pn_link) |
| return; |
| |
| if (error) { |
| pn_condition_t *cond = pn_link_condition(pn_link); |
| qdr_error_copy(error, cond); |
| } |
| |
| // |
| // If the link is only half open, then this DETACH constitutes the rejection of |
| // an incoming ATTACH. We must nullify the source and target in order to be |
| // compliant with the AMQP specification. This is because Proton will generate |
| // the missing ATTACH before the DETACH and will include spurious terminus data |
| // if we don't nullify it here. |
| // |
| if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) { |
| if (pn_link_is_receiver(pn_link)) { |
| pn_terminus_set_type(pn_link_target(pn_link), PN_UNSPECIFIED); |
| pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); |
| } else { |
| pn_terminus_set_type(pn_link_source(pn_link), PN_UNSPECIFIED); |
| pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); |
| } |
| } |
| |
| if (close) |
| qd_link_close(qlink); |
| else |
| qd_link_detach(qlink); |
| |
| // |
| // This is the last event for this link that we are going to send into Proton. |
| // Remove the core->proton linkage. Note that the proton->core linkage may still |
| // be intact and needed. |
| // |
| qdr_link_set_context(link, 0); |
| |
| // |
| // If this is the second detach, free the qd_link |
| // |
| if (!first) { |
| qd_link_free(qlink); |
| } |
| } |
| |
| |
| static void CORE_link_flow(void *context, qdr_link_t *link, int credit) |
| { |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| if (!qlink) |
| return; |
| |
| pn_link_t *plink = qd_link_pn(qlink); |
| |
| if (plink) |
| pn_link_flow(plink, credit); |
| } |
| |
| |
| static void CORE_link_offer(void *context, qdr_link_t *link, int delivery_count) |
| { |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| if (!qlink) |
| return; |
| |
| pn_link_t *plink = qd_link_pn(qlink); |
| |
| if (plink) |
| pn_link_offered(plink, delivery_count); |
| } |
| |
| |
| static void CORE_link_drained(void *context, qdr_link_t *link) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| if (!qlink) |
| return; |
| |
| pn_link_t *plink = qd_link_pn(qlink); |
| |
| if (plink) { |
| pn_link_drained(plink); |
| qdr_link_set_drained(router->router_core, link); |
| } |
| } |
| |
| |
| static void CORE_link_drain(void *context, qdr_link_t *link, bool mode) |
| { |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| if (!qlink) |
| return; |
| |
| pn_link_t *plink = qd_link_pn(qlink); |
| |
| if (plink) { |
| if (pn_link_is_receiver(plink)) |
| pn_link_set_drain(plink, mode); |
| } |
| } |
| |
| |
| static int CORE_link_push(void *context, qdr_link_t *link, int limit) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| if (!qlink) |
| return 0; |
| |
| pn_link_t *plink = qd_link_pn(qlink); |
| |
| if (plink) { |
| int link_credit = pn_link_credit(plink); |
| if (link_credit > limit) |
| link_credit = limit; |
| |
| if (link_credit > 0) |
| // We will not bother calling qdr_link_process_deliveries if we have no credit. |
| return qdr_link_process_deliveries(router->router_core, link, link_credit); |
| } |
| return 0; |
| } |
| |
| static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| qd_connection_t *qconn = qd_link_connection(qlink); |
| |
| uint64_t update = 0; |
| |
| if (!qlink) |
| return 0; |
| |
| pn_link_t *plink = qd_link_pn(qlink); |
| if (!plink) |
| return 0; |
| |
| // |
| // If the remote send settle mode is set to 'settled' then settle the delivery on behalf of the receiver. |
| // |
| bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED; |
| pn_delivery_t *pdlv = 0; |
| |
| if (!qdr_delivery_tag_sent(dlv)) { |
| const char *tag; |
| int tag_length; |
| |
| qdr_delivery_tag(dlv, &tag, &tag_length); |
| |
| // Create a new proton delivery on link 'plink' |
| pn_delivery(plink, pn_dtag(tag, tag_length)); |
| |
| pdlv = pn_link_current(plink); |
| |
| // handle any delivery-state on the transfer e.g. transactional-state |
| qdr_delivery_write_extension_state(dlv, pdlv, true); |
| |
| // |
| // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver. |
| // |
| if (qdr_delivery_get_context(dlv) == 0) |
| qdr_node_connect_deliveries(qlink, dlv, pdlv); |
| |
| qdr_delivery_set_tag_sent(dlv, true); |
| } else { |
| pdlv = qdr_node_delivery_pn_from_qdr(dlv); |
| } |
| |
| if (!pdlv) |
| return 0; |
| |
| bool restart_rx = false; |
| bool q3_stalled = false; |
| |
| qd_message_t *msg_out = qdr_delivery_message(dlv); |
| |
| qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx, &q3_stalled); |
| |
| if (q3_stalled) { |
| qd_link_q3_block(qlink); |
| qdr_link_stalled_outbound(link); |
| } |
| |
| if (restart_rx) { |
| qd_link_restart_rx(qd_message_get_receiving_link(msg_out)); |
| } |
| |
| bool send_complete = qdr_delivery_send_complete(dlv); |
| |
| if (send_complete) { |
| if (qd_message_aborted(msg_out)) { |
| // Aborted messages must be settled locally |
| // Settling does not produce any disposition to message sender. |
| if (pdlv) { |
| pn_link_advance(plink); |
| qdr_node_disconnect_deliveries(router->router_core, qlink, dlv, pdlv); |
| pn_delivery_settle(pdlv); |
| } |
| } else { |
| if (!settled && remote_snd_settled) { |
| // The caller must tell the core that the delivery has been |
| // accepted and settled, since we are settling on behalf of the |
| // receiver |
| update = PN_ACCEPTED; // schedule the settle |
| } |
| |
| pn_link_advance(plink); |
| |
| if (settled || remote_snd_settled) { |
| if (pdlv) { |
| qdr_node_disconnect_deliveries(router->router_core, qlink, dlv, pdlv); |
| pn_delivery_settle(pdlv); |
| } |
| } |
| } |
| log_link_message(qconn, plink, msg_out); |
| } |
| return update; |
| } |
| |
| |
| static int CORE_link_get_credit(void *context, qdr_link_t *link) |
| { |
| qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); |
| pn_link_t *plink = !!qlink ? qd_link_pn(qlink) : 0; |
| |
| if (!plink) |
| return 0; |
| |
| return pn_link_remote_credit(plink); |
| } |
| |
| |
| static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) |
| { |
| qd_router_t *router = (qd_router_t*) context; |
| pn_delivery_t *pnd = qdr_node_delivery_pn_from_qdr(dlv); |
| |
| if (!pnd) |
| return; |
| |
| // If the delivery's link is somehow gone (maybe because of a connection drop, we don't proceed. |
| if (!pn_delivery_link(pnd)) |
| return; |
| |
| qdr_error_t *error = qdr_delivery_error(dlv); |
| |
| if (error) { |
| pn_condition_t *condition = pn_disposition_condition(pn_delivery_local(pnd)); |
| char *name = qdr_error_name(error); |
| char *description = qdr_error_description(error); |
| pn_condition_set_name(condition, (const char*) name); |
| pn_condition_set_description(condition, (const char*) description); |
| if (qdr_error_info(error)) |
| pn_data_copy(pn_condition_info(condition), qdr_error_info(error)); |
| //proton makes copies of name and description, so it is ok to free them here. |
| free(name); |
| free(description); |
| } |
| |
| qdr_link_t *qlink = qdr_delivery_link(dlv); |
| qd_link_t *link = 0; |
| qd_connection_t *qd_conn = 0; |
| |
| if (qlink) { |
| link = (qd_link_t*) qdr_link_get_context(qlink); |
| if (link) { |
| qd_conn = qd_link_connection(link); |
| if (qd_conn == 0) |
| return; |
| } |
| else |
| return; |
| } |
| else |
| return; |
| |
| // |
| // If the disposition has changed and the proton delivery has not already been settled, update the proton delivery. |
| // |
| if (disp != pn_delivery_remote_state(pnd) && !pn_delivery_settled(pnd)) { |
| qd_message_t *msg = qdr_delivery_message(dlv); |
| |
| if (disp == PN_MODIFIED) |
| pn_disposition_set_failed(pn_delivery_local(pnd), true); |
| qdr_delivery_write_extension_state(dlv, pnd, false); |
| |
| // |
| // If the delivery is still arriving, don't push out the disposition change yet. |
| // |
| assert(qdr_delivery_disposition(dlv) == disp) ; |
| if (qd_message_receive_complete(msg)) { |
| if (disp != pn_delivery_local_state(pnd)) { |
| pn_delivery_update(pnd, disp); |
| } |
| } |
| } |
| |
| if (settled) { |
| qd_message_t *msg = qdr_delivery_message(dlv); |
| if (qd_message_receive_complete(msg)) { |
| // |
| // If the delivery is settled and the message has fully arrived, disconnect |
| // the linkages and settle it in Proton now. |
| // |
| qdr_node_disconnect_deliveries(router->router_core, link, dlv, pnd); |
| pn_delivery_settle(pnd); |
| } else { |
| if (disp == PN_RELEASED || disp == PN_MODIFIED || qdr_delivery_presettled(dlv)) { |
| // |
| // If the delivery is settled and it is still arriving, defer the settlement |
| // until the content has fully arrived. For now set the disposition on the qdr_delivery |
| // We will use this disposition later on to set the disposition on the proton delivery. |
| // |
| qdr_delivery_set_disposition(dlv, disp); |
| // |
| // We have set the message to be discarded. We will use this information |
| // in AMQP_rx_handler to only update the disposition on the proton delivery if the message is discarded. |
| // |
| qd_message_set_discard(msg, true); |
| // |
| // If the disposition is RELEASED or MODIFIED, set the message to discard |
| // and if it is blocked by Q2 holdoff, get the link rolling again. |
| // |
| qd_message_Q2_holdoff_disable(msg); |
| qd_link_restart_rx(link); |
| } |
| } |
| } |
| } |
| |
| |
| void qd_router_setup_late(qd_dispatch_t *qd) |
| { |
| qd->router->tracemask = qd_tracemask(); |
| qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id); |
| |
| qdr_connection_handlers(qd->router->router_core, (void*) qd->router, |
| CORE_connection_activate, |
| CORE_link_first_attach, |
| CORE_link_second_attach, |
| CORE_link_detach, |
| CORE_link_flow, |
| CORE_link_offer, |
| CORE_link_drained, |
| CORE_link_drain, |
| CORE_link_push, |
| CORE_link_deliver, |
| CORE_link_get_credit, |
| CORE_delivery_update, |
| CORE_close_connection, |
| CORE_conn_trace); |
| |
| qd_router_python_setup(qd->router); |
| qd_timer_schedule(qd->router->timer, 1000); |
| } |
| |
| void qd_router_free(qd_router_t *router) |
| { |
| if (!router) return; |
| |
| qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH); |
| |
| qdr_core_free(router->router_core); |
| qd_tracemask_free(router->tracemask); |
| qd_timer_free(router->timer); |
| sys_mutex_free(router->lock); |
| qd_router_configure_free(router); |
| qd_router_python_free(router); |
| |
| free(router); |
| free(node_id); |
| free(direct_prefix); |
| } |
| |
| |
| const char *qd_router_id(const qd_dispatch_t *qd) |
| { |
| return node_id; |
| } |
| |
| |
| qdr_core_t *qd_router_core(qd_dispatch_t *qd) |
| { |
| return qd->router->router_core; |
| } |
| |
| |
| // called when Q2 holdoff is deactivated so we can receive more message buffers |
| // |
| void qd_link_restart_rx(qd_link_t *in_link) |
| { |
| if (!in_link) |
| return; |
| |
| assert(qd_link_direction(in_link) == QD_INCOMING); |
| |
| qd_connection_t *in_conn = qd_link_connection(in_link); |
| if (in_conn) { |
| qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp); |
| set_safe_ptr_qd_link_t(in_link, safe_ptr); |
| qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr); |
| } |
| } |
| |
| |
| // Issue a warning POLICY log message with connection and link identities |
| // prepended to the policy denial text string. |
| void qd_connection_log_policy_denial(qd_link_t *link, const char *text) |
| { |
| qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); |
| uint64_t l_id = 0; |
| uint64_t c_id = 0; |
| if (rlink) { |
| l_id = rlink->identity; |
| if (rlink->conn) { |
| c_id = rlink->conn->identity; |
| } |
| } |
| qd_log(qd_policy_log_source(), QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] %s", |
| c_id, l_id, text); |
| } |