blob: 439bc0c62befddfd60cc2f6f3b2e4b5a98f15949 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
# Test the HTTP/1.x Adaptor
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import errno
import io
import select
import socket
import sys
from threading import Thread
from time import sleep, time
import uuid
try:
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.client import HTTPConnection
from http.client import HTTPException
except ImportError:
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from httplib import HTTPConnection, HTTPException
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
from system_test import TestCase, unittest, main_module, Qdrouterd, QdManager
from system_test import TIMEOUT, Logger, AsyncTestSender, AsyncTestReceiver
class RequestMsg(object):
"""
A 'hardcoded' HTTP request message. This class writes its request
message to the HTTPConnection.
"""
def __init__(self, method, target, headers=None, body=None):
self.method = method
self.target = target
self.headers = headers or {}
self.body = body
def send_request(self, conn, extra_headers=None):
extra_headers = extra_headers or {}
conn.putrequest(self.method, self.target)
for key, value in self.headers.items():
conn.putheader(key, value)
for key, value in extra_headers.items():
conn.putheader(key, value)
conn.endheaders()
if self.body:
conn.send(self.body)
class ResponseMsg(object):
"""
A 'hardcoded' HTTP response message. This class writes its response
message when called by the HTTPServer via the BaseHTTPRequestHandler
"""
def __init__(self, status, version=None, reason=None,
headers=None, body=None, error=False):
self.status = status
self.version = version or "HTTP/1.1"
self.reason = reason
self.headers = headers or {}
self.body = body
self.error = error
def send_response(self, handler, extra_headers=None):
extra_headers = extra_headers or {}
if self.error:
handler.send_error(self.status,
message=self.reason)
return
handler.send_response(self.status, self.reason)
for key, value in self.headers.items():
handler.send_header(key, value)
for key, value in extra_headers.items():
handler.send_header(key, value)
handler.end_headers()
if self.body:
handler.wfile.write(self.body)
handler.wfile.flush()
class ResponseValidator(object):
"""
Validate a response as received by the HTTP client
"""
def __init__(self, status=200, expect_headers=None, expect_body=None):
if expect_headers is None:
expect_headers = {}
self.status = status
self.expect_headers = expect_headers
self.expect_body = expect_body
def check_response(self, rsp):
if self.status and rsp.status != self.status:
raise Exception("Bad response code, expected %s got %s"
% (self.status, rsp.status))
for key, value in self.expect_headers.items():
if rsp.getheader(key) != value:
raise Exception("Missing/bad header (%s), expected %s got %s"
% (key, value, rsp.getheader(key)))
body = rsp.read()
if (self.expect_body and self.expect_body != body):
raise Exception("Bad response body expected %s got %s"
% (self.expect_body, body))
return body
class RequestHandler(BaseHTTPRequestHandler):
"""
Dispatches requests received by the HTTPServer based on the method
"""
protocol_version = 'HTTP/1.1'
def _execute_request(self, tests):
for req, resp, val in tests:
if req.target == self.path:
xhdrs = None
if "test-echo" in self.headers:
xhdrs = {"test-echo":
self.headers["test-echo"]}
self._consume_body()
if not isinstance(resp, list):
resp = [resp]
for r in resp:
r.send_response(self, extra_headers=xhdrs)
self.server.request_count += 1
return
self.send_error(404, "Not Found")
def do_GET(self):
self._execute_request(self.server.system_tests["GET"])
def do_HEAD(self):
self._execute_request(self.server.system_tests["HEAD"])
def do_POST(self):
if self.path == "/SHUTDOWN":
self.send_response(200, "OK")
self.send_header("Content-Length", "13")
self.end_headers()
self.wfile.write(b'Server Closed')
self.wfile.flush()
self.close_connection = True
self.server.server_killed = True
return
self._execute_request(self.server.system_tests["POST"])
def do_PUT(self):
self._execute_request(self.server.system_tests["PUT"])
# these overrides just quiet the test output
# comment them out to help debug:
def log_request(self, code=None, size=None):
pass
def log_message(self, format=None, *args):
pass
def _consume_body(self):
"""
Read the entire body off the rfile. This must be done to allow
multiple requests on the same socket
"""
if self.command == 'HEAD':
return b''
for key, value in self.headers.items():
if key.lower() == 'content-length':
return self.rfile.read(int(value))
if key.lower() == 'transfer-encoding' \
and 'chunked' in value.lower():
body = b''
while True:
header = self.rfile.readline().strip().split(b';')[0]
hlen = int(header, base=16)
if hlen > 0:
data = self.rfile.read(hlen + 2) # 2 = \r\n
body += data[:-2]
else:
self.rfile.readline() # discard last \r\n
break
return body
return self.rfile.read()
class RequestHandler10(RequestHandler):
"""
RequestHandler that forces the server to use HTTP version 1.0 semantics
"""
protocol_version = 'HTTP/1.0'
class MyHTTPServer(HTTPServer):
"""
Adds a switch to the HTTPServer to allow it to exit gracefully
"""
def __init__(self, addr, handler_cls, testcases):
self.system_tests = testcases
self.request_count = 0
HTTPServer.__init__(self, addr, handler_cls)
def server_close(self):
try:
# force immediate close of listening socket
self.socket.shutdown(socket.SHUT_RDWR)
except Exception:
pass
HTTPServer.server_close(self)
class TestServer(object):
"""
A HTTPServer running in a separate thread
"""
def __init__(self, server_port, client_port, tests, handler_cls=None):
self._logger = Logger(title="TestServer", print_to_console=False)
self._client_port = client_port
self._server_addr = ("", server_port)
self._server = MyHTTPServer(self._server_addr,
handler_cls or RequestHandler,
tests)
self._server.allow_reuse_address = True
self._thread = Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
def _run(self):
self._logger.log("TestServer listening on %s:%s" % self._server_addr)
try:
self._server.server_killed = False
while not self._server.server_killed:
self._server.handle_request()
except Exception as exc:
self._logger.log("TestServer %s crash: %s" %
(self._server_addr, exc))
raise
self._logger.log("TestServer %s:%s closed" % self._server_addr)
def wait(self, timeout=TIMEOUT):
self._logger.log("TestServer %s:%s shutting down" % self._server_addr)
self.request_count = 0
if self._thread.is_alive():
client = HTTPConnection("127.0.0.1:%s" % self._client_port,
timeout=TIMEOUT)
client.putrequest("POST", "/SHUTDOWN")
client.putheader("Content-Length", "0")
client.endheaders()
# 13 == len('Server Closed')
client.getresponse().read(13)
client.close()
self._thread.join(timeout=TIMEOUT)
if self._server:
self._server.server_close()
self.request_count = self._server.request_count
del self._server
sleep(0.5) # fudge factor allow socket close to complete
class ThreadedTestClient(object):
"""
An HTTP client running in a separate thread
"""
def __init__(self, tests, port, repeat=1):
self._id = uuid.uuid4().hex
self._conn_addr = ("127.0.0.1:%s" % port)
self._tests = tests
self._repeat = repeat
self._logger = Logger(title="TestClient: %s" % self._id,
print_to_console=False)
self._thread = Thread(target=self._run)
self._thread.daemon = True
self.error = None
self.count = 0
self._thread.start()
def _run(self):
self._logger.log("TestClient connecting on %s" % self._conn_addr)
client = HTTPConnection(self._conn_addr, timeout=TIMEOUT)
self._logger.log("TestClient connected")
for loop in range(self._repeat):
self._logger.log("TestClient start request %d" % loop)
for op, tests in self._tests.items():
for req, _, val in tests:
self._logger.log("TestClient sending %s %s request" % (op, req.target))
req.send_request(client,
{"test-echo": "%s-%s-%s-%s" % (self._id,
loop,
op,
req.target)})
self._logger.log("TestClient getting %s response" % op)
try:
rsp = client.getresponse()
except HTTPException as exc:
self._logger.log("TestClient response failed: %s" % exc)
self.error = str(exc)
return
self._logger.log("TestClient response %s received" % op)
if val:
try:
body = val.check_response(rsp)
except Exception as exc:
self._logger.log("TestClient response invalid: %s"
% str(exc))
self.error = "client failed: %s" % str(exc)
return
if req.method == "BODY" and body != b'':
self._logger.log("TestClient response invalid: %s"
% "body present!")
self.error = "error: body present!"
return
self.count += 1
self._logger.log("TestClient request %s %s completed!" %
(op, req.target))
client.close()
self._logger.log("TestClient to %s closed" % self._conn_addr)
def wait(self, timeout=TIMEOUT):
self._thread.join(timeout=TIMEOUT)
self._logger.log("TestClient %s shut down" % self._conn_addr)
sleep(0.5) # fudge factor allow socket close to complete
def dump_log(self):
self._logger.dump()
def http1_ping(sport, cport):
"""
Test the HTTP path by doing a simple GET request
"""
TEST = {
"GET": [
(RequestMsg("GET", "/GET/ping",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": 4,
"Content-Type": "text/plain;charset=utf-8"},
body=b'pong'),
ResponseValidator(expect_body=b'pong'))
]
}
server = TestServer(server_port=sport,
client_port=cport,
tests=TEST)
client = ThreadedTestClient(tests=TEST, port=cport)
client.wait()
server.wait()
return (client.count, client.error)
class Http1AdaptorManagementTest(TestCase):
"""
Test Creation and deletion of HTTP1 management entities
"""
@classmethod
def setUpClass(cls):
super(Http1AdaptorManagementTest, cls).setUpClass()
cls.http_server_port = cls.tester.get_port()
cls.http_listener_port = cls.tester.get_port()
config = [
('router', {'mode': 'standalone',
'id': 'HTTP1MgmtTest',
'allowUnsettledMulticast': 'yes'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
config = Qdrouterd.Config(config)
cls.router = cls.tester.qdrouterd('HTTP1MgmtTest', config, wait=True)
def test_01_mgmt(self):
"""
Create and delete HTTP1 connectors and listeners
"""
LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener'
CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector'
CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection'
mgmt = self.router.management
self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
mgmt.create(type=CONNECTOR_TYPE,
name="ServerConnector",
attributes={'address': 'http1',
'port': self.http_server_port,
'protocolVersion': 'HTTP1'})
mgmt.create(type=LISTENER_TYPE,
name="ClientListener",
attributes={'address': 'http1',
'port': self.http_listener_port,
'protocolVersion': 'HTTP1'})
# verify the entities have been created and http traffic works
self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
count, error = http1_ping(sport=self.http_server_port,
cport=self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
#
# delete the connector and wait for the associated connection to be
# removed
#
mgmt.delete(type=CONNECTOR_TYPE, name="ServerConnector")
self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
hconns = 0
retry = 20 # 20 * 0.25 = 5 sec
while retry:
obj = mgmt.query(type=CONNECTION_TYPE,
attribute_names=["protocol"])
for item in obj.get_dicts():
if "http/1.x" in item["protocol"]:
hconns += 1
if hconns == 0:
break
sleep(0.25)
retry -= 1
self.assertEqual(0, hconns, msg="HTTP connection not deleted")
# When a connector is configured the router will periodically attempt
# to connect to the server address. To prove that the connector has
# been completely removed listen for connection attempts on the server
# port.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("", self.http_server_port))
s.setblocking(1)
s.settimeout(3) # reconnect attempts every 2.5 seconds
s.listen(1)
with self.assertRaises(socket.timeout):
conn, addr = s.accept()
s.close()
#
# re-create the connector and verify it works
#
mgmt.create(type=CONNECTOR_TYPE,
name="ServerConnector",
attributes={'address': 'http1',
'port': self.http_server_port,
'protocolVersion': 'HTTP1'})
self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
count, error = http1_ping(sport=self.http_server_port,
cport=self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
class Http1AdaptorOneRouterTest(TestCase):
"""
Test HTTP servers and clients attached to a standalone router
"""
# HTTP/1.1 compliant test cases
TESTS_11 = {
#
# GET
#
"GET": [
(RequestMsg("GET", "/GET/error",
headers={"Content-Length": 0}),
ResponseMsg(400, reason="Bad breath", error=True),
ResponseValidator(status=400)),
(RequestMsg("GET", "/GET/no_content",
headers={"Content-Length": 0}),
ResponseMsg(204, reason="No Content"),
ResponseValidator(status=204)),
(RequestMsg("GET", "/GET/content_len",
headers={"Content-Length": "00"}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": 1,
"Content-Type": "text/plain;charset=utf-8"},
body=b'?'),
ResponseValidator(expect_headers={'Content-Length': '1'},
expect_body=b'?')),
(RequestMsg("GET", "/GET/content_len_511",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": 511,
"Content-Type": "text/plain;charset=utf-8"},
body=b'X' * 511),
ResponseValidator(expect_headers={'Content-Length': '511'},
expect_body=b'X' * 511)),
(RequestMsg("GET", "/GET/content_len_4096",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": 4096,
"Content-Type": "text/plain;charset=utf-8"},
body=b'X' * 4096),
ResponseValidator(expect_headers={'Content-Length': '4096'},
expect_body=b'X' * 4096)),
(RequestMsg("GET", "/GET/chunked",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"transfer-encoding": "chunked",
"Content-Type": "text/plain;charset=utf-8"},
# note: the chunk length does not count the trailing CRLF
body=b'16\r\n'
+ b'Mary had a little pug \r\n'
+ b'1b\r\n'
+ b'Its name was "Skupper-Jack"\r\n'
+ b'0\r\n'
+ b'Optional: Trailer\r\n'
+ b'Optional: Trailer\r\n'
+ b'\r\n'),
ResponseValidator(expect_headers={'transfer-encoding': 'chunked'},
expect_body=b'Mary had a little pug Its name was "Skupper-Jack"')),
(RequestMsg("GET", "/GET/chunked_large",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"transfer-encoding": "chunked",
"Content-Type": "text/plain;charset=utf-8"},
# note: the chunk length does not count the trailing CRLF
body=b'1\r\n'
+ b'?\r\n'
+ b'800\r\n'
+ b'X' * 0x800 + b'\r\n'
+ b'13\r\n'
+ b'Y' * 0x13 + b'\r\n'
+ b'0\r\n'
+ b'Optional: Trailer\r\n'
+ b'Optional: Trailer\r\n'
+ b'\r\n'),
ResponseValidator(expect_headers={'transfer-encoding': 'chunked'},
expect_body=b'?' + b'X' * 0x800 + b'Y' * 0x13)),
(RequestMsg("GET", "/GET/info_content_len",
headers={"Content-Length": 0}),
[ResponseMsg(100, reason="Continue",
headers={"Blab": 1, "Blob": "?"}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": 1,
"Content-Type": "text/plain;charset=utf-8"},
body=b'?')],
ResponseValidator(expect_headers={'Content-Type': "text/plain;charset=utf-8"},
expect_body=b'?')),
# (RequestMsg("GET", "/GET/no_length",
# headers={"Content-Length": "0"}),
# ResponseMsg(200, reason="OK",
# headers={"Content-Type": "text/plain;charset=utf-8",
# "connection": "close"
# },
# body=b'Hi! ' * 1024 + b'X'),
# ResponseValidator(expect_body=b'Hi! ' * 1024 + b'X')),
],
#
# HEAD
#
"HEAD": [
(RequestMsg("HEAD", "/HEAD/test_01",
headers={"Content-Length": "0"}),
ResponseMsg(200, headers={"App-Header-1": "Value 01",
"Content-Length": "10",
"App-Header-2": "Value 02"},
body=None),
ResponseValidator(expect_headers={"App-Header-1": "Value 01",
"Content-Length": "10",
"App-Header-2": "Value 02"})
),
(RequestMsg("HEAD", "/HEAD/test_02",
headers={"Content-Length": "0"}),
ResponseMsg(200, headers={"App-Header-1": "Value 01",
"Transfer-Encoding": "chunked",
"App-Header-2": "Value 02"}),
ResponseValidator(expect_headers={"App-Header-1": "Value 01",
"Transfer-Encoding": "chunked",
"App-Header-2": "Value 02"})),
(RequestMsg("HEAD", "/HEAD/test_03",
headers={"Content-Length": "0"}),
ResponseMsg(200, headers={"App-Header-3": "Value 03"}),
ResponseValidator(expect_headers={"App-Header-3": "Value 03"})),
],
#
# POST
#
"POST": [
(RequestMsg("POST", "/POST/test_01",
headers={"App-Header-1": "Value 01",
"Content-Length": "19",
"Content-Type": "application/x-www-form-urlencoded"},
body=b'one=1&two=2&three=3'),
ResponseMsg(200, reason="OK",
headers={"Response-Header": "whatever",
"Transfer-Encoding": "chunked"},
body=b'8\r\n'
+ b'12345678\r\n'
+ b'f\r\n'
+ b'abcdefghijklmno\r\n'
+ b'000\r\n'
+ b'\r\n'),
ResponseValidator(expect_body=b'12345678abcdefghijklmno')
),
(RequestMsg("POST", "/POST/test_02",
headers={"App-Header-1": "Value 01",
"Transfer-Encoding": "chunked"},
body=b'01\r\n'
+ b'!\r\n'
+ b'0\r\n\r\n'),
ResponseMsg(200, reason="OK",
headers={"Response-Header": "whatever",
"Content-Length": "9"},
body=b'Hi There!'),
ResponseValidator(expect_body=b'Hi There!')
),
],
#
# PUT
#
"PUT": [
(RequestMsg("PUT", "/PUT/test_01",
headers={"Put-Header-1": "Value 01",
"Transfer-Encoding": "chunked",
"Content-Type": "text/plain;charset=utf-8"},
body=b'80\r\n'
+ b'$' * 0x80 + b'\r\n'
+ b'0\r\n\r\n'),
ResponseMsg(201, reason="Created",
headers={"Response-Header": "whatever",
"Content-length": "3"},
body=b'ABC'),
ResponseValidator(status=201, expect_body=b'ABC')
),
(RequestMsg("PUT", "/PUT/test_02",
headers={"Put-Header-1": "Value 01",
"Content-length": "0",
"Content-Type": "text/plain;charset=utf-8"}),
ResponseMsg(201, reason="Created",
headers={"Response-Header": "whatever",
"Transfer-Encoding": "chunked"},
body=b'1\r\n$\r\n0\r\n\r\n'),
ResponseValidator(status=201, expect_body=b'$')
),
]
}
# HTTP/1.0 compliant test cases (no chunked, response length unspecified)
TESTS_10 = {
#
# GET
#
"GET": [
(RequestMsg("GET", "/GET/error",
headers={"Content-Length": 0}),
ResponseMsg(400, reason="Bad breath", error=True),
ResponseValidator(status=400)),
(RequestMsg("GET", "/GET/no_content",
headers={"Content-Length": 0}),
ResponseMsg(204, reason="No Content"),
ResponseValidator(status=204)),
(RequestMsg("GET", "/GET/content_len_511",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": 511,
"Content-Type": "text/plain;charset=utf-8"},
body=b'X' * 511),
ResponseValidator(expect_headers={'Content-Length': '511'},
expect_body=b'X' * 511)),
(RequestMsg("GET", "/GET/content_len_4096",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Type": "text/plain;charset=utf-8"},
body=b'X' * 4096),
ResponseValidator(expect_headers={"Content-Type": "text/plain;charset=utf-8"},
expect_body=b'X' * 4096)),
(RequestMsg("GET", "/GET/info_content_len",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Type": "text/plain;charset=utf-8"},
body=b'?'),
ResponseValidator(expect_headers={'Content-Type': "text/plain;charset=utf-8"},
expect_body=b'?')),
# test support for "folded headers"
(RequestMsg("GET", "/GET/folded_header_01",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Type": "text/plain;charset=utf-8",
"Content-Length": 1,
"folded-header": "One\r\n \r\n\tTwo"},
body=b'X'),
ResponseValidator(expect_headers={"Content-Type":
"text/plain;charset=utf-8",
"folded-header":
"One \tTwo"},
expect_body=b'X')),
(RequestMsg("GET", "/GET/folded_header_02",
headers={"Content-Length": 0}),
ResponseMsg(200, reason="OK",
headers={"Content-Type": "text/plain;charset=utf-8",
"Content-Length": 1,
"folded-header": "\r\n \r\n\tTwo",
"another-header": "three"},
body=b'X'),
ResponseValidator(expect_headers={"Content-Type":
"text/plain;charset=utf-8",
# trim leading and
# trailing ws:
"folded-header":
"Two",
"another-header":
"three"},
expect_body=b'X')),
],
#
# HEAD
#
"HEAD": [
(RequestMsg("HEAD", "/HEAD/test_01",
headers={"Content-Length": "0"}),
ResponseMsg(200, headers={"App-Header-1": "Value 01",
"Content-Length": "10",
"App-Header-2": "Value 02"},
body=None),
ResponseValidator(expect_headers={"App-Header-1": "Value 01",
"Content-Length": "10",
"App-Header-2": "Value 02"})
),
(RequestMsg("HEAD", "/HEAD/test_03",
headers={"Content-Length": "0"}),
ResponseMsg(200, headers={"App-Header-3": "Value 03"}),
ResponseValidator(expect_headers={"App-Header-3": "Value 03"})),
],
#
# POST
#
"POST": [
(RequestMsg("POST", "/POST/test_01",
headers={"App-Header-1": "Value 01",
"Content-Length": "19",
"Content-Type": "application/x-www-form-urlencoded"},
body=b'one=1&two=2&three=3'),
ResponseMsg(200, reason="OK",
headers={"Response-Header": "whatever"},
body=b'12345678abcdefghijklmno'),
ResponseValidator(expect_body=b'12345678abcdefghijklmno')
),
(RequestMsg("POST", "/POST/test_02",
headers={"App-Header-1": "Value 01",
"Content-Length": "5"},
body=b'01234'),
ResponseMsg(200, reason="OK",
headers={"Response-Header": "whatever",
"Content-Length": "9"},
body=b'Hi There!'),
ResponseValidator(expect_body=b'Hi There!')
),
],
#
# PUT
#
"PUT": [
(RequestMsg("PUT", "/PUT/test_01",
headers={"Put-Header-1": "Value 01",
"Content-Length": "513",
"Content-Type": "text/plain;charset=utf-8"},
body=b'$' * 513),
ResponseMsg(201, reason="Created",
headers={"Response-Header": "whatever",
"Content-length": "3"},
body=b'ABC'),
ResponseValidator(status=201, expect_body=b'ABC')
),
(RequestMsg("PUT", "/PUT/test_02",
headers={"Put-Header-1": "Value 01",
"Content-length": "0",
"Content-Type": "text/plain;charset=utf-8"}),
ResponseMsg(201, reason="Created",
headers={"Response-Header": "whatever"},
body=b'No Content Length'),
ResponseValidator(status=201, expect_body=b'No Content Length')
),
]
}
@classmethod
def setUpClass(cls):
"""Start a router"""
super(Http1AdaptorOneRouterTest, cls).setUpClass()
def router(name, mode, extra):
config = [
('router', {'mode': mode,
'id': name,
'allowUnsettledMulticast': 'yes'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
if extra:
config.extend(extra)
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
return cls.routers[-1]
# configuration:
# One interior router, two servers (one running as HTTP/1.0)
#
# +----------------+
# | INT.A |
# +----------------+
# ^ ^
# | |
# V V
# <clients> <servers>
cls.routers = []
cls.http_server11_port = cls.tester.get_port()
cls.http_server10_port = cls.tester.get_port()
cls.http_listener11_port = cls.tester.get_port()
cls.http_listener10_port = cls.tester.get_port()
router('INT.A', 'standalone',
[('httpConnector', {'port': cls.http_server11_port,
'protocolVersion': 'HTTP1',
'address': 'testServer11'}),
('httpConnector', {'port': cls.http_server10_port,
'protocolVersion': 'HTTP1',
'address': 'testServer10'}),
('httpListener', {'port': cls.http_listener11_port,
'protocolVersion': 'HTTP1',
'address': 'testServer11'}),
('httpListener', {'port': cls.http_listener10_port,
'protocolVersion': 'HTTP1',
'address': 'testServer10'})
])
cls.INT_A = cls.routers[0]
cls.INT_A.listener = cls.INT_A.addresses[0]
cls.http11_server = TestServer(server_port=cls.http_server11_port,
client_port=cls.http_listener11_port,
tests=cls.TESTS_11)
cls.http10_server = TestServer(server_port=cls.http_server10_port,
client_port=cls.http_listener10_port,
tests=cls.TESTS_10,
handler_cls=RequestHandler10)
cls.INT_A.wait_connectors()
@classmethod
def tearDownClass(cls):
if cls.http11_server:
cls.http11_server.wait()
if cls.http10_server:
cls.http10_server.wait()
super(Http1AdaptorOneRouterTest, cls).tearDownClass()
def _do_request(self, client, tests):
for req, _, val in tests:
req.send_request(client)
rsp = client.getresponse()
try:
body = val.check_response(rsp)
except Exception as exc:
self.fail("request failed: %s" % str(exc))
if req.method == "BODY":
self.assertEqual(b'', body)
def test_001_get(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener11_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_11["GET"])
client.close()
def test_002_head(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener11_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_11["HEAD"])
client.close()
def test_003_post(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener11_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_11["POST"])
client.close()
def test_004_put(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener11_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_11["PUT"])
client.close()
def test_005_get_10(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener10_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_10["GET"])
client.close()
def test_006_head_10(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener10_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_10["HEAD"])
client.close()
def test_007_post_10(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener10_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_10["POST"])
client.close()
def test_008_put_10(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener10_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_10["PUT"])
client.close()
def test_000_stats(self):
client = HTTPConnection("127.0.0.1:%s" % self.http_listener11_port,
timeout=TIMEOUT)
self._do_request(client, self.TESTS_11["GET"])
self._do_request(client, self.TESTS_11["POST"])
client.close()
qd_manager = QdManager(self, address=self.INT_A.listener)
stats = qd_manager.query('org.apache.qpid.dispatch.httpRequestInfo')
self.assertEqual(len(stats), 2)
for s in stats:
self.assertEqual(s.get('requests'), 10)
self.assertEqual(s.get('details').get('GET:400'), 1)
self.assertEqual(s.get('details').get('GET:200'), 6)
self.assertEqual(s.get('details').get('GET:204'), 1)
self.assertEqual(s.get('details').get('POST:200'), 2)
def assert_approximately_equal(a, b):
self.assertTrue((abs(a - b) / a) < 0.1)
if stats[0].get('direction') == 'out':
self.assertEqual(stats[1].get('direction'), 'in')
assert_approximately_equal(stats[0].get('bytesOut'), 1059)
assert_approximately_equal(stats[0].get('bytesIn'), 8849)
assert_approximately_equal(stats[1].get('bytesOut'), 8830)
assert_approximately_equal(stats[1].get('bytesIn'), 1059)
else:
self.assertEqual(stats[0].get('direction'), 'in')
self.assertEqual(stats[1].get('direction'), 'out')
assert_approximately_equal(stats[0].get('bytesOut'), 8849)
assert_approximately_equal(stats[0].get('bytesIn'), 1059)
assert_approximately_equal(stats[1].get('bytesOut'), 1059)
assert_approximately_equal(stats[1].get('bytesIn'), 8830)
class Http1AdaptorEdge2EdgeTest(TestCase):
"""
Test an HTTP servers and clients attached to edge routers separated by an
interior router
"""
@classmethod
def setUpClass(cls):
"""Start a router"""
super(Http1AdaptorEdge2EdgeTest, cls).setUpClass()
def router(name, mode, extra):
config = [
('router', {'mode': mode,
'id': name,
'allowUnsettledMulticast': 'yes'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
if extra:
config.extend(extra)
config = Qdrouterd.Config(config)
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
return cls.routers[-1]
# configuration:
# one edge, one interior
#
# +-------+ +---------+ +-------+
# | EA1 |<==>| INT.A |<==>| EA2 |
# +-------+ +---------+ +-------+
# ^ ^
# | |
# V V
# <clients> <servers>
cls.routers = []
cls.INTA_edge1_port = cls.tester.get_port()
cls.INTA_edge2_port = cls.tester.get_port()
cls.http_server11_port = cls.tester.get_port()
cls.http_listener11_port = cls.tester.get_port()
cls.http_server10_port = cls.tester.get_port()
cls.http_listener10_port = cls.tester.get_port()
router('INT.A', 'interior',
[('listener', {'role': 'edge', 'port': cls.INTA_edge1_port}),
('listener', {'role': 'edge', 'port': cls.INTA_edge2_port}),
])
cls.INT_A = cls.routers[0]
cls.INT_A.listener = cls.INT_A.addresses[0]
router('EA1', 'edge',
[('connector', {'name': 'uplink', 'role': 'edge',
'port': cls.INTA_edge1_port}),
('httpListener', {'port': cls.http_listener11_port,
'protocolVersion': 'HTTP1',
'address': 'testServer11'}),
('httpListener', {'port': cls.http_listener10_port,
'protocolVersion': 'HTTP1',
'address': 'testServer10'})
])
cls.EA1 = cls.routers[1]
cls.EA1.listener = cls.EA1.addresses[0]
router('EA2', 'edge',
[('connector', {'name': 'uplink', 'role': 'edge',
'port': cls.INTA_edge2_port}),
('httpConnector', {'port': cls.http_server11_port,
'protocolVersion': 'HTTP1',
'address': 'testServer11'}),
('httpConnector', {'port': cls.http_server10_port,
'protocolVersion': 'HTTP1',
'address': 'testServer10'})
])
cls.EA2 = cls.routers[-1]
cls.EA2.listener = cls.EA2.addresses[0]
cls.INT_A.wait_address('EA1')
cls.INT_A.wait_address('EA2')
def test_01_concurrent_requests(self):
"""
Test multiple concurrent clients sending streaming messages
"""
REQ_CT = 3 # 3 requests per TEST_*
TESTS_11 = {
"PUT": [
(RequestMsg("PUT", "/PUT/test_01_concurrent_requests_11",
headers={
"Transfer-encoding": "chunked",
"Content-Type": "text/plain;charset=utf-8"
},
# ~384K to trigger Q2
body=b'20000\r\n' + b'1' * 0x20000 + b'\r\n'
+ b'20000\r\n' + b'2' * 0x20000 + b'\r\n'
+ b'20000\r\n' + b'3' * 0x20000 + b'\r\n'
+ b'13\r\nEND OF TRANSMISSION\r\n'
+ b'0\r\n\r\n'),
ResponseMsg(201, reason="Created",
headers={"Test-Header": "/PUT/test_01_concurrent_requests_11",
"Content-Length": "0"}),
ResponseValidator(status=201)
)],
"GET": [
(RequestMsg("GET", "/GET/test_01_concurrent_requests_11_small",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={
"Content-Length": "19",
"Content-Type": "text/plain;charset=utf-8",
"Test-Header": "/GET/test_01_concurrent_requests_11_small"
},
body=b'END OF TRANSMISSION'),
ResponseValidator(status=200)),
(RequestMsg("GET", "/GET/test_01_concurrent_requests_11",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={
"transfer-Encoding": "chunked",
"Content-Type": "text/plain;charset=utf-8",
"Test-Header": "/GET/test_01_concurrent_requests_11"
},
# ~384K to trigger Q2
body=b'20000\r\n' + b'1' * 0x20000 + b'\r\n'
+ b'20000\r\n' + b'2' * 0x20000 + b'\r\n'
+ b'20000\r\n' + b'3' * 0x20000 + b'\r\n'
+ b'13\r\nEND OF TRANSMISSION\r\n'
+ b'0\r\n\r\n'),
ResponseValidator(status=200)
)],
}
TESTS_10 = {
"POST": [
(RequestMsg("POST", "/POST/test_01_concurrent_requests_10",
headers={"Content-Type": "text/plain;charset=utf-8",
"Content-Length": "393216"},
body=b'P' * 393197
+ b'END OF TRANSMISSION'),
ResponseMsg(201, reason="Created",
headers={"Test-Header": "/POST/test_01_concurrent_requests_10",
"Content-Length": "0"}),
ResponseValidator(status=201)
)],
"GET": [
(RequestMsg("GET", "/GET/test_01_concurrent_requests_10_small",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
# no content-length, server must close conn when done
headers={"Test-Header": "/GET/test_01_concurrent_requests_10_small",
"Content-Type": "text/plain;charset=utf-8"},
body=b'END OF TRANSMISSION'),
ResponseValidator(status=200)),
(RequestMsg("GET", "/GET/test_01_concurrent_requests_10",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={"Test-Header": "/GET/test_01_concurrent_requests_10",
"Content-Length": "393215",
"Content-Type": "text/plain;charset=utf-8"},
body=b'G' * 393196
+ b'END OF TRANSMISSION'),
ResponseValidator(status=200)
)],
}
server11 = TestServer(server_port=self.http_server11_port,
client_port=self.http_listener11_port,
tests=TESTS_11)
server10 = TestServer(server_port=self.http_server10_port,
client_port=self.http_listener10_port,
tests=TESTS_10,
handler_cls=RequestHandler10)
self.EA2.wait_connectors()
repeat_ct = 10
client_ct = 4 # per version
clients = []
for _ in range(client_ct):
clients.append(ThreadedTestClient(TESTS_11,
self.http_listener11_port,
repeat=repeat_ct))
clients.append(ThreadedTestClient(TESTS_10,
self.http_listener10_port,
repeat=repeat_ct))
for client in clients:
client.wait()
try:
self.assertIsNone(client.error)
self.assertEqual(repeat_ct * REQ_CT, client.count)
except Exception:
client.dump_log()
raise
server11.wait()
self.assertEqual(client_ct * repeat_ct * REQ_CT,
server11.request_count)
server10.wait()
self.assertEqual(client_ct * repeat_ct * REQ_CT,
server10.request_count)
def test_02_credit_replenish(self):
"""
Verify credit is replenished by sending > the default credit window
requests across the routers. The default credit window is 250
"""
TESTS = {
"GET": [
(RequestMsg("GET", "/GET/test_02_credit_replenish",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": "24",
"Content-Type": "text/plain;charset=utf-8"},
body=b'test_02_credit_replenish'),
ResponseValidator(status=200),
),
]
}
server = TestServer(server_port=self.http_server11_port,
client_port=self.http_listener11_port,
tests=TESTS)
self.EA2.wait_connectors()
client = ThreadedTestClient(TESTS,
self.http_listener11_port,
repeat=300)
client.wait()
self.assertIsNone(client.error)
self.assertEqual(300, client.count)
server.wait()
def test_03_server_reconnect(self):
"""
Verify server reconnect logic.
"""
TESTS = {
"GET": [
(RequestMsg("GET", "/GET/test_03_server_reconnect",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": "24",
"Content-Type": "text/plain;charset=utf-8"},
body=b'test_03_server_reconnect'),
ResponseValidator(status=200),
),
]
}
# bring up the server and send some requests. This will cause the
# router to grant credit for clients
server = TestServer(server_port=self.http_server11_port,
client_port=self.http_listener11_port,
tests=TESTS)
self.EA2.wait_connectors()
client = ThreadedTestClient(TESTS,
self.http_listener11_port,
repeat=2)
client.wait()
self.assertIsNone(client.error)
self.assertEqual(2, client.count)
# simulate server loss. Fire up a client which should be granted
# credit since the adaptor does not immediately teardown the server
# links. This will cause the adaptor to run qdr_connection_process
# without a raw connection available to wake the I/O thread..
server.wait()
client = ThreadedTestClient(TESTS,
self.http_listener11_port,
repeat=2)
# the adaptor will detach the links to the server if the connection
# cannot be reestablished after 2.5 seconds. Restart the server before
# that occurrs to prevent client messages from being released with 503
# status.
server = TestServer(server_port=self.http_server11_port,
client_port=self.http_listener11_port,
tests=TESTS)
client.wait()
self.assertIsNone(client.error)
self.assertEqual(2, client.count)
server.wait()
def test_04_server_pining_for_the_fjords(self):
"""
Test permanent loss of server
"""
TESTS = {
"GET": [
(RequestMsg("GET", "/GET/test_04_fjord_pining",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": "20",
"Content-Type": "text/plain;charset=utf-8"},
body=b'test_04_fjord_pining'),
ResponseValidator(status=200),
),
]
}
# bring up the server and send some requests. This will cause the
# router to grant credit for clients
server = TestServer(server_port=self.http_server11_port,
client_port=self.http_listener11_port,
tests=TESTS)
self.EA2.wait_connectors()
client = ThreadedTestClient(TESTS, self.http_listener11_port)
client.wait()
self.assertIsNone(client.error)
self.assertEqual(1, client.count)
TESTS_FAIL = {
"GET": [
(RequestMsg("GET", "/GET/test_04_fjord_pining",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": "20",
"Content-Type": "text/plain;charset=utf-8"},
body=b'test_04_fjord_pining'),
ResponseValidator(status=503),
),
]
}
# Kill the server then issue client requests. These requests will be
# held on the server's outgoing links until they expire (2.5 seconds).
# At that point the client will receive a 503 response.
server.wait()
client = ThreadedTestClient(TESTS_FAIL, self.http_listener11_port)
client.wait()
self.assertIsNone(client.error)
self.assertEqual(1, client.count)
# ensure links recover once the server re-appears
server = TestServer(server_port=self.http_server11_port,
client_port=self.http_listener11_port,
tests=TESTS)
self.EA2.wait_connectors()
client = ThreadedTestClient(TESTS, self.http_listener11_port)
client.wait()
self.assertIsNone(client.error)
self.assertEqual(1, client.count)
server.wait()
def test_05_large_streaming_msg(self):
"""
Verify large streaming message transfer
"""
TESTS_11 = {
"PUT": [
(RequestMsg("PUT", "/PUT/streaming_test_11",
headers={
"Transfer-encoding": "chunked",
"Content-Type": "text/plain;charset=utf-8"
},
# 4 chunks each ~= 600K
body=b'927C1\r\n' + b'0' * 0x927C0 + b'X\r\n'
+ b'927C0\r\n' + b'1' * 0x927C0 + b'\r\n'
+ b'927C1\r\n' + b'2' * 0x927C0 + b'X\r\n'
+ b'927C0\r\n' + b'3' * 0x927C0 + b'\r\n'
+ b'0\r\n\r\n'),
ResponseMsg(201, reason="Created",
headers={"Response-Header": "data",
"Content-Length": "0"}),
ResponseValidator(status=201))
],
"GET": [
(RequestMsg("GET", "/GET/streaming_test_11",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={
"transfer-Encoding": "chunked",
"Content-Type": "text/plain;charset=utf-8"
},
# two 1.2MB chunk
body=b'124f80\r\n' + b'4' * 0x124F80 + b'\r\n'
+ b'124f80\r\n' + b'5' * 0x124F80 + b'\r\n'
+ b'0\r\n\r\n'),
ResponseValidator(status=200))
],
}
TESTS_10 = {
"POST": [
(RequestMsg("POST", "/POST/streaming_test_10",
headers={"Header-1": "H" * 2048,
"Content-Length": "2097155",
"Content-Type": "text/plain;charset=utf-8"},
body=b'P' * 2097155),
ResponseMsg(201, reason="Created",
headers={"Response-Header": "data",
"Content-Length": "0"}),
ResponseValidator(status=201))
],
"GET": [
(RequestMsg("GET", "/GET/streaming_test_10",
headers={"Content-Length": "000"}),
ResponseMsg(200, reason="OK",
headers={"Content-Length": "1999999",
"Content-Type": "text/plain;charset=utf-8"},
body=b'G' * 1999999),
ResponseValidator(status=200))
],
}
server11 = TestServer(server_port=self.http_server11_port,
client_port=self.http_listener11_port,
tests=TESTS_11)
server10 = TestServer(server_port=self.http_server10_port,
client_port=self.http_listener10_port,
tests=TESTS_10,
handler_cls=RequestHandler10)
self.EA2.wait_connectors()
client11 = ThreadedTestClient(TESTS_11,
self.http_listener11_port,
repeat=2)
client11.wait()
self.assertIsNone(client11.error)
self.assertEqual(4, client11.count)
client10 = ThreadedTestClient(TESTS_10,
self.http_listener10_port,
repeat=2)
client10.wait()
self.assertIsNone(client10.error)
self.assertEqual(4, client10.count)
server11.wait()
server10.wait()
class FakeHttpServerBase(object):
"""
A very base socket server to simulate HTTP server behaviors
"""
def __init__(self, host='', port=80, bufsize=1024):
super(FakeHttpServerBase, self).__init__()
self.host = host
self.port = port
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listener.settimeout(TIMEOUT)
self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listener.bind((host, port))
self.listener.listen(1)
self.conn, self.addr = self.listener.accept()
self.do_connect()
while True:
data = self.conn.recv(bufsize)
if not data:
break
self.do_data(data)
self.do_close()
def do_connect(self):
pass
def do_data(self, data):
pass
def do_close(self):
self.listener.shutdown(socket.SHUT_RDWR)
self.listener.close()
del self.listener
self.conn.shutdown(socket.SHUT_RDWR)
self.conn.close()
del self.conn
sleep(0.5) # fudge factor allow socket close to complete
class Http1AdaptorBadEndpointsTest(TestCase):
"""
Subject the router to mis-behaving HTTP endpoints.
"""
@classmethod
def setUpClass(cls):
"""
Single router configuration with one HTTPListener and one
HTTPConnector.
"""
super(Http1AdaptorBadEndpointsTest, cls).setUpClass()
cls.http_server_port = cls.tester.get_port()
cls.http_listener_port = cls.tester.get_port()
cls.http_fake_port = cls.tester.get_port()
config = [
('router', {'mode': 'standalone',
'id': 'TestBadEndpoints',
'allowUnsettledMulticast': 'yes'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('httpConnector', {'port': cls.http_server_port,
'protocolVersion': 'HTTP1',
'address': 'testServer'}),
('httpListener', {'port': cls.http_listener_port,
'protocolVersion': 'HTTP1',
'address': 'testServer'}),
('httpListener', {'port': cls.http_fake_port,
'protocolVersion': 'HTTP1',
'address': 'fakeServer'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
config = Qdrouterd.Config(config)
cls.INT_A = cls.tester.qdrouterd("TestBadEndpoints", config, wait=True)
cls.INT_A.listener = cls.INT_A.addresses[0]
def test_01_unsolicited_response(self):
"""
Create a server that sends an immediate Request Timeout response
without first waiting for a request to arrive.
"""
class UnsolicitedResponse(FakeHttpServerBase):
def __init__(self, host, port):
self.request_sent = False
super(UnsolicitedResponse, self).__init__(host, port)
def do_connect(self):
self.conn.sendall(b'HTTP/1.1 408 Request Timeout\r\n'
+ b'Content-Length: 10\r\n'
+ b'\r\n'
+ b'Bad Server')
self.request_sent = True
count, error = http1_ping(self.http_server_port,
self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
server = UnsolicitedResponse('', self.http_server_port)
self.assertTrue(server.request_sent)
count, error = http1_ping(self.http_server_port,
self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
def test_02_bad_request_message(self):
"""
Test various improperly constructed request messages
"""
server = TestServer(server_port=self.http_server_port,
client_port=self.http_listener_port,
tests={})
body_filler = "?" * 1024 * 300 # Q2
msg = Message(body="NOMSGID " + body_filler)
ts = AsyncTestSender(address=self.INT_A.listener,
target="testServer",
message=msg)
ts.wait()
self.assertEqual(1, ts.rejected)
msg = Message(body="NO REPLY TO " + body_filler)
msg.id = 1
ts = AsyncTestSender(address=self.INT_A.listener,
target="testServer",
message=msg)
ts.wait()
self.assertEqual(1, ts.rejected)
msg = Message(body="NO SUBJECT " + body_filler)
msg.id = 1
msg.reply_to = "amqp://fake/reply_to"
ts = AsyncTestSender(address=self.INT_A.listener,
target="testServer",
message=msg)
ts.wait()
self.assertEqual(1, ts.rejected)
msg = Message(body="NO APP PROPERTIES " + body_filler)
msg.id = 1
msg.reply_to = "amqp://fake/reply_to"
msg.subject = "GET"
ts = AsyncTestSender(address=self.INT_A.listener,
target="testServer",
message=msg)
ts.wait()
self.assertEqual(1, ts.rejected)
# TODO: fix body parsing (returns NEED_MORE)
# msg = Message(body="INVALID BODY " + body_filler)
# msg.id = 1
# msg.reply_to = "amqp://fake/reply_to"
# msg.subject = "GET"
# msg.properties = {"http:target": "/Some/target"}
# ts = AsyncTestSender(address=self.INT_A.listener,
# target="testServer",
# message=msg)
# ts.wait()
# self.assertEqual(1, ts.rejected);
server.wait()
# verify router is still sane:
count, error = http1_ping(self.http_server_port,
self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
def test_03_bad_response_message(self):
"""
Test various improperly constructed response messages
"""
DUMMY_TESTS = {
"GET": [
(RequestMsg("GET", "/GET/test_03_bad_response_message",
headers={"Content-Length": "000"}),
None,
None,
),
]
}
body_filler = "?" * 1024 * 300 # Q2
# fake server
rx = AsyncTestReceiver(self.INT_A.listener,
source="fakeServer")
# no correlation id:
client = ThreadedTestClient(DUMMY_TESTS,
self.http_fake_port)
req = rx.queue.get(timeout=TIMEOUT)
resp = Message(body="NO CORRELATION ID " + body_filler)
resp.to = req.reply_to
ts = AsyncTestSender(address=self.INT_A.listener,
target=req.reply_to,
message=resp)
ts.wait()
self.assertEqual(1, ts.rejected)
client.wait()
self.assertIsNotNone(client.error)
# missing application properties
client = ThreadedTestClient(DUMMY_TESTS,
self.http_fake_port)
req = rx.queue.get(timeout=TIMEOUT)
resp = Message(body="NO APPLICATION PROPS " + body_filler)
resp.to = req.reply_to
resp.correlation_id = req.id
ts = AsyncTestSender(address=self.INT_A.listener,
target=req.reply_to,
message=resp)
ts.wait()
self.assertEqual(1, ts.rejected)
client.wait()
self.assertIsNotNone(client.error)
# no status application property
client = ThreadedTestClient(DUMMY_TESTS,
self.http_fake_port)
req = rx.queue.get(timeout=TIMEOUT)
resp = Message(body="MISSING STATUS HEADER " + body_filler)
resp.to = req.reply_to
resp.correlation_id = req.id
resp.properties = {"stuff": "value"}
ts = AsyncTestSender(address=self.INT_A.listener,
target=req.reply_to,
message=resp)
ts.wait()
self.assertEqual(1, ts.rejected)
client.wait()
self.assertIsNotNone(client.error)
# TODO: fix body parsing (returns NEED_MORE)
# # invalid body format
# client = ThreadedTestClient(DUMMY_TESTS,
# self.http_fake_port)
# req = rx.queue.get(timeout=TIMEOUT)
# resp = Message(body="INVALID BODY FORMAT " + body_filler)
# resp.to = req.reply_to
# resp.correlation_id = req.id
# resp.properties = {"http:status": 200}
# ts = AsyncTestSender(address=self.INT_A.listener,
# target=req.reply_to,
# message=resp)
# ts.wait()
# self.assertEqual(1, ts.rejected);
# client.wait()
# self.assertIsNotNone(client.error)
rx.stop()
sleep(0.5) # fudge factor allow socket close to complete
# verify router is still sane:
count, error = http1_ping(self.http_server_port,
self.http_listener_port)
self.assertIsNone(error)
self.assertEqual(1, count)
class Http1AdaptorQ2Standalone(TestCase):
"""
Force Q2 blocking/recovery on both client and server endpoints. This test
uses a single router to ensure both client facing and server facing
Q2 components of the HTTP/1.x adaptor are triggered.
"""
@classmethod
def setUpClass(cls):
"""
Single router configuration with one HTTPListener and one
HTTPConnector.
"""
super(Http1AdaptorQ2Standalone, cls).setUpClass()
cls.http_server_port = cls.tester.get_port()
cls.http_listener_port = cls.tester.get_port()
config = [
('router', {'mode': 'standalone',
'id': 'RowdyRoddyRouter',
'allowUnsettledMulticast': 'yes'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('httpListener', {'port': cls.http_listener_port,
'protocolVersion': 'HTTP1',
'address': 'testServer'}),
('httpConnector', {'port': cls.http_server_port,
'protocolVersion': 'HTTP1',
'address': 'testServer'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
config = Qdrouterd.Config(config)
cls.INT_A = cls.tester.qdrouterd("TestBadEndpoints", config, wait=True)
cls.INT_A.listener = cls.INT_A.addresses[0]
def _write_until_full(self, sock, data, timeout):
"""
Write data to socket until either all data written or timeout.
Return the number of bytes written, which will == len(data) if timeout
not hit
"""
sock.setblocking(0)
sent = 0
while sent < len(data):
try:
_, rw, _ = select.select([], [sock], [], timeout)
except select.error as serror:
if serror[0] == errno.EINTR:
print("ignoring interrupt from select(): %s" % str(serror))
continue
raise # assuming fatal...
if rw:
sent += sock.send(data[sent:])
else:
break # timeout
return sent
def _read_until_empty(self, sock, timeout):
"""
Read data from socket until timeout occurs. Return read data.
"""
sock.setblocking(0)
data = b''
while True:
try:
rd, _, _ = select.select([sock], [], [], timeout)
except select.error as serror:
if serror[0] == errno.EINTR:
print("ignoring interrupt from select(): %s" % str(serror))
continue
raise # assuming fatal...
if rd:
data += sock.recv(4096)
else:
break # timeout
return data
def test_01_backpressure_client(self):
"""
Trigger Q2 backpressure against the HTTP client.
"""
# create a listener socket to act as the server service
server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_listener.settimeout(TIMEOUT)
server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_listener.bind(('', self.http_server_port))
server_listener.listen(1)
# block until router connects
server_sock, host_port = server_listener.accept()
server_sock.settimeout(0.5)
server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# create a client connection to the router
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.settimeout(0.5)
client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
client_sock.connect((host_port[0], self.http_listener_port))
# send a Very Large PUSH request, expecting it to block at some point
push_req_hdr = b'PUSH / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n'
count = self._write_until_full(client_sock, push_req_hdr, 1.0)
self.assertEqual(len(push_req_hdr), count)
chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n'
last_chunk = b'0 \r\n\r\n'
count = 0
deadline = time() + TIMEOUT
while deadline >= time():
count = self._write_until_full(client_sock, chunk, 5.0)
if count < len(chunk):
break
self.assertFalse(time() > deadline,
"Client never blocked as expected!")
# client should now be in Q2 block. Drain the server to unblock Q2
_ = self._read_until_empty(server_sock, 2.0)
# finish the PUSH
if count:
remainder = self._write_until_full(client_sock, chunk[count:], 1.0)
self.assertEqual(len(chunk), count + remainder)
count = self._write_until_full(client_sock, last_chunk, 1.0)
self.assertEqual(len(last_chunk), count)
# receive the request and reply
_ = self._read_until_empty(server_sock, 2.0)
response = b'HTTP/1.1 201 Created\r\nContent-Length: 0\r\n\r\n'
count = self._write_until_full(server_sock, response, 1.0)
self.assertEqual(len(response), count)
# complete the response read
_ = self._read_until_empty(client_sock, 2.0)
self.assertEqual(len(response), len(_))
client_sock.shutdown(socket.SHUT_RDWR)
client_sock.close()
server_sock.shutdown(socket.SHUT_RDWR)
server_sock.close()
server_listener.shutdown(socket.SHUT_RDWR)
server_listener.close()
# search the router log file to verify Q2 was hit
block_ct = 0
unblock_ct = 0
with io.open(self.INT_A.logfile_path) as f:
for line in f:
if 'client link blocked on Q2 limit' in line:
block_ct += 1
if 'client link unblocked from Q2 limit' in line:
unblock_ct += 1
self.assertTrue(block_ct > 0)
self.assertEqual(block_ct, unblock_ct)
def test_02_backpressure_server(self):
"""
Trigger Q2 backpressure against the HTTP server.
"""
small_get_req = b'GET / HTTP/1.1\r\nContent-Length: 0\r\n\r\n'
# create a listener socket to act as the server service
server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_listener.settimeout(TIMEOUT)
server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_listener.bind(('', self.http_server_port))
server_listener.listen(1)
# block until router connects
server_sock, host_port = server_listener.accept()
server_sock.settimeout(0.5)
server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# create a client connection to the router
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.settimeout(0.5)
client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
client_sock.connect((host_port[0], self.http_listener_port))
# send GET request - expect this to be successful
count = self._write_until_full(client_sock, small_get_req, 1.0)
self.assertEqual(len(small_get_req), count)
request = self._read_until_empty(server_sock, 5.0)
self.assertEqual(len(small_get_req), len(request))
# send a Very Long response, expecting it to block at some point
chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n'
last_chunk = b'0 \r\n\r\n'
response = b'HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n'
count = self._write_until_full(server_sock, response, 1.0)
self.assertEqual(len(response), count)
count = 0
deadline = time() + TIMEOUT
while deadline >= time():
count = self._write_until_full(server_sock, chunk, 5.0)
if count < len(chunk):
break
self.assertFalse(time() > deadline,
"Server never blocked as expected!")
# server should now be in Q2 block. Drain the client to unblock Q2
_ = self._read_until_empty(client_sock, 2.0)
# finish the response
if count:
remainder = self._write_until_full(server_sock, chunk[count:], 1.0)
self.assertEqual(len(chunk), count + remainder)
count = self._write_until_full(server_sock, last_chunk, 1.0)
self.assertEqual(len(last_chunk), count)
server_sock.shutdown(socket.SHUT_RDWR)
server_sock.close()
_ = self._read_until_empty(client_sock, 1.0)
client_sock.shutdown(socket.SHUT_RDWR)
client_sock.close()
server_listener.shutdown(socket.SHUT_RDWR)
server_listener.close()
# search the router log file to verify Q2 was hit
block_ct = 0
unblock_ct = 0
with io.open(self.INT_A.logfile_path) as f:
for line in f:
if 'server link blocked on Q2 limit' in line:
block_ct += 1
if 'server link unblocked from Q2 limit' in line:
unblock_ct += 1
self.assertTrue(block_ct > 0)
self.assertEqual(block_ct, unblock_ct)
if __name__ == '__main__':
unittest.main(main_module())