| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <stdio.h> |
| #include <string.h> |
| #include <qpid/dispatch/container.h> |
| #include <qpid/dispatch/message.h> |
| #include <proton/engine.h> |
| #include <proton/message.h> |
| #include <qpid/dispatch/ctools.h> |
| #include <qpid/dispatch/hash.h> |
| #include <qpid/dispatch/threading.h> |
| #include <qpid/dispatch/iterator.h> |
| #include <qpid/dispatch/log.h> |
| |
| static char *module="CONTAINER"; |
| |
| struct dx_node_t { |
| const dx_node_type_t *ntype; |
| char *name; |
| void *context; |
| dx_dist_mode_t supported_dist; |
| dx_lifetime_policy_t life_policy; |
| }; |
| |
| ALLOC_DECLARE(dx_node_t); |
| ALLOC_DEFINE(dx_node_t); |
| ALLOC_DEFINE(dx_link_item_t); |
| |
| struct dx_link_t { |
| pn_link_t *pn_link; |
| void *context; |
| dx_node_t *node; |
| }; |
| |
| ALLOC_DECLARE(dx_link_t); |
| ALLOC_DEFINE(dx_link_t); |
| |
| typedef struct nxc_node_type_t { |
| DEQ_LINKS(struct nxc_node_type_t); |
| const dx_node_type_t *ntype; |
| } nxc_node_type_t; |
| DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t); |
| |
| |
| static hash_t *node_type_map; |
| static hash_t *node_map; |
| static sys_mutex_t *lock; |
| static dx_node_t *default_node; |
| static nxc_node_type_list_t node_type_list; |
| |
| static void setup_outgoing_link(pn_link_t *pn_link) |
| { |
| sys_mutex_lock(lock); |
| dx_node_t *node; |
| int result; |
| const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); |
| dx_field_iterator_t *iter; |
| // TODO - Extract the name from the structured source |
| |
| if (source) { |
| iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); |
| result = hash_retrieve(node_map, iter, (void*) &node); |
| dx_field_iterator_free(iter); |
| } else |
| result = -1; |
| sys_mutex_unlock(lock); |
| |
| if (result < 0) { |
| if (default_node) |
| node = default_node; |
| else { |
| // Reject the link |
| // TODO - When the API allows, add an error message for "no available node" |
| pn_link_close(pn_link); |
| return; |
| } |
| } |
| |
| dx_link_t *link = new_dx_link_t(); |
| if (!link) { |
| pn_link_close(pn_link); |
| return; |
| } |
| |
| link->pn_link = pn_link; |
| link->context = 0; |
| link->node = node; |
| |
| pn_link_set_context(pn_link, link); |
| node->ntype->outgoing_handler(node->context, link); |
| } |
| |
| |
| static void setup_incoming_link(pn_link_t *pn_link) |
| { |
| sys_mutex_lock(lock); |
| dx_node_t *node; |
| int result; |
| const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); |
| dx_field_iterator_t *iter; |
| // TODO - Extract the name from the structured target |
| |
| if (target) { |
| iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); |
| result = hash_retrieve(node_map, iter, (void*) &node); |
| dx_field_iterator_free(iter); |
| } else |
| result = -1; |
| sys_mutex_unlock(lock); |
| |
| if (result < 0) { |
| if (default_node) |
| node = default_node; |
| else { |
| // Reject the link |
| // TODO - When the API allows, add an error message for "no available node" |
| pn_link_close(pn_link); |
| return; |
| } |
| } |
| |
| dx_link_t *link = new_dx_link_t(); |
| if (!link) { |
| pn_link_close(pn_link); |
| return; |
| } |
| |
| link->pn_link = pn_link; |
| link->context = 0; |
| link->node = node; |
| |
| pn_link_set_context(pn_link, link); |
| node->ntype->incoming_handler(node->context, link); |
| } |
| |
| |
| static int do_writable(pn_link_t *pn_link) |
| { |
| dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); |
| if (!link) |
| return 0; |
| |
| dx_node_t *node = link->node; |
| if (!node) |
| return 0; |
| |
| return node->ntype->writable_handler(node->context, link); |
| } |
| |
| |
| static void process_receive(pn_delivery_t *delivery) |
| { |
| pn_link_t *pn_link = pn_delivery_link(delivery); |
| dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); |
| |
| if (link) { |
| dx_node_t *node = link->node; |
| if (node) { |
| node->ntype->rx_handler(node->context, link, delivery); |
| return; |
| } |
| } |
| |
| // |
| // Reject the delivery if we couldn't find a node to handle it |
| // |
| pn_link_advance(pn_link); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(delivery, PN_REJECTED); |
| pn_delivery_settle(delivery); |
| } |
| |
| |
| static void do_send(pn_delivery_t *delivery) |
| { |
| pn_link_t *pn_link = pn_delivery_link(delivery); |
| dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); |
| |
| if (link) { |
| dx_node_t *node = link->node; |
| if (node) { |
| node->ntype->tx_handler(node->context, link, delivery); |
| return; |
| } |
| } |
| |
| // TODO - Cancel the delivery |
| } |
| |
| |
| static void do_updated(pn_delivery_t *delivery) |
| { |
| pn_link_t *pn_link = pn_delivery_link(delivery); |
| dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); |
| |
| if (link) { |
| dx_node_t *node = link->node; |
| if (node) |
| node->ntype->disp_handler(node->context, link, delivery); |
| } |
| } |
| |
| |
| static int close_handler(void* unused, pn_connection_t *conn) |
| { |
| // |
| // Close all links, passing False as the 'closed' argument. These links are not |
| // being properly 'detached'. They are being orphaned. |
| // |
| pn_link_t *pn_link = pn_link_head(conn, 0); |
| while (pn_link) { |
| dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); |
| dx_node_t *node = link->node; |
| if (node) |
| node->ntype->link_detach_handler(node->context, link, 0); |
| pn_link_close(pn_link); |
| free_dx_link_t(link); |
| pn_link = pn_link_next(pn_link, 0); |
| } |
| |
| // teardown all sessions |
| pn_session_t *ssn = pn_session_head(conn, 0); |
| while (ssn) { |
| pn_session_close(ssn); |
| ssn = pn_session_next(ssn, 0); |
| } |
| |
| // teardown the connection |
| pn_connection_close(conn); |
| return 0; |
| } |
| |
| |
| static int process_handler(void* unused, pn_connection_t *conn) |
| { |
| pn_session_t *ssn; |
| pn_link_t *pn_link; |
| pn_delivery_t *delivery; |
| int event_count = 0; |
| |
| // Step 1: setup the engine's connection, and any sessions and links |
| // that may be pending. |
| |
| // initialize the connection if it's new |
| if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { |
| pn_connection_open(conn); |
| event_count++; |
| } |
| |
| // open all pending sessions |
| ssn = pn_session_head(conn, PN_LOCAL_UNINIT); |
| while (ssn) { |
| pn_session_open(ssn); |
| ssn = pn_session_next(ssn, PN_LOCAL_UNINIT); |
| event_count++; |
| } |
| |
| // configure and open any pending links |
| pn_link = pn_link_head(conn, PN_LOCAL_UNINIT); |
| while (pn_link) { |
| if (pn_link_is_sender(pn_link)) |
| setup_outgoing_link(pn_link); |
| else |
| setup_incoming_link(pn_link); |
| pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT); |
| event_count++; |
| } |
| |
| |
| // Step 2: Now drain all the pending deliveries from the connection's |
| // work queue and process them |
| |
| delivery = pn_work_head(conn); |
| while (delivery) { |
| if (pn_delivery_readable(delivery)) |
| process_receive(delivery); |
| else if (pn_delivery_writable(delivery)) |
| do_send(delivery); |
| |
| if (pn_delivery_updated(delivery)) |
| do_updated(delivery); |
| |
| delivery = pn_work_next(delivery); |
| event_count++; |
| } |
| |
| // |
| // Step 2.5: Traverse all of the links on the connection looking for |
| // outgoing links with non-zero credit. Call the attached node's |
| // writable handler for such links. |
| // |
| pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); |
| while (pn_link) { |
| assert(pn_session_connection(pn_link_session(pn_link)) == conn); |
| if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0) |
| event_count += do_writable(pn_link); |
| pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); |
| } |
| |
| // Step 3: Clean up any links or sessions that have been closed by the |
| // remote. If the connection has been closed remotely, clean that up |
| // also. |
| |
| // teardown any terminating links |
| pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| while (pn_link) { |
| dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); |
| dx_node_t *node = link->node; |
| if (node) |
| node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message |
| pn_link_close(pn_link); |
| pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| event_count++; |
| } |
| |
| // teardown any terminating sessions |
| ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| while (ssn) { |
| pn_session_close(ssn); |
| ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| event_count++; |
| } |
| |
| // teardown the connection if it's terminating |
| if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { |
| pn_connection_close(conn); |
| event_count++; |
| } |
| |
| return event_count; |
| } |
| |
| |
| static void open_handler(dx_connection_t *conn, dx_direction_t dir) |
| { |
| const dx_node_type_t *nt; |
| |
| // |
| // Note the locking structure in this function. Generally this would be unsafe, but since |
| // this particular list is only ever appended to and never has items inserted or deleted, |
| // this usage is safe in this case. |
| // |
| sys_mutex_lock(lock); |
| nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list); |
| sys_mutex_unlock(lock); |
| |
| pn_connection_open(dx_connection_pn(conn)); |
| |
| while (nt_item) { |
| nt = nt_item->ntype; |
| if (dir == DX_INCOMING) { |
| if (nt->inbound_conn_open_handler) |
| nt->inbound_conn_open_handler(nt->type_context, conn); |
| } else { |
| if (nt->outbound_conn_open_handler) |
| nt->outbound_conn_open_handler(nt->type_context, conn); |
| } |
| |
| sys_mutex_lock(lock); |
| nt_item = DEQ_NEXT(nt_item); |
| sys_mutex_unlock(lock); |
| } |
| } |
| |
| |
| static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn) |
| { |
| pn_connection_t *conn = dx_connection_pn(dx_conn); |
| |
| switch (event) { |
| case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break; |
| case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break; |
| case DX_CONN_EVENT_CLOSE: return close_handler(context, conn); |
| case DX_CONN_EVENT_PROCESS: return process_handler(context, conn); |
| } |
| |
| return 0; |
| } |
| |
| |
| void dx_container_initialize(void) |
| { |
| dx_log(module, LOG_TRACE, "Container Initializing"); |
| |
| node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 |
| node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 |
| lock = sys_mutex(); |
| default_node = 0; |
| DEQ_INIT(node_type_list); |
| |
| dx_server_set_conn_handler(handler); |
| } |
| |
| |
| void dx_container_finalize(void) |
| { |
| } |
| |
| |
| int dx_container_register_node_type(const dx_node_type_t *nt) |
| { |
| int result; |
| dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL); |
| nxc_node_type_t *nt_item = NEW(nxc_node_type_t); |
| DEQ_ITEM_INIT(nt_item); |
| nt_item->ntype = nt; |
| |
| sys_mutex_lock(lock); |
| result = hash_insert_const(node_type_map, iter, nt); |
| DEQ_INSERT_TAIL(node_type_list, nt_item); |
| sys_mutex_unlock(lock); |
| |
| dx_field_iterator_free(iter); |
| if (result < 0) |
| return result; |
| dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name); |
| |
| return 0; |
| } |
| |
| |
| void dx_container_set_default_node_type(const dx_node_type_t *nt, |
| void *context, |
| dx_dist_mode_t supported_dist) |
| { |
| if (default_node) |
| dx_container_destroy_node(default_node); |
| |
| if (nt) { |
| default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT); |
| dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); |
| } else { |
| default_node = 0; |
| dx_log(module, LOG_TRACE, "Default node removed"); |
| } |
| } |
| |
| |
| dx_node_t *dx_container_create_node(const dx_node_type_t *nt, |
| const char *name, |
| void *context, |
| dx_dist_mode_t supported_dist, |
| dx_lifetime_policy_t life_policy) |
| { |
| int result; |
| dx_node_t *node = new_dx_node_t(); |
| if (!node) |
| return 0; |
| |
| node->ntype = nt; |
| node->name = 0; |
| node->context = context; |
| node->supported_dist = supported_dist; |
| node->life_policy = life_policy; |
| |
| if (name) { |
| dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL); |
| sys_mutex_lock(lock); |
| result = hash_insert(node_map, iter, node); |
| sys_mutex_unlock(lock); |
| dx_field_iterator_free(iter); |
| if (result < 0) { |
| free_dx_node_t(node); |
| return 0; |
| } |
| |
| node->name = (char*) malloc(strlen(name) + 1); |
| strcpy(node->name, name); |
| } |
| |
| if (name) |
| dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name); |
| |
| return node; |
| } |
| |
| |
| void dx_container_destroy_node(dx_node_t *node) |
| { |
| if (node->name) { |
| dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL); |
| sys_mutex_lock(lock); |
| hash_remove(node_map, iter); |
| sys_mutex_unlock(lock); |
| dx_field_iterator_free(iter); |
| free(node->name); |
| } |
| |
| free_dx_node_t(node); |
| } |
| |
| |
| void dx_container_node_set_context(dx_node_t *node, void *node_context) |
| { |
| node->context = node_context; |
| } |
| |
| |
| dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node) |
| { |
| return node->supported_dist; |
| } |
| |
| |
| dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node) |
| { |
| return node->life_policy; |
| } |
| |
| |
| dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name) |
| { |
| pn_session_t *sess = pn_session(dx_connection_pn(conn)); |
| dx_link_t *link = new_dx_link_t(); |
| |
| if (dir == DX_OUTGOING) |
| link->pn_link = pn_sender(sess, name); |
| else |
| link->pn_link = pn_receiver(sess, name); |
| link->context = node->context; |
| link->node = node; |
| |
| pn_link_set_context(link->pn_link, link); |
| |
| pn_session_open(sess); |
| |
| return link; |
| } |
| |
| |
| void dx_link_set_context(dx_link_t *link, void *context) |
| { |
| link->context = context; |
| } |
| |
| |
| void *dx_link_get_context(dx_link_t *link) |
| { |
| return link->context; |
| } |
| |
| |
| pn_link_t *dx_link_pn(dx_link_t *link) |
| { |
| return link->pn_link; |
| } |
| |
| |
| pn_terminus_t *dx_link_source(dx_link_t *link) |
| { |
| return pn_link_source(link->pn_link); |
| } |
| |
| |
| pn_terminus_t *dx_link_target(dx_link_t *link) |
| { |
| return pn_link_target(link->pn_link); |
| } |
| |
| |
| pn_terminus_t *dx_link_remote_source(dx_link_t *link) |
| { |
| return pn_link_remote_source(link->pn_link); |
| } |
| |
| |
| pn_terminus_t *dx_link_remote_target(dx_link_t *link) |
| { |
| return pn_link_remote_target(link->pn_link); |
| } |
| |
| |
| void dx_link_activate(dx_link_t *link) |
| { |
| if (!link || !link->pn_link) |
| return; |
| |
| pn_session_t *sess = pn_link_session(link->pn_link); |
| if (!sess) |
| return; |
| |
| pn_connection_t *conn = pn_session_connection(sess); |
| if (!conn) |
| return; |
| |
| dx_connection_t *ctx = pn_connection_get_context(conn); |
| if (!ctx) |
| return; |
| |
| dx_server_activate(ctx); |
| } |
| |
| |
| void dx_link_close(dx_link_t *link) |
| { |
| pn_link_close(link->pn_link); |
| } |
| |
| |