DISPATCH-1961: Enable Q2 flow control for HTTP/1.x adaptor

This closes #1029
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 6ce1ead..9e4eeb6 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -119,6 +119,8 @@
             pn_raw_connection_close(rconn);
         }
 
+        sys_atomic_destroy(&hconn->q2_restart);
+
         free(hconn->cfg.host);
         free(hconn->cfg.port);
         free(hconn->cfg.address);
@@ -415,6 +417,25 @@
 }
 
 
+// Per-message callback to resume receiving after Q2 is unblocked on the
+// incoming link (to HTTP app).  This routine runs on another I/O thread so it
+// must be thread safe!
+//
+void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
+{
+    // prevent the hconn from being deleted while running:
+    sys_mutex_lock(qdr_http1_adaptor->lock);
+
+    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)qd_alloc_deref_safe_ptr(&context);
+    if (hconn && hconn->raw_conn) {
+        sys_atomic_set(&hconn->q2_restart, 1);
+        pn_raw_connection_wake(hconn->raw_conn);
+    }
+
+    sys_mutex_unlock(qdr_http1_adaptor->lock);
+}
+
+
 //
 // Protocol Adaptor Callbacks
 //
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 27b25cd..ff03380 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -133,6 +133,7 @@
     hconn->adaptor = qdr_http1_adaptor;
     hconn->handler_context.handler = &_handle_connection_events;
     hconn->handler_context.context = hconn;
+    sys_atomic_init(&hconn->q2_restart, 0);
 
     hconn->client.next_msg_id = 1;
 
@@ -364,6 +365,36 @@
 }
 
 
+// handle PN_RAW_CONNECTION_READ
+static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
+{
+    int error = 0;
+    qd_buffer_list_t blist;
+    uintmax_t length;
+    qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+    if (length) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client (%zu buffers)",
+               hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
+        hconn->in_http1_octets += length;
+        error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+    }
+    return error;
+}
+
+
+// handle PN_RAW_CONNECTION_NEED_READ_BUFFERS
+static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
+{
+    // @TODO(kgiusti): backpressure if no credit
+    if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) {
+        int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+               hconn->conn_id, granted);
+    }
+}
+
+
 // Proton Connection Event Handler
 //
 static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
@@ -424,31 +455,34 @@
     }
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
-        // @TODO(kgiusti): backpressure if no credit
-        if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) {
-            int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
-            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
-                   hconn->conn_id, granted);
-        }
+        _handle_conn_need_read_buffers(hconn);
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
+        int error = 0;
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
+
+        if (sys_atomic_set(&hconn->q2_restart, 0)) {
+            // note: unit tests grep for this log!
+            qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", hconn->conn_id);
+            hconn->q2_blocked = false;
+            error = _handle_conn_read_event(hconn);  // restart receiving
+            _handle_conn_need_read_buffers(hconn);
+        }
+
         while (qdr_connection_process(hconn->qdr_conn)) {}
+
+        if (error)
+            qdr_http1_close_connection(hconn, "Incoming request message failed to parse");
+
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id);
         break;
     }
     case PN_RAW_CONNECTION_READ: {
-        qd_buffer_list_t blist;
-        uintmax_t length;
-        qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
-        if (length) {
-            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client",
-                   hconn->conn_id, hconn->in_link_id, length);
-            hconn->in_http1_octets += length;
-            int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+        if (!hconn->q2_blocked) {
+            int error = _handle_conn_read_event(hconn);
             if (error)
-                qdr_http1_close_connection(hconn, "Incoming request message failed to parse");
+                qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
         }
         break;
     }
@@ -581,7 +615,7 @@
     }
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] %u request octets encoded",
+           "[C%"PRIu64"][L%"PRIu64"] %u response octets encoded",
            hconn->conn_id, hconn->out_link_id, len);
 
 
@@ -825,6 +859,13 @@
     qd_compose_free(hreq->request_props);
     hreq->request_props = 0;
 
+    // future-proof: ensure the message headers have not caused Q2
+    // blocking.  We only check for Q2 events while adding body data.
+    assert(!qd_message_is_Q2_blocked(hreq->request_msg));
+
+    qd_alloc_safe_ptr_t hconn_sp = QD_SAFE_PTR_INIT(hconn);
+    qd_message_set_q2_unblocked_handler(hreq->request_msg, qdr_http1_q2_unblocked_handler, hconn_sp);
+
     // Use up one credit to obtain a delivery and forward to core.  If no
     // credit is available the request is stalled until the core grants more
     // flow.
