| #ifndef qd_router_core_private |
| #define qd_router_core_private 1 |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "dispatch_private.h" |
| #include "message_private.h" |
| #include <qpid/dispatch/router_core.h> |
| #include <qpid/dispatch/threading.h> |
| #include <qpid/dispatch/atomic.h> |
| #include <qpid/dispatch/log.h> |
| #include <memory.h> |
| |
| typedef struct qdr_address_t qdr_address_t; |
| typedef struct qdr_address_config_t qdr_address_config_t; |
| typedef struct qdr_node_t qdr_node_t; |
| typedef struct qdr_router_ref_t qdr_router_ref_t; |
| typedef struct qdr_link_ref_t qdr_link_ref_t; |
| typedef struct qdr_forwarder_t qdr_forwarder_t; |
| typedef struct qdr_link_route_t qdr_link_route_t; |
| typedef struct qdr_auto_link_t qdr_auto_link_t; |
| typedef struct qdr_conn_identifier_t qdr_conn_identifier_t; |
| typedef struct qdr_connection_ref_t qdr_connection_ref_t; |
| typedef struct qdr_exchange qdr_exchange_t; |
| typedef struct qdr_edge_t qdr_edge_t; |
| |
| ALLOC_DECLARE(qdr_address_t); |
| ALLOC_DECLARE(qdr_address_config_t); |
| ALLOC_DECLARE(qdr_node_t); |
| ALLOC_DECLARE(qdr_router_ref_t); |
| ALLOC_DECLARE(qdr_link_ref_t); |
| ALLOC_DECLARE(qdr_link_route_t); |
| ALLOC_DECLARE(qdr_auto_link_t); |
| ALLOC_DECLARE(qdr_conn_identifier_t); |
| ALLOC_DECLARE(qdr_connection_ref_t); |
| |
| ALLOC_DECLARE(qdr_connection_t); |
| ALLOC_DECLARE(qdr_link_t); |
| |
| |
| #include "core_link_endpoint.h" |
| #include "core_events.h" |
| #include "core_attach_address_lookup.h" |
| |
| qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment); |
| int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, |
| bool exclude_inprocess, bool control); |
| bool qdr_forward_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *in_link, qdr_terminus_t *source, |
| qdr_terminus_t *target); |
| void qdr_forward_link_direct_CT(qdr_core_t *core, |
| qdr_connection_t *conn, |
| qdr_link_t *in_link, |
| qdr_terminus_t *source, |
| qdr_terminus_t *target, |
| char *strip, |
| char *insert); |
| |
| typedef enum { |
| QDR_CONDITION_NO_ROUTE_TO_DESTINATION, |
| QDR_CONDITION_ROUTED_LINK_LOST, |
| QDR_CONDITION_FORBIDDEN, |
| QDR_CONDITION_WRONG_ROLE, |
| QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, |
| QDR_CONDITION_INVALID_LINK_EXPIRATION, |
| QDR_CONDITION_NONE |
| } qdr_condition_t; |
| |
| /** |
| * qdr_field_t - This type is used to pass variable-length fields (strings, etc.) into |
| * and out of the router-core thread. |
| */ |
| typedef struct { |
| qd_buffer_list_t buffers; |
| qd_iterator_t *iterator; |
| } qdr_field_t; |
| |
| qdr_field_t *qdr_field(const char *string); |
| qdr_field_t *qdr_field_from_iter(qd_iterator_t *iter); |
| qd_iterator_t *qdr_field_iterator(qdr_field_t *field); |
| void qdr_field_free(qdr_field_t *field); |
| char *qdr_field_copy(qdr_field_t *field); |
| |
| /** |
| * qdr_action_t - This type represents one work item to be performed by the router-core thread. |
| */ |
| typedef struct qdr_action_t qdr_action_t; |
| typedef void (*qdr_action_handler_t) (qdr_core_t *core, qdr_action_t *action, bool discard); |
| |
| struct qdr_action_t { |
| DEQ_LINKS(qdr_action_t); |
| qdr_action_handler_t action_handler; |
| const char *label; |
| union { |
| // |
| // Arguments for router control-plane actions |
| // |
| struct { |
| int link_maskbit; |
| int router_maskbit; |
| int nh_router_maskbit; |
| int cost; |
| int treatment_hint; |
| qd_bitmask_t *router_set; |
| qdr_field_t *address; |
| } route_table; |
| |
| // |
| // Arguments for connection-level actions |
| // |
| struct { |
| qdr_connection_t_sp conn; |
| qdr_field_t *connection_label; |
| qdr_field_t *container_id; |
| qdr_link_t_sp link; |
| qdr_delivery_t *delivery; |
| qd_message_t *msg; |
| qd_direction_t dir; |
| qdr_terminus_t *source; |
| qdr_terminus_t *target; |
| qdr_error_t *error; |
| qd_detach_type_t dt; |
| int credit; |
| bool more; // true if there are more frames arriving, false otherwise |
| bool drain; |
| uint8_t tag[32]; |
| int tag_length; |
| } connection; |
| |
| // |
| // Arguments for delivery state updates |
| // |
| struct { |
| qdr_delivery_t *delivery; |
| uint64_t disposition; |
| bool settled; |
| qdr_error_t *error; |
| } delivery; |
| |
| // |
| // Arguments for in-process messaging |
| // |
| struct { |
| qdr_field_t *address; |
| char address_class; |
| char address_phase; |
| qd_address_treatment_t treatment; |
| qdr_subscription_t *subscription; |
| qd_message_t *message; |
| bool exclude_inprocess; |
| bool control; |
| } io; |
| |
| // |
| // Arguments for management-agent actions |
| // |
| struct { |
| qdr_query_t *query; |
| int offset; |
| qdr_field_t *identity; |
| qdr_field_t *name; |
| qd_parsed_field_t *in_body; |
| qd_buffer_list_t body_buffers; |
| } agent; |
| |
| // |
| // Arguments for stats request actions |
| // |
| struct { |
| qdr_global_stats_t *stats; |
| qdr_global_stats_handler_t handler; |
| void *context; |
| } stats_request; |
| |
| // |
| // Arguments for general use |
| // |
| struct { |
| void *context_1; |
| void *context_2; |
| void *context_3; |
| void *context_4; |
| } general; |
| |
| } args; |
| }; |
| |
| ALLOC_DECLARE(qdr_action_t); |
| DEQ_DECLARE(qdr_action_t, qdr_action_list_t); |
| |
| // |
| // |
| // |
| typedef struct qdr_delivery_cleanup_t qdr_delivery_cleanup_t; |
| |
| struct qdr_delivery_cleanup_t { |
| DEQ_LINKS(qdr_delivery_cleanup_t); |
| qd_message_t *msg; |
| qd_iterator_t *iter; |
| }; |
| |
| ALLOC_DECLARE(qdr_delivery_cleanup_t); |
| DEQ_DECLARE(qdr_delivery_cleanup_t, qdr_delivery_cleanup_list_t); |
| |
| // |
| // General Work |
| // |
| // The following types are used to post work to the IO threads for |
| // non-connection-specific action. These actions are serialized through |
| // a zero-delay timer and are processed by one thread at a time. General |
| // actions occur in-order and are not run concurrently. |
| // |
| typedef struct qdr_general_work_t qdr_general_work_t; |
| typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work); |
| |
| struct qdr_general_work_t { |
| DEQ_LINKS(qdr_general_work_t); |
| qdr_general_work_handler_t handler; |
| int maskbit; |
| int inter_router_cost; |
| qd_message_t *msg; |
| qdr_receive_t on_message; |
| void *on_message_context; |
| uint64_t in_conn_id; |
| uint64_t mobile_seq; |
| qdr_delivery_cleanup_list_t delivery_cleanup_list; |
| qdr_global_stats_handler_t stats_handler; |
| void *context; |
| }; |
| |
| ALLOC_DECLARE(qdr_general_work_t); |
| DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t); |
| |
| qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler); |
| |
| |
| // |
| // Connection Work |
| // |
| // The following types are used to post work to the IO threads for |
| // connection-specific action. The actions for a particular connection |
| // are run in-order and are not concurrent. Actions for different connections |
| // will run concurrently. |
| // |
| typedef enum { |
| QDR_CONNECTION_WORK_FIRST_ATTACH, |
| QDR_CONNECTION_WORK_SECOND_ATTACH |
| } qdr_connection_work_type_t; |
| |
| typedef struct qdr_connection_work_t { |
| DEQ_LINKS(struct qdr_connection_work_t); |
| qdr_connection_work_type_t work_type; |
| qdr_link_t *link; |
| qdr_terminus_t *source; |
| qdr_terminus_t *target; |
| } qdr_connection_work_t; |
| |
| ALLOC_DECLARE(qdr_connection_work_t); |
| DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t); |
| void qdr_connection_work_free_CT(qdr_connection_work_t *work); |
| |
| // |
| // Link Work |
| // |
| // The following type is used to post link-specific work to the IO threads. |
| // This ensures that work related to a particular link (deliveries, disposition |
| // updates, flow updates, and detaches) are processed in-order. |
| // |
| // DELIVERY - Push up to _value_ deliveries from the undelivered list to the |
| // link (outgoing links only). Don't push more than there is |
| // available credit for. If the full number of deliveries (_value_) |
| // cannot be pushed, don't consume this work item from the list. |
| // This link will be blocked until further credit is received. |
| // FLOW - Push a flow update using _drain_action_ and _value_ for the |
| // number of incremental credits. |
| // FIRST_DETACH - Issue a first detach on this link, using _error_ if there is an |
| // error condition. |
| // SECOND_DETACH - Issue a second detach on this link. |
| // |
| typedef enum { |
| QDR_LINK_WORK_DELIVERY, |
| QDR_LINK_WORK_FLOW, |
| QDR_LINK_WORK_FIRST_DETACH, |
| QDR_LINK_WORK_SECOND_DETACH |
| } qdr_link_work_type_t; |
| |
| typedef enum { |
| QDR_LINK_WORK_DRAIN_ACTION_NONE = 0, |
| QDR_LINK_WORK_DRAIN_ACTION_SET, |
| QDR_LINK_WORK_DRAIN_ACTION_CLEAR, |
| QDR_LINK_WORK_DRAIN_ACTION_DRAINED |
| } qdr_link_work_drain_action_t; |
| |
| typedef struct qdr_link_work_t { |
| DEQ_LINKS(struct qdr_link_work_t); |
| qdr_link_work_type_t work_type; |
| qdr_error_t *error; |
| int value; |
| qdr_link_work_drain_action_t drain_action; |
| bool close_link; |
| bool processing; |
| } qdr_link_work_t; |
| |
| ALLOC_DECLARE(qdr_link_work_t); |
| DEQ_DECLARE(qdr_link_work_t, qdr_link_work_list_t); |
| |
| |
| #define QDR_AGENT_MAX_COLUMNS 64 |
| #define QDR_AGENT_COLUMN_NULL (QDR_AGENT_MAX_COLUMNS + 1) |
| |
| struct qdr_query_t { |
| DEQ_LINKS(qdr_query_t); |
| qdr_core_t *core; |
| qd_router_entity_type_t entity_type; |
| void *context; |
| int columns[QDR_AGENT_MAX_COLUMNS]; |
| qd_composed_field_t *body; |
| qdr_field_t *next_key; |
| int next_offset; |
| bool more; |
| qd_amqp_error_t status; |
| uint64_t in_conn; // or perhaps a pointer??? |
| }; |
| |
| DEQ_DECLARE(qdr_query_t, qdr_query_list_t); |
| |
| struct qdr_node_t { |
| DEQ_LINKS(qdr_node_t); |
| qdr_address_t *owning_addr; |
| int mask_bit; |
| qdr_node_t *next_hop; ///< Next hop node _if_ this is not a neighbor node |
| int link_mask_bit; ///< Mask bit of inter-router connection if this is a neighbor node |
| uint32_t ref_count; |
| qd_bitmask_t *valid_origins; |
| int cost; |
| uint64_t mobile_seq; |
| char *wire_address_ma; ///< The address of this router's mobile-sync agent in non-hashed form |
| }; |
| |
| DEQ_DECLARE(qdr_node_t, qdr_node_list_t); |
| void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode); |
| |
| #define PEER_CONTROL_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->control_links_by_mask_bit[n->link_mask_bit] : 0) |
| // PEER_DATA_LINK has gotten more complex with prioritized links, and is now a function, peer_data_link(). |
| |
| |
| |
| struct qdr_router_ref_t { |
| DEQ_LINKS(qdr_router_ref_t); |
| qdr_node_t *router; |
| }; |
| |
| DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t); |
| |
| typedef struct qdr_delivery_ref_t { |
| DEQ_LINKS(struct qdr_delivery_ref_t); |
| qdr_delivery_t *dlv; |
| } qdr_delivery_ref_t; |
| |
| ALLOC_DECLARE(qdr_delivery_ref_t); |
| DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t); |
| |
| struct qdr_subscription_t { |
| DEQ_LINKS(qdr_subscription_t); |
| qdr_core_t *core; |
| qdr_address_t *addr; |
| qdr_receive_t on_message; |
| void *on_message_context; |
| bool in_core; |
| }; |
| |
| DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t); |
| |
| typedef struct qdr_subscription_ref_t { |
| DEQ_LINKS(struct qdr_subscription_ref_t); |
| qdr_subscription_t *sub; |
| } qdr_subscription_ref_t; |
| |
| ALLOC_DECLARE(qdr_subscription_ref_t); |
| DEQ_DECLARE(qdr_subscription_ref_t, qdr_subscription_ref_list_t); |
| |
| void qdr_add_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_t *sub); |
| void qdr_del_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_ref_t *ref); |
| |
| DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t); |
| |
| void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv); |
| void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref); |
| |
| #define QDR_LINK_LIST_CLASS_ADDRESS 0 |
| #define QDR_LINK_LIST_CLASS_WORK 1 |
| #define QDR_LINK_LIST_CLASS_CONNECTION 2 |
| #define QDR_LINK_LIST_CLASS_LOCAL 3 |
| #define QDR_LINK_LIST_CLASSES 4 |
| |
| typedef enum { |
| QDR_LINK_OPER_UP, |
| QDR_LINK_OPER_DOWN, |
| QDR_LINK_OPER_QUIESCING, |
| QDR_LINK_OPER_IDLE |
| } qdr_link_oper_status_t; |
| |
| #define QDR_LINK_RATE_DEPTH 5 |
| |
| struct qdr_link_t { |
| DEQ_LINKS(qdr_link_t); |
| qdr_core_t *core; |
| uint64_t identity; |
| void *user_context; |
| void *edge_context; ///< Opaque context to be used for edge-related purposes |
| qdr_connection_t *conn; ///< [ref] Connection that owns this link |
| qd_link_type_t link_type; |
| qd_direction_t link_direction; |
| qdr_link_work_list_t work_list; |
| char *name; |
| char *disambiguated_name; |
| char *terminus_addr; |
| int attach_count; ///< 1 or 2 depending on the state of the lifecycle |
| int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle |
| qdr_address_t *owning_addr; ///< [ref] Address record that owns this link |
| int phase; |
| qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link |
| qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint |
| qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to containing reference objects |
| qdr_auto_link_t *auto_link; ///< [ref] Auto_link that owns this link |
| qdr_delivery_list_t undelivered; ///< Deliveries to be forwarded or sent |
| qdr_delivery_list_t unsettled; ///< Unsettled deliveries |
| qdr_delivery_list_t settled; ///< Settled deliveries |
| qdr_delivery_ref_list_t updated_deliveries; ///< References to deliveries (in the unsettled list) with updates. |
| qdr_link_oper_status_t oper_status; |
| int capacity; |
| int credit_to_core; ///< Number of the available credits incrementally given to the core |
| int credit_pending; ///< Number of credits to be issued once consumers are available |
| int credit_stored; ///< Number of credits given to the link before it was ready to process them. |
| int credit_reported; ///< Number of credits to expose to management |
| uint32_t zero_credit_time; ///< Number of core ticks when credit last went to zero |
| bool reported_as_blocked; ///< The fact that this link has been blocked with zero credit has been logged |
| bool admin_enabled; |
| bool strip_annotations_in; |
| bool strip_annotations_out; |
| bool drain_mode; |
| bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure |
| bool detach_received; ///< True on core receipt of inbound attach |
| bool detach_send_done; ///< True once the detach has been sent by the I/O thread |
| bool edge; ///< True if this link is in an edge-connection |
| bool processing; ///< True if an IO thread is currently handling this link |
| bool ready_to_free; ///< True if the core thread wanted to clean up the link but it was processing |
| bool fallback; ///< True if this link is attached to a fallback destination for an address |
| char *strip_prefix; |
| char *insert_prefix; |
| bool terminus_survives_disconnect; |
| |
| uint64_t total_deliveries; |
| uint64_t presettled_deliveries; |
| uint64_t dropped_presettled_deliveries; |
| uint64_t accepted_deliveries; |
| uint64_t rejected_deliveries; |
| uint64_t released_deliveries; |
| uint64_t modified_deliveries; |
| uint64_t deliveries_delayed_1sec; |
| uint64_t deliveries_delayed_10sec; |
| uint64_t deliveries_stuck; |
| uint64_t settled_deliveries[QDR_LINK_RATE_DEPTH]; |
| uint64_t *ingress_histogram; |
| uint8_t priority; |
| uint8_t rate_cursor; |
| uint32_t core_ticks; |
| }; |
| |
| DEQ_DECLARE(qdr_link_t, qdr_link_list_t); |
| |
| struct qdr_link_ref_t { |
| DEQ_LINKS(qdr_link_ref_t); |
| qdr_link_t *link; |
| }; |
| |
| DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t); |
| |
| void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls); |
| bool qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls); |
| void move_link_ref(qdr_link_t *link, int from_cls, int to_cls); |
| |
| |
| struct qdr_connection_ref_t { |
| DEQ_LINKS(qdr_connection_ref_t); |
| qdr_connection_t *conn; |
| }; |
| |
| DEQ_DECLARE(qdr_connection_ref_t, qdr_connection_ref_list_t); |
| |
| void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn); |
| void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn); |
| |
| struct qdr_address_t { |
| DEQ_LINKS(qdr_address_t); |
| qdr_address_config_t *config; |
| qdr_subscription_list_t subscriptions; ///< In-process message subscribers |
| qdr_connection_ref_list_t conns; ///< Local Connections for route-destinations |
| qdr_link_ref_list_t rlinks; ///< Locally-Connected Consumers |
| qdr_link_ref_list_t inlinks; ///< Locally-Connected Producers |
| qd_bitmask_t *rnodes; ///< Bitmask of remote routers with connected consumers |
| qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry |
| qdrc_endpoint_desc_t *core_endpoint; ///< [ref] Set if this address is bound to an in-core endpoint |
| void *core_endpoint_context; |
| qdr_link_t *edge_inlink; ///< [ref] In-link from connected Interior router (on edge router) |
| qdr_link_t *edge_outlink; ///< [ref] Out-link to connected Interior router (on edge router) |
| qd_address_treatment_t treatment; |
| qdr_forwarder_t *forwarder; |
| int ref_count; ///< Number of entities referencing this address |
| bool local; |
| bool router_control_only; ///< If set, address is only for deliveries arriving on a control link |
| uint32_t tracked_deliveries; |
| uint64_t cost_epoch; |
| |
| // |
| // State for mobile-address synchronization |
| // |
| DEQ_LINKS_N(SYNC_ADD, qdr_address_t); |
| DEQ_LINKS_N(SYNC_DEL, qdr_address_t); |
| uint32_t sync_mask; |
| |
| // |
| // State for tracking fallback destinations for undeliverable deliveries |
| // |
| qdr_address_t *fallback; ///< Pointer to this address's fallback destination |
| qdr_address_t *fallback_for; ///< Pointer to the address that this is a fallback for |
| |
| // |
| // State for "closest" treatment |
| // |
| qd_bitmask_t *closest_remotes; |
| int next_remote; |
| |
| // |
| // State for "balanced" treatment |
| // |
| int *outstanding_deliveries; |
| |
| // |
| // State for "exchange" treatment |
| // |
| qdr_exchange_t *exchange; // weak ref |
| |
| // |
| // State for "link balanced" treatment |
| // |
| char *add_prefix; |
| char *del_prefix; |
| |
| /**@name Statistics */ |
| ///@{ |
| uint64_t deliveries_ingress; |
| uint64_t deliveries_egress; |
| uint64_t deliveries_transit; |
| uint64_t deliveries_to_container; |
| uint64_t deliveries_from_container; |
| uint64_t deliveries_egress_route_container; |
| uint64_t deliveries_ingress_route_container; |
| uint64_t deliveries_redirected; |
| |
| ///@} |
| |
| int priority; |
| }; |
| |
| DEQ_DECLARE(qdr_address_t, qdr_address_list_t); |
| |
| qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config); |
| qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_treatment_t treatment); |
| qdr_address_t *qdr_add_mobile_address_CT(qdr_core_t *core, const char* prefix, const char *addr, qd_address_treatment_t treatment, bool edge); |
| void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr); |
| void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link); |
| void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link); |
| void qdr_core_bind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_connection_t *conn); |
| void qdr_core_unbind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_connection_t *conn); |
| void qdr_setup_fallback_address_CT(qdr_core_t *core, qdr_address_t *addr); |
| |
| struct qdr_address_config_t { |
| DEQ_LINKS(qdr_address_config_t); |
| char *name; |
| uint64_t identity; |
| uint32_t ref_count; |
| char *pattern; |
| bool fallback; |
| bool is_prefix; |
| qd_address_treatment_t treatment; |
| int in_phase; |
| int out_phase; |
| int priority; |
| }; |
| |
| DEQ_DECLARE(qdr_address_config_t, qdr_address_config_list_t); |
| void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr); |
| bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); |
| |
| // |
| // Connection Information |
| // |
| // This record is used to give the core thread access to the details |
| // of a connection's configuration. |
| // |
| |
| struct qdr_connection_info_t { |
| char *container; |
| char *sasl_mechanisms; |
| char *host; |
| bool is_encrypted; |
| char *ssl_proto; |
| char *ssl_cipher; |
| char *user; |
| bool is_authenticated; |
| bool opened; |
| qd_direction_t dir; |
| qdr_connection_role_t role; |
| pn_data_t *connection_properties; |
| bool ssl; |
| int ssl_ssf; //ssl strength factor |
| }; |
| |
| ALLOC_DECLARE(qdr_connection_info_t); |
| |
| DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t); |
| |
| |
| typedef enum { |
| QDR_CONN_OPER_UP, |
| } qdr_conn_oper_status_t; |
| |
| |
| typedef enum { |
| QDR_CONN_ADMIN_ENABLED, |
| QDR_CONN_ADMIN_DELETED |
| } qdr_conn_admin_status_t; |
| |
| |
| struct qdr_connection_t { |
| DEQ_LINKS(qdr_connection_t); |
| DEQ_LINKS_N(ACTIVATE, qdr_connection_t); |
| uint64_t identity; |
| qdr_core_t *core; |
| bool incoming; |
| bool in_activate_list; |
| qdr_connection_role_t role; |
| int inter_router_cost; |
| qdr_conn_identifier_t *conn_id; |
| qdr_conn_identifier_t *alt_conn_id; |
| bool strip_annotations_in; |
| bool strip_annotations_out; |
| bool policy_allow_dynamic_link_routes; |
| bool policy_allow_admin_status_update; |
| int link_capacity; |
| int mask_bit; |
| qdr_connection_work_list_t work_list; |
| sys_mutex_t *work_lock; |
| qdr_link_ref_list_t links; |
| qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES]; |
| char *tenant_space; |
| int tenant_space_len; |
| qdr_connection_info_t *connection_info; |
| void *user_context; /* Updated from IO thread, use work_lock */ |
| qdr_link_route_list_t conn_link_routes; // connection scoped link routes |
| qdr_conn_oper_status_t oper_status; |
| qdr_conn_admin_status_t admin_status; |
| qdr_error_t *error; |
| bool closed; // This bit is used in the case where a client is trying to force close this connection. |
| uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running. |
| uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection. |
| }; |
| |
| DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t); |
| |
| #define QDR_IS_LINK_ROUTE_PREFIX(p) ((p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_IN || (p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_OUT) |
| #define QDR_IS_LINK_ROUTE(p) ((p) == QD_ITER_HASH_PREFIX_LINKROUTE_PATTERN_IN || (p) == QD_ITER_HASH_PREFIX_LINKROUTE_PATTERN_OUT || QDR_IS_LINK_ROUTE_PREFIX(p)) |
| #define QDR_LINK_ROUTE_DIR(p) (((p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_IN || (p) == QD_ITER_HASH_PREFIX_LINKROUTE_PATTERN_IN) ? QD_INCOMING : QD_OUTGOING) |
| #define QDR_LINK_ROUTE_HASH(dir, is_prefix) \ |
| (((dir) == QD_INCOMING) \ |
| ? ((is_prefix) ? QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_IN : QD_ITER_HASH_PREFIX_LINKROUTE_PATTERN_IN) \ |
| : ((is_prefix) ? QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_OUT : QD_ITER_HASH_PREFIX_LINKROUTE_PATTERN_OUT)) |
| |
| struct qdr_link_route_t { |
| DEQ_LINKS(qdr_link_route_t); |
| DEQ_LINKS_N(REF, qdr_link_route_t); |
| uint64_t identity; |
| char *name; |
| qdr_address_t *addr; |
| qd_direction_t dir; |
| qdr_conn_identifier_t *conn_id; |
| qd_address_treatment_t treatment; |
| bool active; |
| bool is_prefix; |
| char *pattern; |
| char *add_prefix; |
| char *del_prefix; |
| qdr_connection_t *parent_conn; |
| }; |
| |
| void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr); |
| void qdr_core_delete_auto_link (qdr_core_t *core, qdr_auto_link_t *al); |
| |
| // Core timer related field/data structures |
| typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context); |
| typedef qdr_address_t * (*qdr_edge_conn_addr_t) (void *context); |
| |
| typedef struct qdr_core_timer_t { |
| DEQ_LINKS(struct qdr_core_timer_t); |
| qdr_timer_cb_t handler; |
| void *context; |
| uint32_t delta_time_seconds; |
| bool scheduled; |
| } qdr_core_timer_t; |
| |
| ALLOC_DECLARE(qdr_core_timer_t); |
| DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t); |
| |
| |
| typedef enum { |
| QDR_AUTO_LINK_STATE_INACTIVE, |
| QDR_AUTO_LINK_STATE_ATTACHING, |
| QDR_AUTO_LINK_STATE_FAILED, |
| QDR_AUTO_LINK_STATE_ACTIVE, |
| QDR_AUTO_LINK_STATE_QUIESCING, |
| QDR_AUTO_LINK_STATE_IDLE |
| } qdr_auto_link_state_t; |
| |
| struct qdr_auto_link_t { |
| DEQ_LINKS(qdr_auto_link_t); |
| DEQ_LINKS_N(REF, qdr_auto_link_t); |
| uint64_t identity; |
| char *name; |
| qdr_address_t *addr; |
| char *external_addr; |
| const char *internal_addr; |
| int phase; |
| int retry_attempts; |
| qd_direction_t dir; |
| qdr_conn_identifier_t *conn_id; |
| qdr_link_t *link; |
| qdr_auto_link_state_t state; |
| qdr_core_timer_t *retry_timer; // If the auto link attach fails or gets disconnected, this timer retries the attach. |
| char *last_error; |
| bool fallback; // True iff this auto-link attaches to a fallback destination for an address. |
| }; |
| |
| DEQ_DECLARE(qdr_auto_link_t, qdr_auto_link_list_t); |
| |
| |
| struct qdr_conn_identifier_t { |
| qd_hash_handle_t *connection_hash_handle; |
| qd_hash_handle_t *container_hash_handle; |
| qdr_connection_ref_list_t connection_refs; |
| qdr_link_route_list_t link_route_refs; |
| qdr_auto_link_list_t auto_link_refs; |
| }; |
| |
| DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t); |
| |
| typedef struct qdr_priority_sheaf_t { |
| qdr_link_t *links[QDR_N_PRIORITIES]; |
| int count; |
| } qdr_priority_sheaf_t; |
| |
| struct qdr_core_t { |
| qd_dispatch_t *qd; |
| qd_log_source_t *log; |
| qd_log_source_t *agent_log; |
| sys_thread_t *thread; |
| bool running; |
| qdr_action_list_t action_list; |
| qdr_action_list_t action_list_background; /// Actions processed only when the action_list is empty |
| sys_cond_t *action_cond; |
| sys_mutex_t *action_lock; |
| |
| sys_mutex_t *work_lock; |
| qdr_core_timer_list_t scheduled_timers; |
| qdr_general_work_list_t work_list; |
| qd_timer_t *work_timer; |
| uint32_t uptime_ticks; |
| |
| qdr_connection_list_t open_connections; |
| qdr_connection_t *active_edge_connection; |
| qdr_connection_list_t connections_to_activate; |
| qdr_link_list_t open_links; |
| |
| qdrc_attach_addr_lookup_t addr_lookup_handler; |
| void *addr_lookup_context; |
| |
| // |
| // Agent section |
| // |
| qdr_query_list_t outgoing_query_list; |
| sys_mutex_t *query_lock; |
| qd_timer_t *agent_timer; |
| qdr_manage_response_t agent_response_handler; |
| qdr_subscription_t *agent_subscription_mobile; |
| qdr_subscription_t *agent_subscription_local; |
| |
| // |
| // Route table section |
| // |
| void *rt_context; |
| qdr_set_mobile_seq_t rt_set_mobile_seq; |
| qdr_set_my_mobile_seq_t rt_set_my_mobile_seq; |
| qdr_link_lost_t rt_link_lost; |
| |
| // |
| // Connection section |
| // |
| void *user_context; |
| qdr_link_first_attach_t first_attach_handler; |
| qdr_link_second_attach_t second_attach_handler; |
| qdr_link_detach_t detach_handler; |
| qdr_link_flow_t flow_handler; |
| qdr_link_offer_t offer_handler; |
| qdr_link_drained_t drained_handler; |
| qdr_link_drain_t drain_handler; |
| qdr_link_push_t push_handler; |
| qdr_link_deliver_t deliver_handler; |
| qdr_link_get_credit_t get_credit_handler; |
| qdr_delivery_update_t delivery_update_handler; |
| qdr_connection_close_t conn_close_handler; |
| |
| // |
| // Events section |
| // |
| qdrc_event_subscription_list_t conn_event_subscriptions; |
| qdrc_event_subscription_list_t link_event_subscriptions; |
| qdrc_event_subscription_list_t addr_event_subscriptions; |
| qdrc_event_subscription_list_t router_event_subscriptions; |
| |
| qd_router_mode_t router_mode; |
| const char *router_area; |
| const char *router_id; |
| |
| qdr_address_config_list_t addr_config; |
| qdr_auto_link_list_t auto_links; |
| qdr_link_route_list_t link_routes; |
| qd_hash_t *conn_id_hash; |
| qdr_address_list_t addrs; |
| qd_hash_t *addr_hash; |
| qd_parse_tree_t *addr_parse_tree; |
| qd_parse_tree_t *link_route_tree[2]; // QD_INCOMING, QD_OUTGOING |
| qdr_address_t *hello_addr; |
| qdr_address_t *router_addr_L; |
| qdr_address_t *routerma_addr_L; |
| qdr_address_t *router_addr_T; |
| qdr_address_t *routerma_addr_T; |
| |
| qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest |
| qd_bitmask_t *neighbor_free_mask; |
| qdr_node_t **routers_by_mask_bit; |
| qdr_link_t **control_links_by_mask_bit; |
| qdr_priority_sheaf_t *data_links_by_mask_bit; |
| uint64_t cost_epoch; |
| |
| uint64_t next_tag; |
| |
| uint64_t next_identifier; |
| sys_mutex_t *id_lock; |
| |
| qdr_exchange_list_t exchanges; |
| qdr_forwarder_t *forwarders[QD_TREATMENT_LINK_BALANCED + 1]; |
| |
| qdr_delivery_cleanup_list_t delivery_cleanup_list; ///< List of delivery cleanup items to be processed in an IO thread |
| |
| // Overall delivery counters |
| uint64_t presettled_deliveries; |
| uint64_t dropped_presettled_deliveries; |
| uint64_t accepted_deliveries; |
| uint64_t rejected_deliveries; |
| uint64_t released_deliveries; |
| uint64_t modified_deliveries; |
| uint64_t deliveries_ingress; |
| uint64_t deliveries_egress; |
| uint64_t deliveries_transit; |
| uint64_t deliveries_egress_route_container; |
| uint64_t deliveries_ingress_route_container; |
| uint64_t deliveries_delayed_1sec; |
| uint64_t deliveries_delayed_10sec; |
| uint64_t deliveries_stuck; |
| uint64_t deliveries_redirected; |
| uint32_t links_blocked; |
| |
| qdr_edge_conn_addr_t edge_conn_addr; |
| void *edge_context; |
| }; |
| |
| struct qdr_terminus_t { |
| qdr_field_t *address; |
| pn_durability_t durability; |
| pn_expiry_policy_t expiry_policy; |
| pn_seconds_t timeout; |
| bool dynamic; |
| bool coordinator; |
| pn_distribution_mode_t distribution_mode; |
| pn_data_t *properties; |
| pn_data_t *filter; |
| pn_data_t *outcomes; |
| pn_data_t *capabilities; |
| }; |
| |
| ALLOC_DECLARE(qdr_terminus_t); |
| |
| void *router_core_thread(void *arg); |
| uint64_t qdr_identifier(qdr_core_t* core); |
| void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id); |
| void qdr_route_table_setup_CT(qdr_core_t *core); |
| void qdr_agent_setup_CT(qdr_core_t *core); |
| void qdr_forwarder_setup_CT(qdr_core_t *core); |
| qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label); |
| void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); |
| void qdr_action_background_enqueue(qdr_core_t *core, qdr_action_t *action); |
| void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain); |
| void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr); |
| void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); |
| |
| /** |
| * Returns true if the passed in address is a mobile address, false otherwise |
| * If the first character of the address_key (obtained using its hash_handle) is M, the address is mobile. |
| */ |
| bool qdr_address_is_mobile_CT(qdr_address_t *addr); |
| |
| void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg); |
| void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control); |
| void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); |
| |
| void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq); |
| void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq); |
| void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit); |
| |
| void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work); |
| void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr); |
| bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); |
| qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg); |
| void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv); |
| void qdr_connection_free(qdr_connection_t *conn); |
| void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn); |
| qdr_address_config_t *qdr_config_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter); |
| qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter, qdr_address_config_t **addr_config); |
| qd_address_treatment_t qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t *iter, qd_address_treatment_t default_treatment, qdr_address_config_t **addr_config); |
| qdr_edge_t *qdr_edge(qdr_core_t *); |
| void qdr_edge_free(qdr_edge_t *); |
| void qdr_edge_connection_opened(qdr_edge_t *edge, qdr_connection_t *conn); |
| void qdr_edge_connection_closed(qdr_edge_t *edge); |
| |
| void qdr_connection_enqueue_work_CT(qdr_core_t *core, |
| qdr_connection_t *conn, |
| qdr_connection_work_t *work); |
| void qdr_link_enqueue_work_CT(qdr_core_t *core, |
| qdr_link_t *conn, |
| qdr_link_work_t *work); |
| |
| qdr_link_t *qdr_create_link_CT(qdr_core_t *core, |
| qdr_connection_t *conn, |
| qd_link_type_t link_type, |
| qd_direction_t dir, |
| qdr_terminus_t *source, |
| qdr_terminus_t *target); |
| |
| void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close); |
| void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); |
| |
| qdr_query_t *qdr_query(qdr_core_t *core, |
| void *context, |
| qd_router_entity_type_t type, |
| qd_composed_field_t *body, |
| uint64_t conn_id); |
| void qdr_modules_finalize(qdr_core_t *core); |
| |
| /** |
| * Create a new timer which will only be used inside the code thread. |
| * |
| * @param core Pointer to the core object returned by qd_core() |
| * @callback Callback function to be invoked when timer fires. |
| * @timer_context Context to be used when firing callback |
| */ |
| qdr_core_timer_t *qdr_core_timer_CT(qdr_core_t *core, qdr_timer_cb_t callback, void *timer_context); |
| |
| |
| /** |
| * Schedules a core timer with a delay. The timer will fire after "delay" seconds |
| * @param core Pointer to the core object returned by qd_core() |
| * @param timer Timer object that needs to be scheduled. |
| * @param delay The number of seconds to wait before firing the timer |
| */ |
| void qdr_core_timer_schedule_CT(qdr_core_t *core, qdr_core_timer_t *timer, uint32_t delay); |
| |
| /** |
| * Cancels an already scheduled timeer. This does not free the timer. It is the responsibility of the person who |
| * created the timer to free it. |
| * @param core Pointer to the core object returned by qd_core() |
| * @param timer Timer object that needs to be scheduled. |
| * |
| */ |
| void qdr_core_timer_cancel_CT(qdr_core_t *core, qdr_core_timer_t *timer); |
| |
| /** |
| * Cancels the timer if it is scheduled and and free it. |
| * @param core Pointer to the core object returned by qd_core() |
| * @param timer Timer object that needs to be scheduled. |
| */ |
| void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer); |
| |
| /** |
| * Clears the sheaf of priority links in a connection. |
| * Call this when a connection is being closed, when the mask-bit |
| * for that sheaf is being returned to the core for re-use. |
| * @param core Pointer to the core object returned by qd_core() |
| * @param n uint8_t index for the sheaf to be reset prior to re-use. |
| */ |
| void qdr_reset_sheaf(qdr_core_t *core, uint8_t n); |
| |
| /** |
| * Run in an IO thread. |
| * |
| * Records Proton's view of the link's available credit and tracks it for management and |
| * logging. |
| */ |
| void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link); |
| |
| #endif |