| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "engine-internal.h" |
| #include <stdlib.h> |
| #include <string.h> |
| #include "protocol.h" |
| |
| #include <assert.h> |
| #include <stdarg.h> |
| #include <stdio.h> |
| |
| #include "platform.h" |
| #include "platform_fmt.h" |
| #include "transport/transport.h" |
| |
| |
| static void pni_session_bound(pn_session_t *ssn); |
| static void pni_link_bound(pn_link_t *link); |
| |
| |
| // endpoints |
| |
| static pn_connection_t *pni_ep_get_connection(pn_endpoint_t *endpoint) |
| { |
| switch (endpoint->type) { |
| case CONNECTION: |
| return (pn_connection_t *) endpoint; |
| case SESSION: |
| return ((pn_session_t *) endpoint)->connection; |
| case SENDER: |
| case RECEIVER: |
| return ((pn_link_t *) endpoint)->session->connection; |
| } |
| |
| return NULL; |
| } |
| |
| static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) { |
| switch (type) { |
| case CONNECTION: |
| return open ? PN_CONNECTION_LOCAL_OPEN : PN_CONNECTION_LOCAL_CLOSE; |
| case SESSION: |
| return open ? PN_SESSION_LOCAL_OPEN : PN_SESSION_LOCAL_CLOSE; |
| case SENDER: |
| case RECEIVER: |
| return open ? PN_LINK_LOCAL_OPEN : PN_LINK_LOCAL_CLOSE; |
| default: |
| assert(false); |
| return PN_EVENT_NONE; |
| } |
| } |
| |
| static void pn_endpoint_open(pn_endpoint_t *endpoint) |
| { |
| if (!(endpoint->state & PN_LOCAL_ACTIVE)) { |
| PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE); |
| pn_connection_t *conn = pni_ep_get_connection(endpoint); |
| pn_collector_put(conn->collector, PN_OBJECT, endpoint, |
| endpoint_event(endpoint->type, true)); |
| pn_modified(conn, endpoint, true); |
| } |
| } |
| |
| static void pn_endpoint_close(pn_endpoint_t *endpoint) |
| { |
| if (!(endpoint->state & PN_LOCAL_CLOSED)) { |
| PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED); |
| pn_connection_t *conn = pni_ep_get_connection(endpoint); |
| pn_collector_put(conn->collector, PN_OBJECT, endpoint, |
| endpoint_event(endpoint->type, false)); |
| pn_modified(conn, endpoint, true); |
| } |
| } |
| |
| void pn_connection_reset(pn_connection_t *connection) |
| { |
| assert(connection); |
| pn_endpoint_t *endpoint = &connection->endpoint; |
| endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; |
| } |
| |
| void pn_connection_open(pn_connection_t *connection) |
| { |
| assert(connection); |
| pn_endpoint_open(&connection->endpoint); |
| } |
| |
| void pn_connection_close(pn_connection_t *connection) |
| { |
| assert(connection); |
| pn_endpoint_close(&connection->endpoint); |
| } |
| |
| static void pni_endpoint_tini(pn_endpoint_t *endpoint); |
| |
| void pn_connection_release(pn_connection_t *connection) |
| { |
| assert(!connection->endpoint.freed); |
| // free those endpoints that haven't been freed by the application |
| LL_REMOVE(connection, endpoint, &connection->endpoint); |
| while (connection->endpoint_head) { |
| pn_endpoint_t *ep = connection->endpoint_head; |
| switch (ep->type) { |
| case SESSION: |
| // note: this will free all child links: |
| pn_session_free((pn_session_t *)ep); |
| break; |
| case SENDER: |
| case RECEIVER: |
| pn_link_free((pn_link_t *)ep); |
| break; |
| default: |
| assert(false); |
| } |
| } |
| connection->endpoint.freed = true; |
| if (!connection->transport) { |
| // no transport available to consume transport work items, |
| // so manually clear them: |
| pn_ep_incref(&connection->endpoint); |
| pn_connection_unbound(connection); |
| } |
| pn_ep_decref(&connection->endpoint); |
| } |
| |
| void pn_connection_free(pn_connection_t *connection) { |
| pn_connection_release(connection); |
| pn_decref(connection); |
| } |
| |
| void pn_connection_bound(pn_connection_t *connection) |
| { |
| pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND); |
| pn_ep_incref(&connection->endpoint); |
| |
| size_t nsessions = pn_list_size(connection->sessions); |
| for (size_t i = 0; i < nsessions; i++) { |
| pni_session_bound((pn_session_t *) pn_list_get(connection->sessions, i)); |
| } |
| } |
| |
| // invoked when transport has been removed: |
| void pn_connection_unbound(pn_connection_t *connection) |
| { |
| connection->transport = NULL; |
| if (connection->endpoint.freed) { |
| // connection has been freed prior to unbinding, thus it |
| // cannot be re-assigned to a new transport. Clear the |
| // transport work lists to allow the connection to be freed. |
| while (connection->transport_head) { |
| pn_clear_modified(connection, connection->transport_head); |
| } |
| while (connection->tpwork_head) { |
| pn_clear_tpwork(connection->tpwork_head); |
| } |
| } |
| pn_ep_decref(&connection->endpoint); |
| } |
| |
| pn_record_t *pn_connection_attachments(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->context; |
| } |
| |
| void *pn_connection_get_context(pn_connection_t *conn) |
| { |
| // XXX: we should really assert on conn here, but this causes |
| // messenger tests to fail |
| return conn ? pn_record_get(conn->context, PN_LEGCTX) : NULL; |
| } |
| |
| void pn_connection_set_context(pn_connection_t *conn, void *context) |
| { |
| assert(conn); |
| pn_record_set(conn->context, PN_LEGCTX, context); |
| } |
| |
| pn_transport_t *pn_connection_transport(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->transport; |
| } |
| |
| void pn_condition_init(pn_condition_t *condition) |
| { |
| condition->name = pn_string(NULL); |
| condition->description = pn_string(NULL); |
| condition->info = pn_data(0); |
| } |
| |
| void pn_condition_tini(pn_condition_t *condition) |
| { |
| pn_data_free(condition->info); |
| pn_free(condition->description); |
| pn_free(condition->name); |
| } |
| |
| static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn) |
| { |
| pn_list_add(conn->sessions, ssn); |
| ssn->connection = conn; |
| pn_incref(conn); // keep around until finalized |
| pn_ep_incref(&conn->endpoint); |
| } |
| |
| static void pni_remove_session(pn_connection_t *conn, pn_session_t *ssn) |
| { |
| if (pn_list_remove(conn->sessions, ssn)) { |
| pn_ep_decref(&conn->endpoint); |
| LL_REMOVE(conn, endpoint, &ssn->endpoint); |
| } |
| } |
| |
| pn_connection_t *pn_session_connection(pn_session_t *session) |
| { |
| if (!session) return NULL; |
| return session->connection; |
| } |
| |
| void pn_session_open(pn_session_t *session) |
| { |
| assert(session); |
| pn_endpoint_open(&session->endpoint); |
| } |
| |
| void pn_session_close(pn_session_t *session) |
| { |
| assert(session); |
| pn_endpoint_close(&session->endpoint); |
| } |
| |
| void pn_session_free(pn_session_t *session) |
| { |
| assert(!session->endpoint.freed); |
| while(pn_list_size(session->links)) { |
| pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0); |
| pn_link_free(link); |
| } |
| pni_remove_session(session->connection, session); |
| pn_list_add(session->connection->freed, session); |
| session->endpoint.freed = true; |
| pn_ep_decref(&session->endpoint); |
| |
| // the finalize logic depends on endpoint.freed, so we incref/decref |
| // to give it a chance to rerun |
| pn_incref(session); |
| pn_decref(session); |
| } |
| |
| pn_record_t *pn_session_attachments(pn_session_t *session) |
| { |
| assert(session); |
| return session->context; |
| } |
| |
| void *pn_session_get_context(pn_session_t *session) |
| { |
| return session ? pn_record_get(session->context, PN_LEGCTX) : 0; |
| } |
| |
| void pn_session_set_context(pn_session_t *session, void *context) |
| { |
| assert(context); |
| pn_record_set(session->context, PN_LEGCTX, context); |
| } |
| |
| |
| static void pni_add_link(pn_session_t *ssn, pn_link_t *link) |
| { |
| pn_list_add(ssn->links, link); |
| link->session = ssn; |
| pn_ep_incref(&ssn->endpoint); |
| } |
| |
| static void pni_remove_link(pn_session_t *ssn, pn_link_t *link) |
| { |
| if (pn_list_remove(ssn->links, link)) { |
| pn_ep_decref(&ssn->endpoint); |
| LL_REMOVE(ssn->connection, endpoint, &link->endpoint); |
| } |
| } |
| |
| void pn_link_open(pn_link_t *link) |
| { |
| assert(link); |
| pn_endpoint_open(&link->endpoint); |
| } |
| |
| void pn_link_close(pn_link_t *link) |
| { |
| assert(link); |
| pn_endpoint_close(&link->endpoint); |
| } |
| |
| void pn_link_detach(pn_link_t *link) |
| { |
| assert(link); |
| link->detached = true; |
| pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_LOCAL_DETACH); |
| pn_modified(link->session->connection, &link->endpoint, true); |
| |
| } |
| |
| static void pni_terminus_free(pn_terminus_t *terminus) |
| { |
| pn_free(terminus->address); |
| pn_free(terminus->properties); |
| pn_free(terminus->capabilities); |
| pn_free(terminus->outcomes); |
| pn_free(terminus->filter); |
| } |
| |
| void pn_link_free(pn_link_t *link) |
| { |
| assert(!link->endpoint.freed); |
| pni_remove_link(link->session, link); |
| pn_list_add(link->session->freed, link); |
| pn_delivery_t *delivery = link->unsettled_head; |
| while (delivery) { |
| pn_delivery_t *next = delivery->unsettled_next; |
| pn_delivery_settle(delivery); |
| delivery = next; |
| } |
| link->endpoint.freed = true; |
| pn_ep_decref(&link->endpoint); |
| |
| // the finalize logic depends on endpoint.freed (modified above), so |
| // we incref/decref to give it a chance to rerun |
| pn_incref(link); |
| pn_decref(link); |
| } |
| |
| void *pn_link_get_context(pn_link_t *link) |
| { |
| assert(link); |
| return pn_record_get(link->context, PN_LEGCTX); |
| } |
| |
| void pn_link_set_context(pn_link_t *link, void *context) |
| { |
| assert(link); |
| pn_record_set(link->context, PN_LEGCTX, context); |
| } |
| |
| pn_record_t *pn_link_attachments(pn_link_t *link) |
| { |
| assert(link); |
| return link->context; |
| } |
| |
| void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn) |
| { |
| endpoint->type = (pn_endpoint_type_t) type; |
| endpoint->referenced = true; |
| endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; |
| endpoint->error = pn_error(); |
| pn_condition_init(&endpoint->condition); |
| pn_condition_init(&endpoint->remote_condition); |
| endpoint->endpoint_next = NULL; |
| endpoint->endpoint_prev = NULL; |
| endpoint->transport_next = NULL; |
| endpoint->transport_prev = NULL; |
| endpoint->modified = false; |
| endpoint->freed = false; |
| endpoint->refcount = 1; |
| //fprintf(stderr, "initting 0x%lx\n", (uintptr_t) endpoint); |
| |
| LL_ADD(conn, endpoint, endpoint); |
| } |
| |
| void pn_ep_incref(pn_endpoint_t *endpoint) |
| { |
| endpoint->refcount++; |
| } |
| |
| static pn_event_type_t pn_final_type(pn_endpoint_type_t type) { |
| switch (type) { |
| case CONNECTION: |
| return PN_CONNECTION_FINAL; |
| case SESSION: |
| return PN_SESSION_FINAL; |
| case SENDER: |
| case RECEIVER: |
| return PN_LINK_FINAL; |
| default: |
| assert(false); |
| return PN_EVENT_NONE; |
| } |
| } |
| |
| static pn_endpoint_t *pn_ep_parent(pn_endpoint_t *endpoint) { |
| switch (endpoint->type) { |
| case CONNECTION: |
| return NULL; |
| case SESSION: |
| return &((pn_session_t *) endpoint)->connection->endpoint; |
| case SENDER: |
| case RECEIVER: |
| return &((pn_link_t *) endpoint)->session->endpoint; |
| default: |
| assert(false); |
| return NULL; |
| } |
| } |
| |
| void pn_ep_decref(pn_endpoint_t *endpoint) |
| { |
| assert(endpoint->refcount > 0); |
| endpoint->refcount--; |
| if (endpoint->refcount == 0) { |
| pn_connection_t *conn = pni_ep_get_connection(endpoint); |
| pn_collector_put(conn->collector, PN_OBJECT, endpoint, pn_final_type(endpoint->type)); |
| } |
| } |
| |
| static void pni_endpoint_tini(pn_endpoint_t *endpoint) |
| { |
| pn_error_free(endpoint->error); |
| pn_condition_tini(&endpoint->remote_condition); |
| pn_condition_tini(&endpoint->condition); |
| } |
| |
| static void pni_free_children(pn_list_t *children, pn_list_t *freed) |
| { |
| while (pn_list_size(children) > 0) { |
| pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(children, 0); |
| assert(!endpoint->referenced); |
| pn_free(endpoint); |
| } |
| |
| while (pn_list_size(freed) > 0) { |
| pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(freed, 0); |
| assert(!endpoint->referenced); |
| pn_free(endpoint); |
| } |
| |
| pn_free(children); |
| pn_free(freed); |
| } |
| |
| static void pn_connection_finalize(void *object) |
| { |
| pn_connection_t *conn = (pn_connection_t *) object; |
| pn_endpoint_t *endpoint = &conn->endpoint; |
| |
| if (conn->transport) { |
| assert(!conn->transport->referenced); |
| pn_free(conn->transport); |
| } |
| |
| // freeing the transport could post events |
| if (pn_refcount(conn) > 0) { |
| return; |
| } |
| |
| pni_free_children(conn->sessions, conn->freed); |
| pn_free(conn->context); |
| pn_decref(conn->collector); |
| |
| pn_free(conn->container); |
| pn_free(conn->hostname); |
| pn_free(conn->auth_user); |
| pn_free(conn->auth_password); |
| pn_free(conn->offered_capabilities); |
| pn_free(conn->desired_capabilities); |
| pn_free(conn->properties); |
| pni_endpoint_tini(endpoint); |
| pn_free(conn->delivery_pool); |
| } |
| |
| #define pn_connection_initialize NULL |
| #define pn_connection_hashcode NULL |
| #define pn_connection_compare NULL |
| #define pn_connection_inspect NULL |
| |
| pn_connection_t *pn_connection(void) |
| { |
| static const pn_class_t clazz = PN_CLASS(pn_connection); |
| pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t)); |
| if (!conn) return NULL; |
| |
| conn->endpoint_head = NULL; |
| conn->endpoint_tail = NULL; |
| pn_endpoint_init(&conn->endpoint, CONNECTION, conn); |
| conn->transport_head = NULL; |
| conn->transport_tail = NULL; |
| conn->sessions = pn_list(PN_WEAKREF, 0); |
| conn->freed = pn_list(PN_WEAKREF, 0); |
| conn->transport = NULL; |
| conn->work_head = NULL; |
| conn->work_tail = NULL; |
| conn->tpwork_head = NULL; |
| conn->tpwork_tail = NULL; |
| conn->container = pn_string(NULL); |
| conn->hostname = pn_string(NULL); |
| conn->auth_user = pn_string(NULL); |
| conn->auth_password = pn_string(NULL); |
| conn->offered_capabilities = pn_data(0); |
| conn->desired_capabilities = pn_data(0); |
| conn->properties = pn_data(0); |
| conn->collector = NULL; |
| conn->context = pn_record(); |
| conn->delivery_pool = pn_list(PN_OBJECT, 0); |
| |
| return conn; |
| } |
| |
| static const pn_event_type_t endpoint_init_event_map[] = { |
| PN_CONNECTION_INIT, /* CONNECTION */ |
| PN_SESSION_INIT, /* SESSION */ |
| PN_LINK_INIT, /* SENDER */ |
| PN_LINK_INIT}; /* RECEIVER */ |
| |
| void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector) |
| { |
| pn_decref(connection->collector); |
| connection->collector = collector; |
| pn_incref(connection->collector); |
| pn_endpoint_t *endpoint = connection->endpoint_head; |
| while (endpoint) { |
| pn_collector_put(connection->collector, PN_OBJECT, endpoint, endpoint_init_event_map[endpoint->type]); |
| endpoint = endpoint->endpoint_next; |
| } |
| } |
| |
| pn_state_t pn_connection_state(pn_connection_t *connection) |
| { |
| return connection ? connection->endpoint.state : 0; |
| } |
| |
| pn_error_t *pn_connection_error(pn_connection_t *connection) |
| { |
| return connection ? connection->endpoint.error : NULL; |
| } |
| |
| const char *pn_connection_get_container(pn_connection_t *connection) |
| { |
| assert(connection); |
| return pn_string_get(connection->container); |
| } |
| |
| void pn_connection_set_container(pn_connection_t *connection, const char *container) |
| { |
| assert(connection); |
| pn_string_set(connection->container, container); |
| } |
| |
| const char *pn_connection_get_hostname(pn_connection_t *connection) |
| { |
| assert(connection); |
| return pn_string_get(connection->hostname); |
| } |
| |
| void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname) |
| { |
| assert(connection); |
| pn_string_set(connection->hostname, hostname); |
| } |
| |
| const char *pn_connection_get_user(pn_connection_t *connection) |
| { |
| assert(connection); |
| return pn_string_get(connection->auth_user); |
| } |
| |
| void pn_connection_set_user(pn_connection_t *connection, const char *user) |
| { |
| assert(connection); |
| pn_string_set(connection->auth_user, user); |
| } |
| |
| void pn_connection_set_password(pn_connection_t *connection, const char *password) |
| { |
| assert(connection); |
| pn_string_set(connection->auth_password, password); |
| } |
| |
| pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->offered_capabilities; |
| } |
| |
| pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->desired_capabilities; |
| } |
| |
| pn_data_t *pn_connection_properties(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->properties; |
| } |
| |
| pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->transport ? connection->transport->remote_offered_capabilities : NULL; |
| } |
| |
| pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->transport ? connection->transport->remote_desired_capabilities : NULL; |
| } |
| |
| pn_data_t *pn_connection_remote_properties(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->transport ? connection->transport->remote_properties : NULL; |
| } |
| |
| const char *pn_connection_remote_container(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->transport ? connection->transport->remote_container : NULL; |
| } |
| |
| const char *pn_connection_remote_hostname(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->transport ? connection->transport->remote_hostname : NULL; |
| } |
| |
| pn_delivery_t *pn_work_head(pn_connection_t *connection) |
| { |
| assert(connection); |
| return connection->work_head; |
| } |
| |
| pn_delivery_t *pn_work_next(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| |
| if (delivery->work) |
| return delivery->work_next; |
| else |
| return pn_work_head(delivery->link->session->connection); |
| } |
| |
| static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery) |
| { |
| if (!delivery->work) |
| { |
| assert(!delivery->local.settled); // never allow settled deliveries |
| LL_ADD(connection, work, delivery); |
| delivery->work = true; |
| } |
| } |
| |
| static void pni_clear_work(pn_connection_t *connection, pn_delivery_t *delivery) |
| { |
| if (delivery->work) |
| { |
| LL_REMOVE(connection, work, delivery); |
| delivery->work = false; |
| } |
| } |
| |
| void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery) |
| { |
| pn_link_t *link = pn_delivery_link(delivery); |
| pn_delivery_t *current = pn_link_current(link); |
| if (delivery->updated && !delivery->local.settled) { |
| pni_add_work(connection, delivery); |
| } else if (delivery == current) { |
| if (link->endpoint.type == SENDER) { |
| if (pn_link_credit(link) > 0) { |
| pni_add_work(connection, delivery); |
| } else { |
| pni_clear_work(connection, delivery); |
| } |
| } else { |
| pni_add_work(connection, delivery); |
| } |
| } else { |
| pni_clear_work(connection, delivery); |
| } |
| } |
| |
| static void pni_add_tpwork(pn_delivery_t *delivery) |
| { |
| pn_connection_t *connection = delivery->link->session->connection; |
| if (!delivery->tpwork) |
| { |
| LL_ADD(connection, tpwork, delivery); |
| delivery->tpwork = true; |
| } |
| pn_modified(connection, &connection->endpoint, true); |
| } |
| |
| void pn_clear_tpwork(pn_delivery_t *delivery) |
| { |
| pn_connection_t *connection = delivery->link->session->connection; |
| if (delivery->tpwork) |
| { |
| LL_REMOVE(connection, tpwork, delivery); |
| delivery->tpwork = false; |
| if (pn_refcount(delivery) > 0) { |
| pn_incref(delivery); |
| pn_decref(delivery); |
| } |
| } |
| } |
| |
| void pn_dump(pn_connection_t *conn) |
| { |
| pn_endpoint_t *endpoint = conn->transport_head; |
| while (endpoint) |
| { |
| printf("%p", (void *) endpoint); |
| endpoint = endpoint->transport_next; |
| if (endpoint) |
| printf(" -> "); |
| } |
| printf("\n"); |
| } |
| |
| void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit) |
| { |
| if (!endpoint->modified) { |
| LL_ADD(connection, transport, endpoint); |
| endpoint->modified = true; |
| } |
| |
| if (emit && connection->transport) { |
| pn_collector_put(connection->collector, PN_OBJECT, connection->transport, |
| PN_TRANSPORT); |
| } |
| } |
| |
| void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint) |
| { |
| if (endpoint->modified) { |
| LL_REMOVE(connection, transport, endpoint); |
| endpoint->transport_next = NULL; |
| endpoint->transport_prev = NULL; |
| endpoint->modified = false; |
| } |
| } |
| |
| static bool pni_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) |
| { |
| if (endpoint->type != type) return false; |
| |
| if (!state) return true; |
| |
| int st = endpoint->state; |
| if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0) |
| return st & state; |
| else |
| return st == state; |
| } |
| |
| pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) |
| { |
| while (endpoint) |
| { |
| if (pni_matches(endpoint, type, state)) |
| return endpoint; |
| endpoint = endpoint->endpoint_next; |
| } |
| return NULL; |
| } |
| |
| pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state) |
| { |
| if (conn) |
| return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state); |
| else |
| return NULL; |
| } |
| |
| pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state) |
| { |
| if (ssn) |
| return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state); |
| else |
| return NULL; |
| } |
| |
| pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state) |
| { |
| if (!conn) return NULL; |
| |
| pn_endpoint_t *endpoint = conn->endpoint_head; |
| |
| while (endpoint) |
| { |
| if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) |
| return (pn_link_t *) endpoint; |
| endpoint = endpoint->endpoint_next; |
| } |
| |
| return NULL; |
| } |
| |
| pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state) |
| { |
| if (!link) return NULL; |
| |
| pn_endpoint_t *endpoint = link->endpoint.endpoint_next; |
| |
| while (endpoint) |
| { |
| if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) |
| return (pn_link_t *) endpoint; |
| endpoint = endpoint->endpoint_next; |
| } |
| |
| return NULL; |
| } |
| |
| static void pn_session_incref(void *object) |
| { |
| pn_session_t *session = (pn_session_t *) object; |
| if (!session->endpoint.referenced) { |
| session->endpoint.referenced = true; |
| pn_incref(session->connection); |
| } else { |
| pn_object_incref(object); |
| } |
| } |
| |
| static bool pn_ep_bound(pn_endpoint_t *endpoint) |
| { |
| pn_connection_t *conn = pni_ep_get_connection(endpoint); |
| pn_session_t *ssn; |
| pn_link_t *lnk; |
| |
| if (!conn->transport) return false; |
| if (endpoint->modified) return true; |
| |
| switch (endpoint->type) { |
| case CONNECTION: |
| return ((pn_connection_t *)endpoint)->transport; |
| case SESSION: |
| ssn = (pn_session_t *) endpoint; |
| return (((int16_t) ssn->state.local_channel) >= 0 || ((int16_t) ssn->state.remote_channel) >= 0); |
| case SENDER: |
| case RECEIVER: |
| lnk = (pn_link_t *) endpoint; |
| return ((int32_t) lnk->state.local_handle) >= 0 || ((int32_t) lnk->state.remote_handle) >= 0; |
| default: |
| assert(false); |
| return false; |
| } |
| } |
| |
| static bool pni_connection_live(pn_connection_t *conn) { |
| return pn_refcount(conn) > 1; |
| } |
| |
| static bool pni_session_live(pn_session_t *ssn) { |
| return pni_connection_live(ssn->connection) || pn_refcount(ssn) > 1; |
| } |
| |
| static bool pni_link_live(pn_link_t *link) { |
| return pni_session_live(link->session) || pn_refcount(link) > 1; |
| } |
| |
| static bool pni_endpoint_live(pn_endpoint_t *endpoint) { |
| switch (endpoint->type) { |
| case CONNECTION: |
| return pni_connection_live((pn_connection_t *)endpoint); |
| case SESSION: |
| return pni_session_live((pn_session_t *) endpoint); |
| case SENDER: |
| case RECEIVER: |
| return pni_link_live((pn_link_t *) endpoint); |
| default: |
| assert(false); |
| return false; |
| } |
| } |
| |
| static bool pni_preserve_child(pn_endpoint_t *endpoint) |
| { |
| pn_connection_t *conn = pni_ep_get_connection(endpoint); |
| pn_endpoint_t *parent = pn_ep_parent(endpoint); |
| if (pni_endpoint_live(parent) && (!endpoint->freed || (pn_ep_bound(endpoint))) |
| && endpoint->referenced) { |
| pn_object_incref(endpoint); |
| endpoint->referenced = false; |
| pn_decref(parent); |
| return true; |
| } else { |
| LL_REMOVE(conn, transport, endpoint); |
| return false; |
| } |
| } |
| |
| static void pn_session_finalize(void *object) |
| { |
| pn_session_t *session = (pn_session_t *) object; |
| pn_endpoint_t *endpoint = &session->endpoint; |
| |
| if (pni_preserve_child(endpoint)) { |
| return; |
| } |
| |
| pn_free(session->context); |
| pni_free_children(session->links, session->freed); |
| pni_endpoint_tini(endpoint); |
| pn_delivery_map_free(&session->state.incoming); |
| pn_delivery_map_free(&session->state.outgoing); |
| pn_free(session->state.local_handles); |
| pn_free(session->state.remote_handles); |
| pni_remove_session(session->connection, session); |
| pn_list_remove(session->connection->freed, session); |
| |
| if (session->connection->transport) { |
| pn_transport_t *transport = session->connection->transport; |
| pn_hash_del(transport->local_channels, session->state.local_channel); |
| pn_hash_del(transport->remote_channels, session->state.remote_channel); |
| } |
| |
| if (endpoint->referenced) { |
| pn_decref(session->connection); |
| } |
| } |
| |
| #define pn_session_new pn_object_new |
| #define pn_session_refcount pn_object_refcount |
| #define pn_session_decref pn_object_decref |
| #define pn_session_reify pn_object_reify |
| #define pn_session_initialize NULL |
| #define pn_session_hashcode NULL |
| #define pn_session_compare NULL |
| #define pn_session_inspect NULL |
| |
| pn_session_t *pn_session(pn_connection_t *conn) |
| { |
| assert(conn); |
| |
| |
| pn_transport_t * transport = pn_connection_transport(conn); |
| |
| if(transport) { |
| // channel_max is an index, not a count. |
| if(pn_hash_size(transport->local_channels) > (size_t)transport->channel_max) { |
| pn_transport_logf(transport, |
| "pn_session: too many sessions: %d channel_max is %d", |
| pn_hash_size(transport->local_channels), |
| transport->channel_max); |
| return (pn_session_t *) 0; |
| } |
| } |
| |
| #define pn_session_free pn_object_free |
| static const pn_class_t clazz = PN_METACLASS(pn_session); |
| #undef pn_session_free |
| pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t)); |
| if (!ssn) return NULL; |
| pn_endpoint_init(&ssn->endpoint, SESSION, conn); |
| pni_add_session(conn, ssn); |
| ssn->links = pn_list(PN_WEAKREF, 0); |
| ssn->freed = pn_list(PN_WEAKREF, 0); |
| ssn->context = pn_record(); |
| ssn->incoming_capacity = 1024*1024; |
| ssn->incoming_bytes = 0; |
| ssn->outgoing_bytes = 0; |
| ssn->incoming_deliveries = 0; |
| ssn->outgoing_deliveries = 0; |
| ssn->outgoing_window = 2147483647; |
| |
| // begin transport state |
| memset(&ssn->state, 0, sizeof(ssn->state)); |
| ssn->state.local_channel = (uint16_t)-1; |
| ssn->state.remote_channel = (uint16_t)-1; |
| pn_delivery_map_init(&ssn->state.incoming, 0); |
| pn_delivery_map_init(&ssn->state.outgoing, 0); |
| ssn->state.local_handles = pn_hash(PN_WEAKREF, 0, 0.75); |
| ssn->state.remote_handles = pn_hash(PN_WEAKREF, 0, 0.75); |
| // end transport state |
| |
| pn_collector_put(conn->collector, PN_OBJECT, ssn, PN_SESSION_INIT); |
| if (conn->transport) { |
| pni_session_bound(ssn); |
| } |
| pn_decref(ssn); |
| return ssn; |
| } |
| |
| static void pni_session_bound(pn_session_t *ssn) |
| { |
| assert(ssn); |
| size_t nlinks = pn_list_size(ssn->links); |
| for (size_t i = 0; i < nlinks; i++) { |
| pni_link_bound((pn_link_t *) pn_list_get(ssn->links, i)); |
| } |
| } |
| |
| void pn_session_unbound(pn_session_t* ssn) |
| { |
| assert(ssn); |
| ssn->state.local_channel = (uint16_t)-1; |
| ssn->state.remote_channel = (uint16_t)-1; |
| ssn->incoming_bytes = 0; |
| ssn->outgoing_bytes = 0; |
| ssn->incoming_deliveries = 0; |
| ssn->outgoing_deliveries = 0; |
| } |
| |
| size_t pn_session_get_incoming_capacity(pn_session_t *ssn) |
| { |
| assert(ssn); |
| return ssn->incoming_capacity; |
| } |
| |
| void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity) |
| { |
| assert(ssn); |
| // XXX: should this trigger a flow? |
| ssn->incoming_capacity = capacity; |
| } |
| |
| size_t pn_session_get_outgoing_window(pn_session_t *ssn) |
| { |
| assert(ssn); |
| return ssn->outgoing_window; |
| } |
| |
| void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window) |
| { |
| assert(ssn); |
| ssn->outgoing_window = window; |
| } |
| |
| size_t pn_session_outgoing_bytes(pn_session_t *ssn) |
| { |
| assert(ssn); |
| return ssn->outgoing_bytes; |
| } |
| |
| size_t pn_session_incoming_bytes(pn_session_t *ssn) |
| { |
| assert(ssn); |
| return ssn->incoming_bytes; |
| } |
| |
| pn_state_t pn_session_state(pn_session_t *session) |
| { |
| return session->endpoint.state; |
| } |
| |
| pn_error_t *pn_session_error(pn_session_t *session) |
| { |
| return session->endpoint.error; |
| } |
| |
| static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type) |
| { |
| terminus->type = type; |
| terminus->address = pn_string(NULL); |
| terminus->durability = PN_NONDURABLE; |
| terminus->expiry_policy = PN_EXPIRE_WITH_SESSION; |
| terminus->timeout = 0; |
| terminus->dynamic = false; |
| terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED; |
| terminus->properties = pn_data(0); |
| terminus->capabilities = pn_data(0); |
| terminus->outcomes = pn_data(0); |
| terminus->filter = pn_data(0); |
| } |
| |
| static void pn_link_incref(void *object) |
| { |
| pn_link_t *link = (pn_link_t *) object; |
| if (!link->endpoint.referenced) { |
| link->endpoint.referenced = true; |
| pn_incref(link->session); |
| } else { |
| pn_object_incref(object); |
| } |
| } |
| |
| static void pn_link_finalize(void *object) |
| { |
| pn_link_t *link = (pn_link_t *) object; |
| pn_endpoint_t *endpoint = &link->endpoint; |
| |
| if (pni_preserve_child(endpoint)) { |
| return; |
| } |
| |
| while (link->unsettled_head) { |
| assert(!link->unsettled_head->referenced); |
| pn_free(link->unsettled_head); |
| } |
| |
| pn_free(link->context); |
| pni_terminus_free(&link->source); |
| pni_terminus_free(&link->target); |
| pni_terminus_free(&link->remote_source); |
| pni_terminus_free(&link->remote_target); |
| pn_free(link->name); |
| pni_endpoint_tini(endpoint); |
| pni_remove_link(link->session, link); |
| pn_hash_del(link->session->state.local_handles, link->state.local_handle); |
| pn_hash_del(link->session->state.remote_handles, link->state.remote_handle); |
| pn_list_remove(link->session->freed, link); |
| if (endpoint->referenced) { |
| pn_decref(link->session); |
| } |
| } |
| |
| #define pn_link_refcount pn_object_refcount |
| #define pn_link_decref pn_object_decref |
| #define pn_link_reify pn_object_reify |
| #define pn_link_initialize NULL |
| #define pn_link_hashcode NULL |
| #define pn_link_compare NULL |
| #define pn_link_inspect NULL |
| |
| pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) |
| { |
| #define pn_link_new pn_object_new |
| #define pn_link_free pn_object_free |
| static const pn_class_t clazz = PN_METACLASS(pn_link); |
| #undef pn_link_new |
| #undef pn_link_free |
| pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t)); |
| |
| pn_endpoint_init(&link->endpoint, type, session->connection); |
| pni_add_link(session, link); |
| pn_incref(session); // keep session until link finalized |
| link->name = pn_string(name); |
| pni_terminus_init(&link->source, PN_SOURCE); |
| pni_terminus_init(&link->target, PN_TARGET); |
| pni_terminus_init(&link->remote_source, PN_UNSPECIFIED); |
| pni_terminus_init(&link->remote_target, PN_UNSPECIFIED); |
| link->unsettled_head = link->unsettled_tail = link->current = NULL; |
| link->unsettled_count = 0; |
| link->available = 0; |
| link->credit = 0; |
| link->queued = 0; |
| link->drain = false; |
| link->drain_flag_mode = true; |
| link->drained = 0; |
| link->context = pn_record(); |
| link->snd_settle_mode = PN_SND_MIXED; |
| link->rcv_settle_mode = PN_RCV_FIRST; |
| link->remote_snd_settle_mode = PN_SND_MIXED; |
| link->remote_rcv_settle_mode = PN_RCV_FIRST; |
| link->detached = false; |
| |
| // begin transport state |
| link->state.local_handle = -1; |
| link->state.remote_handle = -1; |
| link->state.delivery_count = 0; |
| link->state.link_credit = 0; |
| // end transport state |
| |
| pn_collector_put(session->connection->collector, PN_OBJECT, link, PN_LINK_INIT); |
| if (session->connection->transport) { |
| pni_link_bound(link); |
| } |
| pn_decref(link); |
| return link; |
| } |
| |
| static void pni_link_bound(pn_link_t *link) |
| { |
| } |
| |
| void pn_link_unbound(pn_link_t* link) |
| { |
| assert(link); |
| link->state.local_handle = -1; |
| link->state.remote_handle = -1; |
| link->state.delivery_count = 0; |
| link->state.link_credit = 0; |
| } |
| |
| pn_terminus_t *pn_link_source(pn_link_t *link) |
| { |
| return link ? &link->source : NULL; |
| } |
| |
| pn_terminus_t *pn_link_target(pn_link_t *link) |
| { |
| return link ? &link->target : NULL; |
| } |
| |
| pn_terminus_t *pn_link_remote_source(pn_link_t *link) |
| { |
| return link ? &link->remote_source : NULL; |
| } |
| |
| pn_terminus_t *pn_link_remote_target(pn_link_t *link) |
| { |
| return link ? &link->remote_target : NULL; |
| } |
| |
| int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type) |
| { |
| if (!terminus) return PN_ARG_ERR; |
| terminus->type = type; |
| return 0; |
| } |
| |
| pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->type : (pn_terminus_type_t) 0; |
| } |
| |
| const char *pn_terminus_get_address(pn_terminus_t *terminus) |
| { |
| assert(terminus); |
| return pn_string_get(terminus->address); |
| } |
| |
| int pn_terminus_set_address(pn_terminus_t *terminus, const char *address) |
| { |
| assert(terminus); |
| return pn_string_set(terminus->address, address); |
| } |
| |
| pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->durability : (pn_durability_t) 0; |
| } |
| |
| int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability) |
| { |
| if (!terminus) return PN_ARG_ERR; |
| terminus->durability = durability; |
| return 0; |
| } |
| |
| pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->expiry_policy : (pn_expiry_policy_t) 0; |
| } |
| |
| int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy) |
| { |
| if (!terminus) return PN_ARG_ERR; |
| terminus->expiry_policy = expiry_policy; |
| return 0; |
| } |
| |
| pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->timeout : 0; |
| } |
| |
| int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout) |
| { |
| if (!terminus) return PN_ARG_ERR; |
| terminus->timeout = timeout; |
| return 0; |
| } |
| |
| bool pn_terminus_is_dynamic(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->dynamic : false; |
| } |
| |
| int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic) |
| { |
| if (!terminus) return PN_ARG_ERR; |
| terminus->dynamic = dynamic; |
| return 0; |
| } |
| |
| pn_data_t *pn_terminus_properties(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->properties : NULL; |
| } |
| |
| pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->capabilities : NULL; |
| } |
| |
| pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->outcomes : NULL; |
| } |
| |
| pn_data_t *pn_terminus_filter(pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->filter : NULL; |
| } |
| |
| pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus) |
| { |
| return terminus ? terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED; |
| } |
| |
| int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m) |
| { |
| if (!terminus) return PN_ARG_ERR; |
| terminus->distribution_mode = m; |
| return 0; |
| } |
| |
| int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src) |
| { |
| if (!terminus || !src) { |
| return PN_ARG_ERR; |
| } |
| |
| terminus->type = src->type; |
| int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src)); |
| if (err) return err; |
| terminus->durability = src->durability; |
| terminus->expiry_policy = src->expiry_policy; |
| terminus->timeout = src->timeout; |
| terminus->dynamic = src->dynamic; |
| terminus->distribution_mode = src->distribution_mode; |
| err = pn_data_copy(terminus->properties, src->properties); |
| if (err) return err; |
| err = pn_data_copy(terminus->capabilities, src->capabilities); |
| if (err) return err; |
| err = pn_data_copy(terminus->outcomes, src->outcomes); |
| if (err) return err; |
| err = pn_data_copy(terminus->filter, src->filter); |
| if (err) return err; |
| return 0; |
| } |
| |
| pn_link_t *pn_sender(pn_session_t *session, const char *name) |
| { |
| return pn_link_new(SENDER, session, name); |
| } |
| |
| pn_link_t *pn_receiver(pn_session_t *session, const char *name) |
| { |
| return pn_link_new(RECEIVER, session, name); |
| } |
| |
| pn_state_t pn_link_state(pn_link_t *link) |
| { |
| return link->endpoint.state; |
| } |
| |
| pn_error_t *pn_link_error(pn_link_t *link) |
| { |
| return link->endpoint.error; |
| } |
| |
| const char *pn_link_name(pn_link_t *link) |
| { |
| assert(link); |
| return pn_string_get(link->name); |
| } |
| |
| bool pn_link_is_sender(pn_link_t *link) |
| { |
| return link->endpoint.type == SENDER; |
| } |
| |
| bool pn_link_is_receiver(pn_link_t *link) |
| { |
| return link->endpoint.type == RECEIVER; |
| } |
| |
| pn_session_t *pn_link_session(pn_link_t *link) |
| { |
| assert(link); |
| return link->session; |
| } |
| |
| static void pn_disposition_finalize(pn_disposition_t *ds) |
| { |
| pn_free(ds->data); |
| pn_free(ds->annotations); |
| pn_condition_tini(&ds->condition); |
| } |
| |
| static void pn_delivery_incref(void *object) |
| { |
| pn_delivery_t *delivery = (pn_delivery_t *) object; |
| if (delivery->link && !delivery->referenced) { |
| delivery->referenced = true; |
| pn_incref(delivery->link); |
| } else { |
| pn_object_incref(object); |
| } |
| } |
| |
| static bool pni_preserve_delivery(pn_delivery_t *delivery) |
| { |
| pn_connection_t *conn = delivery->link->session->connection; |
| return !delivery->local.settled || (conn->transport && (delivery->state.init || delivery->tpwork)); |
| } |
| |
| static void pn_delivery_finalize(void *object) |
| { |
| pn_delivery_t *delivery = (pn_delivery_t *) object; |
| pn_link_t *link = delivery->link; |
| // assert(!delivery->state.init); |
| |
| bool pooled = false; |
| bool referenced = true; |
| if (link) { |
| if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) { |
| delivery->referenced = false; |
| pn_object_incref(delivery); |
| pn_decref(link); |
| return; |
| } |
| referenced = delivery->referenced; |
| |
| pn_clear_tpwork(delivery); |
| LL_REMOVE(link, unsettled, delivery); |
| pn_delivery_map_del(pn_link_is_sender(link) |
| ? &link->session->state.outgoing |
| : &link->session->state.incoming, |
| delivery); |
| pn_buffer_clear(delivery->tag); |
| pn_buffer_clear(delivery->bytes); |
| pn_record_clear(delivery->context); |
| delivery->settled = true; |
| pn_connection_t *conn = link->session->connection; |
| assert(pn_refcount(delivery) == 0); |
| if (pni_connection_live(conn)) { |
| pn_list_t *pool = link->session->connection->delivery_pool; |
| delivery->link = NULL; |
| pn_list_add(pool, delivery); |
| pooled = true; |
| assert(pn_refcount(delivery) == 1); |
| } |
| } |
| |
| if (!pooled) { |
| pn_free(delivery->context); |
| pn_buffer_free(delivery->tag); |
| pn_buffer_free(delivery->bytes); |
| pn_disposition_finalize(&delivery->local); |
| pn_disposition_finalize(&delivery->remote); |
| } |
| |
| if (referenced) { |
| pn_decref(link); |
| } |
| } |
| |
| static void pn_disposition_init(pn_disposition_t *ds) |
| { |
| ds->data = pn_data(0); |
| ds->annotations = pn_data(0); |
| pn_condition_init(&ds->condition); |
| } |
| |
| static void pn_disposition_clear(pn_disposition_t *ds) |
| { |
| ds->type = 0; |
| ds->section_number = 0; |
| ds->section_offset = 0; |
| ds->failed = false; |
| ds->undeliverable = false; |
| ds->settled = false; |
| pn_data_clear(ds->data); |
| pn_data_clear(ds->annotations); |
| pn_condition_clear(&ds->condition); |
| } |
| |
| #define pn_delivery_new pn_object_new |
| #define pn_delivery_refcount pn_object_refcount |
| #define pn_delivery_decref pn_object_decref |
| #define pn_delivery_free pn_object_free |
| #define pn_delivery_reify pn_object_reify |
| #define pn_delivery_initialize NULL |
| #define pn_delivery_hashcode NULL |
| #define pn_delivery_compare NULL |
| #define pn_delivery_inspect NULL |
| |
| pn_delivery_tag_t pn_dtag(const char *bytes, size_t size) { |
| pn_delivery_tag_t dtag = {size, bytes}; |
| return dtag; |
| } |
| |
| pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) |
| { |
| assert(link); |
| pn_list_t *pool = link->session->connection->delivery_pool; |
| pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool); |
| if (!delivery) { |
| static const pn_class_t clazz = PN_METACLASS(pn_delivery); |
| delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t)); |
| if (!delivery) return NULL; |
| delivery->tag = pn_buffer(16); |
| delivery->bytes = pn_buffer(64); |
| pn_disposition_init(&delivery->local); |
| pn_disposition_init(&delivery->remote); |
| delivery->context = pn_record(); |
| } else { |
| assert(!delivery->state.init); |
| } |
| delivery->link = link; |
| pn_incref(delivery->link); // keep link until finalized |
| pn_buffer_clear(delivery->tag); |
| pn_buffer_append(delivery->tag, tag.start, tag.size); |
| pn_disposition_clear(&delivery->local); |
| pn_disposition_clear(&delivery->remote); |
| delivery->updated = false; |
| delivery->settled = false; |
| LL_ADD(link, unsettled, delivery); |
| delivery->referenced = true; |
| delivery->work_next = NULL; |
| delivery->work_prev = NULL; |
| delivery->work = false; |
| delivery->tpwork_next = NULL; |
| delivery->tpwork_prev = NULL; |
| delivery->tpwork = false; |
| pn_buffer_clear(delivery->bytes); |
| delivery->done = false; |
| pn_record_clear(delivery->context); |
| |
| // begin delivery state |
| delivery->state.init = false; |
| delivery->state.sent = false; |
| // end delivery state |
| |
| if (!link->current) |
| link->current = delivery; |
| |
| link->unsettled_count++; |
| |
| pn_work_update(link->session->connection, delivery); |
| |
| // XXX: could just remove incref above |
| pn_decref(delivery); |
| |
| return delivery; |
| } |
| |
| bool pn_delivery_buffered(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| if (delivery->settled) return false; |
| if (pn_link_is_sender(delivery->link)) { |
| pn_delivery_state_t *state = &delivery->state; |
| if (state->sent) { |
| return false; |
| } else { |
| return delivery->done || (pn_buffer_size(delivery->bytes) > 0); |
| } |
| } else { |
| return false; |
| } |
| } |
| |
| int pn_link_unsettled(pn_link_t *link) |
| { |
| return link->unsettled_count; |
| } |
| |
| pn_delivery_t *pn_unsettled_head(pn_link_t *link) |
| { |
| pn_delivery_t *d = link->unsettled_head; |
| while (d && d->local.settled) { |
| d = d->unsettled_next; |
| } |
| return d; |
| } |
| |
| pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery) |
| { |
| pn_delivery_t *d = delivery->unsettled_next; |
| while (d && d->local.settled) { |
| d = d->unsettled_next; |
| } |
| return d; |
| } |
| |
| bool pn_delivery_current(pn_delivery_t *delivery) |
| { |
| pn_link_t *link = delivery->link; |
| return pn_link_current(link) == delivery; |
| } |
| |
| void pn_delivery_dump(pn_delivery_t *d) |
| { |
| char tag[1024]; |
| pn_bytes_t bytes = pn_buffer_bytes(d->tag); |
| pn_quote_data(tag, 1024, bytes.start, bytes.size); |
| printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%u, " |
| "remote.settled=%u, updated=%u, current=%u, writable=%u, readable=%u, " |
| "work=%u}", |
| tag, d->local.type, d->remote.type, d->local.settled, |
| d->remote.settled, d->updated, pn_delivery_current(d), |
| pn_delivery_writable(d), pn_delivery_readable(d), d->work); |
| } |
| |
| void *pn_delivery_get_context(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| return pn_record_get(delivery->context, PN_LEGCTX); |
| } |
| |
| void pn_delivery_set_context(pn_delivery_t *delivery, void *context) |
| { |
| assert(delivery); |
| pn_record_set(delivery->context, PN_LEGCTX, context); |
| } |
| |
| pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| return delivery->context; |
| } |
| |
| uint64_t pn_disposition_type(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return disposition->type; |
| } |
| |
| pn_data_t *pn_disposition_data(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return disposition->data; |
| } |
| |
| uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return disposition->section_number; |
| } |
| |
| void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number) |
| { |
| assert(disposition); |
| disposition->section_number = section_number; |
| } |
| |
| uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return disposition->section_offset; |
| } |
| |
| void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset) |
| { |
| assert(disposition); |
| disposition->section_offset = section_offset; |
| } |
| |
| bool pn_disposition_is_failed(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return disposition->failed; |
| } |
| |
| void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed) |
| { |
| assert(disposition); |
| disposition->failed = failed; |
| } |
| |
| bool pn_disposition_is_undeliverable(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return disposition->undeliverable; |
| } |
| |
| void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable) |
| { |
| assert(disposition); |
| disposition->undeliverable = undeliverable; |
| } |
| |
| pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return disposition->annotations; |
| } |
| |
| pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition) |
| { |
| assert(disposition); |
| return &disposition->condition; |
| } |
| |
| pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) |
| { |
| if (delivery) { |
| pn_bytes_t tag = pn_buffer_bytes(delivery->tag); |
| return pn_dtag(tag.start, tag.size); |
| } else { |
| return pn_dtag(0, 0); |
| } |
| } |
| |
| pn_delivery_t *pn_link_current(pn_link_t *link) |
| { |
| if (!link) return NULL; |
| return link->current; |
| } |
| |
| static void pni_advance_sender(pn_link_t *link) |
| { |
| link->current->done = true; |
| link->queued++; |
| link->credit--; |
| link->session->outgoing_deliveries++; |
| pni_add_tpwork(link->current); |
| link->current = link->current->unsettled_next; |
| } |
| |
| static void pni_advance_receiver(pn_link_t *link) |
| { |
| link->credit--; |
| link->queued--; |
| link->session->incoming_deliveries--; |
| |
| pn_delivery_t *current = link->current; |
| link->session->incoming_bytes -= pn_buffer_size(current->bytes); |
| pn_buffer_clear(current->bytes); |
| |
| if (!link->session->state.incoming_window) { |
| pni_add_tpwork(current); |
| } |
| |
| link->current = link->current->unsettled_next; |
| } |
| |
| bool pn_link_advance(pn_link_t *link) |
| { |
| if (link && link->current) { |
| pn_delivery_t *prev = link->current; |
| if (link->endpoint.type == SENDER) { |
| pni_advance_sender(link); |
| } else { |
| pni_advance_receiver(link); |
| } |
| pn_delivery_t *next = link->current; |
| pn_work_update(link->session->connection, prev); |
| if (next) pn_work_update(link->session->connection, next); |
| return prev != next; |
| } else { |
| return false; |
| } |
| } |
| |
| int pn_link_credit(pn_link_t *link) |
| { |
| return link ? link->credit : 0; |
| } |
| |
| int pn_link_available(pn_link_t *link) |
| { |
| return link ? link->available : 0; |
| } |
| |
| int pn_link_queued(pn_link_t *link) |
| { |
| return link ? link->queued : 0; |
| } |
| |
| int pn_link_remote_credit(pn_link_t *link) |
| { |
| assert(link); |
| return link->credit - link->queued; |
| } |
| |
| bool pn_link_get_drain(pn_link_t *link) |
| { |
| assert(link); |
| return link->drain; |
| } |
| |
| pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link) |
| { |
| return link ? (pn_snd_settle_mode_t)link->snd_settle_mode |
| : PN_SND_MIXED; |
| } |
| |
| pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link) |
| { |
| return link ? (pn_rcv_settle_mode_t)link->rcv_settle_mode |
| : PN_RCV_FIRST; |
| } |
| |
| pn_snd_settle_mode_t pn_link_remote_snd_settle_mode(pn_link_t *link) |
| { |
| return link ? (pn_snd_settle_mode_t)link->remote_snd_settle_mode |
| : PN_SND_MIXED; |
| } |
| |
| pn_rcv_settle_mode_t pn_link_remote_rcv_settle_mode(pn_link_t *link) |
| { |
| return link ? (pn_rcv_settle_mode_t)link->remote_rcv_settle_mode |
| : PN_RCV_FIRST; |
| } |
| void pn_link_set_snd_settle_mode(pn_link_t *link, pn_snd_settle_mode_t mode) |
| { |
| if (link) |
| link->snd_settle_mode = (uint8_t)mode; |
| } |
| void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode) |
| { |
| if (link) |
| link->rcv_settle_mode = (uint8_t)mode; |
| } |
| |
| void pn_delivery_settle(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| if (!delivery->local.settled) { |
| pn_link_t *link = delivery->link; |
| if (pn_delivery_current(delivery)) { |
| pn_link_advance(link); |
| } |
| |
| link->unsettled_count--; |
| delivery->local.settled = true; |
| pni_add_tpwork(delivery); |
| pn_work_update(delivery->link->session->connection, delivery); |
| pn_incref(delivery); |
| pn_decref(delivery); |
| } |
| } |
| |
| void pn_link_offered(pn_link_t *sender, int credit) |
| { |
| sender->available = credit; |
| } |
| |
| ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n) |
| { |
| pn_delivery_t *current = pn_link_current(sender); |
| if (!current) return PN_EOS; |
| if (!bytes || !n) return 0; |
| pn_buffer_append(current->bytes, bytes, n); |
| sender->session->outgoing_bytes += n; |
| pni_add_tpwork(current); |
| return n; |
| } |
| |
| int pn_link_drained(pn_link_t *link) |
| { |
| assert(link); |
| int drained = 0; |
| |
| if (pn_link_is_sender(link)) { |
| if (link->drain && link->credit > 0) { |
| link->drained = link->credit; |
| link->credit = 0; |
| pn_modified(link->session->connection, &link->endpoint, true); |
| drained = link->drained; |
| } |
| } else { |
| drained = link->drained; |
| link->drained = 0; |
| } |
| |
| return drained; |
| } |
| |
| ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n) |
| { |
| if (!receiver) return PN_ARG_ERR; |
| |
| pn_delivery_t *delivery = receiver->current; |
| if (delivery) { |
| size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes); |
| pn_buffer_trim(delivery->bytes, size, 0); |
| if (size) { |
| receiver->session->incoming_bytes -= size; |
| if (!receiver->session->state.incoming_window) { |
| pni_add_tpwork(delivery); |
| } |
| return size; |
| } else { |
| return delivery->done ? PN_EOS : 0; |
| } |
| } else { |
| return PN_STATE_ERR; |
| } |
| } |
| |
| void pn_link_flow(pn_link_t *receiver, int credit) |
| { |
| assert(receiver); |
| assert(pn_link_is_receiver(receiver)); |
| receiver->credit += credit; |
| pn_modified(receiver->session->connection, &receiver->endpoint, true); |
| if (!receiver->drain_flag_mode) { |
| pn_link_set_drain(receiver, false); |
| receiver->drain_flag_mode = false; |
| } |
| } |
| |
| void pn_link_drain(pn_link_t *receiver, int credit) |
| { |
| assert(receiver); |
| assert(pn_link_is_receiver(receiver)); |
| pn_link_set_drain(receiver, true); |
| pn_link_flow(receiver, credit); |
| receiver->drain_flag_mode = false; |
| } |
| |
| void pn_link_set_drain(pn_link_t *receiver, bool drain) |
| { |
| assert(receiver); |
| assert(pn_link_is_receiver(receiver)); |
| receiver->drain = drain; |
| pn_modified(receiver->session->connection, &receiver->endpoint, true); |
| receiver->drain_flag_mode = true; |
| } |
| |
| bool pn_link_draining(pn_link_t *receiver) |
| { |
| assert(receiver); |
| assert(pn_link_is_receiver(receiver)); |
| return receiver->drain && (pn_link_credit(receiver) > pn_link_queued(receiver)); |
| } |
| |
| pn_link_t *pn_delivery_link(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| return delivery->link; |
| } |
| |
| pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| return &delivery->local; |
| } |
| |
| uint64_t pn_delivery_local_state(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| return delivery->local.type; |
| } |
| |
| pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| return &delivery->remote; |
| } |
| |
| uint64_t pn_delivery_remote_state(pn_delivery_t *delivery) |
| { |
| assert(delivery); |
| return delivery->remote.type; |
| } |
| |
| bool pn_delivery_settled(pn_delivery_t *delivery) |
| { |
| return delivery ? delivery->remote.settled : false; |
| } |
| |
| bool pn_delivery_updated(pn_delivery_t *delivery) |
| { |
| return delivery ? delivery->updated : false; |
| } |
| |
| void pn_delivery_clear(pn_delivery_t *delivery) |
| { |
| delivery->updated = false; |
| pn_work_update(delivery->link->session->connection, delivery); |
| } |
| |
| void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) |
| { |
| if (!delivery) return; |
| delivery->local.type = state; |
| pni_add_tpwork(delivery); |
| } |
| |
| bool pn_delivery_writable(pn_delivery_t *delivery) |
| { |
| if (!delivery) return false; |
| |
| pn_link_t *link = delivery->link; |
| return pn_link_is_sender(link) && pn_delivery_current(delivery) && pn_link_credit(link) > 0; |
| } |
| |
| bool pn_delivery_readable(pn_delivery_t *delivery) |
| { |
| if (delivery) { |
| pn_link_t *link = delivery->link; |
| return pn_link_is_receiver(link) && pn_delivery_current(delivery); |
| } else { |
| return false; |
| } |
| } |
| |
| size_t pn_delivery_pending(pn_delivery_t *delivery) |
| { |
| return pn_buffer_size(delivery->bytes); |
| } |
| |
| bool pn_delivery_partial(pn_delivery_t *delivery) |
| { |
| return !delivery->done; |
| } |
| |
| pn_condition_t *pn_connection_condition(pn_connection_t *connection) |
| { |
| assert(connection); |
| return &connection->endpoint.condition; |
| } |
| |
| pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection) |
| { |
| assert(connection); |
| pn_transport_t *transport = connection->transport; |
| return transport ? &transport->remote_condition : NULL; |
| } |
| |
| pn_condition_t *pn_session_condition(pn_session_t *session) |
| { |
| assert(session); |
| return &session->endpoint.condition; |
| } |
| |
| pn_condition_t *pn_session_remote_condition(pn_session_t *session) |
| { |
| assert(session); |
| return &session->endpoint.remote_condition; |
| } |
| |
| pn_condition_t *pn_link_condition(pn_link_t *link) |
| { |
| assert(link); |
| return &link->endpoint.condition; |
| } |
| |
| pn_condition_t *pn_link_remote_condition(pn_link_t *link) |
| { |
| assert(link); |
| return &link->endpoint.remote_condition; |
| } |
| |
| bool pn_condition_is_set(pn_condition_t *condition) |
| { |
| return condition && pn_string_get(condition->name); |
| } |
| |
| void pn_condition_clear(pn_condition_t *condition) |
| { |
| assert(condition); |
| pn_string_clear(condition->name); |
| pn_string_clear(condition->description); |
| pn_data_clear(condition->info); |
| } |
| |
| const char *pn_condition_get_name(pn_condition_t *condition) |
| { |
| assert(condition); |
| return pn_string_get(condition->name); |
| } |
| |
| int pn_condition_set_name(pn_condition_t *condition, const char *name) |
| { |
| assert(condition); |
| return pn_string_set(condition->name, name); |
| } |
| |
| const char *pn_condition_get_description(pn_condition_t *condition) |
| { |
| assert(condition); |
| return pn_string_get(condition->description); |
| } |
| |
| int pn_condition_set_description(pn_condition_t *condition, const char *description) |
| { |
| assert(condition); |
| return pn_string_set(condition->description, description); |
| } |
| |
| pn_data_t *pn_condition_info(pn_condition_t *condition) |
| { |
| assert(condition); |
| return condition->info; |
| } |
| |
| bool pn_condition_is_redirect(pn_condition_t *condition) |
| { |
| const char *name = pn_condition_get_name(condition); |
| return name && (!strcmp(name, "amqp:connection:redirect") || |
| !strcmp(name, "amqp:link:redirect")); |
| } |
| |
| const char *pn_condition_redirect_host(pn_condition_t *condition) |
| { |
| pn_data_t *data = pn_condition_info(condition); |
| pn_data_rewind(data); |
| pn_data_next(data); |
| pn_data_enter(data); |
| pn_data_lookup(data, "network-host"); |
| pn_bytes_t host = pn_data_get_bytes(data); |
| pn_data_rewind(data); |
| return host.start; |
| } |
| |
| int pn_condition_redirect_port(pn_condition_t *condition) |
| { |
| pn_data_t *data = pn_condition_info(condition); |
| pn_data_rewind(data); |
| pn_data_next(data); |
| pn_data_enter(data); |
| pn_data_lookup(data, "port"); |
| int port = pn_data_get_int(data); |
| pn_data_rewind(data); |
| return port; |
| } |
| |
| pn_connection_t *pn_event_connection(pn_event_t *event) |
| { |
| pn_session_t *ssn; |
| pn_transport_t *transport; |
| |
| switch (pn_class_id(pn_event_class(event))) { |
| case CID_pn_connection: |
| return (pn_connection_t *) pn_event_context(event); |
| case CID_pn_transport: |
| transport = pn_event_transport(event); |
| if (transport) |
| return transport->connection; |
| return NULL; |
| default: |
| ssn = pn_event_session(event); |
| if (ssn) |
| return pn_session_connection(ssn); |
| } |
| return NULL; |
| } |
| |
| pn_session_t *pn_event_session(pn_event_t *event) |
| { |
| pn_link_t *link; |
| switch (pn_class_id(pn_event_class(event))) { |
| case CID_pn_session: |
| return (pn_session_t *) pn_event_context(event); |
| default: |
| link = pn_event_link(event); |
| if (link) |
| return pn_link_session(link); |
| } |
| return NULL; |
| } |
| |
| pn_link_t *pn_event_link(pn_event_t *event) |
| { |
| pn_delivery_t *dlv; |
| switch (pn_class_id(pn_event_class(event))) { |
| case CID_pn_link: |
| return (pn_link_t *) pn_event_context(event); |
| default: |
| dlv = pn_event_delivery(event); |
| if (dlv) |
| return pn_delivery_link(dlv); |
| } |
| return NULL; |
| } |
| |
| pn_delivery_t *pn_event_delivery(pn_event_t *event) |
| { |
| switch (pn_class_id(pn_event_class(event))) { |
| case CID_pn_delivery: |
| return (pn_delivery_t *) pn_event_context(event); |
| default: |
| return NULL; |
| } |
| } |
| |
| pn_transport_t *pn_event_transport(pn_event_t *event) |
| { |
| switch (pn_class_id(pn_event_class(event))) { |
| case CID_pn_transport: |
| return (pn_transport_t *) pn_event_context(event); |
| default: |
| { |
| pn_connection_t *conn = pn_event_connection(event); |
| if (conn) |
| return pn_connection_transport(conn); |
| return NULL; |
| } |
| } |
| } |