DISPATCH-2073: TCP adaptor end-to-end flow control

This closes #1156
diff --git a/include/qpid/dispatch/delivery_state.h b/include/qpid/dispatch/delivery_state.h
index ec73f8c..8c0b7a7 100644
--- a/include/qpid/dispatch/delivery_state.h
+++ b/include/qpid/dispatch/delivery_state.h
@@ -19,6 +19,8 @@
  * under the License.
  */
 
+#include <proton/disposition.h>
+
 #include <stdbool.h>
 #include <inttypes.h>
 
@@ -61,5 +63,15 @@
 // dispose
 void qd_delivery_state_free(qd_delivery_state_t *ds);
 
+
+// true if the state is final (an outcome). Once a terminal state
+// is reached no further changes are allowed.
+//
+static inline bool qd_delivery_state_is_terminal(uint64_t type)
+{
+    return ((PN_ACCEPTED <= type && type <= PN_MODIFIED) ||
+            type == 0x0033 /* See section 4.5.5 Declared */);
+}
+
 #endif
 
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index dda7665..654fe74 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -908,13 +908,6 @@
  */
 void qdr_link_set_drained(qdr_core_t *core, qdr_link_t *link);
 
-
-/**
- * Set the remote delivery state for dlv. Ownership of dstate is transferred to
- * the dlv and true is returned. Caller must free dstate if false is returned.
- */
-bool qdr_delivery_set_remote_delivery_state(qdr_delivery_t *dlv, qd_delivery_state_t *dstate);
-
 /**
  * Extract the disposition and delivery state data that is to be sent to the
  * remote endpoint via the delivery. Caller takes ownership of the returned
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 9f34403..ddd7409 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -34,6 +34,19 @@
 #include <inttypes.h>
 #include <stdio.h>
 
+// maximum amount of bytes to read from TCP client before backpressure
+// activates.  Note that the actual number of read bytes may exceed this value
+// by READ_BUFFERS * BUFFER_SIZE since we fetch up to READ_BUFFERs worth of
+// buffers when calling pn_raw_connection_take_read_buffers()
+//
+// Assumptions: 1 Gbps, 1msec latency across a router, 3 hop router path
+//   Effective GigEthernet throughput is 116MB/sec, or ~121635 bytes/msec.
+//   For 3 hops routers with ~1msec latency gives a 6msec round trip
+//   time. Ideally the window would be 1/2 full at most before the ACK
+//   arrives:
+//
+const uint32_t TCP_MAX_CAPACITY = 121635 * 6 * 2;
+
 ALLOC_DEFINE(qd_tcp_listener_t);
 ALLOC_DEFINE(qd_tcp_connector_t);
 
@@ -71,8 +84,9 @@
     qd_server_t          *server;
     char                 *remote_address;
     char                 *global_id;
-    uint64_t              bytes_in;
-    uint64_t              bytes_out;
+    uint64_t              bytes_in;       // read from raw conn
+    uint64_t              bytes_out;      // written to raw conn
+    uint64_t              bytes_unacked;  // not yet acked by outgoing tcp adaptor
     uint64_t              opened_time;
     uint64_t              last_in_time;
     uint64_t              last_out_time;
@@ -211,10 +225,15 @@
 static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t *buffers)
 {
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
+
     size_t n;
     int count = 0;
     int free_count = 0;
-    while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
+    const bool was_open = conn->bytes_unacked < TCP_MAX_CAPACITY;
+
+    while ((count + conn->bytes_unacked < TCP_MAX_CAPACITY)
+           && (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
+
         for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
             qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
             qd_buffer_insert(buf, raw_buffers[i].size);
@@ -233,7 +252,15 @@
     if (count > 0) {
         // account for any incoming bytes just read
         conn->last_in_time = tcp_adaptor->core->uptime_ticks;
-        conn->bytes_in += count;
+        conn->bytes_in      += count;
+        conn->bytes_unacked += count;
+        if (conn->bytes_unacked >= TCP_MAX_CAPACITY) {
+            if (was_open) {
+                qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
+                       "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" unacked=%"PRIu64,
+                       conn->conn_id, conn->bytes_in, conn->bytes_unacked);
+            }
+        }
     }
 
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
@@ -837,6 +864,24 @@
         }
         conn->last_out_time = tcp_adaptor->core->uptime_ticks;
         conn->bytes_out += written;
+
+        if (written > 0) {
+            // Tell the upstream to open its receive window.  Note: this update
+            // is sent to the upstream (ingress) TCP adaptor. Since this update
+            // is internal to the router network (never sent to the client) we
+            // do not need to use the section_number (no section numbers in a
+            // TCP stream!) and use section_offset only.
+            //
+            qd_delivery_state_t *dstate = qd_delivery_state();
+            dstate->section_number = 0;
+            dstate->section_offset = conn->bytes_out;
+            qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->outstream,
+                                              PN_RECEIVED,
+                                              false,  // settled
+                                              dstate,
+                                              false);
+        }
+
         qd_log(log, QD_LOG_DEBUG,
                "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN %s pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" bytes",
                conn->conn_id, qdr_tcp_connection_role_name(conn), written, conn->bytes_out);
@@ -1350,6 +1395,9 @@
 
 static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
 {
+
+    // @TODO(kgiusti): determine why this is necessary to prevent window full stall:
+    qd_message_Q2_holdoff_disable(qdr_delivery_message(delivery));
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
@@ -1443,6 +1491,42 @@
                    DLV_ARGS(dlv));
             pn_raw_connection_close(tc->pn_raw_conn);
         }
+
+        if (disp == PN_RECEIVED) {
+            //
+            // the consumer of this TCP flow has updated its tx_sequence:
+            //
+            bool window_opened = false;
+            uint64_t ignore;
+            qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore);
+
+            if (!dstate) {
+                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+                       "[C%"PRIu64"] BAD PN_RECEIVED - missing delivery-state!!", tc->conn_id);
+            } else {
+                // note: the PN_RECEIVED is generated by the remote TCP
+                // adaptor, for simplicity we ignore the section_number since
+                // all we really need is a byte offset:
+                //
+                const bool was_closed = tc->bytes_unacked >= TCP_MAX_CAPACITY;
+                tc->bytes_unacked = tc->bytes_in - dstate->section_offset;
+                window_opened = tc->bytes_unacked < TCP_MAX_CAPACITY;
+                if (was_closed && window_opened) {
+                    qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
+                           "[C%"PRIu64"] TCP RX window OPEN: bytes in=%"PRIu64
+                           " unacked=%"PRIu64" remote bytes out=%"PRIu64,
+                           tc->conn_id, tc->bytes_in, tc->bytes_unacked,
+                           dstate->section_offset);
+                }
+            }
+
+            qd_delivery_state_free(dstate);
+
+            if (window_opened) {
+                // now that the window has opened fetch any outstanding read data
+                handle_incoming(tc, "TCP RX window refresh");
+            }
+        }
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_delivery_update: no link context");
         assert(false);
diff --git a/src/delivery_state.c b/src/delivery_state.c
index 4404aee..6b7e3d7 100644
--- a/src/delivery_state.c
+++ b/src/delivery_state.c
@@ -32,7 +32,6 @@
 }
 
 
-
 qd_delivery_state_t *qd_delivery_state_from_error(qdr_error_t *err)
 {
     if (err) {
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 127ba0a..7d60e42 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -616,6 +616,7 @@
     //
     // Adjust the delivery's identity
     //
+
     if (initial_delivery) {
         initial_delivery->conn_id = link->conn->identity;
         initial_delivery->link_id = link->identity;
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 3aa8cf3..fdecf92 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -147,6 +147,7 @@
     dlv->delivery_id = next_delivery_id();
     dlv->link_id     = endpoint->link->identity;
     dlv->conn_id     = endpoint->link->conn_id;
+    dlv->dispo_lock  = sys_mutex();
     qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdrc_endpoint_delivery_CT", DLV_ARGS(dlv));
     return dlv;
 }
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 8de73e2..a405219 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -29,7 +29,9 @@
 static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 static void qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
-                                           qdr_delivery_t *peer, uint64_t new_disp, bool settled);
+                                           qdr_delivery_t *peer, bool settled);
+static bool qdr_delivery_set_remote_delivery_state_CT(qdr_delivery_t *dlv, uint64_t dispo,
+                                                      qd_delivery_state_t *dstate);
 
 
 void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
@@ -189,11 +191,7 @@
     action->args.delivery.delivery    = delivery;
     action->args.delivery.disposition = disposition;
     action->args.delivery.settled     = settled;
-
-    // handle delivery-state extensions e.g. declared, transactional-state
-    if (!qdr_delivery_set_remote_delivery_state(delivery, dstate)) {
-        qd_delivery_state_free(dstate);
-    }
+    action->args.delivery.dstate      = dstate;
 
     //
     // The delivery's ref_count must be incremented to protect its travels into the
@@ -485,6 +483,7 @@
     qd_bitmask_free(delivery->link_exclusion);
     qd_delivery_state_free(delivery->local_state);
     qd_delivery_state_free(delivery->remote_state);
+    sys_mutex_free(delivery->dispo_lock);
 
     free_qdr_delivery_t(delivery);
 }
@@ -651,23 +650,26 @@
 //
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
+    qdr_delivery_t      *dlv      = action->args.delivery.delivery;
+    qdr_delivery_t      *peer     = qdr_delivery_first_peer_CT(dlv);
+    uint64_t             new_disp = action->args.delivery.disposition;
+    bool                 settled  = action->args.delivery.settled;
+    qd_delivery_state_t *dstate   = action->args.delivery.dstate;
+
     if (discard) {
         qdr_delivery_decref_CT(core, action->args.delivery.delivery,
                                "qdr_update_delivery_CT - remove from action on discard");
+        qd_delivery_state_free(dstate);
         return;
     }
 
-    qdr_delivery_t *dlv      = action->args.delivery.delivery;
-    qdr_delivery_t *peer     = qdr_delivery_first_peer_CT(dlv);
-    uint64_t        new_disp = action->args.delivery.disposition;
-    bool            settled  = action->args.delivery.settled;
-
     if (dlv->multicast) {
         //
         // remote state change for *inbound* multicast delivery,
         // update downstream *outbound* peers
         //
         qdr_delivery_mcast_inbound_update_CT(core, dlv, new_disp, settled);
+        qd_delivery_state_free(dstate);  // kgiusti(TODO): handle propagation!
 
     } else if (peer && peer->multicast) {
         //
@@ -676,12 +678,14 @@
         //
         // coverity[swapped_arguments]
         qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, new_disp, settled);
+        qd_delivery_state_free(dstate);  // kgiusti(TODO): handle propagation!
 
     } else {
         //
-        // Anycast forwarding - note: peer _may_ be freed by this call
+        // Anycast forwarding - note: peer _may_ be freed after this!
         //
-        qdr_delivery_anycast_update_CT(core, dlv, peer, new_disp, settled);
+        qdr_delivery_set_remote_delivery_state_CT(dlv, new_disp, dstate);
+        qdr_delivery_anycast_update_CT(core, dlv, peer, settled);
     }
 
     //
@@ -697,7 +701,7 @@
 // returns true if ownership of error parameter is taken (do not free it)
 //
 static void qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
-                                           qdr_delivery_t *peer, uint64_t new_disp, bool settled)
+                                           qdr_delivery_t *peer, bool settled)
 {
     bool push           = false;
     bool peer_moved     = false;
@@ -713,24 +717,15 @@
     //
     // Non-multicast Logic:
     //
-    // If disposition has changed and there is a peer link, set the disposition
-    // of the peer
+    // If the disposition or extended delivery-state has changed and there is a
+    // peer link, set the disposition/delivery-state of the peer
     // If remote settled, the delivery must be unlinked and freed.
     // If remote settled and there is a peer, the peer shall be settled and
     // unlinked.  It shall not be freed until the connection-side thread
     // settles the PN delivery.
     //
-    if (new_disp != dlv->remote_disposition || new_disp == PN_RECEIVED) {
-        //
-        // Remote disposition has changed, propagate the change to the peer
-        // delivery local disposition.
-        //
-        dlv->remote_disposition = new_disp;
-        if (peer) {
-            peer->disposition = new_disp;
-            push = true;
-            qdr_delivery_move_delivery_state_CT(dlv, peer);
-        }
+    if (peer) {
+        push = qdr_delivery_move_delivery_state_CT(dlv, peer);
     }
 
     if (settled) {
@@ -923,11 +918,6 @@
 }
 
 
-// true if delivery state d is a terminal state as defined by AMQP 1.0
-//
-#define IS_TERMINAL(d) (PN_ACCEPTED <= (d) && (d) <= PN_MODIFIED)
-
-
 // an outbound mcast delivery has changed its remote state (disposition)
 // propagate the change back "upstream" to the inbound delivery
 //
@@ -967,7 +957,7 @@
 
     out_dlv->remote_disposition = new_disp;
 
-    if (IS_TERMINAL(new_disp)) {
+    if (qd_delivery_state_is_terminal(new_disp)) {
         // our mcast impl ignores non-terminal outcomes
 
         qd_log(core->log, QD_LOG_TRACE,
@@ -988,7 +978,7 @@
         //
         qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
         while (peer) {
-            if (!IS_TERMINAL(peer->remote_disposition)) {
+            if (!qd_delivery_state_is_terminal(peer->remote_disposition)) {
                 break;
             }
             peer = qdr_delivery_next_peer_CT(in_dlv);
@@ -1216,27 +1206,38 @@
 }
 
 
-// Set remote delivery state. Ownership of *remote_state is passed to the delivery.
-// Called on I/O thread that reads the delivery state from proton.
-bool qdr_delivery_set_remote_delivery_state(qdr_delivery_t *dlv, qd_delivery_state_t *remote_state)
+// Set remote delivery state when proton indicates that the remote has updated
+// delivery. Ownership of *remote_state is passed to the delivery.
+//
+static bool qdr_delivery_set_remote_delivery_state_CT(qdr_delivery_t *dlv, uint64_t remote_dispo, qd_delivery_state_t *remote_state)
 {
-    // once set the I/O thread cannot overwrite this until the core has forwarded it
-    if (!dlv->remote_state) {
-        dlv->remote_state = remote_state;
-        return true;
+    // old state, has not been transfered to peer?
+    if (dlv->remote_state) {
+        qd_delivery_state_free(dlv->remote_state);
     }
-    return false;
+    dlv->remote_state = remote_state;
+    dlv->remote_disposition = remote_dispo;
+    return true;
 }
 
 
-// Take local delivery state from the delivery.  Caller assumes ownership of state object.
-// Called on the I/O thread that writes the delivery state to proton
+// Called on the I/O thread: take local delivery state from the delivery for writing to proton.
+// Caller assumes ownership of state object.
 //
 qd_delivery_state_t *qdr_delivery_take_local_delivery_state(qdr_delivery_t *dlv, uint64_t *dispo)
 {
+    sys_mutex_lock(dlv->dispo_lock);
+
     qd_delivery_state_t *dstate = dlv->local_state;
     dlv->local_state = 0;
-    if (dispo) *dispo = dlv->disposition;
+    *dispo = dlv->disposition;
+    // if the disposition is not terminal, clear the state to allow another
+    // disposition update.  Terminal dispositions are final and never update.
+    if (!qd_delivery_state_is_terminal(dlv->disposition))
+        dlv->disposition = 0;
+
+    sys_mutex_unlock(dlv->dispo_lock);
+
     return dstate;
 }
 
@@ -1245,12 +1246,29 @@
 // of its peer delivery.  This causes the delivery state data to propagate
 // from one delivery to another.
 //
-void qdr_delivery_move_delivery_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer)
+// returns true if the state has changed and peer needs to be written to
+// proton.
+//
+bool qdr_delivery_move_delivery_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer)
 {
-    // if state is already present do not overwrite it as the outgoing
-    // I/O thread may be in the process of writing it to proton
-    if (!peer->local_state) {
-        peer->local_state = dlv->remote_state;
-        dlv->remote_state = 0;
+    qd_delivery_state_t *dstate = dlv->remote_state;
+    uint64_t dispo = dlv->remote_disposition;
+    dlv->remote_state = 0;
+
+    if (dispo) {
+        // must lock peer when modifying local state since I/O thread may be reading
+        // it at the same time
+        sys_mutex_lock(peer->dispo_lock);
+
+        peer->disposition = dispo;
+        if (peer->local_state) {
+            // old state not consumed by I/O thread?
+            qd_delivery_state_free(peer->local_state);
+        }
+        peer->local_state = dstate;
+
+        sys_mutex_unlock(peer->dispo_lock);
     }
+
+    return !!dispo;
 }
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 9f79af3..f0bdd2b 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -42,6 +42,7 @@
     qd_message_t           *msg;
     qd_iterator_t          *to_addr;
     qd_iterator_t          *origin;
+    sys_mutex_t            *dispo_lock;          ///< lock disposition and local_state fields
     uint64_t                disposition;         ///< local disposition, will be pushed to remote endpoint
     uint64_t                remote_disposition;  ///< disposition as set by remote endpoint
     uint64_t                mcast_disposition;   ///< temporary terminal disposition while multicast fwding
@@ -108,8 +109,7 @@
 bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
 
 void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label);
-
-void qdr_delivery_move_delivery_state_CT(qdr_delivery_t *from, qdr_delivery_t *to);
+bool qdr_delivery_move_delivery_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer);
 
 //
 // I/O thread only functions
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 71acb5b..7354e74 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -154,6 +154,7 @@
     out_dlv->delivery_id = next_delivery_id();
     out_dlv->link_id     = out_link->identity;
     out_dlv->conn_id     = out_link->conn_id;
+    out_dlv->dispo_lock  = sys_mutex();
     qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_forward_new_delivery_CT", DLV_ARGS(out_dlv));
 
     if (in_dlv) {
@@ -162,7 +163,6 @@
         out_dlv->ingress_index = in_dlv->ingress_index;
         if (in_dlv->remote_disposition) {
             // propagate disposition state from remote to peer
-            out_dlv->disposition = in_dlv->remote_disposition;
             qdr_delivery_move_delivery_state_CT(in_dlv, out_dlv);
         }
     } else {
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 4852134..e33941f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -144,13 +144,14 @@
         // Arguments for delivery state updates
         //
         struct {
-            qdr_delivery_t *delivery;
-            uint64_t        disposition;
-            uint8_t         tag[32];
-            int             tag_length;
-            bool            settled;
-            bool            presettled;  // true if remote settles while msg is in flight
-            bool            more;  // true if there are more frames arriving, false otherwise
+            qdr_delivery_t      *delivery;
+            qd_delivery_state_t *dstate;
+            uint64_t             disposition;
+            uint8_t              tag[32];
+            int                  tag_length;
+            bool                 settled;
+            bool                 presettled;  // true if remote settles while msg is in flight
+            bool                 more;  // true if there are more frames arriving, false otherwise
         } delivery;
 
         //
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 917276c..cb22e55 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -59,6 +59,7 @@
     dlv->delivery_id        = next_delivery_id();
     dlv->link_id            = link->identity;
     dlv->conn_id            = link->conn_id;
+    dlv->dispo_lock         = sys_mutex();
     qd_log(link->core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_link_deliver", DLV_ARGS(dlv));
 
     qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list");
@@ -94,6 +95,7 @@
     dlv->delivery_id        = next_delivery_id();
     dlv->link_id            = link->identity;
     dlv->conn_id            = link->conn_id;
+    dlv->dispo_lock         = sys_mutex();
     qd_log(link->core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_link_deliver_to", DLV_ARGS(dlv));
 
     qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list");
@@ -124,6 +126,7 @@
     dlv->delivery_id        = next_delivery_id();
     dlv->link_id            = link->identity;
     dlv->conn_id            = link->conn_id;
+    dlv->dispo_lock         = sys_mutex();
     qd_log(link->core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_link_deliver_to_routed_link", DLV_ARGS(dlv));
 
     qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
diff --git a/src/router_node.c b/src/router_node.c
index 3e17695..bac3729 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -133,63 +133,66 @@
 {
     qd_delivery_state_t *dstate = 0;
     uint64_t            outcome = pn_delivery_remote_state(pnd);
-    if (pnd) {
-        switch (outcome) {
-        case PN_RECEIVED: {
-            pn_disposition_t *disp = pn_delivery_remote(pnd);
-            dstate = qd_delivery_state();
-            dstate->section_number = pn_disposition_get_section_number(disp);
-            dstate->section_offset = pn_disposition_get_section_offset(disp);
-            break;
-        }
-        case PN_ACCEPTED:
-        case PN_RELEASED:
-            // no associated state (that we care about)
-            break;
-        case PN_REJECTED: {
-            // See AMQP 1.0 section 3.4.4 Rejected
-            pn_condition_t *cond = pn_disposition_condition(pn_delivery_remote(pnd));
-            dstate = qd_delivery_state();
-            dstate->error = qdr_error_from_pn(cond);
-            break;
-        }
-        case PN_MODIFIED: {
-            // See AMQP 1.0 section 3.4.5 Modified
-            pn_disposition_t *disp = pn_delivery_remote(pnd);
-            bool failed = pn_disposition_is_failed(disp);
-            bool undeliverable = pn_disposition_is_undeliverable(disp);
-            pn_data_t *anno = pn_disposition_annotations(disp);
 
-            // avoid expensive alloc if only default values found
-            const bool need_anno = (anno && pn_data_size(anno) > 0);
-            if (failed || undeliverable || need_anno) {
-                dstate = qd_delivery_state();
-                dstate->delivery_failed = failed;
-                dstate->undeliverable_here = undeliverable;
-                if (need_anno) {
-                    dstate->annotations = pn_data(0);
-                    pn_data_copy(dstate->annotations, anno);
-                }
-            }
-            break;
-        }
-        default: {
-            // Check for custom state data. Custom outcomes and AMQP 1.0
-            // Transaction defined outcomes will all be numerically >
-            // PN_MODIFIED. See Part 4: Transactions and section 1.5 Descriptor
-            // Values in the AMQP 1.0 spec.
-            if (outcome > PN_MODIFIED) {
-                pn_data_t *data = pn_disposition_data(pn_delivery_remote(pnd));
-                if (data && pn_data_size(data) > 0) {
-                    dstate = qd_delivery_state();
-                    dstate->extension = pn_data(0);
-                    pn_data_copy(dstate->extension, data);
-                }
-            }
+    switch (outcome) {
+    case 0:
+        // not set - no delivery-state
         break;
-        }
-        } // end switch
+    case PN_RECEIVED: {
+        pn_disposition_t *disp = pn_delivery_remote(pnd);
+        dstate = qd_delivery_state();
+        dstate->section_number = pn_disposition_get_section_number(disp);
+        dstate->section_offset = pn_disposition_get_section_offset(disp);
+        break;
     }
+    case PN_ACCEPTED:
+    case PN_RELEASED:
+        // no associated state (that we care about)
+        break;
+    case PN_REJECTED: {
+        // See AMQP 1.0 section 3.4.4 Rejected
+        pn_condition_t *cond = pn_disposition_condition(pn_delivery_remote(pnd));
+        dstate = qd_delivery_state();
+        dstate->error = qdr_error_from_pn(cond);
+        break;
+    }
+    case PN_MODIFIED: {
+        // See AMQP 1.0 section 3.4.5 Modified
+        pn_disposition_t *disp = pn_delivery_remote(pnd);
+        bool failed = pn_disposition_is_failed(disp);
+        bool undeliverable = pn_disposition_is_undeliverable(disp);
+        pn_data_t *anno = pn_disposition_annotations(disp);
+
+        // avoid expensive alloc if only default values found
+        const bool need_anno = (anno && pn_data_size(anno) > 0);
+        if (failed || undeliverable || need_anno) {
+            dstate = qd_delivery_state();
+            dstate->delivery_failed = failed;
+            dstate->undeliverable_here = undeliverable;
+            if (need_anno) {
+                dstate->annotations = pn_data(0);
+                pn_data_copy(dstate->annotations, anno);
+            }
+        }
+        break;
+    }
+    default: {
+        // Check for custom state data. Custom outcomes and AMQP 1.0
+        // Transaction defined outcomes will all be numerically >
+        // PN_MODIFIED. See Part 4: Transactions and section 1.5 Descriptor
+        // Values in the AMQP 1.0 spec.
+        if (outcome > PN_MODIFIED) {
+            pn_data_t *data = pn_disposition_data(pn_delivery_remote(pnd));
+            if (data && pn_data_size(data) > 0) {
+                dstate = qd_delivery_state();
+                dstate->extension = pn_data(0);
+                pn_data_copy(dstate->extension, data);
+            }
+        }
+        break;
+    }
+    } // end switch
+
     return dstate;
 }
 
@@ -926,28 +929,38 @@
     qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
 
     //
-    // It's important to not do any processing without a qdr_delivery.  When pre-settled
-    // multi-frame deliveries arrive, it's possible for the settlement to register before
-    // the whole message arrives.  Such premature settlement indications must be ignored.
-    //
-    if (!delivery || !qdr_delivery_receive_complete(delivery))
+    // It's important to not do any processing without a qdr_delivery.
+    if (!delivery)
         return;
 
-    //
-    // Update the disposition of the delivery
-    //
-    qdr_delivery_remote_state_updated(router->router_core, delivery,
-                                      pn_delivery_remote_state(pnd),
-                                      pn_delivery_settled(pnd),
-                                      qd_delivery_read_remote_state(pnd),
-                                      false);
+    uint64_t dstate = pn_delivery_remote_state(pnd);
+    bool settled = pn_delivery_settled(pnd);
 
     //
-    // If settled, close out the delivery
+    // When pre-settled multi-frame deliveries arrive, it's possible for the
+    // settlement to register before the whole message arrives.  Such premature
+    // settlement indications must be ignored.
     //
-    if (pn_delivery_settled(pnd)) {
-        qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
-        pn_delivery_settle(pnd);
+    if (settled && !qdr_delivery_receive_complete(delivery))
+        settled = false;
+
+    if (dstate || settled) {
+        //
+        // Update the disposition of the delivery
+        //
+        qdr_delivery_remote_state_updated(router->router_core, delivery,
+                                          dstate,
+                                          settled,
+                                          qd_delivery_read_remote_state(pnd),
+                                          false);
+
+        //
+        // If settled, close out the delivery
+        //
+        if (settled) {
+            qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+            pn_delivery_settle(pnd);
+        }
     }
 }
 
@@ -1942,12 +1955,12 @@
 
         // handle any delivery-state on the transfer e.g. transactional-state
         qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &disposition);
-        if (dstate) {
-            qd_delivery_write_local_state(pdlv, disposition, dstate);
-            qd_delivery_state_free(dstate);
-        }
-        if (disposition)
+        if (disposition) {
+            if (dstate)
+                qd_delivery_write_local_state(pdlv, disposition, dstate);
             pn_delivery_update(pdlv, disposition);
+        }
+        qd_delivery_state_free(dstate);
 
         //
         // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
@@ -2040,7 +2053,7 @@
         link = (qd_link_t*) qdr_link_get_context(qlink);
         if (link) {
             qd_conn = qd_link_connection(link);
-            if (qd_conn == 0)
+             if (qd_conn == 0)
                 return;
         }
         else
@@ -2049,33 +2062,19 @@
     else
         return;
 
-    //
-    // DISPATCH-2040: For link routed links, it does not matter if the passed in disp matches the pn_delivery_remote_state(pnd), we will still
-    // call qd_delivery_write_local_state and send out the disposition if the delivery is not already settled.
-    //
-    // For non link routed links, if the disposition has changed and the proton delivery has not already been settled, update the proton delivery.
-    //
-    if ((qdr_link_is_routed(qlink) || disp != pn_delivery_remote_state(pnd)) && !pn_delivery_settled(pnd)) {
-        qd_message_t *msg = qdr_delivery_message(dlv);
-
-        // handle propagation of delivery state from qdr_delivery_t to proton:
-        uint64_t ignore = 0;  // expect same value as 'disp'
+    if (disp && !pn_delivery_settled(pnd)) {
+        uint64_t ignore = 0;
         qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore);
-        assert(ignore == disp);
-        qd_delivery_write_local_state(pnd, disp, dstate);
-        qd_delivery_state_free(dstate);
+        assert(ignore == disp); // expected: since both are from the same dlv
 
-        if (disp == PN_MODIFIED)
-            pn_disposition_set_failed(pn_delivery_local(pnd), true);
-
-        //
-        // If the delivery is still arriving, don't push out the disposition change yet.
-        //
-        assert(qdr_delivery_disposition(dlv) == disp) ;
-        if (qd_message_receive_complete(msg)) {
-            if (disp != pn_delivery_local_state(pnd)) {
-                pn_delivery_update(pnd, disp);
-            }
+        // update if the disposition has changed or there is new state associated with it
+        if (disp != pn_delivery_local_state(pnd) || dstate) {
+            // handle propagation of delivery state from qdr_delivery_t to proton:
+            qd_delivery_write_local_state(pnd, disp, dstate);
+            pn_delivery_update(pnd, disp);
+            qd_delivery_state_free(dstate);
+            if (disp == PN_MODIFIED)  // @TODO(kgiusti) why do we need this???
+                pn_disposition_set_failed(pn_delivery_local(pnd), true);
         }
     }
 
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index d5d0bac..b2f1aad 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -838,6 +838,8 @@
     # Q2 holdoff
     @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
     def test_60_q2_holdoff(self):
+        # for now, Q2 is disabled to avoid stalling TCP backpressure
+        self.skipTest("Q2 is disabled on TCP adaptor")
         name = "test_60_q2_holdoff"
         self.logger.log("TCP_TEST Start %s" % name)