DISPATCH-1961: Enable Q2 flow control for HTTP/1.x adaptor
This closes #1029
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 6ce1ead..9e4eeb6 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -119,6 +119,8 @@
pn_raw_connection_close(rconn);
}
+ sys_atomic_destroy(&hconn->q2_restart);
+
free(hconn->cfg.host);
free(hconn->cfg.port);
free(hconn->cfg.address);
@@ -415,6 +417,25 @@
}
+// Per-message callback to resume receiving after Q2 is unblocked on the
+// incoming link (to HTTP app). This routine runs on another I/O thread so it
+// must be thread safe!
+//
+void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
+{
+ // prevent the hconn from being deleted while running:
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+
+ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)qd_alloc_deref_safe_ptr(&context);
+ if (hconn && hconn->raw_conn) {
+ sys_atomic_set(&hconn->q2_restart, 1);
+ pn_raw_connection_wake(hconn->raw_conn);
+ }
+
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+}
+
+
//
// Protocol Adaptor Callbacks
//
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 27b25cd..ff03380 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -133,6 +133,7 @@
hconn->adaptor = qdr_http1_adaptor;
hconn->handler_context.handler = &_handle_connection_events;
hconn->handler_context.context = hconn;
+ sys_atomic_init(&hconn->q2_restart, 0);
hconn->client.next_msg_id = 1;
@@ -364,6 +365,36 @@
}
+// handle PN_RAW_CONNECTION_READ
+static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
+{
+ int error = 0;
+ qd_buffer_list_t blist;
+ uintmax_t length;
+ qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+ if (length) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client (%zu buffers)",
+ hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
+ hconn->in_http1_octets += length;
+ error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+ }
+ return error;
+}
+
+
+// handle PN_RAW_CONNECTION_NEED_READ_BUFFERS
+static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
+{
+ // @TODO(kgiusti): backpressure if no credit
+ if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) {
+ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+ hconn->conn_id, granted);
+ }
+}
+
+
// Proton Connection Event Handler
//
static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
@@ -424,31 +455,34 @@
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
- // @TODO(kgiusti): backpressure if no credit
- if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) {
- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
- hconn->conn_id, granted);
- }
+ _handle_conn_need_read_buffers(hconn);
break;
}
case PN_RAW_CONNECTION_WAKE: {
+ int error = 0;
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
+
+ if (sys_atomic_set(&hconn->q2_restart, 0)) {
+ // note: unit tests grep for this log!
+ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", hconn->conn_id);
+ hconn->q2_blocked = false;
+ error = _handle_conn_read_event(hconn); // restart receiving
+ _handle_conn_need_read_buffers(hconn);
+ }
+
while (qdr_connection_process(hconn->qdr_conn)) {}
+
+ if (error)
+ qdr_http1_close_connection(hconn, "Incoming request message failed to parse");
+
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id);
break;
}
case PN_RAW_CONNECTION_READ: {
- qd_buffer_list_t blist;
- uintmax_t length;
- qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
- if (length) {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client",
- hconn->conn_id, hconn->in_link_id, length);
- hconn->in_http1_octets += length;
- int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+ if (!hconn->q2_blocked) {
+ int error = _handle_conn_read_event(hconn);
if (error)
- qdr_http1_close_connection(hconn, "Incoming request message failed to parse");
+ qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
}
break;
}
@@ -581,7 +615,7 @@
}
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] %u request octets encoded",
+ "[C%"PRIu64"][L%"PRIu64"] %u response octets encoded",
hconn->conn_id, hconn->out_link_id, len);
@@ -825,6 +859,13 @@
qd_compose_free(hreq->request_props);
hreq->request_props = 0;
+ // future-proof: ensure the message headers have not caused Q2
+ // blocking. We only check for Q2 events while adding body data.
+ assert(!qd_message_is_Q2_blocked(hreq->request_msg));
+
+ qd_alloc_safe_ptr_t hconn_sp = QD_SAFE_PTR_INIT(hconn);
+ qd_message_set_q2_unblocked_handler(hreq->request_msg, qdr_http1_q2_unblocked_handler, hconn_sp);
+
// Use up one credit to obtain a delivery and forward to core. If no
// credit is available the request is stalled until the core grants more
// flow.
@@ -848,6 +889,7 @@
static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len,
bool more)
{
+ bool q2_blocked = false;
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hconn->cfg.event_channel && strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) {
@@ -860,8 +902,12 @@
"[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
- // @TODO(kgiusti): handle Q2 block event:
- qd_message_stream_data_append(msg, body, 0);
+ qd_message_stream_data_append(msg, body, &q2_blocked);
+ hconn->q2_blocked = hconn->q2_blocked || q2_blocked;
+ if (q2_blocked) {
+ // note: unit tests grep for this log!
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] client link blocked on Q2 limit", hconn->conn_id);
+ }
//
// Notify the router that more data is ready to be pushed out on the delivery
@@ -1635,6 +1681,10 @@
static void _client_request_free(_client_request_t *hreq)
{
if (hreq) {
+ // deactivate the Q2 callback
+ qd_message_t *msg = hreq->request_dlv ? qdr_delivery_message(hreq->request_dlv) : hreq->request_msg;
+ qd_message_clear_q2_unblocked_handler(msg);
+
qdr_http1_request_base_cleanup(&hreq->base);
qd_message_free(hreq->request_msg);
if (hreq->request_dlv) {
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index 858efb5..5ecf4fd 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -172,6 +172,8 @@
qdr_link_t *in_link;
uint64_t in_link_id;
int in_link_credit; // provided by router
+ sys_atomic_t q2_restart; // signal to resume receive
+ bool q2_blocked; // stop reading from raw conn
// Oldest at HEAD
//
@@ -220,7 +222,7 @@
const char *reason);
void qdr_http1_rejected_response(qdr_http1_request_base_t *hreq,
const qdr_error_t *error);
-
+void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context);
// http1_client.c protocol adaptor callbacks
//
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 1ef6b2a..c1fb17b 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -152,6 +152,7 @@
hconn->adaptor = qdr_http1_adaptor;
hconn->handler_context.handler = &_handle_connection_events;
hconn->handler_context.context = hconn;
+ sys_atomic_init(&hconn->q2_restart, 0);
hconn->cfg.host = qd_strdup(bconfig->host);
hconn->cfg.port = qd_strdup(bconfig->port);
hconn->cfg.address = qd_strdup(bconfig->address);
@@ -466,6 +467,48 @@
hreq->request_settled = true;
}
+
+// handle PN_RAW_CONNECTION_READ
+static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
+{
+ int error = 0;
+ qd_buffer_list_t blist;
+ uintmax_t length;
+
+ qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+
+ if (HTTP1_DUMP_BUFFERS) {
+ fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
+ qd_buffer_t *bb = DEQ_HEAD(blist);
+ while (bb) {
+ fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
+ bb = DEQ_NEXT(bb);
+ }
+ fflush(stdout);
+ }
+
+ if (length) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server (%zu buffers)",
+ hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
+ hconn->in_http1_octets += length;
+ error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+ }
+ return error;
+}
+
+
+// handle PN_RAW_CONNECTION_NEED_READ_BUFFERS
+static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
+{
+ // @TODO(kgiusti): backpressure if no credit
+ // if (hconn->in_link_credit > 0 */)
+ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+ hconn->conn_id, granted);
+}
+
+
// Proton Raw Connection Events
//
static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
@@ -490,6 +533,7 @@
// message (response body terminated on connection closed)
h1_codec_connection_rx_closed(hconn->http_conn);
pn_raw_connection_close(hconn->raw_conn);
+ hconn->q2_blocked = false;
break;
}
@@ -557,39 +601,32 @@
break;
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
- // @TODO(kgiusti): backpressure if no credit
- // if (hconn->in_link_credit > 0 */)
- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
- hconn->conn_id, granted);
+ _handle_conn_need_read_buffers(hconn);
break;
}
case PN_RAW_CONNECTION_WAKE: {
+ int error = 0;
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
+
+ if (sys_atomic_set(&hconn->q2_restart, 0)) {
+ // note: unit tests grep for this log!
+ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] server link unblocked from Q2 limit", hconn->conn_id);
+ hconn->q2_blocked = false;
+ error = _handle_conn_read_event(hconn); // restart receiving
+ _handle_conn_need_read_buffers(hconn);
+ }
+
while (qdr_connection_process(hconn->qdr_conn)) {}
+
+ if (error)
+ qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
+
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Connection processing complete", hconn->conn_id);
break;
}
case PN_RAW_CONNECTION_READ: {
- qd_buffer_list_t blist;
- uintmax_t length;
- qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
-
- if (HTTP1_DUMP_BUFFERS) {
- fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
- qd_buffer_t *bb = DEQ_HEAD(blist);
- while (bb) {
- fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
- bb = DEQ_NEXT(bb);
- }
- fflush(stdout);
- }
-
- if (length) {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server",
- hconn->conn_id, hconn->in_link_id, length);
- hconn->in_http1_octets += length;
- int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
+ if (!hconn->q2_blocked) {
+ int error = _handle_conn_read_event(hconn);
if (error)
qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
}
@@ -934,6 +971,13 @@
qd_compose_free(rmsg->msg_props);
rmsg->msg_props = 0;
+ // future-proof: ensure the message headers have not caused Q2
+ // blocking. We only check for Q2 events while adding body data.
+ assert(!qd_message_is_Q2_blocked(rmsg->msg));
+
+ qd_alloc_safe_ptr_t hconn_sp = QD_SAFE_PTR_INIT(hconn);
+ qd_message_set_q2_unblocked_handler(rmsg->msg, qdr_http1_q2_unblocked_handler, hconn_sp);
+
// start delivery if possible
if (hconn->in_link_credit > 0 && rmsg == DEQ_HEAD(hreq->responses)) {
hconn->in_link_credit -= 1;
@@ -962,6 +1006,7 @@
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
+ bool q2_blocked = false;
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
@@ -976,8 +1021,13 @@
qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
- // @TODO(kgiusti): handle Q2 block event:
- qd_message_stream_data_append(msg, body, 0);
+
+ qd_message_stream_data_append(msg, body, &q2_blocked);
+ hconn->q2_blocked = hconn->q2_blocked || q2_blocked;
+ if (q2_blocked) {
+ // note: unit tests grep for this log!
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] server link blocked on Q2 limit", hconn->conn_id);
+ }
//
// Notify the router that more data is ready to be pushed out on the delivery
@@ -1518,6 +1568,11 @@
static void _server_response_msg_free(_server_request_t *hreq, _server_response_msg_t *rmsg)
{
DEQ_REMOVE(hreq->responses, rmsg);
+
+ // deactivate the Q2 callback
+ qd_message_t *msg = rmsg->dlv ? qdr_delivery_message(rmsg->dlv) : rmsg->msg;
+ qd_message_clear_q2_unblocked_handler(msg);
+
qd_message_free(rmsg->msg);
qd_compose_free(rmsg->msg_props);
if (rmsg->dlv) {
diff --git a/src/message.c b/src/message.c
index f81d15e..1679407 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2936,14 +2936,16 @@
}
-void qd_message_clear_Q2_unblocked_handler(qd_message_t *msg)
+void qd_message_clear_q2_unblocked_handler(qd_message_t *msg)
{
- qd_message_content_t *content = MSG_CONTENT(msg);
+ if (msg) {
+ qd_message_content_t *content = MSG_CONTENT(msg);
- LOCK(content->lock);
+ LOCK(content->lock);
- content->q2_unblocker.handler = 0;
- qd_nullify_safe_ptr(&content->q2_unblocker.context);
+ content->q2_unblocker.handler = 0;
+ qd_nullify_safe_ptr(&content->q2_unblocker.context);
- UNLOCK(content->lock);
+ UNLOCK(content->lock);
+ }
}
diff --git a/tests/system_test.py b/tests/system_test.py
index bab62c4..5fec091 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -733,6 +733,10 @@
def wait_router_connected(self, router_id, **retry_kwargs):
retry(lambda: self.is_router_connected(router_id), **retry_kwargs)
+ @property
+ def logfile_path(self):
+ return os.path.join(self.outdir, self.logfile)
+
class Tester(object):
"""Tools for use by TestCase
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index c182a3d..9e6776a 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -27,10 +27,13 @@
from __future__ import print_function
+import errno
+import io
+import select
import socket
import sys
from threading import Thread
-from time import sleep
+from time import sleep, time
import uuid
try:
from http.server import HTTPServer, BaseHTTPRequestHandler
@@ -1705,5 +1708,253 @@
self.assertEqual(1, count)
+
+class Http1AdaptorQ2Standalone(TestCase):
+ """
+ Force Q2 blocking/recovery on both client and server endpoints. This test
+ uses a single router to ensure both client facing and server facing
+ Q2 components of the HTTP/1.x adaptor are triggered.
+ """
+ @classmethod
+ def setUpClass(cls):
+ """
+ Single router configuration with one HTTPListener and one
+ HTTPConnector.
+ """
+ super(Http1AdaptorQ2Standalone, cls).setUpClass()
+
+ cls.http_server_port = cls.tester.get_port()
+ cls.http_listener_port = cls.tester.get_port()
+
+ config = [
+ ('router', {'mode': 'standalone',
+ 'id': 'RowdyRoddyRouter',
+ 'allowUnsettledMulticast': 'yes'}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port()}),
+ ('httpListener', {'port': cls.http_listener_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer'}),
+ ('httpConnector', {'port': cls.http_server_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer'}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ]
+ config = Qdrouterd.Config(config)
+ cls.INT_A = cls.tester.qdrouterd("TestBadEndpoints", config, wait=True)
+ cls.INT_A.listener = cls.INT_A.addresses[0]
+
+
+ def _write_until_full(self, sock, data, timeout):
+ """
+ Write data to socket until either all data written or timeout.
+ Return the number of bytes written, which will == len(data) if timeout
+ not hit
+ """
+ sock.setblocking(0)
+ sent = 0
+
+ while sent < len(data):
+ try:
+ _, rw, _ = select.select([], [sock], [], timeout)
+ except select.error as serror:
+ if serror[0] == errno.EINTR:
+ print("ignoring interrupt from select(): %s" % str(serror))
+ continue
+ raise # assuming fatal...
+ if rw:
+ sent += sock.send(data[sent:])
+ else:
+ break # timeout
+ return sent
+
+ def _read_until_empty(self, sock, timeout):
+ """
+ Read data from socket until timeout occurs. Return read data.
+ """
+ sock.setblocking(0)
+ data = b''
+
+ while True:
+ try:
+ rd, _, _ = select.select([sock], [], [], timeout)
+ except select.error as serror:
+ if serror[0] == errno.EINTR:
+ print("ignoring interrupt from select(): %s" % str(serror))
+ continue
+ raise # assuming fatal...
+ if rd:
+ data += sock.recv(4096)
+ else:
+ break # timeout
+ return data
+
+ def test_01_backpressure_client(self):
+ """
+ Trigger Q2 backpressure against the HTTP client.
+ """
+
+ # create a listener socket to act as the server service
+ server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_listener.settimeout(TIMEOUT)
+ server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_listener.bind(('', self.http_server_port))
+ server_listener.listen(1)
+
+ # block until router connects
+ server_sock, host_port = server_listener.accept()
+ server_sock.settimeout(0.5)
+ server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+ # create a client connection to the router
+ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ client_sock.settimeout(0.5)
+ client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ client_sock.connect((host_port[0], self.http_listener_port))
+
+ # send a Very Large PUSH request, expecting it to block at some point
+
+ push_req_hdr = b'PUSH / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n'
+ count = self._write_until_full(client_sock, push_req_hdr, 1.0)
+ self.assertEqual(len(push_req_hdr), count)
+
+ chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n'
+ last_chunk = b'0 \r\n\r\n'
+ count = 0
+ deadline = time() + TIMEOUT
+ while deadline >= time():
+ count = self._write_until_full(client_sock, chunk, 5.0)
+ if count < len(chunk):
+ break
+ self.assertFalse(time() > deadline,
+ "Client never blocked as expected!")
+
+ # client should now be in Q2 block. Drain the server to unblock Q2
+ _ = self._read_until_empty(server_sock, 2.0)
+
+ # finish the PUSH
+ if count:
+ remainder = self._write_until_full(client_sock, chunk[count:], 1.0)
+ self.assertEqual(len(chunk), count + remainder)
+
+ count = self._write_until_full(client_sock, last_chunk, 1.0)
+ self.assertEqual(len(last_chunk), count)
+
+ # receive the request and reply
+ _ = self._read_until_empty(server_sock, 2.0)
+
+ response = b'HTTP/1.1 201 Created\r\nContent-Length: 0\r\n\r\n'
+ count = self._write_until_full(server_sock, response, 1.0)
+ self.assertEqual(len(response), count)
+
+ # complete the response read
+ _ = self._read_until_empty(client_sock, 2.0)
+ self.assertEqual(len(response), len(_))
+
+ client_sock.shutdown(socket.SHUT_RDWR)
+ client_sock.close()
+
+ server_sock.shutdown(socket.SHUT_RDWR)
+ server_sock.close()
+
+ server_listener.shutdown(socket.SHUT_RDWR)
+ server_listener.close()
+
+ # search the router log file to verify Q2 was hit
+
+ block_ct = 0
+ unblock_ct = 0
+ with io.open(self.INT_A.logfile_path) as f:
+ for line in f:
+ if 'client link blocked on Q2 limit' in line:
+ block_ct += 1
+ if 'client link unblocked from Q2 limit' in line:
+ unblock_ct += 1
+ self.assertTrue(block_ct > 0)
+ self.assertEqual(block_ct, unblock_ct)
+
+ def test_02_backpressure_server(self):
+ """
+ Trigger Q2 backpressure against the HTTP server.
+ """
+ small_get_req = b'GET / HTTP/1.1\r\nContent-Length: 0\r\n\r\n'
+
+ # create a listener socket to act as the server service
+ server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_listener.settimeout(TIMEOUT)
+ server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_listener.bind(('', self.http_server_port))
+ server_listener.listen(1)
+
+ # block until router connects
+ server_sock, host_port = server_listener.accept()
+ server_sock.settimeout(0.5)
+ server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+ # create a client connection to the router
+ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ client_sock.settimeout(0.5)
+ client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ client_sock.connect((host_port[0], self.http_listener_port))
+
+ # send GET request - expect this to be successful
+ count = self._write_until_full(client_sock, small_get_req, 1.0)
+ self.assertEqual(len(small_get_req), count)
+
+ request = self._read_until_empty(server_sock, 5.0)
+ self.assertEqual(len(small_get_req), len(request))
+
+ # send a Very Long response, expecting it to block at some point
+ chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n'
+ last_chunk = b'0 \r\n\r\n'
+ response = b'HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n'
+
+ count = self._write_until_full(server_sock, response, 1.0)
+ self.assertEqual(len(response), count)
+
+ count = 0
+ deadline = time() + TIMEOUT
+ while deadline >= time():
+ count = self._write_until_full(server_sock, chunk, 5.0)
+ if count < len(chunk):
+ break
+ self.assertFalse(time() > deadline,
+ "Server never blocked as expected!")
+
+ # server should now be in Q2 block. Drain the client to unblock Q2
+ _ = self._read_until_empty(client_sock, 2.0)
+
+ # finish the response
+ if count:
+ remainder = self._write_until_full(server_sock, chunk[count:], 1.0)
+ self.assertEqual(len(chunk), count + remainder)
+
+ count = self._write_until_full(server_sock, last_chunk, 1.0)
+ self.assertEqual(len(last_chunk), count)
+ server_sock.shutdown(socket.SHUT_RDWR)
+ server_sock.close()
+
+ _ = self._read_until_empty(client_sock, 1.0)
+ client_sock.shutdown(socket.SHUT_RDWR)
+ client_sock.close()
+
+ server_listener.shutdown(socket.SHUT_RDWR)
+ server_listener.close()
+
+ # search the router log file to verify Q2 was hit
+
+ block_ct = 0
+ unblock_ct = 0
+ with io.open(self.INT_A.logfile_path) as f:
+ for line in f:
+ if 'server link blocked on Q2 limit' in line:
+ block_ct += 1
+ if 'server link unblocked from Q2 limit' in line:
+ unblock_ct += 1
+ self.assertTrue(block_ct > 0)
+ self.assertEqual(block_ct, unblock_ct)
+
+
if __name__ == '__main__':
unittest.main(main_module())