blob: 12599ae761ead66c84d0e7f5954608244bc3b8e5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "engine-internal.h"
#include <stdlib.h>
#include <string.h>
#include <proton/framing.h>
#include "protocol.h"
#include <inttypes.h>
#include <stdarg.h>
#include <stdio.h>
#include "../sasl/sasl-internal.h"
#include "../ssl/ssl-internal.h"
// delivery buffers
void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
{
// XXX: error handling
db->deliveries = malloc(sizeof(pn_delivery_state_t) * capacity);
db->next = next;
db->capacity = capacity;
db->head = 0;
db->size = 0;
}
void pn_delivery_buffer_free(pn_delivery_buffer_t *db)
{
free(db->deliveries);
}
size_t pn_delivery_buffer_size(pn_delivery_buffer_t *db)
{
return db->size;
}
size_t pn_delivery_buffer_available(pn_delivery_buffer_t *db)
{
return db->capacity - db->size;
}
bool pn_delivery_buffer_empty(pn_delivery_buffer_t *db)
{
return db->size == 0;
}
pn_delivery_state_t *pn_delivery_buffer_get(pn_delivery_buffer_t *db, size_t index)
{
if (index < db->size) return db->deliveries + ((db->head + index) % db->capacity);
else return NULL;
}
pn_delivery_state_t *pn_delivery_buffer_head(pn_delivery_buffer_t *db)
{
if (db->size) return db->deliveries + db->head;
else return NULL;
}
pn_delivery_state_t *pn_delivery_buffer_tail(pn_delivery_buffer_t *db)
{
if (db->size) return pn_delivery_buffer_get(db, db->size - 1);
else return NULL;
}
pn_sequence_t pn_delivery_buffer_lwm(pn_delivery_buffer_t *db)
{
if (db->size) return pn_delivery_buffer_head(db)->id;
else return db->next;
}
static void pn_delivery_state_init(pn_delivery_state_t *ds, pn_delivery_t *delivery, pn_sequence_t id)
{
ds->delivery = delivery;
ds->id = id;
ds->sent = false;
}
pn_delivery_state_t *pn_delivery_buffer_push(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
{
if (!pn_delivery_buffer_available(db))
return NULL;
db->size++;
pn_delivery_state_t *ds = pn_delivery_buffer_tail(db);
pn_delivery_state_init(ds, delivery, db->next++);
return ds;
}
bool pn_delivery_buffer_pop(pn_delivery_buffer_t *db)
{
if (db->size) {
db->head = (db->head + 1) % db->capacity;
db->size--;
return true;
} else {
return false;
}
}
void pn_delivery_buffer_gc(pn_delivery_buffer_t *db)
{
while (db->size && !pn_delivery_buffer_head(db)->delivery) {
pn_delivery_buffer_pop(db);
}
}
// endpoints
pn_connection_t *pn_ep_get_connection(pn_endpoint_t *endpoint)
{
switch (endpoint->type) {
case CONNECTION:
return (pn_connection_t *) endpoint;
case SESSION:
return ((pn_session_t *) endpoint)->connection;
case SENDER:
case RECEIVER:
return ((pn_link_t *) endpoint)->session->connection;
case TRANSPORT:
return ((pn_transport_t *) endpoint)->connection;
}
return NULL;
}
void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
void pn_open(pn_endpoint_t *endpoint)
{
// TODO: do we care about the current state?
PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
pn_modified(pn_ep_get_connection(endpoint), endpoint);
}
void pn_close(pn_endpoint_t *endpoint)
{
// TODO: do we care about the current state?
PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
pn_modified(pn_ep_get_connection(endpoint), endpoint);
}
void pn_free(pn_endpoint_t *endpoint)
{
switch (endpoint->type)
{
case CONNECTION:
pn_connection_free((pn_connection_t *)endpoint);
break;
case TRANSPORT:
pn_transport_free((pn_transport_t *)endpoint);
break;
case SESSION:
pn_session_free((pn_session_t *)endpoint);
break;
case SENDER:
pn_link_free((pn_link_t *)endpoint);
break;
case RECEIVER:
pn_link_free((pn_link_t *)endpoint);
break;
}
}
void pn_connection_open(pn_connection_t *connection)
{
if (connection) pn_open((pn_endpoint_t *) connection);
}
void pn_connection_close(pn_connection_t *connection)
{
if (connection) pn_close((pn_endpoint_t *) connection);
}
void pn_endpoint_tini(pn_endpoint_t *endpoint);
void pn_connection_free(pn_connection_t *connection)
{
if (!connection) return;
while (connection->session_count)
pn_session_free(connection->sessions[connection->session_count - 1]);
free(connection->sessions);
free(connection->container);
free(connection->hostname);
pn_data_free(connection->offered_capabilities);
pn_data_free(connection->desired_capabilities);
pn_endpoint_tini(&connection->endpoint);
free(connection);
}
void *pn_connection_get_context(pn_connection_t *conn)
{
return conn ? conn->context : 0;
}
void pn_connection_set_context(pn_connection_t *conn, void *context)
{
if (conn)
conn->context = context;
}
void pn_transport_open(pn_transport_t *transport)
{
pn_open((pn_endpoint_t *) transport);
}
void pn_transport_close(pn_transport_t *transport)
{
pn_close((pn_endpoint_t *) transport);
}
void pn_transport_free(pn_transport_t *transport)
{
if (!transport) return;
pn_ssl_free(transport->ssl);
pn_sasl_free(transport->sasl);
pn_dispatcher_free(transport->disp);
for (int i = 0; i < transport->session_capacity; i++) {
pn_delivery_buffer_free(&transport->sessions[i].incoming);
pn_delivery_buffer_free(&transport->sessions[i].outgoing);
free(transport->sessions[i].links);
free(transport->sessions[i].handles);
}
free(transport->remote_container);
free(transport->remote_hostname);
pn_data_free(transport->remote_offered_capabilities);
pn_data_free(transport->remote_desired_capabilities);
pn_error_free(transport->error);
free(transport->sessions);
free(transport->channels);
free(transport);
}
void pn_add_session(pn_connection_t *conn, pn_session_t *ssn)
{
PN_ENSURE(conn->sessions, conn->session_capacity, conn->session_count + 1);
conn->sessions[conn->session_count++] = ssn;
ssn->connection = conn;
ssn->id = conn->session_count;
}
void pn_remove_session(pn_connection_t *conn, pn_session_t *ssn)
{
for (int i = 0; i < conn->session_count; i++)
{
if (conn->sessions[i] == ssn)
{
memmove(&conn->sessions[i], &conn->sessions[i+1], conn->session_count - i - 1);
conn->session_count--;
break;
}
}
ssn->connection = NULL;
}
pn_connection_t *pn_session_connection(pn_session_t *session)
{
return session ? session->connection : NULL;
}
void pn_session_open(pn_session_t *session)
{
if (session) pn_open((pn_endpoint_t *) session);
}
void pn_session_close(pn_session_t *session)
{
if (session) pn_close((pn_endpoint_t *) session);
}
void pn_session_free(pn_session_t *session)
{
if (!session) return;
while (session->link_count)
pn_link_free(session->links[session->link_count - 1]);
if (session->connection)
pn_remove_session(session->connection, session);
free(session->links);
pn_endpoint_tini(&session->endpoint);
free(session);
}
void *pn_session_get_context(pn_session_t *session)
{
return session ? session->context : 0;
}
void pn_session_set_context(pn_session_t *session, void *context)
{
if (session)
session->context = context;
}
void pn_add_link(pn_session_t *ssn, pn_link_t *link)
{
PN_ENSURE(ssn->links, ssn->link_capacity, ssn->link_count + 1);
ssn->links[ssn->link_count++] = link;
link->session = ssn;
link->id = ssn->link_count;
}
void pn_remove_link(pn_session_t *ssn, pn_link_t *link)
{
for (int i = 0; i < ssn->link_count; i++)
{
if (ssn->links[i] == link)
{
memmove(&ssn->links[i], &ssn->links[i+1], ssn->link_count - i - 1);
ssn->link_count--;
break;
}
}
link->session = NULL;
}
void pn_free_delivery(pn_delivery_t *delivery)
{
if (delivery) {
pn_buffer_free(delivery->tag);
pn_buffer_free(delivery->bytes);
free(delivery);
}
}
void pn_link_open(pn_link_t *link)
{
if (link) pn_open((pn_endpoint_t *) link);
}
void pn_link_close(pn_link_t *link)
{
if (link) pn_close((pn_endpoint_t *) link);
}
void pn_terminus_free(pn_terminus_t *terminus)
{
free(terminus->address);
pn_data_free(terminus->properties);
pn_data_free(terminus->capabilities);
pn_data_free(terminus->outcomes);
pn_data_free(terminus->filter);
}
void pn_link_free(pn_link_t *link)
{
if (!link) return;
pn_terminus_free(&link->source);
pn_terminus_free(&link->target);
pn_terminus_free(&link->remote_source);
pn_terminus_free(&link->remote_target);
pn_remove_link(link->session, link);
while (link->settled_head) {
pn_delivery_t *d = link->settled_head;
LL_POP(link, settled);
pn_free_delivery(d);
}
while (link->unsettled_head) {
pn_delivery_t *d = link->unsettled_head;
LL_POP(link, unsettled);
pn_free_delivery(d);
}
free(link->name);
pn_endpoint_tini(&link->endpoint);
free(link);
}
void *pn_link_get_context(pn_link_t *link)
{
return link ? link->context : 0;
}
void pn_link_set_context(pn_link_t *link, void *context)
{
if (link)
link->context = context;
}
void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
{
endpoint->type = type;
endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
endpoint->error = pn_error();
endpoint->endpoint_next = NULL;
endpoint->endpoint_prev = NULL;
endpoint->transport_next = NULL;
endpoint->transport_prev = NULL;
endpoint->modified = false;
LL_ADD(conn, endpoint, endpoint);
}
void pn_endpoint_tini(pn_endpoint_t *endpoint)
{
pn_error_free(endpoint->error);
}
pn_connection_t *pn_connection()
{
pn_connection_t *conn = malloc(sizeof(pn_connection_t));
if (!conn) return NULL;
conn->context = NULL;
conn->endpoint_head = NULL;
conn->endpoint_tail = NULL;
pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
conn->transport_head = NULL;
conn->transport_tail = NULL;
conn->sessions = NULL;
conn->session_capacity = 0;
conn->session_count = 0;
conn->transport = NULL;
conn->work_head = NULL;
conn->work_tail = NULL;
conn->tpwork_head = NULL;
conn->tpwork_tail = NULL;
conn->container = NULL;
conn->hostname = NULL;
conn->offered_capabilities = pn_data(16);
conn->desired_capabilities = pn_data(16);
return conn;
}
pn_state_t pn_connection_state(pn_connection_t *connection)
{
return connection ? connection->endpoint.state : 0;
}
pn_error_t *pn_connection_error(pn_connection_t *connection)
{
return connection ? connection->endpoint.error : NULL;
}
const char *pn_connection_get_container(pn_connection_t *connection)
{
return connection ? connection->container : NULL;
}
void pn_connection_set_container(pn_connection_t *connection, const char *container)
{
if (!connection) return;
if (connection->container) free(connection->container);
connection->container = pn_strdup(container);
}
const char *pn_connection_get_hostname(pn_connection_t *connection)
{
return connection ? connection->hostname : NULL;
}
void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname)
{
if (!connection) return;
if (connection->hostname) free(connection->hostname);
connection->hostname = pn_strdup(hostname);
}
pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection)
{
return connection->offered_capabilities;
}
pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection)
{
return connection->desired_capabilities;
}
pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection)
{
if (!connection) return NULL;
return connection->transport ? connection->transport->remote_offered_capabilities : NULL;
}
pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection)
{
if (!connection) return NULL;
return connection->transport ? connection->transport->remote_desired_capabilities : NULL;
}
const char *pn_connection_remote_container(pn_connection_t *connection)
{
if (!connection) return NULL;
return connection->transport ? connection->transport->remote_container : NULL;
}
const char *pn_connection_remote_hostname(pn_connection_t *connection)
{
if (!connection) return NULL;
return connection->transport ? connection->transport->remote_hostname : NULL;
}
pn_delivery_t *pn_work_head(pn_connection_t *connection)
{
if (!connection) return NULL;
return connection->work_head;
}
pn_delivery_t *pn_work_next(pn_delivery_t *delivery)
{
if (!delivery) return NULL;
if (delivery->work)
return delivery->work_next;
else
return pn_work_head(delivery->link->session->connection);
}
void pn_add_work(pn_connection_t *connection, pn_delivery_t *delivery)
{
if (!delivery->work)
{
LL_ADD(connection, work, delivery);
delivery->work = true;
}
}
void pn_clear_work(pn_connection_t *connection, pn_delivery_t *delivery)
{
if (delivery->work)
{
LL_REMOVE(connection, work, delivery);
delivery->work = false;
}
}
void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery)
{
pn_link_t *link = pn_delivery_link(delivery);
pn_delivery_t *current = pn_link_current(link);
if (delivery->updated && !delivery->local_settled) {
pn_add_work(connection, delivery);
} else if (delivery == current) {
if (link->endpoint.type == SENDER) {
if (pn_link_credit(link) > 0) {
pn_add_work(connection, delivery);
} else {
pn_clear_work(connection, delivery);
}
} else {
pn_add_work(connection, delivery);
}
} else {
pn_clear_work(connection, delivery);
}
}
void pn_add_tpwork(pn_delivery_t *delivery)
{
pn_connection_t *connection = delivery->link->session->connection;
if (!delivery->tpwork)
{
LL_ADD(connection, tpwork, delivery);
delivery->tpwork = true;
}
pn_modified(connection, &connection->endpoint);
}
void pn_clear_tpwork(pn_delivery_t *delivery)
{
pn_connection_t *connection = delivery->link->session->connection;
if (delivery->tpwork)
{
LL_REMOVE(connection, tpwork, delivery);
delivery->tpwork = false;
}
}
void pn_dump(pn_connection_t *conn)
{
pn_endpoint_t *endpoint = conn->transport_head;
while (endpoint)
{
printf("%p", (void *) endpoint);
endpoint = endpoint->transport_next;
if (endpoint)
printf(" -> ");
}
printf("\n");
}
void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
{
if (!endpoint->modified) {
LL_ADD(connection, transport, endpoint);
endpoint->modified = true;
}
}
void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
{
if (endpoint->modified) {
LL_REMOVE(connection, transport, endpoint);
endpoint->transport_next = NULL;
endpoint->transport_prev = NULL;
endpoint->modified = false;
}
}
bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
{
if (endpoint->type != type) return false;
if (!state) return true;
int st = endpoint->state;
if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0)
return st & state;
else
return st == state;
}
pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
{
while (endpoint)
{
if (pn_matches(endpoint, type, state))
return endpoint;
endpoint = endpoint->endpoint_next;
}
return NULL;
}
pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state)
{
if (conn)
return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state);
else
return NULL;
}
pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state)
{
if (ssn)
return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state);
else
return NULL;
}
pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state)
{
if (!conn) return NULL;
pn_endpoint_t *endpoint = conn->endpoint_head;
while (endpoint)
{
if (pn_matches(endpoint, SENDER, state) || pn_matches(endpoint, RECEIVER, state))
return (pn_link_t *) endpoint;
endpoint = endpoint->endpoint_next;
}
return NULL;
}
pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state)
{
if (!link) return NULL;
pn_endpoint_t *endpoint = link->endpoint.endpoint_next;
while (endpoint)
{
if (pn_matches(endpoint, SENDER, state) || pn_matches(endpoint, RECEIVER, state))
return (pn_link_t *) endpoint;
endpoint = endpoint->endpoint_next;
}
return NULL;
}
pn_session_t *pn_session(pn_connection_t *conn)
{
if (!conn) return NULL;
pn_session_t *ssn = malloc(sizeof(pn_session_t));
if (!ssn) return NULL;
pn_endpoint_init(&ssn->endpoint, SESSION, conn);
pn_add_session(conn, ssn);
ssn->links = NULL;
ssn->link_capacity = 0;
ssn->link_count = 0;
ssn->context = 0;
return ssn;
}
pn_state_t pn_session_state(pn_session_t *session)
{
return session->endpoint.state;
}
pn_error_t *pn_session_error(pn_session_t *session)
{
return session->endpoint.error;
}
int pn_do_open(pn_dispatcher_t *disp);
int pn_do_begin(pn_dispatcher_t *disp);
int pn_do_attach(pn_dispatcher_t *disp);
int pn_do_transfer(pn_dispatcher_t *disp);
int pn_do_flow(pn_dispatcher_t *disp);
int pn_do_disposition(pn_dispatcher_t *disp);
int pn_do_detach(pn_dispatcher_t *disp);
int pn_do_end(pn_dispatcher_t *disp);
int pn_do_close(pn_dispatcher_t *disp);
static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, const char *bytes, size_t available);
static ssize_t pn_input_read_sasl(pn_transport_t *transport, const char *bytes, size_t available);
static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, const char *bytes, size_t available);
static ssize_t pn_input_read_amqp(pn_transport_t *transport, const char *bytes, size_t available);
static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
void pn_transport_init(pn_transport_t *transport)
{
transport->process_input = pn_input_read_amqp_header;
transport->process_output = pn_output_write_amqp_header;
transport->header_count = 0;
transport->sasl = NULL;
transport->ssl = NULL;
transport->disp = pn_dispatcher(0, transport);
pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
pn_dispatcher_action(transport->disp, BEGIN, "BEGIN", pn_do_begin);
pn_dispatcher_action(transport->disp, ATTACH, "ATTACH", pn_do_attach);
pn_dispatcher_action(transport->disp, TRANSFER, "TRANSFER", pn_do_transfer);
pn_dispatcher_action(transport->disp, FLOW, "FLOW", pn_do_flow);
pn_dispatcher_action(transport->disp, DISPOSITION, "DISPOSITION", pn_do_disposition);
pn_dispatcher_action(transport->disp, DETACH, "DETACH", pn_do_detach);
pn_dispatcher_action(transport->disp, END, "END", pn_do_end);
pn_dispatcher_action(transport->disp, CLOSE, "CLOSE", pn_do_close);
transport->open_sent = false;
transport->open_rcvd = false;
transport->close_sent = false;
transport->close_rcvd = false;
transport->remote_container = NULL;
transport->remote_hostname = NULL;
transport->local_max_frame = 0;
transport->remote_max_frame = 0;
transport->remote_offered_capabilities = pn_data(16);
transport->remote_desired_capabilities = pn_data(16);
transport->error = pn_error();
transport->sessions = NULL;
transport->session_capacity = 0;
transport->channels = NULL;
transport->channel_capacity = 0;
transport->condition = NULL;
}
pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
{
int old_capacity = transport->session_capacity;
PN_ENSURE(transport->sessions, transport->session_capacity, ssn->id + 1);
for (int i = old_capacity; i < transport->session_capacity; i++)
{
transport->sessions[i] = (pn_session_state_t) {.session=NULL,
.local_channel=-1,
.remote_channel=-1};
pn_delivery_buffer_init(&transport->sessions[i].incoming, 0, PN_SESSION_WINDOW);
pn_delivery_buffer_init(&transport->sessions[i].outgoing, 0, PN_SESSION_WINDOW);
}
pn_session_state_t *state = &transport->sessions[ssn->id];
state->session = ssn;
return state;
}
pn_session_state_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
{
PN_ENSUREZ(transport->channels, transport->channel_capacity, channel + 1);
return transport->channels[channel];
}
void pn_map_channel(pn_transport_t *transport, uint16_t channel, pn_session_state_t *state)
{
PN_ENSUREZ(transport->channels, transport->channel_capacity, channel + 1);
state->remote_channel = channel;
transport->channels[channel] = state;
}
pn_transport_t *pn_transport()
{
pn_transport_t *transport = malloc(sizeof(pn_transport_t));
if (!transport) return NULL;
transport->connection = NULL;
pn_transport_init(transport);
return transport;
}
void pn_transport_sasl_init(pn_transport_t *transport)
{
transport->process_input = pn_input_read_sasl_header;
transport->process_output = pn_output_write_sasl_header;
}
int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
{
if (!transport) return PN_ARG_ERR;
if (transport->connection) return PN_STATE_ERR;
if (connection->transport) return PN_STATE_ERR;
transport->connection = connection;
connection->transport = transport;
if (transport->open_rcvd) {
PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
if (!pn_error_code(transport->error)) {
transport->disp->halt = false;
}
}
return 0;
}
pn_error_t *pn_transport_error(pn_transport_t *transport)
{
return transport->error;
}
void pn_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type)
{
terminus->type = type;
terminus->address = NULL;
terminus->durability = PN_NONDURABLE;
terminus->expiry_policy = PN_SESSION_CLOSE;
terminus->timeout = 0;
terminus->dynamic = false;
terminus->properties = pn_data(16);
terminus->capabilities = pn_data(16);
terminus->outcomes = pn_data(16);
terminus->filter = pn_data(16);
}
void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const char *name)
{
pn_endpoint_init(&link->endpoint, type, session->connection);
pn_add_link(session, link);
link->name = pn_strdup(name);
pn_terminus_init(&link->source, PN_SOURCE);
pn_terminus_init(&link->target, PN_TARGET);
pn_terminus_init(&link->remote_source, PN_UNSPECIFIED);
pn_terminus_init(&link->remote_target, PN_UNSPECIFIED);
link->settled_head = link->settled_tail = NULL;
link->unsettled_head = link->unsettled_tail = link->current = NULL;
link->unsettled_count = 0;
link->available = 0;
link->credit = 0;
link->queued = 0;
link->drain = false;
link->drained = false;
link->context = 0;
}
pn_terminus_t *pn_link_source(pn_link_t *link)
{
return link ? &link->source : NULL;
}
pn_terminus_t *pn_link_target(pn_link_t *link)
{
return link ? &link->target : NULL;
}
pn_terminus_t *pn_link_remote_source(pn_link_t *link)
{
return link ? &link->remote_source : NULL;
}
pn_terminus_t *pn_link_remote_target(pn_link_t *link)
{
return link ? &link->remote_target : NULL;
}
int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type)
{
if (!terminus) return PN_ARG_ERR;
terminus->type = type;
return 0;
}
pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus)
{
return terminus ? terminus->type : 0;
}
const char *pn_terminus_get_address(pn_terminus_t *terminus)
{
return terminus ? terminus->address : NULL;
}
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
{
if (!terminus) return PN_ARG_ERR;
if (terminus->address) free(terminus->address);
terminus->address = pn_strdup(address);
return 0;
}
char *pn_bytes_strdup(pn_bytes_t str)
{
return pn_strndup(str.start, str.size);
}
int pn_terminus_set_address_bytes(pn_terminus_t *terminus, pn_bytes_t address)
{
if (!terminus) return PN_ARG_ERR;
if (terminus->address) free(terminus->address);
terminus->address = pn_bytes_strdup(address);
return 0;
}
pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus)
{
return terminus ? terminus->durability : 0;
}
int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability)
{
if (!terminus) return PN_ARG_ERR;
terminus->durability = durability;
return 0;
}
pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus)
{
return terminus ? terminus->expiry_policy : 0;
}
int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy)
{
if (!terminus) return PN_ARG_ERR;
terminus->expiry_policy = expiry_policy;
return 0;
}
pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus)
{
return terminus ? terminus->timeout : 0;
}
int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout)
{
if (!terminus) return PN_ARG_ERR;
terminus->timeout = timeout;
return 0;
}
bool pn_terminus_is_dynamic(pn_terminus_t *terminus)
{
return terminus ? terminus->dynamic : false;
}
int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic)
{
if (!terminus) return PN_ARG_ERR;
terminus->dynamic = dynamic;
return 0;
}
pn_data_t *pn_terminus_properties(pn_terminus_t *terminus)
{
return terminus ? terminus->properties : NULL;
}
pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus)
{
return terminus ? terminus->capabilities : NULL;
}
pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus)
{
return terminus ? terminus->outcomes : NULL;
}
pn_data_t *pn_terminus_filter(pn_terminus_t *terminus)
{
return terminus ? terminus->filter : NULL;
}
int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src)
{
if (!terminus || !src) {
return PN_ARG_ERR;
}
terminus->type = src->type;
int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src));
if (err) return err;
terminus->durability = src->durability;
terminus->expiry_policy = src->expiry_policy;
terminus->timeout = src->timeout;
terminus->dynamic = src->dynamic;
err = pn_data_copy(terminus->properties, src->properties);
if (err) return err;
err = pn_data_copy(terminus->capabilities, src->capabilities);
if (err) return err;
err = pn_data_copy(terminus->outcomes, src->outcomes);
if (err) return err;
err = pn_data_copy(terminus->filter, src->filter);
if (err) return err;
return 0;
}
pn_link_state_t *pn_link_get_state(pn_session_state_t *ssn_state, pn_link_t *link)
{
int old_capacity = ssn_state->link_capacity;
PN_ENSURE(ssn_state->links, ssn_state->link_capacity, link->id + 1);
for (int i = old_capacity; i < ssn_state->link_capacity; i++)
{
ssn_state->links[i] = (pn_link_state_t) {.link=NULL, .local_handle = -1,
.remote_handle=-1};
}
pn_link_state_t *state = &ssn_state->links[link->id];
state->link = link;
return state;
}
void pn_map_handle(pn_session_state_t *ssn_state, uint32_t handle, pn_link_state_t *state)
{
PN_ENSUREZ(ssn_state->handles, ssn_state->handle_capacity, handle + 1);
state->remote_handle = handle;
ssn_state->handles[handle] = state;
}
pn_link_state_t *pn_handle_state(pn_session_state_t *ssn_state, uint32_t handle)
{
PN_ENSUREZ(ssn_state->handles, ssn_state->handle_capacity, handle + 1);
return ssn_state->handles[handle];
}
pn_link_t *pn_sender(pn_session_t *session, const char *name)
{
if (!session) return NULL;
pn_link_t *snd = malloc(sizeof(pn_link_t));
if (!snd) return NULL;
pn_link_init(snd, SENDER, session, name);
return snd;
}
pn_link_t *pn_receiver(pn_session_t *session, const char *name)
{
if (!session) return NULL;
pn_link_t *rcv = malloc(sizeof(pn_link_t));
if (!rcv) return NULL;
pn_link_init(rcv, RECEIVER, session, name);
return rcv;
}
pn_state_t pn_link_state(pn_link_t *link)
{
return link->endpoint.state;
}
pn_error_t *pn_link_error(pn_link_t *link)
{
return link->endpoint.error;
}
const char *pn_link_name(pn_link_t *link)
{
return link ? link->name : NULL;
}
bool pn_link_is_sender(pn_link_t *link)
{
return link->endpoint.type == SENDER;
}
bool pn_link_is_receiver(pn_link_t *link)
{
return link->endpoint.type == RECEIVER;
}
pn_session_t *pn_link_session(pn_link_t *link)
{
return link->session;
}
pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
{
if (!link) return NULL;
pn_delivery_t *delivery = link->settled_head;
LL_POP(link, settled);
if (!delivery) {
delivery = malloc(sizeof(pn_delivery_t));
if (!delivery) return NULL;
delivery->tag = pn_buffer(16);
delivery->bytes = pn_buffer(64);
}
delivery->link = link;
pn_buffer_clear(delivery->tag);
pn_buffer_append(delivery->tag, tag.bytes, tag.size);
delivery->local_state = 0;
delivery->remote_state = 0;
delivery->local_settled = false;
delivery->remote_settled = false;
delivery->updated = false;
delivery->settled = false;
LL_ADD(link, unsettled, delivery);
delivery->work_next = NULL;
delivery->work_prev = NULL;
delivery->work = false;
delivery->tpwork_next = NULL;
delivery->tpwork_prev = NULL;
delivery->tpwork = false;
pn_buffer_clear(delivery->bytes);
delivery->done = false;
delivery->transport_context = NULL;
delivery->context = NULL;
if (!link->current)
link->current = delivery;
link->unsettled_count++;
pn_work_update(link->session->connection, delivery);
return delivery;
}
int pn_link_unsettled(pn_link_t *link)
{
return link->unsettled_count;
}
pn_delivery_t *pn_unsettled_head(pn_link_t *link)
{
return link->unsettled_head;
}
pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
{
return delivery->unsettled_next;
}
bool pn_is_current(pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
return pn_link_current(link) == delivery;
}
void pn_delivery_dump(pn_delivery_t *d)
{
char tag[1024];
pn_bytes_t bytes = pn_buffer_bytes(d->tag);
pn_quote_data(tag, 1024, bytes.start, bytes.size);
printf("{tag=%s, local_state=%u, remote_state=%u, local_settled=%u, "
"remote_settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
"work=%u}",
tag, d->local_state, d->remote_state, d->local_settled,
d->remote_settled, d->updated, pn_is_current(d),
pn_delivery_writable(d), pn_delivery_readable(d), d->work);
}
void *pn_delivery_get_context(pn_delivery_t *delivery)
{
return delivery ? delivery->context : NULL;
}
void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
{
if (delivery)
delivery->context = context;
}
pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
if (delivery) {
pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
return pn_dtag(tag.start, tag.size);
} else {
return (pn_delivery_tag_t) {0};
}
}
pn_delivery_t *pn_link_current(pn_link_t *link)
{
if (!link) return NULL;
return link->current;
}
void pn_advance_sender(pn_link_t *link)
{
link->current->done = true;
link->queued++;
link->credit--;
pn_add_tpwork(link->current);
link->current = link->current->unsettled_next;
}
void pn_advance_receiver(pn_link_t *link)
{
link->credit--;
link->queued--;
link->current = link->current->unsettled_next;
}
bool pn_link_advance(pn_link_t *link)
{
if (link && link->current) {
pn_delivery_t *prev = link->current;
if (link->endpoint.type == SENDER) {
pn_advance_sender(link);
} else {
pn_advance_receiver(link);
}
pn_delivery_t *next = link->current;
pn_work_update(link->session->connection, prev);
if (next) pn_work_update(link->session->connection, next);
return prev != next;
} else {
return false;
}
}
int pn_link_credit(pn_link_t *link)
{
return link ? link->credit : 0;
}
int pn_link_available(pn_link_t *link)
{
return link ? link->available : 0;
}
int pn_link_queued(pn_link_t *link)
{
return link ? link->queued : 0;
}
void pn_real_settle(pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
LL_REMOVE(link, unsettled, delivery);
// TODO: what if we settle the current delivery?
LL_ADD(link, settled, delivery);
pn_buffer_clear(delivery->tag);
pn_buffer_clear(delivery->bytes);
delivery->settled = true;
}
void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
{
pn_delivery_state_t *state = delivery->transport_context;
delivery->transport_context = NULL;
if (state) state->delivery = NULL;
pn_real_settle(delivery);
if (state) pn_delivery_buffer_gc(db);
pn_clear_tpwork(delivery);
}
void pn_delivery_settle(pn_delivery_t *delivery)
{
if (!delivery) return;
pn_link_t *link = delivery->link;
if (pn_is_current(delivery)) {
pn_link_advance(link);
}
link->unsettled_count--;
delivery->local_settled = true;
pn_add_tpwork(delivery);
pn_work_update(delivery->link->session->connection, delivery);
}
int pn_post_close(pn_transport_t *transport)
{
const char *condition = transport->condition;
return pn_post_frame(transport->disp, 0, "DL[?DL[s]]", CLOSE, (bool) condition, ERROR, condition);
}
int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
{
va_list ap;
transport->condition = condition;
va_start(ap, fmt);
char buf[1024];
// XXX: result
vsnprintf(buf, 1024, fmt, ap);
va_end(ap);
pn_error_set(transport->error, PN_ERR, buf);
if (!transport->close_sent) {
pn_post_close(transport);
transport->close_sent = true;
}
transport->disp->halt = true;
fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->error));
return PN_ERR;
}
int pn_do_open(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
pn_connection_t *conn = transport->connection;
bool container_q, hostname_q;
pn_bytes_t remote_container, remote_hostname;
pn_data_clear(transport->remote_offered_capabilities);
pn_data_clear(transport->remote_desired_capabilities);
int err = pn_scan_args(disp, "D.[?S?SI....CC]", &container_q,
&remote_container, &hostname_q, &remote_hostname,
&transport->remote_max_frame,
transport->remote_offered_capabilities,
transport->remote_desired_capabilities);
if (err) return err;
if (transport->remote_max_frame > 0) {
if (transport->remote_max_frame < AMQP_MIN_MAX_FRAME_SIZE) {
fprintf(stderr, "Peer advertised bad max-frame (%u), forcing to %u\n",
transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE);
transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE;
}
disp->remote_max_frame = transport->remote_max_frame;
pn_buffer_clear( disp->frame );
pn_buffer_ensure( disp->frame, disp->remote_max_frame );
}
if (container_q) {
transport->remote_container = pn_bytes_strdup(remote_container);
} else {
transport->remote_container = NULL;
}
if (hostname_q) {
transport->remote_hostname = pn_bytes_strdup(remote_hostname);
} else {
transport->remote_hostname = NULL;
}
if (conn) {
PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
} else {
transport->disp->halt = true;
}
transport->open_rcvd = true;
return 0;
}
int pn_do_begin(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
bool reply;
uint16_t remote_channel;
pn_sequence_t next;
int err = pn_scan_args(disp, "D.[?HI]", &reply, &remote_channel, &next);
if (err) return err;
pn_session_state_t *state;
if (reply) {
// XXX: what if session is NULL?
state = &transport->sessions[remote_channel];
} else {
pn_session_t *ssn = pn_session(transport->connection);
state = pn_session_get_state(transport, ssn);
}
state->incoming_transfer_count = next;
pn_map_channel(transport, disp->channel, state);
PN_SET_REMOTE(state->session->endpoint.state, PN_REMOTE_ACTIVE);
return 0;
}
pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_bytes_t name, bool is_sender)
{
pn_endpoint_type_t type = is_sender ? SENDER : RECEIVER;
for (int i = 0; i < ssn_state->session->link_count; i++)
{
pn_link_t *link = ssn_state->session->links[i];
if (link->endpoint.type == type &&
!strncmp(name.start, link->name, name.size))
{
return pn_link_get_state(ssn_state, link);
}
}
return NULL;
}
static pn_expiry_policy_t symbol2policy(pn_bytes_t symbol)
{
if (!symbol.start)
return PN_SESSION_CLOSE;
if (!strncmp(symbol.start, "link-detach", symbol.size))
return PN_LINK_CLOSE;
if (!strncmp(symbol.start, "session-end", symbol.size))
return PN_SESSION_CLOSE;
if (!strncmp(symbol.start, "connection-close", symbol.size))
return PN_CONNECTION_CLOSE;
if (!strncmp(symbol.start, "never", symbol.size))
return PN_NEVER;
return PN_SESSION_CLOSE;
}
int pn_do_attach(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
pn_bytes_t name;
uint32_t handle;
bool is_sender;
pn_bytes_t source, target;
pn_durability_t src_dr, tgt_dr;
pn_bytes_t src_exp, tgt_exp;
pn_seconds_t src_timeout, tgt_timeout;
bool src_dynamic, tgt_dynamic;
pn_sequence_t idc;
int err = pn_scan_args(disp, "D.[SIo..D.[SIsIo]D.[SIsIo]..I]", &name, &handle,
&is_sender,
&source, &src_dr, &src_exp, &src_timeout, &src_dynamic,
&target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
&idc);
if (err) return err;
char strname[name.size + 1];
strncpy(strname, name.start, name.size);
strname[name.size] = '\0';
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_link_state_t *link_state = pn_find_link(ssn_state, name, is_sender);
pn_link_t *link;
if (!link_state) {
if (is_sender) {
link = (pn_link_t *) pn_sender(ssn_state->session, strname);
} else {
link = (pn_link_t *) pn_receiver(ssn_state->session, strname);
}
link_state = pn_link_get_state(ssn_state, link);
} else {
link = link_state->link;
}
pn_map_handle(ssn_state, handle, link_state);
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
pn_terminus_t *rsrc = &link_state->link->remote_source;
if (source.start) {
pn_terminus_set_type(rsrc, PN_SOURCE);
pn_terminus_set_address_bytes(rsrc, source);
pn_terminus_set_durability(rsrc, src_dr);
pn_terminus_set_expiry_policy(rsrc, symbol2policy(src_exp));
pn_terminus_set_timeout(rsrc, src_timeout);
pn_terminus_set_dynamic(rsrc, src_dynamic);
} else {
pn_terminus_set_type(rsrc, PN_UNSPECIFIED);
}
pn_terminus_t *rtgt = &link_state->link->remote_target;
if (target.start) {
pn_terminus_set_type(rtgt, PN_TARGET);
pn_terminus_set_address_bytes(rtgt, target);
pn_terminus_set_durability(rtgt, tgt_dr);
pn_terminus_set_expiry_policy(rtgt, symbol2policy(tgt_exp));
pn_terminus_set_timeout(rtgt, tgt_timeout);
pn_terminus_set_dynamic(rtgt, tgt_dynamic);
} else {
pn_terminus_set_type(rtgt, PN_UNSPECIFIED);
}
pn_data_clear(link->remote_source.properties);
pn_data_clear(link->remote_source.filter);
pn_data_clear(link->remote_source.outcomes);
pn_data_clear(link->remote_source.capabilities);
pn_data_clear(link->remote_target.properties);
pn_data_clear(link->remote_target.capabilities);
err = pn_scan_args(disp, "D.[.....D.[.....C.C.CC]D.[.....CC]",
link->remote_source.properties,
link->remote_source.filter,
link->remote_source.outcomes,
link->remote_source.capabilities,
link->remote_target.properties,
link->remote_target.capabilities);
if (err) return err;
pn_data_rewind(link->remote_source.properties);
pn_data_rewind(link->remote_source.filter);
pn_data_rewind(link->remote_source.outcomes);
pn_data_rewind(link->remote_source.capabilities);
pn_data_rewind(link->remote_target.properties);
pn_data_rewind(link->remote_target.capabilities);
if (!is_sender) {
link_state->delivery_count = idc;
}
return 0;
}
int pn_post_flow(pn_transport_t *transport, pn_session_state_t *ssn_state, pn_link_state_t *state);
int pn_do_transfer(pn_dispatcher_t *disp)
{
// XXX: multi transfer
pn_transport_t *transport = disp->context;
uint32_t handle;
pn_bytes_t tag;
bool id_present;
pn_sequence_t id;
bool more;
int err = pn_scan_args(disp, "D.[I?Iz..o]", &handle, &id_present, &id, &tag,
&more);
if (err) return err;
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
pn_delivery_t *delivery;
if (link->unsettled_tail && !link->unsettled_tail->done) {
delivery = link->unsettled_tail;
} else {
pn_delivery_buffer_t *incoming = &ssn_state->incoming;
if (!pn_delivery_buffer_available(incoming)) {
return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
}
if (!ssn_state->incoming_init) {
incoming->next = id;
ssn_state->incoming_init = true;
}
delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
delivery->transport_context = state;
if (id_present && id != state->id) {
int err = pn_do_error(transport, "amqp:session:invalid-field",
"sequencing error, expected delivery-id %u, got %u",
state->id, id);
// XXX: this will probably leave delivery buffer state messed up
pn_full_settle(incoming, delivery);
return err;
}
link_state->delivery_count++;
link_state->link_credit--;
link->queued++;
}
pn_buffer_append(delivery->bytes, disp->payload, disp->size);
delivery->done = !more;
ssn_state->incoming_transfer_count++;
ssn_state->incoming_window--;
if (!ssn_state->incoming_window && (int32_t) link_state->local_handle >= 0) {
pn_post_flow(transport, ssn_state, link_state);
}
return 0;
}
int pn_do_flow(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
pn_sequence_t onext, inext, delivery_count;
uint32_t iwin, owin, link_credit;
uint32_t handle;
bool inext_init, handle_init, dcount_init, drain;
int err = pn_scan_args(disp, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin,
&onext, &owin, &handle_init, &handle, &dcount_init,
&delivery_count, &link_credit, &drain);
if (err) return err;
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
if (inext_init) {
ssn_state->outgoing_window = inext + iwin - ssn_state->outgoing_transfer_count;
} else {
ssn_state->outgoing_window = iwin;
}
if (handle_init) {
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
if (link->endpoint.type == SENDER) {
pn_sequence_t receiver_count;
if (dcount_init) {
receiver_count = delivery_count;
} else {
// our initial delivery count
receiver_count = 0;
}
pn_sequence_t old = link_state->link_credit;
link_state->link_credit = receiver_count + link_credit - link_state->delivery_count;
link->credit += link_state->link_credit - old;
link->drain = drain;
pn_delivery_t *delivery = pn_link_current(link);
if (delivery) pn_work_update(transport->connection, delivery);
} else {
pn_sequence_t delta = delivery_count - link_state->delivery_count;
if (delta > 0) {
link_state->delivery_count += delta;
link_state->link_credit -= delta;
link->credit -= delta;
}
}
}
return 0;
}
int pn_do_disposition(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
bool role;
pn_sequence_t first, last;
uint64_t code;
bool last_init, settled, code_init;
int err = pn_scan_args(disp, "D.[oI?IoD?L[]]", &role, &first, &last_init,
&last, &settled, &code_init, &code);
if (err) return err;
if (!last_init) last = first;
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_disposition_t dispo = 0;
if (code_init) {
switch (code)
{
case ACCEPTED:
dispo = PN_ACCEPTED;
break;
case REJECTED:
dispo = PN_REJECTED;
break;
default:
// XXX
fprintf(stderr, "default %" PRIu64 "\n", code);
break;
}
}
pn_delivery_buffer_t *deliveries;
if (role) {
deliveries = &ssn_state->outgoing;
} else {
deliveries = &ssn_state->incoming;
}
pn_sequence_t lwm = pn_delivery_buffer_lwm(deliveries);
for (pn_sequence_t id = first; id <= last; id++) {
if (id < lwm) continue;
pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
pn_delivery_t *delivery = state->delivery;
delivery->remote_state = dispo;
delivery->remote_settled = settled;
delivery->updated = true;
pn_work_update(transport->connection, delivery);
}
return 0;
}
int pn_do_detach(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
uint32_t handle;
bool closed;
int err = pn_scan_args(disp, "D.[Io]", &handle, &closed);
if (err) return err;
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
if (!ssn_state) {
return pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", disp->channel);
}
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
link_state->remote_handle = -2;
if (closed)
{
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
} else {
// TODO: implement
}
return 0;
}
int pn_do_end(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_session_t *session = ssn_state->session;
ssn_state->remote_channel = -2;
PN_SET_REMOTE(session->endpoint.state, PN_REMOTE_CLOSED);
return 0;
}
int pn_do_close(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
transport->close_rcvd = true;
PN_SET_REMOTE(transport->connection->endpoint.state, PN_REMOTE_CLOSED);
return 0;
}
ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available)
{
if (!transport) return PN_ARG_ERR;
size_t consumed = 0;
const bool use_ssl = transport->ssl != NULL;
while (true) {
ssize_t n;
if (use_ssl)
n = pn_ssl_input( transport->ssl, bytes + consumed, available - consumed);
else
n = transport->process_input(transport, bytes + consumed, available - consumed);
if (n > 0) {
consumed += n;
if (consumed >= available) {
break;
}
} else if (n == 0) {
break;
} else {
if (n != PN_EOS) {
pn_dispatcher_trace(transport->disp, 0, "ERROR[%i] %s\n",
pn_error_code(transport->error),
pn_error_text(transport->error));
}
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
return n;
}
}
return consumed;
}
#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes, size_t available,
const char *header, size_t size, const char *protocol,
ssize_t (*next)(pn_transport_t *, const char *, size_t))
{
const char *point = header + transport->header_count;
int delta = pn_min(available, size - transport->header_count);
if (!available || memcmp(bytes, point, delta)) {
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
return pn_error_format(transport->error, PN_ERR,
"%s header mismatch: '%s'", protocol, quoted);
} else {
transport->header_count += delta;
if (transport->header_count == size) {
transport->header_count = 0;
transport->process_input = next;
if (transport->disp->trace & PN_TRACE_FRM)
fprintf(stderr, " <- %s\n", protocol);
}
return delta;
}
}
static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, const char *bytes, size_t available)
{
return pn_input_read_header(transport, bytes, available, SASL_HEADER, 8, "SASL", pn_input_read_sasl);
}
static ssize_t pn_input_read_sasl(pn_transport_t *transport, const char *bytes, size_t available)
{
pn_sasl_t *sasl = transport->sasl;
ssize_t n = pn_sasl_input(sasl, bytes, available);
if (n == PN_EOS) {
transport->process_input = pn_input_read_amqp_header;
return transport->process_input(transport, bytes, available);
} else {
return n;
}
}
#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, const char *bytes, size_t available)
{
return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
"AMQP", pn_input_read_amqp);
}
static ssize_t pn_input_read_amqp(pn_transport_t *transport, const char *bytes, size_t available)
{
if (transport->close_rcvd) {
if (available > 0) {
pn_do_error(transport, "amqp:connection:framing-error", "data after close");
return PN_ERR;
} else {
return PN_EOS;
}
}
if (!available) {
pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
return PN_ERR;
}
ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
if (n < 0) {
return pn_error_set(transport->error, n, "dispatch error");
} else if (transport->close_rcvd) {
return PN_EOS;
} else {
return n;
}
}
bool pn_delivery_buffered(pn_delivery_t *delivery)
{
if (delivery->settled) return false;
if (pn_link_is_sender(delivery->link)) {
pn_delivery_state_t *state = delivery->transport_context;
if (state) {
return (delivery->done && !state->sent) || pn_buffer_size(delivery->bytes) > 0;
} else {
return delivery->done;
}
} else {
return false;
}
}
int pn_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == CONNECTION)
{
if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
{
pn_connection_t *connection = (pn_connection_t *) endpoint;
int err = pn_post_frame(transport->disp, 0, "DL[SS?InnnnCC]", OPEN,
connection->container,
connection->hostname,
// if not zero, advertise our max frame size
(bool)transport->local_max_frame, transport->local_max_frame,
connection->offered_capabilities,
connection->desired_capabilities);
if (err) return err;
transport->open_sent = true;
}
}
return 0;
}
int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SESSION && transport->open_sent)
{
pn_session_t *ssn = (pn_session_t *) endpoint;
pn_session_state_t *state = pn_session_get_state(transport, ssn);
if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
{
// XXX: we use the session id as the outgoing channel, we depend
// on this for looking up via remote channel
uint16_t channel = ssn->id;
pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
((int16_t) state->remote_channel >= 0), state->remote_channel,
state->outgoing_transfer_count,
pn_delivery_buffer_available(&state->incoming),
pn_delivery_buffer_available(&state->outgoing));
state->local_channel = channel;
}
}
return 0;
}
static const char *expiry_symbol(pn_expiry_policy_t policy)
{
switch (policy)
{
case PN_LINK_CLOSE:
return "link-detach";
case PN_SESSION_CLOSE:
return NULL;
case PN_CONNECTION_CLOSE:
return "connection-close";
case PN_NEVER:
return "never";
}
return NULL;
}
int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (transport->open_sent && (endpoint->type == SENDER ||
endpoint->type == RECEIVER))
{
pn_link_t *link = (pn_link_t *) endpoint;
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
pn_link_state_t *state = pn_link_get_state(ssn_state, link);
if (((int16_t) ssn_state->local_channel >= 0) &&
!(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1)
{
// XXX
state->local_handle = link->id;
int err = pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[SIonn?DL[SIsIoCnCnCC]?DL[SIsIoCC]nnI]", ATTACH,
link->name,
state->local_handle,
endpoint->type == RECEIVER,
(bool) link->source.type, SOURCE,
link->source.address,
link->source.durability,
expiry_symbol(link->source.expiry_policy),
link->source.timeout,
link->source.dynamic,
link->source.properties,
link->source.filter,
link->source.outcomes,
link->source.capabilities,
(bool) link->target.type, TARGET,
link->target.address,
link->target.durability,
expiry_symbol(link->target.expiry_policy),
link->target.timeout,
link->target.dynamic,
link->target.properties,
link->target.capabilities,
0);
if (err) return err;
}
}
return 0;
}
int pn_post_flow(pn_transport_t *transport, pn_session_state_t *ssn_state, pn_link_state_t *state)
{
ssn_state->incoming_window = pn_delivery_buffer_available(&ssn_state->incoming);
bool link = (bool) state;
return pn_post_frame(transport->disp, ssn_state->local_channel, "DL[?IIII?I?I?In?o]", FLOW,
(int16_t) ssn_state->remote_channel >= 0, ssn_state->incoming_transfer_count,
ssn_state->incoming_window,
ssn_state->outgoing.next,
pn_delivery_buffer_available(&ssn_state->outgoing),
link, state ? state->local_handle : 0,
link, state ? state->delivery_count : 0,
link, state ? state->link_credit : 0,
link, state ? state->link->drain : false);
}
int pn_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE)
{
pn_link_t *rcv = (pn_link_t *) endpoint;
pn_session_state_t *ssn_state = pn_session_get_state(transport, rcv->session);
pn_link_state_t *state = pn_link_get_state(ssn_state, rcv);
if ((int16_t) ssn_state->local_channel >= 0 &&
(int32_t) state->local_handle >= 0 &&
((rcv->drain || state->link_credit != rcv->credit - rcv->queued) || !ssn_state->incoming_window)) {
state->link_credit = rcv->credit - rcv->queued;
return pn_post_flow(transport, ssn_state, state);
}
}
return 0;
}
int pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
// XXX: check for null state
pn_delivery_state_t *state = delivery->transport_context;
uint64_t code;
switch(delivery->local_state) {
case PN_ACCEPTED:
code = ACCEPTED;
break;
case PN_RELEASED:
code = RELEASED;
break;
//TODO: rejected and modified (both take extra data which may need to be passed through somehow) e.g. change from enum to discriminated union?
default:
code = 0;
}
if (code || delivery->local_settled)
return pn_post_frame(transport->disp, ssn_state->local_channel, "DL[oIIo?DL[]]", DISPOSITION,
link->endpoint.type == RECEIVER, state->id, state->id,
delivery->local_settled,
(bool)code, code);
return 0;
}
int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
pn_delivery_state_t *state = delivery->transport_context;
if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
delivery->transport_context = state;
}
if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
ssn_state->outgoing_window > 0 && link_state->link_credit > 0) {
pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
pn_set_payload(transport->disp, bytes.start, bytes.size);
pn_buffer_clear(delivery->bytes);
pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
int err = pn_post_transfer_frame(transport->disp,
ssn_state->local_channel,
link_state->local_handle,
state->id, &tag,
0, // message-format
delivery->local_settled,
!delivery->done);
if (err) return err;
ssn_state->outgoing_transfer_count++;
ssn_state->outgoing_window--;
if (delivery->done) {
state->sent = true;
link_state->delivery_count++;
link_state->link_credit--;
link->queued--;
}
}
}
pn_delivery_state_t *state = delivery->transport_context;
// XXX: need to prevent duplicate disposition sending
if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
&& state && state->sent) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
if (delivery->local_settled && state && state->sent) {
pn_full_settle(&ssn_state->outgoing, delivery);
}
return 0;
}
int pn_process_tpwork_receiver(pn_transport_t *transport, pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
// XXX: need to prevent duplicate disposition sending
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->transport_context) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
if (delivery->local_settled) {
size_t available = pn_delivery_buffer_available(&ssn_state->incoming);
pn_full_settle(&ssn_state->incoming, delivery);
if (!ssn_state->incoming_window &&
pn_delivery_buffer_available(&ssn_state->incoming) > available) {
int err = pn_post_flow(transport, ssn_state, NULL);
if (err) return err;
}
}
return 0;
}
int pn_process_tpwork(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == CONNECTION && !transport->close_sent)
{
pn_connection_t *conn = (pn_connection_t *) endpoint;
pn_delivery_t *delivery = conn->tpwork_head;
while (delivery)
{
pn_link_t *link = delivery->link;
if (pn_link_is_sender(link)) {
int err = pn_process_tpwork_sender(transport, delivery);
if (err) return err;
} else {
int err = pn_process_tpwork_receiver(transport, delivery);
if (err) return err;
}
if (!pn_delivery_buffered(delivery)) {
pn_clear_tpwork(delivery);
}
delivery = delivery->tpwork_next;
}
}
return 0;
}
int pn_process_flow_sender(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SENDER && endpoint->state & PN_LOCAL_ACTIVE)
{
pn_link_t *snd = (pn_link_t *) endpoint;
pn_session_state_t *ssn_state = pn_session_get_state(transport, snd->session);
pn_link_state_t *state = pn_link_get_state(ssn_state, snd);
if ((int16_t) ssn_state->local_channel >= 0 &&
(int32_t) state->local_handle >= 0 &&
snd->drain && snd->drained) {
pn_delivery_t *tail = state->link->unsettled_tail;
if (!tail || !pn_delivery_buffered(tail)) {
state->delivery_count += state->link_credit;
state->link_credit = 0;
snd->drained = false;
return pn_post_flow(transport, ssn_state, state);
}
}
}
return 0;
}
int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SENDER || endpoint->type == RECEIVER)
{
pn_link_t *link = (pn_link_t *) endpoint;
pn_session_t *session = link->session;
pn_session_state_t *ssn_state = pn_session_get_state(transport, session);
pn_link_state_t *state = pn_link_get_state(ssn_state, link);
if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >= 0 &&
(int16_t) ssn_state->local_channel >= 0 && !transport->close_sent) {
if (pn_link_is_sender(link) && pn_link_queued(link) &&
(int32_t) state->remote_handle != -2 &&
(int16_t) ssn_state->remote_channel != -2 &&
!transport->close_rcvd) return 0;
int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io]", DETACH,
state->local_handle, true);
if (err) return err;
state->local_handle = -2;
}
pn_clear_modified(transport->connection, endpoint);
}
return 0;
}
bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session)
{
if (!transport->open_rcvd) return true;
if (transport->close_rcvd) return false;
pn_connection_t *conn = transport->connection;
pn_link_t *link = pn_link_head(conn, 0);
while (link) {
if (pn_link_is_sender(link) && pn_link_queued(link) > 0) {
pn_session_t *ssn = link->session;
if (session && session == ssn) {
pn_session_state_t *ssn_state = pn_session_get_state(transport, session);
pn_link_state_t *state = pn_link_get_state(ssn_state, link);
if ((int32_t) state->remote_handle != -2 &&
(int16_t) ssn_state->remote_channel != -2) {
return true;
}
}
}
link = pn_link_next(link, 0);
}
return false;
}
int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SESSION)
{
pn_session_t *session = (pn_session_t *) endpoint;
pn_session_state_t *state = pn_session_get_state(transport, session);
if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0
&& !transport->close_sent)
{
if (pn_pointful_buffering(transport, session)) return 0;
int err = pn_post_frame(transport->disp, state->local_channel, "DL[]", END);
if (err) return err;
state->local_channel = -2;
}
pn_clear_modified(transport->connection, endpoint);
}
return 0;
}
int pn_process_conn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == CONNECTION)
{
if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
if (pn_pointful_buffering(transport, NULL)) return 0;
int err = pn_post_close(transport);
if (err) return err;
transport->close_sent = true;
}
pn_clear_modified(transport->connection, endpoint);
}
return 0;
}
int pn_phase(pn_transport_t *transport, int (*phase)(pn_transport_t *, pn_endpoint_t *))
{
pn_connection_t *conn = transport->connection;
pn_endpoint_t *endpoint = conn->transport_head;
while (endpoint)
{
pn_endpoint_t *next = endpoint->transport_next;
int err = phase(transport, endpoint);
if (err) return err;
endpoint = next;
}
return 0;
}
int pn_process(pn_transport_t *transport)
{
int err;
if ((err = pn_phase(transport, pn_process_conn_setup))) return err;
if ((err = pn_phase(transport, pn_process_ssn_setup))) return err;
if ((err = pn_phase(transport, pn_process_link_setup))) return err;
if ((err = pn_phase(transport, pn_process_flow_receiver))) return err;
// XXX: this has to happen two times because we might settle stuff
// on the first pass and create space for more work to be done on the
// second pass
if ((err = pn_phase(transport, pn_process_tpwork))) return err;
if ((err = pn_phase(transport, pn_process_tpwork))) return err;
if ((err = pn_phase(transport, pn_process_flow_sender))) return err;
if ((err = pn_phase(transport, pn_process_link_teardown))) return err;
if ((err = pn_phase(transport, pn_process_ssn_teardown))) return err;
if ((err = pn_phase(transport, pn_process_conn_teardown))) return err;
if (transport->connection->tpwork_head) {
pn_modified(transport->connection, &transport->connection->endpoint);
}
return 0;
}
static ssize_t pn_output_write_header(pn_transport_t *transport,
char *bytes, size_t size,
const char *header, size_t hdrsize,
const char *protocol,
ssize_t (*next)(pn_transport_t *, char *, size_t))
{
if (transport->disp->trace & PN_TRACE_FRM)
fprintf(stderr, " -> %s\n", protocol);
if (size >= hdrsize) {
memmove(bytes, header, hdrsize);
transport->process_output = next;
return hdrsize;
} else {
return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
}
}
static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t size)
{
return pn_output_write_header(transport, bytes, size, SASL_HEADER, 8, "SASL",
pn_output_write_sasl);
}
static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t size)
{
pn_sasl_t *sasl = transport->sasl;
ssize_t n = pn_sasl_output(sasl, bytes, size);
if (n == PN_EOS) {
transport->process_output = pn_output_write_amqp_header;
return 0;
} else {
return n;
}
}
static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t size)
{
return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
pn_output_write_amqp);
}
static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t size)
{
if (!transport->connection) {
return 0;
}
if (!pn_error_code(transport->error)) {
pn_error_set(transport->error, pn_process(transport), "process error");
}
if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
if (pn_error_code(transport->error))
return pn_error_code(transport->error);
else
return PN_EOS;
}
return pn_dispatcher_output(transport->disp, bytes, size);
}
ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
{
if (!transport) return PN_ARG_ERR;
size_t total = 0;
const bool use_ssl = transport->ssl != NULL;
while (size - total > 0) {
ssize_t n;
if (use_ssl)
n = pn_ssl_output( transport->ssl, bytes + total, size - total);
else
n = transport->process_output(transport, bytes + total, size - total);
if (n > 0) {
total += n;
} else if (n == 0) {
break;
} else if (n == PN_EOS) {
if (total > 0) {
return total;
} else {
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
return PN_EOS;
}
} else {
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_dispatcher_trace(transport->disp, 0, "-> EOS (%zi) %s\n", n,
pn_error_text(transport->error));
return n;
}
}
return total;
}
void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace)
{
if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
if (transport->ssl) pn_ssl_trace(transport->ssl, trace);
transport->disp->trace = trace;
}
uint32_t pn_transport_get_max_frame(pn_transport_t *transport)
{
return transport->local_max_frame;
}
void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
{
if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
size = AMQP_MIN_MAX_FRAME_SIZE;
transport->local_max_frame = size;
}
uint32_t pn_transport_get_remote_max_frame(pn_transport_t *transport)
{
return transport->remote_max_frame;
}
void pn_link_offered(pn_link_t *sender, int credit)
{
sender->available = credit;
}
ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
{
pn_delivery_t *current = pn_link_current(sender);
if (!current) return PN_EOS;
pn_buffer_append(current->bytes, bytes, n);
pn_add_tpwork(current);
return n;
}
void pn_link_drained(pn_link_t *sender)
{
if (sender && sender->drain && sender->credit > 0) {
sender->credit = 0;
sender->drained = true;
pn_modified(sender->session->connection, &sender->endpoint);
}
}
ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
{
if (!receiver) return PN_ARG_ERR;
pn_delivery_t *delivery = receiver->current;
if (delivery) {
size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
pn_buffer_trim(delivery->bytes, size, 0);
if (size) {
return size;
} else {
return delivery->done ? PN_EOS : 0;
}
} else {
return PN_STATE_ERR;
}
}
void pn_link_flow(pn_link_t *receiver, int credit)
{
if (receiver && pn_link_is_receiver(receiver)) {
receiver->credit += credit;
receiver->drain = false;
pn_modified(receiver->session->connection, &receiver->endpoint);
}
}
void pn_link_drain(pn_link_t *receiver, int credit)
{
if (receiver && pn_link_is_receiver(receiver)) {
pn_link_flow(receiver, credit);
receiver->drain = true;
}
}
time_t pn_transport_tick(pn_transport_t *engine, time_t now)
{
return 0;
}
pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
{
if (!delivery) return NULL;
return delivery->link;
}
pn_disposition_t pn_delivery_local_state(pn_delivery_t *delivery)
{
return delivery->local_state;
}
pn_disposition_t pn_delivery_remote_state(pn_delivery_t *delivery)
{
return delivery->remote_state;
}
bool pn_delivery_settled(pn_delivery_t *delivery)
{
return delivery ? delivery->remote_settled : false;
}
bool pn_delivery_updated(pn_delivery_t *delivery)
{
return delivery ? delivery->updated : false;
}
void pn_delivery_clear(pn_delivery_t *delivery)
{
delivery->updated = false;
pn_work_update(delivery->link->session->connection, delivery);
}
void pn_delivery_update(pn_delivery_t *delivery, pn_disposition_t disposition)
{
if (!delivery) return;
delivery->local_state = disposition;
pn_add_tpwork(delivery);
}
bool pn_delivery_writable(pn_delivery_t *delivery)
{
if (!delivery) return false;
pn_link_t *link = delivery->link;
return pn_link_is_sender(link) && pn_is_current(delivery) && pn_link_credit(link) > 0;
}
bool pn_delivery_readable(pn_delivery_t *delivery)
{
if (delivery) {
pn_link_t *link = delivery->link;
return pn_link_is_receiver(link) && pn_is_current(delivery);
} else {
return false;
}
}
size_t pn_delivery_pending(pn_delivery_t *delivery)
{
return pn_buffer_size(delivery->bytes);
}
bool pn_delivery_partial(pn_delivery_t *delivery)
{
return !delivery->done;
}