| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <stdio.h> |
| #include <string.h> |
| #include "dispatch_private.h" |
| #include <qpid/dispatch/container.h> |
| #include <qpid/dispatch/server.h> |
| #include <qpid/dispatch/message.h> |
| #include <proton/engine.h> |
| #include <proton/message.h> |
| #include <qpid/dispatch/ctools.h> |
| #include <qpid/dispatch/hash.h> |
| #include <qpid/dispatch/threading.h> |
| #include <qpid/dispatch/iterator.h> |
| #include <qpid/dispatch/log.h> |
| #include <qpid/dispatch/agent.h> |
| #include "conditionals.h" |
| |
| static char *module="CONTAINER"; |
| |
| struct qd_node_t { |
| qd_container_t *container; |
| const qd_node_type_t *ntype; |
| char *name; |
| void *context; |
| qd_dist_mode_t supported_dist; |
| qd_lifetime_policy_t life_policy; |
| }; |
| |
| ALLOC_DECLARE(qd_node_t); |
| ALLOC_DEFINE(qd_node_t); |
| ALLOC_DEFINE(qd_link_item_t); |
| |
| |
| struct qd_link_t { |
| pn_link_t *pn_link; |
| void *context; |
| qd_node_t *node; |
| bool drain_mode; |
| }; |
| |
| ALLOC_DECLARE(qd_link_t); |
| ALLOC_DEFINE(qd_link_t); |
| |
| |
| struct qd_delivery_t { |
| pn_delivery_t *pn_delivery; |
| qd_delivery_t *peer; |
| void *context; |
| uint64_t disposition; |
| qd_link_t *link; |
| int in_fifo; |
| bool pending_delete; |
| }; |
| |
| ALLOC_DECLARE(qd_delivery_t); |
| ALLOC_DEFINE(qd_delivery_t); |
| |
| |
| typedef struct qdc_node_type_t { |
| DEQ_LINKS(struct qdc_node_type_t); |
| const qd_node_type_t *ntype; |
| } qdc_node_type_t; |
| DEQ_DECLARE(qdc_node_type_t, qdc_node_type_list_t); |
| |
| static int QD_CONTAINER_CLASS_CONTAINER = 1; |
| static int QD_CONTAINER_CLASS_NODE_TYPE = 2; |
| static int QD_CONTAINER_CLASS_NODE = 3; |
| |
| typedef struct container_class_t { |
| qd_container_t *container; |
| int class_id; |
| } container_class_t; |
| |
| struct qd_container_t { |
| qd_dispatch_t *qd; |
| qd_server_t *server; |
| qd_hash_t *node_type_map; |
| qd_hash_t *node_map; |
| sys_mutex_t *lock; |
| qd_node_t *default_node; |
| qdc_node_type_list_t node_type_list; |
| qd_agent_class_t *class_container; |
| qd_agent_class_t *class_node_type; |
| qd_agent_class_t *class_node; |
| }; |
| |
| static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) |
| { |
| sys_mutex_lock(container->lock); |
| qd_node_t *node = 0; |
| const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); |
| qd_field_iterator_t *iter; |
| // TODO - Extract the name from the structured source |
| |
| if (source) { |
| iter = qd_field_iterator_string(source, ITER_VIEW_NODE_ID); |
| qd_hash_retrieve(container->node_map, iter, (void*) &node); |
| qd_field_iterator_free(iter); |
| } |
| sys_mutex_unlock(container->lock); |
| |
| if (node == 0) { |
| if (container->default_node) |
| node = container->default_node; |
| else { |
| // Reject the link |
| // TODO - When the API allows, add an error message for "no available node" |
| pn_link_close(pn_link); |
| return; |
| } |
| } |
| |
| qd_link_t *link = new_qd_link_t(); |
| if (!link) { |
| pn_link_close(pn_link); |
| return; |
| } |
| |
| link->pn_link = pn_link; |
| link->context = 0; |
| link->node = node; |
| link->drain_mode = pn_link_get_drain(pn_link); |
| |
| pn_link_set_context(pn_link, link); |
| node->ntype->outgoing_handler(node->context, link); |
| } |
| |
| |
| static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) |
| { |
| sys_mutex_lock(container->lock); |
| qd_node_t *node = 0; |
| const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); |
| qd_field_iterator_t *iter; |
| // TODO - Extract the name from the structured target |
| |
| if (target) { |
| iter = qd_field_iterator_string(target, ITER_VIEW_NODE_ID); |
| qd_hash_retrieve(container->node_map, iter, (void*) &node); |
| qd_field_iterator_free(iter); |
| } |
| sys_mutex_unlock(container->lock); |
| |
| if (node == 0) { |
| if (container->default_node) |
| node = container->default_node; |
| else { |
| // Reject the link |
| // TODO - When the API allows, add an error message for "no available node" |
| pn_link_close(pn_link); |
| return; |
| } |
| } |
| |
| qd_link_t *link = new_qd_link_t(); |
| if (!link) { |
| pn_link_close(pn_link); |
| return; |
| } |
| |
| link->pn_link = pn_link; |
| link->context = 0; |
| link->node = node; |
| link->drain_mode = pn_link_get_drain(pn_link); |
| |
| pn_link_set_context(pn_link, link); |
| node->ntype->incoming_handler(node->context, link); |
| } |
| |
| |
| static int do_writable(pn_link_t *pn_link) |
| { |
| qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); |
| if (!link) |
| return 0; |
| |
| qd_node_t *node = link->node; |
| if (!node) |
| return 0; |
| |
| return node->ntype->writable_handler(node->context, link); |
| } |
| |
| |
| static void do_receive(pn_delivery_t *pnd) |
| { |
| pn_link_t *pn_link = pn_delivery_link(pnd); |
| qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); |
| qd_delivery_t *delivery = (qd_delivery_t*) pn_delivery_get_context(pnd); |
| |
| if (link) { |
| qd_node_t *node = link->node; |
| if (node) { |
| if (!delivery) { |
| delivery = new_qd_delivery_t(); |
| delivery->pn_delivery = pnd; |
| delivery->peer = 0; |
| delivery->context = 0; |
| delivery->disposition = 0; |
| delivery->link = link; |
| delivery->in_fifo = 0; |
| delivery->pending_delete = false; |
| pn_delivery_set_context(pnd, delivery); |
| } |
| |
| node->ntype->rx_handler(node->context, link, delivery); |
| return; |
| } |
| } |
| |
| // |
| // Reject the delivery if we couldn't find a node to handle it |
| // |
| pn_link_advance(pn_link); |
| pn_link_flow(pn_link, 1); |
| pn_delivery_update(pnd, PN_REJECTED); |
| pn_delivery_settle(pnd); |
| } |
| |
| |
| static void do_updated(pn_delivery_t *pnd) |
| { |
| pn_link_t *pn_link = pn_delivery_link(pnd); |
| qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); |
| qd_delivery_t *delivery = (qd_delivery_t*) pn_delivery_get_context(pnd); |
| |
| if (link && delivery) { |
| qd_node_t *node = link->node; |
| if (node) |
| node->ntype->disp_handler(node->context, link, delivery); |
| } |
| } |
| |
| |
| static int close_handler(void* unused, pn_connection_t *conn) |
| { |
| // |
| // Close all links, passing False as the 'closed' argument. These links are not |
| // being properly 'detached'. They are being orphaned. |
| // |
| pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE); |
| while (pn_link) { |
| qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); |
| qd_node_t *node = link->node; |
| if (node && link) |
| node->ntype->link_detach_handler(node->context, link, 0); |
| pn_link_close(pn_link); |
| free_qd_link_t(link); |
| pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE); |
| } |
| |
| // teardown all sessions |
| pn_session_t *ssn = pn_session_head(conn, 0); |
| while (ssn) { |
| pn_session_close(ssn); |
| ssn = pn_session_next(ssn, 0); |
| } |
| |
| // teardown the connection |
| pn_connection_close(conn); |
| return 0; |
| } |
| |
| |
| static int process_handler(qd_container_t *container, void* unused, pn_connection_t *conn) |
| { |
| pn_session_t *ssn; |
| pn_link_t *pn_link; |
| pn_delivery_t *delivery; |
| int event_count = 0; |
| |
| // Step 1: setup the engine's connection, and any sessions and links |
| // that may be pending. |
| |
| // initialize the connection if it's new |
| if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { |
| pn_connection_open(conn); |
| event_count++; |
| } |
| |
| // open all pending sessions |
| ssn = pn_session_head(conn, PN_LOCAL_UNINIT); |
| while (ssn) { |
| pn_session_open(ssn); |
| ssn = pn_session_next(ssn, PN_LOCAL_UNINIT); |
| event_count++; |
| } |
| |
| // configure and open any pending links |
| pn_link = pn_link_head(conn, PN_LOCAL_UNINIT); |
| while (pn_link) { |
| if (pn_link_is_sender(pn_link)) |
| setup_outgoing_link(container, pn_link); |
| else |
| setup_incoming_link(container, pn_link); |
| pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT); |
| event_count++; |
| } |
| |
| |
| // Step 2: Now drain all the pending deliveries from the connection's |
| // work queue and process them |
| |
| delivery = pn_work_head(conn); |
| while (delivery) { |
| if (pn_delivery_readable(delivery)) |
| do_receive(delivery); |
| |
| if (pn_delivery_updated(delivery)) { |
| do_updated(delivery); |
| pn_delivery_clear(delivery); |
| } |
| delivery = pn_work_next(delivery); |
| event_count++; |
| } |
| |
| // |
| // Step 2.5: Call the attached node's writable handler for all active links |
| // on the connection. Note that in Dispatch, links are considered |
| // bidirectional. Incoming and outgoing only pertains to deliveries and |
| // deliveries are a subset of the traffic that flows both directions on links. |
| // |
| pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); |
| while (pn_link) { |
| assert(pn_session_connection(pn_link_session(pn_link)) == conn); |
| event_count += do_writable(pn_link); |
| pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); |
| } |
| |
| // Step 3: Clean up any links or sessions that have been closed by the |
| // remote. If the connection has been closed remotely, clean that up |
| // also. |
| |
| // teardown any terminating links |
| pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| while (pn_link) { |
| qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); |
| qd_node_t *node = link->node; |
| if (node) |
| node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message |
| pn_link_close(pn_link); |
| pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| event_count++; |
| } |
| |
| // teardown any terminating sessions |
| ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| while (ssn) { |
| pn_session_close(ssn); |
| ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); |
| event_count++; |
| } |
| |
| // teardown the connection if it's terminating |
| if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { |
| pn_connection_close(conn); |
| event_count++; |
| } |
| |
| return event_count; |
| } |
| |
| |
| static void open_handler(qd_container_t *container, qd_connection_t *conn, qd_direction_t dir) |
| { |
| const qd_node_type_t *nt; |
| |
| // |
| // Note the locking structure in this function. Generally this would be unsafe, but since |
| // this particular list is only ever appended to and never has items inserted or deleted, |
| // this usage is safe in this case. |
| // |
| sys_mutex_lock(container->lock); |
| qdc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list); |
| sys_mutex_unlock(container->lock); |
| |
| pn_connection_open(qd_connection_pn(conn)); |
| |
| while (nt_item) { |
| nt = nt_item->ntype; |
| if (dir == QD_INCOMING) { |
| if (nt->inbound_conn_open_handler) |
| nt->inbound_conn_open_handler(nt->type_context, conn); |
| } else { |
| if (nt->outbound_conn_open_handler) |
| nt->outbound_conn_open_handler(nt->type_context, conn); |
| } |
| |
| sys_mutex_lock(container->lock); |
| nt_item = DEQ_NEXT(nt_item); |
| sys_mutex_unlock(container->lock); |
| } |
| } |
| |
| |
| static int handler(void *handler_context, void *conn_context, qd_conn_event_t event, qd_connection_t *qd_conn) |
| { |
| qd_container_t *container = (qd_container_t*) handler_context; |
| pn_connection_t *conn = qd_connection_pn(qd_conn); |
| |
| switch (event) { |
| case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn, QD_INCOMING); break; |
| case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING); break; |
| case QD_CONN_EVENT_CLOSE: return close_handler(conn_context, conn); |
| case QD_CONN_EVENT_PROCESS: return process_handler(container, conn_context, conn); |
| } |
| |
| return 0; |
| } |
| |
| |
| static void container_schema_handler(void *context, void *correlator) |
| { |
| } |
| |
| |
| static void container_query_handler(void* context, const char *id, void *correlator) |
| { |
| container_class_t *cls = (container_class_t*) context; |
| |
| if (cls->class_id == QD_CONTAINER_CLASS_CONTAINER) { |
| qd_agent_value_uint(correlator, "node_type_count", qd_hash_size(cls->container->node_type_map)); |
| qd_agent_value_uint(correlator, "node_count", qd_hash_size(cls->container->node_map)); |
| if (cls->container->default_node) |
| qd_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name); |
| else |
| qd_agent_value_null(correlator, "default_node_type"); |
| qd_agent_value_complete(correlator, false); |
| |
| } else if (cls->class_id == QD_CONTAINER_CLASS_NODE_TYPE) { |
| |
| } else if (cls->class_id == QD_CONTAINER_CLASS_NODE) { |
| |
| } |
| } |
| |
| |
| qd_agent_class_t *setup_class(qd_container_t *container, const char *fqname, int id) |
| { |
| container_class_t *cls = NEW(container_class_t); |
| cls->container = container; |
| cls->class_id = id; |
| |
| return qd_agent_register_class(container->qd, fqname, cls, |
| container_schema_handler, |
| container_query_handler); |
| } |
| |
| |
| qd_container_t *qd_container(qd_dispatch_t *qd) |
| { |
| qd_container_t *container = NEW(qd_container_t); |
| |
| container->qd = qd; |
| container->server = qd->server; |
| container->node_type_map = qd_hash(6, 4, 1); // 64 buckets, item batches of 4 |
| container->node_map = qd_hash(10, 32, 0); // 1K buckets, item batches of 32 |
| container->lock = sys_mutex(); |
| container->default_node = 0; |
| DEQ_INIT(container->node_type_list); |
| |
| qd_log(module, LOG_TRACE, "Container Initializing"); |
| qd_server_set_conn_handler(qd, handler, container); |
| |
| return container; |
| } |
| |
| |
| void qd_container_setup_agent(qd_dispatch_t *qd) |
| { |
| qd->container->class_container = |
| setup_class(qd->container, "org.apache.qpid.dispatch.container", QD_CONTAINER_CLASS_CONTAINER); |
| qd->container->class_node_type = |
| setup_class(qd->container, "org.apache.qpid.dispatch.container.node_type", QD_CONTAINER_CLASS_NODE_TYPE); |
| qd->container->class_node = |
| setup_class(qd->container, "org.apache.qpid.dispatch.container.node", QD_CONTAINER_CLASS_NODE); |
| } |
| |
| |
| void qd_container_free(qd_container_t *container) |
| { |
| // TODO - Free the nodes |
| // TODO - Free the node types |
| sys_mutex_free(container->lock); |
| free(container); |
| } |
| |
| |
| int qd_container_register_node_type(qd_dispatch_t *qd, const qd_node_type_t *nt) |
| { |
| qd_container_t *container = qd->container; |
| |
| int result; |
| qd_field_iterator_t *iter = qd_field_iterator_string(nt->type_name, ITER_VIEW_ALL); |
| qdc_node_type_t *nt_item = NEW(qdc_node_type_t); |
| DEQ_ITEM_INIT(nt_item); |
| nt_item->ntype = nt; |
| |
| sys_mutex_lock(container->lock); |
| result = qd_hash_insert_const(container->node_type_map, iter, nt, 0); |
| DEQ_INSERT_TAIL(container->node_type_list, nt_item); |
| sys_mutex_unlock(container->lock); |
| |
| qd_field_iterator_free(iter); |
| if (result < 0) |
| return result; |
| qd_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name); |
| |
| return 0; |
| } |
| |
| |
| qd_node_t *qd_container_set_default_node_type(qd_dispatch_t *qd, |
| const qd_node_type_t *nt, |
| void *context, |
| qd_dist_mode_t supported_dist) |
| { |
| qd_container_t *container = qd->container; |
| |
| if (container->default_node) |
| qd_container_destroy_node(container->default_node); |
| |
| if (nt) { |
| container->default_node = qd_container_create_node(qd, nt, 0, context, supported_dist, QD_LIFE_PERMANENT); |
| qd_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); |
| } else { |
| container->default_node = 0; |
| qd_log(module, LOG_TRACE, "Default node removed"); |
| } |
| |
| return container->default_node; |
| } |
| |
| |
| qd_node_t *qd_container_create_node(qd_dispatch_t *qd, |
| const qd_node_type_t *nt, |
| const char *name, |
| void *context, |
| qd_dist_mode_t supported_dist, |
| qd_lifetime_policy_t life_policy) |
| { |
| qd_container_t *container = qd->container; |
| int result; |
| qd_node_t *node = new_qd_node_t(); |
| if (!node) |
| return 0; |
| |
| node->container = container; |
| node->ntype = nt; |
| node->name = 0; |
| node->context = context; |
| node->supported_dist = supported_dist; |
| node->life_policy = life_policy; |
| |
| if (name) { |
| qd_field_iterator_t *iter = qd_field_iterator_string(name, ITER_VIEW_ALL); |
| sys_mutex_lock(container->lock); |
| result = qd_hash_insert(container->node_map, iter, node, 0); |
| sys_mutex_unlock(container->lock); |
| qd_field_iterator_free(iter); |
| if (result < 0) { |
| free_qd_node_t(node); |
| return 0; |
| } |
| |
| node->name = (char*) malloc(strlen(name) + 1); |
| strcpy(node->name, name); |
| } |
| |
| if (name) |
| qd_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name); |
| |
| return node; |
| } |
| |
| |
| void qd_container_destroy_node(qd_node_t *node) |
| { |
| qd_container_t *container = node->container; |
| |
| if (node->name) { |
| qd_field_iterator_t *iter = qd_field_iterator_string(node->name, ITER_VIEW_ALL); |
| sys_mutex_lock(container->lock); |
| qd_hash_remove(container->node_map, iter); |
| sys_mutex_unlock(container->lock); |
| qd_field_iterator_free(iter); |
| free(node->name); |
| } |
| |
| free_qd_node_t(node); |
| } |
| |
| |
| void qd_container_node_set_context(qd_node_t *node, void *node_context) |
| { |
| node->context = node_context; |
| } |
| |
| |
| qd_dist_mode_t qd_container_node_get_dist_modes(const qd_node_t *node) |
| { |
| return node->supported_dist; |
| } |
| |
| |
| qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node) |
| { |
| return node->life_policy; |
| } |
| |
| |
| qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name) |
| { |
| pn_session_t *sess = pn_session(qd_connection_pn(conn)); |
| qd_link_t *link = new_qd_link_t(); |
| |
| if (dir == QD_OUTGOING) |
| link->pn_link = pn_sender(sess, name); |
| else |
| link->pn_link = pn_receiver(sess, name); |
| |
| link->context = node->context; |
| link->node = node; |
| link->drain_mode = pn_link_get_drain(link->pn_link); |
| |
| pn_link_set_context(link->pn_link, link); |
| |
| pn_session_open(sess); |
| |
| return link; |
| } |
| |
| |
| void qd_link_set_context(qd_link_t *link, void *context) |
| { |
| link->context = context; |
| } |
| |
| |
| void *qd_link_get_context(qd_link_t *link) |
| { |
| return link->context; |
| } |
| |
| |
| void qd_link_set_conn_context(qd_link_t *link, void *context) |
| { |
| pn_session_t *pn_sess = pn_link_session(link->pn_link); |
| if (!pn_sess) |
| return; |
| pn_connection_t *pn_conn = pn_session_connection(pn_sess); |
| if (!pn_conn) |
| return; |
| qd_connection_t *conn = (qd_connection_t*) pn_connection_get_context(pn_conn); |
| if (!conn) |
| return; |
| qd_connection_set_link_context(conn, context); |
| } |
| |
| |
| void *qd_link_get_conn_context(qd_link_t *link) |
| { |
| pn_session_t *pn_sess = pn_link_session(link->pn_link); |
| if (!pn_sess) |
| return 0; |
| pn_connection_t *pn_conn = pn_session_connection(pn_sess); |
| if (!pn_conn) |
| return 0; |
| qd_connection_t *conn = (qd_connection_t*) pn_connection_get_context(pn_conn); |
| if (!conn) |
| return 0; |
| return qd_connection_get_link_context(conn); |
| } |
| |
| |
| pn_link_t *qd_link_pn(qd_link_t *link) |
| { |
| return link->pn_link; |
| } |
| |
| |
| qd_connection_t *qd_link_connection(qd_link_t *link) |
| { |
| if (!link || !link->pn_link) |
| return 0; |
| |
| pn_session_t *sess = pn_link_session(link->pn_link); |
| if (!sess) |
| return 0; |
| |
| pn_connection_t *conn = pn_session_connection(sess); |
| if (!conn) |
| return 0; |
| |
| qd_connection_t *ctx = pn_connection_get_context(conn); |
| if (!ctx) |
| return 0; |
| |
| return ctx; |
| } |
| |
| |
| pn_terminus_t *qd_link_source(qd_link_t *link) |
| { |
| return pn_link_source(link->pn_link); |
| } |
| |
| |
| pn_terminus_t *qd_link_target(qd_link_t *link) |
| { |
| return pn_link_target(link->pn_link); |
| } |
| |
| |
| pn_terminus_t *qd_link_remote_source(qd_link_t *link) |
| { |
| return pn_link_remote_source(link->pn_link); |
| } |
| |
| |
| pn_terminus_t *qd_link_remote_target(qd_link_t *link) |
| { |
| return pn_link_remote_target(link->pn_link); |
| } |
| |
| |
| void qd_link_activate(qd_link_t *link) |
| { |
| if (!link || !link->pn_link || pn_link_state(link->pn_link) != (PN_LOCAL_ACTIVE|PN_REMOTE_ACTIVE)) |
| return; |
| |
| pn_session_t *sess = pn_link_session(link->pn_link); |
| if (!sess || pn_session_state(sess) != (PN_LOCAL_ACTIVE|PN_REMOTE_ACTIVE)) |
| return; |
| |
| pn_connection_t *conn = pn_session_connection(sess); |
| if (!conn || pn_connection_state(conn) != (PN_LOCAL_ACTIVE|PN_REMOTE_ACTIVE)) |
| return; |
| |
| qd_connection_t *ctx = pn_connection_get_context(conn); |
| if (!ctx) |
| return; |
| |
| qd_server_activate(ctx); |
| } |
| |
| |
| void qd_link_close(qd_link_t *link) |
| { |
| pn_link_close(link->pn_link); |
| } |
| |
| |
| bool qd_link_drain_changed(qd_link_t *link, bool *mode) |
| { |
| bool pn_mode = pn_link_get_drain(link->pn_link); |
| bool changed = pn_mode != link->drain_mode; |
| |
| *mode = pn_mode; |
| if (changed) |
| link->drain_mode = pn_mode; |
| return changed; |
| } |
| |
| |
| qd_delivery_t *qd_delivery(qd_link_t *link, pn_delivery_tag_t tag) |
| { |
| pn_link_t *pnl = qd_link_pn(link); |
| |
| // |
| // If there is a current delivery on this outgoing link, something |
| // is wrong with the delivey algorithm. We assume that the current |
| // delivery ('pnd' below) is the one created by pn_delivery. If it is |
| // not, then my understanding of how proton works is incorrect. |
| // |
| assert(!pn_link_current(pnl)); |
| |
| pn_delivery(pnl, tag); |
| pn_delivery_t *pnd = pn_link_current(pnl); |
| |
| if (!pnd) |
| return 0; |
| |
| qd_delivery_t *delivery = new_qd_delivery_t(); |
| delivery->pn_delivery = pnd; |
| delivery->peer = 0; |
| delivery->context = 0; |
| delivery->disposition = 0; |
| delivery->link = link; |
| delivery->in_fifo = 0; |
| delivery->pending_delete = false; |
| pn_delivery_set_context(pnd, delivery); |
| |
| return delivery; |
| } |
| |
| |
| void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition) |
| { |
| if (delivery->pn_delivery) { |
| if (final_disposition > 0) |
| pn_delivery_update(delivery->pn_delivery, final_disposition); |
| pn_delivery_set_context(delivery->pn_delivery, 0); |
| pn_delivery_settle(delivery->pn_delivery); |
| delivery->pn_delivery = 0; |
| } |
| |
| assert(!delivery->peer); |
| |
| if (delivery->in_fifo) |
| delivery->pending_delete = true; |
| else { |
| free_qd_delivery_t(delivery); |
| } |
| } |
| |
| |
| void qd_delivery_link_peers_LH(qd_delivery_t *right, qd_delivery_t *left) |
| { |
| right->peer = left; |
| left->peer = right; |
| } |
| |
| |
| void qd_delivery_unlink_LH(qd_delivery_t *delivery) |
| { |
| if (delivery->peer) { |
| delivery->peer->peer = 0; |
| delivery->peer = 0; |
| } |
| } |
| |
| |
| void qd_delivery_fifo_enter_LH(qd_delivery_t *delivery) |
| { |
| delivery->in_fifo++; |
| } |
| |
| |
| bool qd_delivery_fifo_exit_LH(qd_delivery_t *delivery) |
| { |
| delivery->in_fifo--; |
| if (delivery->in_fifo == 0 && delivery->pending_delete) { |
| free_qd_delivery_t(delivery); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| |
| void qd_delivery_set_context(qd_delivery_t *delivery, void *context) |
| { |
| delivery->context = context; |
| } |
| |
| |
| void *qd_delivery_context(qd_delivery_t *delivery) |
| { |
| return delivery->context; |
| } |
| |
| |
| qd_delivery_t *qd_delivery_peer(qd_delivery_t *delivery) |
| { |
| return delivery->peer; |
| } |
| |
| |
| pn_delivery_t *qd_delivery_pn(qd_delivery_t *delivery) |
| { |
| return delivery->pn_delivery; |
| } |
| |
| |
| void qd_delivery_settle(qd_delivery_t *delivery) |
| { |
| if (delivery->pn_delivery) { |
| pn_delivery_set_context(delivery->pn_delivery, 0); |
| pn_delivery_settle(delivery->pn_delivery); |
| delivery->pn_delivery = 0; |
| } |
| } |
| |
| |
| bool qd_delivery_settled(qd_delivery_t *delivery) |
| { |
| return pn_delivery_settled(delivery->pn_delivery); |
| } |
| |
| |
| bool qd_delivery_disp_changed(qd_delivery_t *delivery) |
| { |
| return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery); |
| } |
| |
| |
| uint64_t qd_delivery_disp(qd_delivery_t *delivery) |
| { |
| delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery); |
| return delivery->disposition; |
| } |
| |
| |
| qd_link_t *qd_delivery_link(qd_delivery_t *delivery) |
| { |
| return delivery->link; |
| } |
| |