@@ -848,6 +889,7 @@
 static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len,
                               bool more)
 {
+    bool               q2_blocked = false;
     _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
     if (hconn->cfg.event_channel && strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) {
@@ -860,8 +902,12 @@
            "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    // @TODO(kgiusti): handle Q2 block event:
-    qd_message_stream_data_append(msg, body, 0);
+    qd_message_stream_data_append(msg, body, &q2_blocked);
+    hconn->q2_blocked = hconn->q2_blocked || q2_blocked;
+    if (q2_blocked) {
+        // note: unit tests grep for this log!
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] client link blocked on Q2 limit", hconn->conn_id);
+    }
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
@@ -1635,6 +1681,10 @@
 static void _client_request_free(_client_request_t *hreq)
 {
     if (hreq) {
+        // deactivate the Q2 callback
+        qd_message_t *msg = hreq->request_dlv ? qdr_delivery_message(hreq->request_dlv) : hreq->request_msg;
+        qd_message_clear_q2_unblocked_handler(msg);
+
         qdr_http1_request_base_cleanup(&hreq->base);
         qd_message_free(hreq->request_msg);
         if (hreq->request_dlv) {
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index 858efb5..5ecf4fd 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -172,6 +172,8 @@
     qdr_link_t            *in_link;
     uint64_t               in_link_id;
     int                    in_link_credit;  // provided by router
+    sys_atomic_t           q2_restart;      // signal to resume receive
+    bool                   q2_blocked;      // stop reading from raw conn
 
     // Oldest at HEAD
     //
@@ -220,7 +222,7 @@
                               const char *reason);
 void qdr_http1_rejected_response(qdr_http1_request_base_t *hreq,
                                  const qdr_error_t *error);
-
+void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context);
 
 // http1_client.c protocol adaptor callbacks
 //
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 1ef6b2a..c1fb17b 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -152,6 +152,7 @@
     hconn->adaptor = qdr_http1_adaptor;
     hconn->handler_context.handler = &_handle_connection_events;
     hconn->handler_context.context = hconn;
+    sys_atomic_init(&hconn->q2_restart, 0);
     hconn->cfg.host = qd_strdup(bconfig->host);
     hconn->cfg.port = qd_strdup(bconfig->port);
     hconn->cfg.address = qd_strdup(bconfig->address);
@@ -466,6 +467,48 @@
     hreq->request_settled = true;
 }
 
+
+// handle PN_RAW_CONNECTION_READ
+static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
+{
+    int              error = 0;
+    qd_buffer_list_t blist;
+    uintmax_t        length;
+
+    qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+
+    if (HTTP1_DUMP_BUFFERS) {
+        fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
+        qd_buffer_t *bb = DEQ_HEAD(blist);
+        while (bb) {
+            fprintf(stdout, "  buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
+            bb = DEQ_NEXT(bb);
+        }
+        fflush(stdout);
+    }
+
+    if (length) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server (%zu buffers)",
+               hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
+        hconn->in_http1_octets += length;
+        error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+    }
+    return error;
+}
+
+
+// handle PN_RAW_CONNECTION_NEED_READ_BUFFERS
+static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
+{
+    // @TODO(kgiusti): backpressure if no credit
+    // if (hconn->in_link_credit > 0 */)
+    int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+           hconn->conn_id, granted);
+}
+
+
 // Proton Raw Connection Events
 //
 static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
@@ -490,6 +533,7 @@
         // message (response body terminated on connection closed)
         h1_codec_connection_rx_closed(hconn->http_conn);
         pn_raw_connection_close(hconn->raw_conn);
+        hconn->q2_blocked = false;
         break;
     }
 
@@ -557,39 +601,32 @@
         break;
     }
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
-        // @TODO(kgiusti): backpressure if no credit
-        // if (hconn->in_link_credit > 0 */)
-        int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
-               hconn->conn_id, granted);
+        _handle_conn_need_read_buffers(hconn);
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
+        int error = 0;
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
+
+        if (sys_atomic_set(&hconn->q2_restart, 0)) {
+            // note: unit tests grep for this log!
+            qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] server link unblocked from Q2 limit", hconn->conn_id);
+            hconn->q2_blocked = false;
+            error = _handle_conn_read_event(hconn);  // restart receiving
+            _handle_conn_need_read_buffers(hconn);
+        }
+
         while (qdr_connection_process(hconn->qdr_conn)) {}
