| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <qpid/dispatch/ctools.h> |
| #include <qpid/dispatch/amqp.h> |
| #include "module.h" |
| #include "core_link_endpoint.h" |
| #include <stdio.h> |
| |
| typedef struct qdr_addr_tracking_module_context_t qdr_addr_tracking_module_context_t; |
| typedef struct qdr_addr_endpoint_state_t qdr_addr_endpoint_state_t; |
| |
| struct qdr_addr_endpoint_state_t { |
| DEQ_LINKS(qdr_addr_endpoint_state_t); |
| qdrc_endpoint_t *endpoint; |
| qdr_connection_t *conn; // The connection associated with the endpoint. |
| qdr_addr_tracking_module_context_t *mc; |
| int ref_count; |
| bool closed; // Is the endpoint that this state belong to closed? |
| }; |
| |
| DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t); |
| ALLOC_DECLARE(qdr_addr_endpoint_state_t); |
| ALLOC_DEFINE(qdr_addr_endpoint_state_t); |
| |
| struct qdr_addr_tracking_module_context_t { |
| qdr_core_t *core; |
| qdr_addr_endpoint_state_list_t endpoint_state_list; |
| qdrc_event_subscription_t *event_sub; |
| qdrc_endpoint_desc_t addr_tracking_endpoint; |
| }; |
| |
| |
| static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t *addr, bool insert_addr) |
| { |
| qd_message_t *msg = qd_message(); |
| |
| // |
| // Start header |
| // |
| qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0); |
| qd_compose_start_list(fld); |
| qd_compose_insert_bool(fld, 0); // durable |
| qd_compose_end_list(fld); |
| |
| qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); |
| |
| qd_compose_start_list(body); |
| |
| const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle); |
| |
| qd_compose_insert_string(body, addr_str); |
| qd_compose_insert_bool(body, insert_addr); |
| qd_compose_end_list(body); |
| |
| // Finally, compose and retuen the message so it can be sent out. |
| qd_message_compose_3(msg, fld, body); |
| |
| qd_compose_free(body); |
| qd_compose_free(fld); |
| |
| return msg; |
| } |
| |
| static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t endpoint_state_list, qdr_connection_t *conn) |
| { |
| qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list); |
| while(endpoint_state) { |
| if (endpoint_state->conn == conn) { |
| return endpoint_state; |
| } |
| endpoint_state = DEQ_NEXT(endpoint_state); |
| } |
| return 0; |
| } |
| |
| |
| static void qdrc_address_endpoint_first_attach(void *bind_context, |
| qdrc_endpoint_t *endpoint, |
| void **link_context, |
| qdr_terminus_t *remote_source, |
| qdr_terminus_t *remote_target) |
| { |
| qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context; |
| |
| qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t(); |
| |
| ZERO(endpoint_state); |
| endpoint_state->endpoint = endpoint; |
| endpoint_state->mc = bc; |
| endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint); |
| |
| |
| DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state); |
| |
| // |
| // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link |
| // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection. |
| // |
| if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) { |
| *link_context = endpoint_state; |
| qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target); |
| } |
| else { |
| // |
| // We simply detach any links that dont match the above condition. |
| // |
| *link_context = 0; |
| qdrc_endpoint_detach_CT(bc->core, endpoint, 0); |
| qdr_terminus_free(remote_source); |
| qdr_terminus_free(remote_target); |
| } |
| } |
| |
| |
| static void qdrc_address_endpoint_on_first_detach(void *link_context, |
| qdr_error_t *error) |
| { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link_context; |
| qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0); |
| qdr_error_free(error); |
| } |
| |
| static void qdrc_address_endpoint_cleanup(void *link_context) |
| { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link_context; |
| if (endpoint_state) { |
| qdr_addr_tracking_module_context_t *mc = endpoint_state->mc; |
| assert (endpoint_state->conn); |
| endpoint_state->closed = true; |
| if (endpoint_state->ref_count == 0) { |
| |
| // |
| // The endpoint has been closed and no other links are referencing this endpoint. Time to free it. |
| // Clean out all the states held by the link_context (endpoint_state) |
| // |
| if (mc) { |
| DEQ_REMOVE(mc->endpoint_state_list, endpoint_state); |
| } |
| |
| endpoint_state->conn = 0; |
| endpoint_state->endpoint = 0; |
| free_qdr_addr_endpoint_state_t(endpoint_state); |
| } |
| } |
| } |
| |
| |
| static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn) |
| { |
| if (!addr) |
| return false; |
| |
| bool can_send = false; |
| if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) { |
| // There is at least one receiver for this address somewhere in the router network |
| can_send = true; |
| } |
| if (!can_send) { |
| if (DEQ_SIZE(addr->rlinks) == 1) { |
| qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); |
| if (link_ref->link->conn != conn) |
| can_send=true; |
| } |
| } |
| return can_send; |
| } |
| |
| |
| static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr) |
| { |
| if (!addr) |
| return; |
| |
| if (!endpoint) |
| return; |
| |
| qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr); |
| qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg); |
| |
| qdrc_endpoint_send_CT(core, endpoint, dlv, true); |
| } |
| |
| static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) |
| { |
| // We only care about mobile addresses. |
| if(!qdr_address_is_mobile_CT(addr)) |
| return; |
| |
| qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context; |
| switch (event) { |
| case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : { |
| // |
| // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything |
| // |
| if (qd_bitmask_cardinality(addr->rnodes) == 0) { |
| qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); |
| // |
| // Every inlink that has an edge context must be informed of the appearence of this address. |
| // |
| while (inlink) { |
| if(inlink->link->edge_context != 0) { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; |
| if (!endpoint_state->closed && qdrc_can_send_address(addr, endpoint_state->conn) ) { |
| qdrc_endpoint_t *endpoint = endpoint_state->endpoint; |
| qdrc_send_message(addr_tracking->core, addr, endpoint, true); |
| } |
| } |
| inlink = DEQ_NEXT(inlink); |
| } |
| } |
| break; |
| } |
| case QDRC_EVENT_ADDR_BECAME_DEST : { |
| // |
| // This address transitioned from zero to one destination. If this address already had local destinations |
| // |
| qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); |
| // |
| // Every inlink that has an edge context must be informed of the appearence of this address. |
| // |
| while (inlink) { |
| if(inlink->link->edge_context != 0) { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; |
| if (!endpoint_state->closed && qdrc_can_send_address(addr, endpoint_state->conn) ) { |
| qdrc_endpoint_t *endpoint = endpoint_state->endpoint; |
| if (endpoint) |
| qdrc_send_message(addr_tracking->core, addr, endpoint, true); |
| } |
| } |
| inlink = DEQ_NEXT(inlink); |
| } |
| } |
| break; |
| |
| case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : { |
| // The address no longer has any local destinations. |
| // If there are no remote destinations either, we have to tell the edge routers to delete their sender links |
| if (qd_bitmask_cardinality(addr->rnodes) == 0) { |
| qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); |
| // |
| // Every inlink that has an edge context must be informed of the disappearence of this address. |
| // |
| while (inlink) { |
| if(inlink->link->edge_context != 0) { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; |
| if(!endpoint_state->closed) { |
| qdrc_endpoint_t *endpoint = endpoint_state->endpoint; |
| if (endpoint) |
| qdrc_send_message(addr_tracking->core, addr, endpoint, false); |
| } |
| } |
| inlink = DEQ_NEXT(inlink); |
| } |
| } |
| |
| break; |
| } |
| case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: { |
| // |
| // This address transitioned from N destinations to one local dest |
| // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it |
| // |
| assert(DEQ_SIZE(addr->rlinks) == 1); |
| // |
| // There should be only one rlink in the rlinks list |
| // |
| qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks); |
| qdr_link_t *link = rlink_ref->link; |
| |
| qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); |
| while (inlink) { |
| if (inlink->link->edge_context != 0) { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; |
| qdrc_endpoint_t *endpoint = endpoint_state->endpoint; |
| if (endpoint_state->conn == link->conn && !endpoint_state->closed) { |
| qdrc_send_message(addr_tracking->core, addr, endpoint, false); |
| break; |
| } |
| } |
| inlink = DEQ_NEXT(inlink); |
| } |
| } |
| break; |
| case QDRC_EVENT_ADDR_TWO_DEST: { |
| // |
| // The address transitioned from one local dest to two destinations, The second destination might be local or remote. |
| // |
| qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks); |
| qdr_link_t *link = rlink_ref->link; |
| |
| qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); |
| while (inlink) { |
| if(inlink->link->edge_context != 0) { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; |
| qdrc_endpoint_t *endpoint = endpoint_state->endpoint; |
| if (link->conn == endpoint_state->conn && !endpoint_state->closed) { |
| qdrc_send_message(addr_tracking->core, addr, endpoint, true); |
| break; |
| } |
| } |
| inlink = DEQ_NEXT(inlink); |
| } |
| break; |
| } |
| |
| default: |
| break; |
| } |
| } |
| |
| static void on_link_event(void *context, qdrc_event_t event, qdr_link_t *link) |
| { |
| switch (event) { |
| case QDRC_EVENT_LINK_EDGE_DATA_ATTACHED : |
| { |
| qdr_addr_tracking_module_context_t *mc = (qdr_addr_tracking_module_context_t *) context; |
| qdr_address_t *addr = link->owning_addr; |
| if (addr && qdr_address_is_mobile_CT(addr) && DEQ_SIZE(addr->subscriptions) == 0 && link->link_direction == QD_INCOMING) { |
| qdr_addr_endpoint_state_t *endpoint_state = qdrc_get_endpoint_state_for_connection(mc->endpoint_state_list, link->conn); |
| // Fix for DISPATCH-1492. Remove the assert(endpoint_state); and add an if condition check for endpoint_state |
| // We will not prevent regular endpoints from connecting to the edge listener for now. |
| if (endpoint_state) { |
| assert(link->edge_context == 0); |
| link->edge_context = endpoint_state; |
| endpoint_state->ref_count++; |
| if (qdrc_can_send_address(addr, link->conn)) { |
| qdrc_send_message(mc->core, addr, endpoint_state->endpoint, true); |
| } |
| } |
| } |
| break; |
| } |
| case QDRC_EVENT_LINK_EDGE_DATA_DETACHED : |
| { |
| if (link->edge_context) { |
| qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link->edge_context; |
| endpoint_state->ref_count--; |
| link->edge_context = 0; |
| // |
| // The endpoint has been closed and no other links are referencing this endpoint. Time to free it. |
| // |
| if (endpoint_state->ref_count == 0 && endpoint_state->closed) { |
| qdr_addr_tracking_module_context_t *mc = endpoint_state->mc; |
| if (mc) { |
| DEQ_REMOVE(mc->endpoint_state_list, endpoint_state); |
| } |
| endpoint_state->conn = 0; |
| endpoint_state->endpoint = 0; |
| free_qdr_addr_endpoint_state_t(endpoint_state); |
| } |
| } |
| break; |
| } |
| |
| default: |
| break; |
| } |
| } |
| |
| |
| static bool qdrc_edge_address_tracking_enable_CT(qdr_core_t *core) |
| { |
| return core->router_mode == QD_ROUTER_MODE_INTERIOR; |
| } |
| |
| |
| static void qdrc_edge_address_tracking_init_CT(qdr_core_t *core, void **module_context) |
| { |
| qdr_addr_tracking_module_context_t *context = NEW(qdr_addr_tracking_module_context_t); |
| ZERO(context); |
| context->core = core; |
| *module_context = context; |
| |
| // |
| // Bind to the static address QD_TERMINUS_EDGE_ADDRESS_TRACKING |
| // |
| context->addr_tracking_endpoint.label = "qdrc_edge_address_tracking_module_init_CT"; |
| context->addr_tracking_endpoint.on_first_attach = qdrc_address_endpoint_first_attach; |
| context->addr_tracking_endpoint.on_first_detach = qdrc_address_endpoint_on_first_detach; |
| context->addr_tracking_endpoint.on_cleanup = qdrc_address_endpoint_cleanup; |
| qdrc_endpoint_bind_mobile_address_CT(core, QD_TERMINUS_EDGE_ADDRESS_TRACKING, '0', &context->addr_tracking_endpoint, context); |
| |
| // |
| // Subscribe to address and link events. |
| // |
| context->event_sub = qdrc_event_subscribe_CT(core, |
| QDRC_EVENT_ADDR_BECAME_LOCAL_DEST | QDRC_EVENT_ADDR_ONE_LOCAL_DEST | |
| QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST | QDRC_EVENT_ADDR_BECAME_DEST | QDRC_EVENT_ADDR_TWO_DEST | |
| QDRC_EVENT_LINK_EDGE_DATA_ATTACHED | QDRC_EVENT_LINK_EDGE_DATA_DETACHED, |
| 0, |
| on_link_event, |
| on_addr_event, |
| 0, |
| context); |
| } |
| |
| |
| static void qdrc_edge_address_tracking_final_CT(void *module_context) |
| { |
| qdr_addr_tracking_module_context_t *mc = ( qdr_addr_tracking_module_context_t *)module_context; |
| |
| // If there are any endpoint states still hanging around, clean it up. |
| qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(mc->endpoint_state_list); |
| while (endpoint_state) { |
| DEQ_REMOVE_HEAD(mc->endpoint_state_list); |
| free_qdr_addr_endpoint_state_t(endpoint_state); |
| endpoint_state = DEQ_HEAD(mc->endpoint_state_list); |
| } |
| qdrc_event_unsubscribe_CT(mc->core, mc->event_sub); |
| free(mc); |
| } |
| |
| |
| QDR_CORE_MODULE_DECLARE("edge_addr_tracking", qdrc_edge_address_tracking_enable_CT, qdrc_edge_address_tracking_init_CT, qdrc_edge_address_tracking_final_CT) |