DISPATCH-2073: TCP adaptor end-to-end flow control
This closes #1156
diff --git a/include/qpid/dispatch/delivery_state.h b/include/qpid/dispatch/delivery_state.h
index ec73f8c..8c0b7a7 100644
--- a/include/qpid/dispatch/delivery_state.h
+++ b/include/qpid/dispatch/delivery_state.h
@@ -19,6 +19,8 @@
* under the License.
*/
+#include <proton/disposition.h>
+
#include <stdbool.h>
#include <inttypes.h>
@@ -61,5 +63,15 @@
// dispose
void qd_delivery_state_free(qd_delivery_state_t *ds);
+
+// true if the state is final (an outcome). Once a terminal state
+// is reached no further changes are allowed.
+//
+static inline bool qd_delivery_state_is_terminal(uint64_t type)
+{
+ return ((PN_ACCEPTED <= type && type <= PN_MODIFIED) ||
+ type == 0x0033 /* See section 4.5.5 Declared */);
+}
+
#endif
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index dda7665..654fe74 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -908,13 +908,6 @@
*/
void qdr_link_set_drained(qdr_core_t *core, qdr_link_t *link);
-
-/**
- * Set the remote delivery state for dlv. Ownership of dstate is transferred to
- * the dlv and true is returned. Caller must free dstate if false is returned.
- */
-bool qdr_delivery_set_remote_delivery_state(qdr_delivery_t *dlv, qd_delivery_state_t *dstate);
-
/**
* Extract the disposition and delivery state data that is to be sent to the
* remote endpoint via the delivery. Caller takes ownership of the returned
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 9f34403..ddd7409 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -34,6 +34,19 @@
#include <inttypes.h>
#include <stdio.h>
+// maximum amount of bytes to read from TCP client before backpressure
+// activates. Note that the actual number of read bytes may exceed this value
+// by READ_BUFFERS * BUFFER_SIZE since we fetch up to READ_BUFFERs worth of
+// buffers when calling pn_raw_connection_take_read_buffers()
+//
+// Assumptions: 1 Gbps, 1msec latency across a router, 3 hop router path
+// Effective GigEthernet throughput is 116MB/sec, or ~121635 bytes/msec.
+// For 3 hops routers with ~1msec latency gives a 6msec round trip
+// time. Ideally the window would be 1/2 full at most before the ACK
+// arrives:
+//
+const uint32_t TCP_MAX_CAPACITY = 121635 * 6 * 2;
+
ALLOC_DEFINE(qd_tcp_listener_t);
ALLOC_DEFINE(qd_tcp_connector_t);
@@ -71,8 +84,9 @@
qd_server_t *server;
char *remote_address;
char *global_id;
- uint64_t bytes_in;
- uint64_t bytes_out;
+ uint64_t bytes_in; // read from raw conn
+ uint64_t bytes_out; // written to raw conn
+ uint64_t bytes_unacked; // not yet acked by outgoing tcp adaptor
uint64_t opened_time;
uint64_t last_in_time;
uint64_t last_out_time;
@@ -211,10 +225,15 @@
static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t *buffers)
{
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+
size_t n;
int count = 0;
int free_count = 0;
- while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
+ const bool was_open = conn->bytes_unacked < TCP_MAX_CAPACITY;
+
+ while ((count + conn->bytes_unacked < TCP_MAX_CAPACITY)
+ && (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
+
for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
qd_buffer_insert(buf, raw_buffers[i].size);
@@ -233,7 +252,15 @@
if (count > 0) {
// account for any incoming bytes just read
conn->last_in_time = tcp_adaptor->core->uptime_ticks;
- conn->bytes_in += count;
+ conn->bytes_in += count;
+ conn->bytes_unacked += count;
+ if (conn->bytes_unacked >= TCP_MAX_CAPACITY) {
+ if (was_open) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
+ "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" unacked=%"PRIu64,
+ conn->conn_id, conn->bytes_in, conn->bytes_unacked);
+ }
+ }
}
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
@@ -837,6 +864,24 @@
}
conn->last_out_time = tcp_adaptor->core->uptime_ticks;
conn->bytes_out += written;
+
+ if (written > 0) {
+ // Tell the upstream to open its receive window. Note: this update
+ // is sent to the upstream (ingress) TCP adaptor. Since this update
+ // is internal to the router network (never sent to the client) we
+ // do not need to use the section_number (no section numbers in a
+ // TCP stream!) and use section_offset only.
+ //
+ qd_delivery_state_t *dstate = qd_delivery_state();
+ dstate->section_number = 0;
+ dstate->section_offset = conn->bytes_out;
+ qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->outstream,
+ PN_RECEIVED,
+ false, // settled
+ dstate,
+ false);
+ }
+
qd_log(log, QD_LOG_DEBUG,
"[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN %s pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" bytes",
conn->conn_id, qdr_tcp_connection_role_name(conn), written, conn->bytes_out);
@@ -1350,6 +1395,9 @@
static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
{
+
+ // @TODO(kgiusti): determine why this is necessary to prevent window full stall:
+ qd_message_Q2_holdoff_disable(qdr_delivery_message(delivery));
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
@@ -1443,6 +1491,42 @@
DLV_ARGS(dlv));
pn_raw_connection_close(tc->pn_raw_conn);
}
+
+ if (disp == PN_RECEIVED) {
+ //
+ // the consumer of this TCP flow has updated its tx_sequence:
+ //
+ bool window_opened = false;
+ uint64_t ignore;
+ qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore);
+
+ if (!dstate) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+ "[C%"PRIu64"] BAD PN_RECEIVED - missing delivery-state!!", tc->conn_id);
+ } else {
+ // note: the PN_RECEIVED is generated by the remote TCP
+ // adaptor, for simplicity we ignore the section_number since
+ // all we really need is a byte offset:
+ //
+ const bool was_closed = tc->bytes_unacked >= TCP_MAX_CAPACITY;
+ tc->bytes_unacked = tc->bytes_in - dstate->section_offset;
+ window_opened = tc->bytes_unacked < TCP_MAX_CAPACITY;
+ if (was_closed && window_opened) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
+ "[C%"PRIu64"] TCP RX window OPEN: bytes in=%"PRIu64
+ " unacked=%"PRIu64" remote bytes out=%"PRIu64,
+ tc->conn_id, tc->bytes_in, tc->bytes_unacked,
+ dstate->section_offset);
+ }
+ }
+
+ qd_delivery_state_free(dstate);
+
+ if (window_opened) {
+ // now that the window has opened fetch any outstanding read data
+ handle_incoming(tc, "TCP RX window refresh");
+ }
+ }
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_delivery_update: no link context");
assert(false);
diff --git a/src/delivery_state.c b/src/delivery_state.c
index 4404aee..6b7e3d7 100644
--- a/src/delivery_state.c
+++ b/src/delivery_state.c
@@ -32,7 +32,6 @@
}
-
qd_delivery_state_t *qd_delivery_state_from_error(qdr_error_t *err)
{
if (err) {
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 127ba0a..7d60e42 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -616,6 +616,7 @@
//
// Adjust the delivery's identity
//
+
if (initial_delivery) {
initial_delivery->conn_id = link->conn->identity;
initial_delivery->link_id = link->identity;
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 3aa8cf3..fdecf92 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -147,6 +147,7 @@
dlv->delivery_id = next_delivery_id();
dlv->link_id = endpoint->link->identity;
dlv->conn_id = endpoint->link->conn_id;
+ dlv->dispo_lock = sys_mutex();
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdrc_endpoint_delivery_CT", DLV_ARGS(dlv));
return dlv;
}
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 8de73e2..a405219 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -29,7 +29,9 @@
static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
static void qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
- qdr_delivery_t *peer, uint64_t new_disp, bool settled);
+ qdr_delivery_t *peer, bool settled);
+static bool qdr_delivery_set_remote_delivery_state_CT(qdr_delivery_t *dlv, uint64_t dispo,
+ qd_delivery_state_t *dstate);
void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
@@ -189,11 +191,7 @@
action->args.delivery.delivery = delivery;
action->args.delivery.disposition = disposition;
action->args.delivery.settled = settled;
-
- // handle delivery-state extensions e.g. declared, transactional-state
- if (!qdr_delivery_set_remote_delivery_state(delivery, dstate)) {
- qd_delivery_state_free(dstate);
- }
+ action->args.delivery.dstate = dstate;
//
// The delivery's ref_count must be incremented to protect its travels into the
@@ -485,6 +483,7 @@
qd_bitmask_free(delivery->link_exclusion);
qd_delivery_state_free(delivery->local_state);
qd_delivery_state_free(delivery->remote_state);
+ sys_mutex_free(delivery->dispo_lock);
free_qdr_delivery_t(delivery);
}
@@ -651,23 +650,26 @@
//
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
+ qdr_delivery_t *dlv = action->args.delivery.delivery;
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
+ uint64_t new_disp = action->args.delivery.disposition;
+ bool settled = action->args.delivery.settled;
+ qd_delivery_state_t *dstate = action->args.delivery.dstate;
+
if (discard) {
qdr_delivery_decref_CT(core, action->args.delivery.delivery,
"qdr_update_delivery_CT - remove from action on discard");
+ qd_delivery_state_free(dstate);
return;
}
- qdr_delivery_t *dlv = action->args.delivery.delivery;
- qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
- uint64_t new_disp = action->args.delivery.disposition;
- bool settled = action->args.delivery.settled;
-
if (dlv->multicast) {
//
// remote state change for *inbound* multicast delivery,
// update downstream *outbound* peers
//
qdr_delivery_mcast_inbound_update_CT(core, dlv, new_disp, settled);
+ qd_delivery_state_free(dstate); // kgiusti(TODO): handle propagation!
} else if (peer && peer->multicast) {
//
@@ -676,12 +678,14 @@
//
// coverity[swapped_arguments]
qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, new_disp, settled);
+ qd_delivery_state_free(dstate); // kgiusti(TODO): handle propagation!
} else {
//
- // Anycast forwarding - note: peer _may_ be freed by this call
+ // Anycast forwarding - note: peer _may_ be freed after this!
//
- qdr_delivery_anycast_update_CT(core, dlv, peer, new_disp, settled);
+ qdr_delivery_set_remote_delivery_state_CT(dlv, new_disp, dstate);
+ qdr_delivery_anycast_update_CT(core, dlv, peer, settled);
}
//
@@ -697,7 +701,7 @@
// returns true if ownership of error parameter is taken (do not free it)
//
static void qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
- qdr_delivery_t *peer, uint64_t new_disp, bool settled)
+ qdr_delivery_t *peer, bool settled)
{
bool push = false;
bool peer_moved = false;
@@ -713,24 +717,15 @@
//
// Non-multicast Logic:
//
- // If disposition has changed and there is a peer link, set the disposition
- // of the peer
+ // If the disposition or extended delivery-state has changed and there is a
+ // peer link, set the disposition/delivery-state of the peer
// If remote settled, the delivery must be unlinked and freed.
// If remote settled and there is a peer, the peer shall be settled and
// unlinked. It shall not be freed until the connection-side thread
// settles the PN delivery.
//
- if (new_disp != dlv->remote_disposition || new_disp == PN_RECEIVED) {
- //
- // Remote disposition has changed, propagate the change to the peer
- // delivery local disposition.
- //
- dlv->remote_disposition = new_disp;
- if (peer) {
- peer->disposition = new_disp;
- push = true;
- qdr_delivery_move_delivery_state_CT(dlv, peer);
- }
+ if (peer) {
+ push = qdr_delivery_move_delivery_state_CT(dlv, peer);
}
if (settled) {
@@ -923,11 +918,6 @@
}
-// true if delivery state d is a terminal state as defined by AMQP 1.0
-//
-#define IS_TERMINAL(d) (PN_ACCEPTED <= (d) && (d) <= PN_MODIFIED)
-
-
// an outbound mcast delivery has changed its remote state (disposition)
// propagate the change back "upstream" to the inbound delivery
//
@@ -967,7 +957,7 @@
out_dlv->remote_disposition = new_disp;
- if (IS_TERMINAL(new_disp)) {
+ if (qd_delivery_state_is_terminal(new_disp)) {
// our mcast impl ignores non-terminal outcomes
qd_log(core->log, QD_LOG_TRACE,
@@ -988,7 +978,7 @@
//
qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
while (peer) {
- if (!IS_TERMINAL(peer->remote_disposition)) {
+ if (!qd_delivery_state_is_terminal(peer->remote_disposition)) {
break;
}
peer = qdr_delivery_next_peer_CT(in_dlv);
@@ -1216,27 +1206,38 @@
}
-// Set remote delivery state. Ownership of *remote_state is passed to the delivery.
-// Called on I/O thread that reads the delivery state from proton.
-bool qdr_delivery_set_remote_delivery_state(qdr_delivery_t *dlv, qd_delivery_state_t *remote_state)
+// Set remote delivery state when proton indicates that the remote has updated
+// delivery. Ownership of *remote_state is passed to the delivery.
+//
+static bool qdr_delivery_set_remote_delivery_state_CT(qdr_delivery_t *dlv, uint64_t remote_dispo, qd_delivery_state_t *remote_state)
{
- // once set the I/O thread cannot overwrite this until the core has forwarded it
- if (!dlv->remote_state) {
- dlv->remote_state = remote_state;
- return true;
+ // old state, has not been transfered to peer?
+ if (dlv->remote_state) {
+ qd_delivery_state_free(dlv->remote_state);
}
- return false;
+ dlv->remote_state = remote_state;
+ dlv->remote_disposition = remote_dispo;
+ return true;
}
-// Take local delivery state from the delivery. Caller assumes ownership of state object.
-// Called on the I/O thread that writes the delivery state to proton
+// Called on the I/O thread: take local delivery state from the delivery for writing to proton.
+// Caller assumes ownership of state object.
//
qd_delivery_state_t *qdr_delivery_take_local_delivery_state(qdr_delivery_t *dlv, uint64_t *dispo)
{
+ sys_mutex_lock(dlv->dispo_lock);
+
qd_delivery_state_t *dstate = dlv->local_state;
dlv->local_state = 0;
- if (dispo) *dispo = dlv->disposition;
+ *dispo = dlv->disposition;
+ // if the disposition is not terminal, clear the state to allow another
+ // disposition update. Terminal dispositions are final and never update.
+ if (!qd_delivery_state_is_terminal(dlv->disposition))
+ dlv->disposition = 0;
+
+ sys_mutex_unlock(dlv->dispo_lock);
+
return dstate;
}
@@ -1245,12 +1246,29 @@
// of its peer delivery. This causes the delivery state data to propagate
// from one delivery to another.
//
-void qdr_delivery_move_delivery_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer)
+// returns true if the state has changed and peer needs to be written to
+// proton.
+//
+bool qdr_delivery_move_delivery_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer)
{
- // if state is already present do not overwrite it as the outgoing
- // I/O thread may be in the process of writing it to proton
- if (!peer->local_state) {
- peer->local_state = dlv->remote_state;
- dlv->remote_state = 0;
+ qd_delivery_state_t *dstate = dlv->remote_state;
+ uint64_t dispo = dlv->remote_disposition;
+ dlv->remote_state = 0;
+
+ if (dispo) {
+ // must lock peer when modifying local state since I/O thread may be reading
+ // it at the same time
+ sys_mutex_lock(peer->dispo_lock);
+
+ peer->disposition = dispo;
+ if (peer->local_state) {
+ // old state not consumed by I/O thread?
+ qd_delivery_state_free(peer->local_state);
+ }
+ peer->local_state = dstate;
+
+ sys_mutex_unlock(peer->dispo_lock);
}
+
+ return !!dispo;
}
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 9f79af3..f0bdd2b 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -42,6 +42,7 @@
qd_message_t *msg;
qd_iterator_t *to_addr;
qd_iterator_t *origin;
+ sys_mutex_t *dispo_lock; ///< lock disposition and local_state fields
uint64_t disposition; ///< local disposition, will be pushed to remote endpoint
uint64_t remote_disposition; ///< disposition as set by remote endpoint
uint64_t mcast_disposition; ///< temporary terminal disposition while multicast fwding
@@ -108,8 +109,7 @@
bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label);
-
-void qdr_delivery_move_delivery_state_CT(qdr_delivery_t *from, qdr_delivery_t *to);
+bool qdr_delivery_move_delivery_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer);
//
// I/O thread only functions
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 71acb5b..7354e74 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -154,6 +154,7 @@
out_dlv->delivery_id = next_delivery_id();
out_dlv->link_id = out_link->identity;
out_dlv->conn_id = out_link->conn_id;
+ out_dlv->dispo_lock = sys_mutex();
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_forward_new_delivery_CT", DLV_ARGS(out_dlv));
if (in_dlv) {
@@ -162,7 +163,6 @@
out_dlv->ingress_index = in_dlv->ingress_index;
if (in_dlv->remote_disposition) {
// propagate disposition state from remote to peer
- out_dlv->disposition = in_dlv->remote_disposition;
qdr_delivery_move_delivery_state_CT(in_dlv, out_dlv);
}
} else {
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 4852134..e33941f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -144,13 +144,14 @@
// Arguments for delivery state updates
//
struct {
- qdr_delivery_t *delivery;
- uint64_t disposition;
- uint8_t tag[32];
- int tag_length;
- bool settled;
- bool presettled; // true if remote settles while msg is in flight
- bool more; // true if there are more frames arriving, false otherwise
+ qdr_delivery_t *delivery;
+ qd_delivery_state_t *dstate;
+ uint64_t disposition;
+ uint8_t tag[32];
+ int tag_length;
+ bool settled;
+ bool presettled; // true if remote settles while msg is in flight
+ bool more; // true if there are more frames arriving, false otherwise
} delivery;
//
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 917276c..cb22e55 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -59,6 +59,7 @@
dlv->delivery_id = next_delivery_id();
dlv->link_id = link->identity;
dlv->conn_id = link->conn_id;
+ dlv->dispo_lock = sys_mutex();
qd_log(link->core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_link_deliver", DLV_ARGS(dlv));
qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list");
@@ -94,6 +95,7 @@
dlv->delivery_id = next_delivery_id();
dlv->link_id = link->identity;
dlv->conn_id = link->conn_id;
+ dlv->dispo_lock = sys_mutex();
qd_log(link->core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_link_deliver_to", DLV_ARGS(dlv));
qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list");
@@ -124,6 +126,7 @@
dlv->delivery_id = next_delivery_id();
dlv->link_id = link->identity;
dlv->conn_id = link->conn_id;
+ dlv->dispo_lock = sys_mutex();
qd_log(link->core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_link_deliver_to_routed_link", DLV_ARGS(dlv));
qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
diff --git a/src/router_node.c b/src/router_node.c
index 3e17695..bac3729 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -133,63 +133,66 @@
{
qd_delivery_state_t *dstate = 0;
uint64_t outcome = pn_delivery_remote_state(pnd);
- if (pnd) {
- switch (outcome) {
- case PN_RECEIVED: {
- pn_disposition_t *disp = pn_delivery_remote(pnd);
- dstate = qd_delivery_state();
- dstate->section_number = pn_disposition_get_section_number(disp);
- dstate->section_offset = pn_disposition_get_section_offset(disp);
- break;
- }
- case PN_ACCEPTED:
- case PN_RELEASED:
- // no associated state (that we care about)
- break;
- case PN_REJECTED: {
- // See AMQP 1.0 section 3.4.4 Rejected
- pn_condition_t *cond = pn_disposition_condition(pn_delivery_remote(pnd));
- dstate = qd_delivery_state();
- dstate->error = qdr_error_from_pn(cond);
- break;
- }
- case PN_MODIFIED: {
- // See AMQP 1.0 section 3.4.5 Modified
- pn_disposition_t *disp = pn_delivery_remote(pnd);
- bool failed = pn_disposition_is_failed(disp);
- bool undeliverable = pn_disposition_is_undeliverable(disp);
- pn_data_t *anno = pn_disposition_annotations(disp);
- // avoid expensive alloc if only default values found
- const bool need_anno = (anno && pn_data_size(anno) > 0);
- if (failed || undeliverable || need_anno) {
- dstate = qd_delivery_state();
- dstate->delivery_failed = failed;
- dstate->undeliverable_here = undeliverable;
- if (need_anno) {
- dstate->annotations = pn_data(0);
- pn_data_copy(dstate->annotations, anno);
- }
- }
- break;
- }
- default: {
- // Check for custom state data. Custom outcomes and AMQP 1.0
- // Transaction defined outcomes will all be numerically >
- // PN_MODIFIED. See Part 4: Transactions and section 1.5 Descriptor
- // Values in the AMQP 1.0 spec.
- if (outcome > PN_MODIFIED) {
- pn_data_t *data = pn_disposition_data(pn_delivery_remote(pnd));
- if (data && pn_data_size(data) > 0) {
- dstate = qd_delivery_state();
- dstate->extension = pn_data(0);
- pn_data_copy(dstate->extension, data);
- }
- }
+ switch (outcome) {
+ case 0:
+ // not set - no delivery-state
break;
- }
- } // end switch
+ case PN_RECEIVED: {
+ pn_disposition_t *disp = pn_delivery_remote(pnd);
+ dstate = qd_delivery_state();
+ dstate->section_number = pn_disposition_get_section_number(disp);
+ dstate->section_offset = pn_disposition_get_section_offset(disp);
+ break;
}
+ case PN_ACCEPTED:
+ case PN_RELEASED:
+ // no associated state (that we care about)
+ break;
+ case PN_REJECTED: {
+ // See AMQP 1.0 section 3.4.4 Rejected
+ pn_condition_t *cond = pn_disposition_condition(pn_delivery_remote(pnd));
+ dstate = qd_delivery_state();
+ dstate->error = qdr_error_from_pn(cond);
+ break;
+ }
+ case PN_MODIFIED: {
+ // See AMQP 1.0 section 3.4.5 Modified
+ pn_disposition_t *disp = pn_delivery_remote(pnd);
+ bool failed = pn_disposition_is_failed(disp);
+ bool undeliverable = pn_disposition_is_undeliverable(disp);
+ pn_data_t *anno = pn_disposition_annotations(disp);
+
+ // avoid expensive alloc if only default values found
+ const bool need_anno = (anno && pn_data_size(anno) > 0);
+ if (failed || undeliverable || need_anno) {
+ dstate = qd_delivery_state();
+ dstate->delivery_failed = failed;
+ dstate->undeliverable_here = undeliverable;
+ if (need_anno) {
+ dstate->annotations = pn_data(0);
+ pn_data_copy(dstate->annotations, anno);
+ }
+ }
+ break;
+ }
+ default: {
+ // Check for custom state data. Custom outcomes and AMQP 1.0
+ // Transaction defined outcomes will all be numerically >
+ // PN_MODIFIED. See Part 4: Transactions and section 1.5 Descriptor
+ // Values in the AMQP 1.0 spec.
+ if (outcome > PN_MODIFIED) {
+ pn_data_t *data = pn_disposition_data(pn_delivery_remote(pnd));
+ if (data && pn_data_size(data) > 0) {
+ dstate = qd_delivery_state();
+ dstate->extension = pn_data(0);
+ pn_data_copy(dstate->extension, data);
+ }
+ }
+ break;
+ }
+ } // end switch
+
return dstate;
}
@@ -926,28 +929,38 @@
qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
//
- // It's important to not do any processing without a qdr_delivery. When pre-settled
- // multi-frame deliveries arrive, it's possible for the settlement to register before
- // the whole message arrives. Such premature settlement indications must be ignored.
- //
- if (!delivery || !qdr_delivery_receive_complete(delivery))
+ // It's important to not do any processing without a qdr_delivery.
+ if (!delivery)
return;
- //
- // Update the disposition of the delivery
- //
- qdr_delivery_remote_state_updated(router->router_core, delivery,
- pn_delivery_remote_state(pnd),
- pn_delivery_settled(pnd),
- qd_delivery_read_remote_state(pnd),
- false);
+ uint64_t dstate = pn_delivery_remote_state(pnd);
+ bool settled = pn_delivery_settled(pnd);
//
- // If settled, close out the delivery
+ // When pre-settled multi-frame deliveries arrive, it's possible for the
+ // settlement to register before the whole message arrives. Such premature
+ // settlement indications must be ignored.
//
- if (pn_delivery_settled(pnd)) {
- qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
- pn_delivery_settle(pnd);
+ if (settled && !qdr_delivery_receive_complete(delivery))
+ settled = false;
+
+ if (dstate || settled) {
+ //
+ // Update the disposition of the delivery
+ //
+ qdr_delivery_remote_state_updated(router->router_core, delivery,
+ dstate,
+ settled,
+ qd_delivery_read_remote_state(pnd),
+ false);
+
+ //
+ // If settled, close out the delivery
+ //
+ if (settled) {
+ qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+ pn_delivery_settle(pnd);
+ }
}
}
@@ -1942,12 +1955,12 @@
// handle any delivery-state on the transfer e.g. transactional-state
qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &disposition);
- if (dstate) {
- qd_delivery_write_local_state(pdlv, disposition, dstate);
- qd_delivery_state_free(dstate);
- }
- if (disposition)
+ if (disposition) {
+ if (dstate)
+ qd_delivery_write_local_state(pdlv, disposition, dstate);
pn_delivery_update(pdlv, disposition);
+ }
+ qd_delivery_state_free(dstate);
//
// If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
@@ -2040,7 +2053,7 @@
link = (qd_link_t*) qdr_link_get_context(qlink);
if (link) {
qd_conn = qd_link_connection(link);
- if (qd_conn == 0)
+ if (qd_conn == 0)
return;
}
else
@@ -2049,33 +2062,19 @@
else
return;
- //
- // DISPATCH-2040: For link routed links, it does not matter if the passed in disp matches the pn_delivery_remote_state(pnd), we will still
- // call qd_delivery_write_local_state and send out the disposition if the delivery is not already settled.
- //
- // For non link routed links, if the disposition has changed and the proton delivery has not already been settled, update the proton delivery.
- //
- if ((qdr_link_is_routed(qlink) || disp != pn_delivery_remote_state(pnd)) && !pn_delivery_settled(pnd)) {
- qd_message_t *msg = qdr_delivery_message(dlv);
-
- // handle propagation of delivery state from qdr_delivery_t to proton:
- uint64_t ignore = 0; // expect same value as 'disp'
+ if (disp && !pn_delivery_settled(pnd)) {
+ uint64_t ignore = 0;
qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore);
- assert(ignore == disp);
- qd_delivery_write_local_state(pnd, disp, dstate);
- qd_delivery_state_free(dstate);
+ assert(ignore == disp); // expected: since both are from the same dlv
- if (disp == PN_MODIFIED)
- pn_disposition_set_failed(pn_delivery_local(pnd), true);
-
- //
- // If the delivery is still arriving, don't push out the disposition change yet.
- //
- assert(qdr_delivery_disposition(dlv) == disp) ;
- if (qd_message_receive_complete(msg)) {
- if (disp != pn_delivery_local_state(pnd)) {
- pn_delivery_update(pnd, disp);
- }
+ // update if the disposition has changed or there is new state associated with it
+ if (disp != pn_delivery_local_state(pnd) || dstate) {
+ // handle propagation of delivery state from qdr_delivery_t to proton:
+ qd_delivery_write_local_state(pnd, disp, dstate);
+ pn_delivery_update(pnd, disp);
+ qd_delivery_state_free(dstate);
+ if (disp == PN_MODIFIED) // @TODO(kgiusti) why do we need this???
+ pn_disposition_set_failed(pn_delivery_local(pnd), true);
}
}
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index d5d0bac..b2f1aad 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -838,6 +838,8 @@
# Q2 holdoff
@SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
def test_60_q2_holdoff(self):
+ # for now, Q2 is disabled to avoid stalling TCP backpressure
+ self.skipTest("Q2 is disabled on TCP adaptor")
name = "test_60_q2_holdoff"
self.logger.log("TCP_TEST Start %s" % name)