+
+        if (error)
+            qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
+
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Connection processing complete", hconn->conn_id);
         break;
     }
     case PN_RAW_CONNECTION_READ: {
-        qd_buffer_list_t blist;
-        uintmax_t length;
-        qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
-
-        if (HTTP1_DUMP_BUFFERS) {
-            fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
-            qd_buffer_t *bb = DEQ_HEAD(blist);
-            while (bb) {
-                fprintf(stdout, "  buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
-                bb = DEQ_NEXT(bb);
-            }
-            fflush(stdout);
-        }
-
-        if (length) {
-            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server",
-                   hconn->conn_id, hconn->in_link_id, length);
-            hconn->in_http1_octets += length;
-            int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+        if (!hconn->q2_blocked) {
+            int error = _handle_conn_read_event(hconn);
             if (error)
                 qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
         }
@@ -934,6 +971,13 @@
     qd_compose_free(rmsg->msg_props);
     rmsg->msg_props = 0;
 
+    // future-proof: ensure the message headers have not caused Q2
+    // blocking.  We only check for Q2 events while adding body data.
+    assert(!qd_message_is_Q2_blocked(rmsg->msg));
+
+    qd_alloc_safe_ptr_t hconn_sp = QD_SAFE_PTR_INIT(hconn);
+    qd_message_set_q2_unblocked_handler(rmsg->msg, qdr_http1_q2_unblocked_handler, hconn_sp);
+
     // start delivery if possible
     if (hconn->in_link_credit > 0 && rmsg == DEQ_HEAD(hreq->responses)) {
         hconn->in_link_credit -= 1;
@@ -962,6 +1006,7 @@
 {
     _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
+    bool                    q2_blocked = false;
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
            "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
@@ -976,8 +1021,13 @@
 
     qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
 
-    // @TODO(kgiusti): handle Q2 block event:
-    qd_message_stream_data_append(msg, body, 0);
+
+    qd_message_stream_data_append(msg, body, &q2_blocked);
+    hconn->q2_blocked = hconn->q2_blocked || q2_blocked;
+    if (q2_blocked) {
+        // note: unit tests grep for this log!
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] server link blocked on Q2 limit", hconn->conn_id);
+    }
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
@@ -1518,6 +1568,11 @@
 static void _server_response_msg_free(_server_request_t *hreq, _server_response_msg_t *rmsg)
 {
     DEQ_REMOVE(hreq->responses, rmsg);
+
+    // deactivate the Q2 callback
+    qd_message_t *msg = rmsg->dlv ? qdr_delivery_message(rmsg->dlv) : rmsg->msg;
+    qd_message_clear_q2_unblocked_handler(msg);
+
     qd_message_free(rmsg->msg);
     qd_compose_free(rmsg->msg_props);
     if (rmsg->dlv) {
diff --git a/src/message.c b/src/message.c
index f81d15e..1679407 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2936,14 +2936,16 @@
 }
 
 
-void qd_message_clear_Q2_unblocked_handler(qd_message_t *msg)
+void qd_message_clear_q2_unblocked_handler(qd_message_t *msg)
 {
-    qd_message_content_t *content = MSG_CONTENT(msg);
+    if (msg) {
+        qd_message_content_t *content = MSG_CONTENT(msg);
 
-    LOCK(content->lock);
+        LOCK(content->lock);
 
-    content->q2_unblocker.handler = 0;
-    qd_nullify_safe_ptr(&content->q2_unblocker.context);
+        content->q2_unblocker.handler = 0;
+        qd_nullify_safe_ptr(&content->q2_unblocker.context);
 
-    UNLOCK(content->lock);
+        UNLOCK(content->lock);
+    }
 }
diff --git a/tests/system_test.py b/tests/system_test.py
index bab62c4..5fec091 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -733,6 +733,10 @@
     def wait_router_connected(self, router_id, **retry_kwargs):
         retry(lambda: self.is_router_connected(router_id), **retry_kwargs)
 
+    @property
+    def logfile_path(self):
+        return os.path.join(self.outdir, self.logfile)
+
 
 class Tester(object):
     """Tools for use by TestCase
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index c182a3d..9e6776a 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -27,10 +27,13 @@
 from __future__ import print_function
 
 
+import errno
+import io
+import select
 import socket
 import sys
 from threading import Thread
-from time import sleep
+from time import sleep, time
 import uuid
 try:
     from http.server import HTTPServer, BaseHTTPRequestHandler
@@ -1705,5 +1708,253 @@
         self.assertEqual(1, count)
 
 
+
+class Http1AdaptorQ2Standalone(TestCase):
+    """
+    Force Q2 blocking/recovery on both client and server endpoints. This test
+    uses a single router to ensure both client facing and server facing
+    Q2 components of the HTTP/1.x adaptor are triggered.
+    """
+    @classmethod
+    def setUpClass(cls):
+        """
+        Single router configuration with one HTTPListener and one
+        HTTPConnector.
+        """
+        super(Http1AdaptorQ2Standalone, cls).setUpClass()
+
+        cls.http_server_port = cls.tester.get_port()
+        cls.http_listener_port = cls.tester.get_port()
+
+        config = [
+            ('router', {'mode': 'standalone',
+                        'id': 'RowdyRoddyRouter',
+                        'allowUnsettledMulticast': 'yes'}),
+            ('listener', {'role': 'normal',
+                          'port': cls.tester.get_port()}),
+            ('httpListener', {'port': cls.http_listener_port,
+                              'protocolVersion': 'HTTP1',
+                              'address': 'testServer'}),
+            ('httpConnector', {'port': cls.http_server_port,
+                               'protocolVersion': 'HTTP1',
+                               'address': 'testServer'}),
+            ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+        ]
+        config = Qdrouterd.Config(config)
+        cls.INT_A = cls.tester.qdrouterd("TestBadEndpoints", config, wait=True)
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+
+
+    def _write_until_full(self, sock, data, timeout):
+        """
+        Write data to socket until either all data written or timeout.
+        Return the number of bytes written, which will == len(data) if timeout
+        not hit
+        """
+        sock.setblocking(0)
+        sent = 0
+
+        while sent < len(data):
+            try:
+                _, rw, _ = select.select([], [sock], [], timeout)
+            except select.error as serror:
+                if serror[0] == errno.EINTR:
+                    print("ignoring interrupt from select(): %s" % str(serror))
+                    continue
+                raise  # assuming fatal...
+            if rw:
+                sent += sock.send(data[sent:])
+            else:
+                break  # timeout
+        return sent
+
+    def _read_until_empty(self, sock, timeout):
+        """
+        Read data from socket until timeout occurs.  Return read data.
+        """
+        sock.setblocking(0)
+        data = b''
+
+        while True:
+            try:
+                rd, _, _ = select.select([sock], [], [], timeout)
+            except select.error as serror:
+                if serror[0] == errno.EINTR:
+                    print("ignoring interrupt from select(): %s" % str(serror))
+                    continue
+                raise  # assuming fatal...
+            if rd:
+                data += sock.recv(4096)
+            else:
+                break  # timeout
+        return data
+
+    def test_01_backpressure_client(self):
+        """
+        Trigger Q2 backpressure against the HTTP client.
+        """
+
+        # create a listener socket to act as the server service
+        server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        server_listener.settimeout(TIMEOUT)
+        server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        server_listener.bind(('', self.http_server_port))
+        server_listener.listen(1)
+
+        # block until router connects
+        server_sock, host_port = server_listener.accept()
+        server_sock.settimeout(0.5)
+        server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+        # create a client connection to the router
+        client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        client_sock.settimeout(0.5)
+        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        client_sock.connect((host_port[0], self.http_listener_port))
+
+        # send a Very Large PUSH request, expecting it to block at some point
+
+        push_req_hdr = b'PUSH / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n'
+        count = self._write_until_full(client_sock, push_req_hdr, 1.0)
+        self.assertEqual(len(push_req_hdr), count)
+
+        chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n'
+        last_chunk = b'0 \r\n\r\n'
+        count = 0
+        deadline = time() + TIMEOUT
+        while deadline >= time():
+            count = self._write_until_full(client_sock, chunk, 5.0)
+            if count < len(chunk):
+                break
+        self.assertFalse(time() > deadline,
+                         "Client never blocked as expected!")
+
+        # client should now be in Q2 block. Drain the server to unblock Q2
+        _ = self._read_until_empty(server_sock, 2.0)
+
+        # finish the PUSH
+        if count:
+            remainder = self._write_until_full(client_sock, chunk[count:], 1.0)
+            self.assertEqual(len(chunk), count + remainder)
+
+        count = self._write_until_full(client_sock, last_chunk, 1.0)
+        self.assertEqual(len(last_chunk), count)
+
+        # receive the request and reply
+        _ = self._read_until_empty(server_sock, 2.0)
+
+        response = b'HTTP/1.1 201 Created\r\nContent-Length: 0\r\n\r\n'
+        count = self._write_until_full(server_sock, response, 1.0)
+        self.assertEqual(len(response), count)
+
+        # complete the response read
+        _ = self._read_until_empty(client_sock, 2.0)
+        self.assertEqual(len(response), len(_))
+
+        client_sock.shutdown(socket.SHUT_RDWR)
+        client_sock.close()
+
+        server_sock.shutdown(socket.SHUT_RDWR)
+        server_sock.close()
+
+        server_listener.shutdown(socket.SHUT_RDWR)
+        server_listener.close()
+
+        # search the router log file to verify Q2 was hit
+
+        block_ct = 0
+        unblock_ct = 0
+        with io.open(self.INT_A.logfile_path) as f:
+            for line in f:
+                if 'client link blocked on Q2 limit' in line:
+                    block_ct += 1
+                if 'client link unblocked from Q2 limit' in line:
+                    unblock_ct += 1
+        self.assertTrue(block_ct > 0)
+        self.assertEqual(block_ct, unblock_ct)
+
+    def test_02_backpressure_server(self):
+        """
+        Trigger Q2 backpressure against the HTTP server.
+        """
+        small_get_req = b'GET / HTTP/1.1\r\nContent-Length: 0\r\n\r\n'
+
+        # create a listener socket to act as the server service
+        server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        server_listener.settimeout(TIMEOUT)
+        server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        server_listener.bind(('', self.http_server_port))
+        server_listener.listen(1)
+
+        # block until router connects
+        server_sock, host_port = server_listener.accept()
+        server_sock.settimeout(0.5)
+        server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+        # create a client connection to the router
+        client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        client_sock.settimeout(0.5)
+        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        client_sock.connect((host_port[0], self.http_listener_port))
+
+        # send GET request - expect this to be successful
+        count = self._write_until_full(client_sock, small_get_req, 1.0)
+        self.assertEqual(len(small_get_req), count)
+
+        request = self._read_until_empty(server_sock, 5.0)
+        self.assertEqual(len(small_get_req), len(request))
+
+        # send a Very Long response, expecting it to block at some point
+        chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n'
+        last_chunk = b'0 \r\n\r\n'
+        response = b'HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n'
+
+        count = self._write_until_full(server_sock, response, 1.0)
+        self.assertEqual(len(response), count)
+
+        count = 0
+        deadline = time() + TIMEOUT
+        while deadline >= time():
+            count = self._write_until_full(server_sock, chunk, 5.0)
+            if count < len(chunk):
+                break
+        self.assertFalse(time() > deadline,
+                         "Server never blocked as expected!")
+
+        # server should now be in Q2 block. Drain the client to unblock Q2
+        _ = self._read_until_empty(client_sock, 2.0)
+
+        # finish the response
+        if count:
+            remainder = self._write_until_full(server_sock, chunk[count:], 1.0)
+            self.assertEqual(len(chunk), count + remainder)
+
+        count = self._write_until_full(server_sock, last_chunk, 1.0)
+        self.assertEqual(len(last_chunk), count)
+        server_sock.shutdown(socket.SHUT_RDWR)
+        server_sock.close()
+
+        _ = self._read_until_empty(client_sock, 1.0)
+        client_sock.shutdown(socket.SHUT_RDWR)
+        client_sock.close()
+
+        server_listener.shutdown(socket.SHUT_RDWR)
+        server_listener.close()
+
+        # search the router log file to verify Q2 was hit
+
+        block_ct = 0
+        unblock_ct = 0
+        with io.open(self.INT_A.logfile_path) as f:
+            for line in f:
+                if 'server link blocked on Q2 limit' in line:
+                    block_ct += 1
+                if 'server link unblocked from Q2 limit' in line:
+                    unblock_ct += 1
+        self.assertTrue(block_ct > 0)
+        self.assertEqual(block_ct, unblock_ct)
+
+
 if __name__ == '__main__':
     unittest.main(main_module())